📄 arraynotificationbuffer.java
字号:
* return. May be 0 to indicate a wait for eligible notifications * that will return a usable <code>nextSequenceNumber</code>. The * {@link TargetedNotification} array in the returned {@link * NotificationResult} may contain more than this number of * elements but will not contain more than this number of * different notifications. */ public NotificationResult fetchNotifications(NotificationBufferFilter filter, long startSequenceNumber, long timeout, int maxNotifications) throws InterruptedException { logger.trace("fetchNotifications", "starts"); if (startSequenceNumber < 0 || isDisposed()) { synchronized(this) { return new NotificationResult(earliestSequenceNumber(), nextSequenceNumber(), new TargetedNotification[0]); } } // Check arg validity if (filter == null || startSequenceNumber < 0 || timeout < 0 || maxNotifications < 0) { logger.trace("fetchNotifications", "Bad args"); throw new IllegalArgumentException("Bad args to fetch"); } if (logger.debugOn()) { logger.trace("fetchNotifications", "filter=" + filter + "; startSeq=" + startSequenceNumber + "; timeout=" + timeout + "; max=" + maxNotifications); } if (startSequenceNumber > nextSequenceNumber()) { final String msg = "Start sequence number too big: " + startSequenceNumber + " > " + nextSequenceNumber(); logger.trace("fetchNotifications", msg); throw new IllegalArgumentException(msg); } /* Determine the end time corresponding to the timeout value. Caller may legitimately supply Long.MAX_VALUE to indicate no timeout. In that case the addition will overflow and produce a negative end time. Set end time to Long.MAX_VALUE in that case. We assume System.currentTimeMillis() is positive. */ long endTime = System.currentTimeMillis() + timeout; if (endTime < 0) // overflow endTime = Long.MAX_VALUE; if (logger.debugOn()) logger.debug("fetchNotifications", "endTime=" + endTime); /* We set earliestSeq the first time through the loop. If we set it here, notifications could be dropped before we started examining them, so earliestSeq might not correspond to the earliest notification we examined. */ long earliestSeq = -1; long nextSeq = startSequenceNumber; List<TargetedNotification> notifs = new ArrayList<TargetedNotification>(); /* On exit from this loop, notifs, earliestSeq, and nextSeq must all be correct values for the returned NotificationResult. */ while (true) { logger.debug("fetchNotifications", "main loop starts"); NamedNotification candidate; /* Get the next available notification regardless of filters, or wait for one to arrive if there is none. */ synchronized (this) { /* First time through. The current earliestSequenceNumber is the first one we could have examined. */ if (earliestSeq < 0) { earliestSeq = earliestSequenceNumber(); if (logger.debugOn()) { logger.debug("fetchNotifications", "earliestSeq=" + earliestSeq); } if (nextSeq < earliestSeq) { nextSeq = earliestSeq; logger.debug("fetchNotifications", "nextSeq=earliestSeq"); } } else earliestSeq = earliestSequenceNumber(); /* If many notifications have been dropped since the last time through, nextSeq could now be earlier than the current earliest. If so, notifications may have been lost and we return now so the caller can see this next time it calls. */ if (nextSeq < earliestSeq) { logger.trace("fetchNotifications", "nextSeq=" + nextSeq + " < " + "earliestSeq=" + earliestSeq + " so may have lost notifs"); break; } if (nextSeq < nextSequenceNumber()) { candidate = notificationAt(nextSeq); if (logger.debugOn()) { logger.debug("fetchNotifications", "candidate: " + candidate); logger.debug("fetchNotifications", "nextSeq now " + nextSeq); } } else { /* nextSeq is the largest sequence number. If we already got notifications, return them now. Otherwise wait for some to arrive, with timeout. */ if (notifs.size() > 0) { logger.debug("fetchNotifications", "no more notifs but have some so don't wait"); break; } long toWait = endTime - System.currentTimeMillis(); if (toWait <= 0) { logger.debug("fetchNotifications", "timeout"); break; } /* dispose called */ if (isDisposed()) { if (logger.debugOn()) logger.debug("fetchNotifications", "dispose callled, no wait"); return new NotificationResult(earliestSequenceNumber(), nextSequenceNumber(), new TargetedNotification[0]); } if (logger.debugOn()) logger.debug("fetchNotifications", "wait(" + toWait + ")"); wait(toWait); continue; } } /* We have a candidate notification. See if it matches our filters. We do this outside the synchronized block so we don't hold up everyone accessing the buffer (including notification senders) while we evaluate potentially slow filters. */ ObjectName name = candidate.getObjectName(); Notification notif = candidate.getNotification(); List<TargetedNotification> matchedNotifs = new ArrayList<TargetedNotification>(); logger.debug("fetchNotifications", "applying filter to candidate"); filter.apply(matchedNotifs, name, notif); if (matchedNotifs.size() > 0) { /* We only check the max size now, so that our returned nextSeq is as large as possible. This prevents the caller from thinking it missed interesting notifications when in fact we knew they weren't. */ if (maxNotifications <= 0) { logger.debug("fetchNotifications", "reached maxNotifications"); break; } --maxNotifications; if (logger.debugOn()) logger.debug("fetchNotifications", "add: " + matchedNotifs); notifs.addAll(matchedNotifs); } ++nextSeq; } // end while /* Construct and return the result. */ int nnotifs = notifs.size(); TargetedNotification[] resultNotifs = new TargetedNotification[nnotifs]; notifs.toArray(resultNotifs); NotificationResult nr = new NotificationResult(earliestSeq, nextSeq, resultNotifs); if (logger.debugOn()) logger.debug("fetchNotifications", nr.toString()); logger.trace("fetchNotifications", "ends"); return nr; } synchronized long earliestSequenceNumber() { return earliestSequenceNumber; } synchronized long nextSequenceNumber() { return nextSequenceNumber; } synchronized void addNotification(NamedNotification notif) { if (logger.traceOn()) logger.trace("addNotification", notif.toString()); while (queue.size() >= queueSize) { dropNotification(); if (logger.debugOn()) { logger.debug("addNotification", "dropped oldest notif, earliestSeq=" + earliestSequenceNumber); } } queue.add(notif); nextSequenceNumber++; if (logger.debugOn()) logger.debug("addNotification", "nextSeq=" + nextSequenceNumber); notifyAll(); } private void dropNotification() { queue.remove(0); earliestSequenceNumber++; } synchronized NamedNotification notificationAt(long seqNo) { long index = seqNo - earliestSequenceNumber; if (index < 0 || index > Integer.MAX_VALUE) { final String msg = "Bad sequence number: " + seqNo + " (earliest " + earliestSequenceNumber + ")"; logger.trace("notificationAt", msg); throw new IllegalArgumentException(msg); } return queue.get((int) index); } private static class NamedNotification { NamedNotification(ObjectName sender, Notification notif) { this.sender = sender; this.notification = notif; } ObjectName getObjectName() { return sender; } Notification getNotification() { return notification; } public String toString() { return "NamedNotification(" + sender + ", " + notification + ")"; } private final ObjectName sender; private final Notification notification; } /* * Add our listener to every NotificationBroadcaster MBean * currently in the MBean server and to every * NotificationBroadcaster later created. * * It would be really nice if we could just do * mbs.addNotificationListener(new ObjectName("*:*"), ...); * Definitely something for the next version of JMX. * * There is a nasty race condition that we must handle. We * first register for MBean-creation notifications so we can add
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -