📄 clientnotifforwarder.java
字号:
if (clientSequenceNumber >= 0) { missed = nr.getEarliestSequenceNumber() - clientSequenceNumber; } clientSequenceNumber = nr.getNextSequenceNumber(); final int size = infoList.size(); listeners = new HashMap(((size>len)?len:size)); for (int i = 0 ; i < len ; i++) { final TargetedNotification tn = notifs[i]; final Integer listenerID = tn.getListenerID(); // check if an mbean unregistration notif if (!listenerID.equals(mbeanRemovedNotifID)) { final ListenerInfo li = (ListenerInfo) infoList.get(listenerID); if (li != null) listeners.put(listenerID,li); continue; } final Notification notif = tn.getNotification(); final String unreg = MBeanServerNotification.UNREGISTRATION_NOTIFICATION; if (notif instanceof MBeanServerNotification && notif.getType().equals(unreg)) { MBeanServerNotification mbsn = (MBeanServerNotification) notif; ObjectName name = mbsn.getMBeanName(); removeNotificationListener(name); } } myListenerID = mbeanRemovedNotifID; } if (missed > 0) { final String msg = "May have lost up to " + missed + " notification" + (missed == 1 ? "" : "s"); lostNotifs(msg, missed); logger.trace("NotifFetcher.run", msg); } // forward for (int i = 0 ; i < len ; i++) { final TargetedNotification tn = notifs[i]; dispatchNotification(tn,myListenerID,listeners); } } synchronized (ClientNotifForwarder.this) { currentFetchThread = null; } if (nr == null || shouldStop()) { // tell that the thread is REALLY stopped setState(STOPPED); } else { executor.execute(this); } } void dispatchNotification(TargetedNotification tn, Integer myListenerID, Map listeners) { final Notification notif = tn.getNotification(); final Integer listenerID = tn.getListenerID(); if (listenerID.equals(myListenerID)) return; final ListenerInfo li = (ClientListenerInfo) listeners.get(listenerID); if (li == null) { logger.trace("NotifFetcher.dispatch", "Listener ID not in map"); return; } NotificationListener l = li.getListener(); Object h = li.getHandback(); try { l.handleNotification(notif, h); } catch (RuntimeException e) { final String msg = "Failed to forward a notification " + "to a listener"; logger.trace("NotifFetcher-run", msg, e); } } private NotificationResult fetchNotifs() { try { NotificationResult nr = ClientNotifForwarder.this. fetchNotifs(clientSequenceNumber,maxNotifications, timeout); if (logger.traceOn()) { logger.trace("NotifFetcher-run", "Got notifications from the server: "+nr); } return nr; } catch (ClassNotFoundException e) { logger.trace("NotifFetcher.fetchNotifs", e); return fetchOneNotif(); } catch (NotSerializableException e) { logger.trace("NotifFetcher.fetchNotifs", e); return fetchOneNotif(); } catch (IOException ioe) { if (!shouldStop()) { logger.error("NotifFetcher-run", "Failed to fetch notification, " + "stopping thread. Error is: " + ioe, ioe); logger.debug("NotifFetcher-run",ioe); } // no more fetching return null; } } /* Fetch one notification when we suspect that it might be a notification that we can't deserialize (because of a missing class). First we ask for 0 notifications with 0 timeout. This allows us to skip sequence numbers for notifications that don't match our filters. Then we ask for one notification. If that produces a ClassNotFoundException or a NotSerializableException, we increase our sequence number and ask again. Eventually we will either get a successful notification, or a return with 0 notifications. In either case we can return a NotificationResult. This algorithm works (albeit less well) even if the server implementation doesn't optimize a request for 0 notifications to skip sequence numbers for notifications that don't match our filters. If we had at least one ClassNotFoundException, then we must emit a JMXConnectionNotification.LOST_NOTIFS. */ private NotificationResult fetchOneNotif() { ClientNotifForwarder cnf = ClientNotifForwarder.this; long startSequenceNumber = clientSequenceNumber; int notFoundCount = 0; NotificationResult result = null; while (result == null && !shouldStop()) { NotificationResult nr; try { // 0 notifs to update startSequenceNumber nr = cnf.fetchNotifs(startSequenceNumber, 0, 0L); } catch (ClassNotFoundException e) { logger.warning("NotifFetcher.fetchOneNotif", "Impossible exception: " + e); logger.debug("NotifFetcher.fetchOneNotif",e); return null; } catch (IOException e) { if (!shouldStop()) logger.trace("NotifFetcher.fetchOneNotif", e); return null; } if (shouldStop()) return null; startSequenceNumber = nr.getNextSequenceNumber(); try { // 1 notif to skip possible missing class result = cnf.fetchNotifs(startSequenceNumber, 1, 0L); } catch (Exception e) { if (e instanceof ClassNotFoundException || e instanceof NotSerializableException) { logger.warning("NotifFetcher.fetchOneNotif", "Failed to deserialize a notification: "+e.toString()); if (logger.traceOn()) { logger.trace("NotifFetcher.fetchOneNotif", "Failed to deserialize a notification.", e); } notFoundCount++; startSequenceNumber++; } else { if (!shouldStop()) logger.trace("NotifFetcher.fetchOneNotif", e); return null; } } } if (notFoundCount > 0) { final String msg = "Dropped " + notFoundCount + " notification" + (notFoundCount == 1 ? "" : "s") + " because classes were missing locally"; lostNotifs(msg, notFoundCount); } return result; } private boolean shouldStop() { synchronized (ClientNotifForwarder.this) { if (state != STARTED) { return true; } else if (infoList.size() == 0) { // no more listener, stop fetching setState(STOPPING); return true; } return false; } } }// -------------------------------------------------// private methods// ------------------------------------------------- private synchronized void setState(int newState) { if (state == TERMINATED) { return; } state = newState; this.notifyAll(); } /* * Called to decide whether need to start a thread for fetching notifs. * <P>The parameter reconnected will decide whether to initilize the clientSequenceNumber, * initilaizing the clientSequenceNumber means to ignore all notifications arrived before. * If it is reconnected, we will not initialize in order to get all notifications arrived * during the reconnection. It may cause the newly registered listeners to receive some * notifications arrived before its registray. */ private synchronized void init(boolean reconnected) throws IOException { switch (state) { case STARTED: return; case STARTING: return; case TERMINATED: throw new IOException("The ClientNotifForwarder has been terminated."); case STOPPING: if (beingReconnected == true) { // wait for another thread to do, which is doing reconnection return; } while (state == STOPPING) { // make sure only one fetching thread. try { wait(); } catch (InterruptedException ire) { IOException ioe = new IOException(ire.toString()); EnvHelp.initCause(ioe, ire); throw ioe; } } // re-call this method to check the state again, // the state can be other value like TERMINATED. init(reconnected); return; case STOPPED: if (beingReconnected == true) { // wait for another thread to do, which is doing reconnection return; } if (logger.traceOn()) { logger.trace("init", "Initializing..."); } // init the clientSequenceNumber if not reconnected. if (!reconnected) { try { NotificationResult nr = fetchNotifs(-1, 0, 0); clientSequenceNumber = nr.getNextSequenceNumber(); } catch (ClassNotFoundException e) { // can't happen logger.warning("init", "Impossible exception: "+ e); logger.debug("init",e); } } // for cleaning try { mbeanRemovedNotifID = addListenerForMBeanRemovedNotif(); } catch (Exception e) { final String msg = "Failed to register a listener to the mbean " + "server: the client will not do clean when an MBean " + "is unregistered"; if (logger.traceOn()) { logger.trace("init", msg, e); } } setState(STARTING); // start fetching executor.execute(new NotifFetcher()); return; default: // should not throw new IOException("Unknown state."); } } /** * Import: should not remove a listener dureing reconnection, the reconnection * needs to change the listener list and that will possibly make removal fail. */ private synchronized void beforeRemove() throws IOException { while (beingReconnected) { if (state == TERMINATED) { throw new IOException("Terminated."); } try { wait(); } catch (InterruptedException ire) { IOException ioe = new IOException(ire.toString()); EnvHelp.initCause(ioe, ire); throw ioe; } } if (state == TERMINATED) { throw new IOException("Terminated."); } }// -------------------------------------------------// private variables// ------------------------------------------------- private final ClassLoader defaultClassLoader; private final Executor executor; private final HashMap infoList = new HashMap(); // Integer -> ClientListenerInfo // notif stuff private long clientSequenceNumber = -1; private final int maxNotifications; private final long timeout; private Integer mbeanRemovedNotifID = null; private Thread currentFetchThread; // admin stuff private boolean inited = false; // state /** * This state means that a thread is being created for fetching and forwarding notifications. */ private static final int STARTING = 0; /** * This state tells that a thread has been started for fetching and forwarding notifications. */ private static final int STARTED = 1; /** * This state means that the fetching thread is informed to stop. */ private static final int STOPPING = 2; /** * This state means that the fetching thread is already stopped. */ private static final int STOPPED = 3; /** * This state means that this object is terminated and no more thread will be created * for fetching notifications. */ private static final int TERMINATED = 4; private int state = STOPPED; /** * This variable is used to tell whether a connector (RMIConnector or ClientIntermediary) * is doing reconnection. * This variable will be set to true by the method <code>preReconnection</code>, and set * fase by <code>postReconnection</code>. * When beingReconnected == true, no thread will be created for fetching notifications. */ private boolean beingReconnected = false; private static final ClassLogger logger = new ClassLogger("javax.management.remote.misc", "ClientNotifForwarder");}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -