⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clientnotifforwarder.java

📁 JAVA的一些源码 JAVA2 STANDARD EDITION DEVELOPMENT KIT 5.0
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
		    if (clientSequenceNumber >= 0) {			missed = nr.getEarliestSequenceNumber() - 			    clientSequenceNumber;    		    }		    clientSequenceNumber = nr.getNextSequenceNumber();		    final int size = infoList.size();		    listeners  = new HashMap(((size>len)?len:size));		    for (int i = 0 ; i < len ; i++) {			final TargetedNotification tn = notifs[i];			final Integer listenerID = tn.getListenerID();						// check if an mbean unregistration notif			if (!listenerID.equals(mbeanRemovedNotifID)) {			    final ListenerInfo li = 				(ListenerInfo) infoList.get(listenerID);			    if (li != null) 				listeners.put(listenerID,li);			    continue;			}			final Notification notif = tn.getNotification();			final String unreg =			    MBeanServerNotification.UNREGISTRATION_NOTIFICATION;			if (notif instanceof MBeanServerNotification &&			    notif.getType().equals(unreg)) {			    			    MBeanServerNotification mbsn =				(MBeanServerNotification) notif;			    ObjectName name = mbsn.getMBeanName();			    			    removeNotificationListener(name);			}		    }		    myListenerID = mbeanRemovedNotifID;		}		if (missed > 0) {		    final String msg =			"May have lost up to " + missed +			" notification" + (missed == 1 ? "" : "s");		    lostNotifs(msg, missed);		    logger.trace("NotifFetcher.run", msg);		}		// forward		for (int i = 0 ; i < len ; i++) {		    final TargetedNotification tn = notifs[i];		    dispatchNotification(tn,myListenerID,listeners);		}	    }            synchronized (ClientNotifForwarder.this) {		currentFetchThread = null;	    }	    if (nr == null || shouldStop()) {		// tell that the thread is REALLY stopped		setState(STOPPED);	    } else {		executor.execute(this);	    }	}	void dispatchNotification(TargetedNotification tn, 				  Integer myListenerID, Map listeners) {	    final Notification notif = tn.getNotification();	    final Integer listenerID = tn.getListenerID();	    	    if (listenerID.equals(myListenerID)) return;	    final ListenerInfo li = (ClientListenerInfo) 		listeners.get(listenerID);	    if (li == null) {		logger.trace("NotifFetcher.dispatch",			     "Listener ID not in map");		return;	    }	    NotificationListener l = li.getListener();	    Object h = li.getHandback();	    try {		l.handleNotification(notif, h);	    } catch (RuntimeException e) {		final String msg =		    "Failed to forward a notification " +		    "to a listener";		logger.trace("NotifFetcher-run", msg, e);	    }	}	private NotificationResult fetchNotifs() {	    try {		NotificationResult nr = ClientNotifForwarder.this.		    fetchNotifs(clientSequenceNumber,maxNotifications,				timeout);		if (logger.traceOn()) {		    logger.trace("NotifFetcher-run",				 "Got notifications from the server: "+nr);		}		return nr;	    } catch (ClassNotFoundException e) {		logger.trace("NotifFetcher.fetchNotifs", e);		return fetchOneNotif();	    } catch (NotSerializableException e) {		logger.trace("NotifFetcher.fetchNotifs", e);		return fetchOneNotif();	    } catch (IOException ioe) {		if (!shouldStop()) {		    logger.error("NotifFetcher-run",				 "Failed to fetch notification, " +				 "stopping thread. Error is: " + ioe, ioe);		    logger.debug("NotifFetcher-run",ioe);		}		// no more fetching		return null;	    }	}	/* Fetch one notification when we suspect that it might be a	   notification that we can't deserialize (because of a	   missing class).  First we ask for 0 notifications with 0	   timeout.  This allows us to skip sequence numbers for	   notifications that don't match our filters.  Then we ask	   for one notification.  If that produces a	   ClassNotFoundException or a NotSerializableException, we	   increase our sequence number and ask again.  Eventually we	   will either get a successful notification, or a return with	   0 notifications.  In either case we can return a	   NotificationResult.  This algorithm works (albeit less	   well) even if the server implementation doesn't optimize a	   request for 0 notifications to skip sequence numbers for	   notifications that don't match our filters.	   If we had at least one ClassNotFoundException, then we	   must emit a JMXConnectionNotification.LOST_NOTIFS.	*/	private NotificationResult fetchOneNotif() {	    ClientNotifForwarder cnf = ClientNotifForwarder.this;	    long startSequenceNumber = clientSequenceNumber;	    int notFoundCount = 0;	    NotificationResult result = null;	    while (result == null && !shouldStop()) {		NotificationResult nr;		try {		    // 0 notifs to update startSequenceNumber		    nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L);		} catch (ClassNotFoundException e) {		    logger.warning("NotifFetcher.fetchOneNotif",				   "Impossible exception: " + e);		    logger.debug("NotifFetcher.fetchOneNotif",e);		    return null;		} catch (IOException e) {		    if (!shouldStop())			logger.trace("NotifFetcher.fetchOneNotif", e);		    return null;		}		if (shouldStop())		    return null;		startSequenceNumber = nr.getNextSequenceNumber();		try {		    // 1 notif to skip possible missing class		    result = cnf.fetchNotifs(startSequenceNumber, 1, 0L);		} catch (Exception e) {		    if (e instanceof ClassNotFoundException			|| e instanceof NotSerializableException) {			logger.warning("NotifFetcher.fetchOneNotif",				     "Failed to deserialize a notification: "+e.toString());			if (logger.traceOn()) {			    logger.trace("NotifFetcher.fetchOneNotif",					 "Failed to deserialize a notification.", e);			}			notFoundCount++;			startSequenceNumber++;		    } else {			if (!shouldStop())			    logger.trace("NotifFetcher.fetchOneNotif", e);			return null;		    }		}	    }	    if (notFoundCount > 0) {		final String msg =		    "Dropped " + notFoundCount + " notification" +		    (notFoundCount == 1 ? "" : "s") +		    " because classes were missing locally";		lostNotifs(msg, notFoundCount);	    }	    return result;	}	private boolean shouldStop() {	    synchronized (ClientNotifForwarder.this) {		if (state != STARTED) {		    return true;		} else if (infoList.size() == 0) {		    // no more listener, stop fetching		    setState(STOPPING);		    return true;		}		return false;	    }	}    }// -------------------------------------------------// private methods// -------------------------------------------------    private synchronized void setState(int newState) {	if (state == TERMINATED) {	    return;	}		state = newState;	this.notifyAll();    }    /*     * Called to decide whether need to start a thread for fetching notifs.     * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber,     * initilaizing the clientSequenceNumber means to ignore all notifications arrived before.     * If it is reconnected, we will not initialize in order to get all notifications arrived     * during the reconnection. It may cause the newly registered listeners to receive some     * notifications arrived before its registray.     */    private synchronized void init(boolean reconnected) throws IOException {	switch (state) {	case STARTED:	    return;	case STARTING:	    return;	case TERMINATED:	    throw new IOException("The ClientNotifForwarder has been terminated.");	case STOPPING:	    if (beingReconnected == true) {		// wait for another thread to do, which is doing reconnection		return;	    }	    while (state == STOPPING) { // make sure only one fetching thread.				try {		    wait();		} catch (InterruptedException ire) {		    IOException ioe = new IOException(ire.toString());		    EnvHelp.initCause(ioe, ire);		    		    throw ioe;		}	    }			    // re-call this method to check the state again,	    // the state can be other value like TERMINATED.	    init(reconnected);	    return;	case STOPPED:	    if (beingReconnected == true) {		// wait for another thread to do, which is doing reconnection		return;	    }	    if (logger.traceOn()) {		logger.trace("init", "Initializing...");	    }	    // init the clientSequenceNumber if not reconnected.	    if (!reconnected) {		try {		    NotificationResult nr = fetchNotifs(-1, 0, 0);		    clientSequenceNumber = nr.getNextSequenceNumber();		} catch (ClassNotFoundException e) {		    // can't happen		    logger.warning("init", "Impossible exception: "+ e);		    logger.debug("init",e);		}	    }	    // for cleaning	    try {		mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();	    } catch (Exception e) {		final String msg =		    "Failed to register a listener to the mbean " +		    "server: the client will not do clean when an MBean " +		    "is unregistered";		if (logger.traceOn()) {		    logger.trace("init", msg, e);		}		 	    } 	    setState(STARTING);	    // start fetching	    executor.execute(new NotifFetcher());	    return;	default:	    // should not	    throw new IOException("Unknown state.");	}    }    /**     * Import: should not remove a listener dureing reconnection, the reconnection     * needs to change the listener list and that will possibly make removal fail.     */    private synchronized void beforeRemove() throws IOException {        while (beingReconnected) {	    if (state == TERMINATED) {		throw new IOException("Terminated.");	    }	    try {		wait();	    } catch (InterruptedException ire) {		IOException ioe = new IOException(ire.toString());		EnvHelp.initCause(ioe, ire);		throw ioe;	    }	}	if (state == TERMINATED) {	    throw new IOException("Terminated.");	}    }// -------------------------------------------------// private variables// -------------------------------------------------    private final ClassLoader defaultClassLoader;    private final Executor executor;    private final HashMap infoList = new HashMap();    // Integer -> ClientListenerInfo    // notif stuff    private long clientSequenceNumber = -1;    private final int maxNotifications;    private final long timeout;    private Integer mbeanRemovedNotifID = null;    private Thread currentFetchThread;    // admin stuff    private boolean inited = false;    // state    /**     * This state means that a thread is being created for fetching and forwarding notifications.     */    private static final int STARTING = 0;    /**     * This state tells that a thread has been started for fetching and forwarding notifications.     */    private static final int STARTED = 1;    /**     * This state means that the fetching thread is informed to stop.     */    private static final int STOPPING = 2;    /**     * This state means that the fetching thread is already stopped.     */    private static final int STOPPED = 3;    /**     * This state means that this object is terminated and no more thread will be created     * for fetching notifications.     */     private static final int TERMINATED = 4;    private int state = STOPPED;    /**     * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)     * is doing reconnection.     * This variable will be set to true by the method <code>preReconnection</code>, and set     * fase by <code>postReconnection</code>.     * When beingReconnected == true, no thread will be created for fetching notifications.     */    private boolean beingReconnected = false;    private static final ClassLogger logger =	new ClassLogger("javax.management.remote.misc",			"ClientNotifForwarder");}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -