📄 arraynotificationbuffer.java
字号:
/* * @(#)ArrayNotificationBuffer.java 1.28 06/03/01 * * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.jmx.remote.internal;import java.io.IOException;import java.security.AccessController;import java.security.PrivilegedAction;import java.security.PrivilegedActionException;import java.security.PrivilegedExceptionAction;import java.util.ArrayList;import java.util.Collection;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Set;import java.util.HashMap;import java.util.Map;import javax.management.InstanceNotFoundException;import javax.management.ListenerNotFoundException;import javax.management.MalformedObjectNameException;import javax.management.MBeanServer;import javax.management.MBeanServerDelegate;import javax.management.MBeanServerNotification;import javax.management.Notification;import javax.management.NotificationBroadcaster;import javax.management.NotificationFilter;import javax.management.NotificationFilterSupport;import javax.management.NotificationListener;import javax.management.ObjectName;import javax.management.QueryEval;import javax.management.QueryExp;import javax.management.remote.NotificationResult;import javax.management.remote.TargetedNotification;import com.sun.jmx.remote.util.EnvHelp;import com.sun.jmx.remote.util.ClassLogger;/** A circular buffer of notifications received from an MBean server. */public class ArrayNotificationBuffer implements NotificationBuffer { private boolean disposed = false; // FACTORY STUFF, INCLUDING SHARING private static final HashMap<MBeanServer,ArrayNotificationBuffer> mbsToBuffer = new HashMap<MBeanServer,ArrayNotificationBuffer>(1); private final Collection<ShareBuffer> sharers = new HashSet<ShareBuffer>(1); public static synchronized NotificationBuffer getNotificationBuffer(MBeanServer mbs, Map env) { //Find out queue size int queueSize = EnvHelp.getNotifBufferSize(env); ArrayNotificationBuffer buf = mbsToBuffer.get(mbs); if (buf == null) { buf = new ArrayNotificationBuffer(mbs, queueSize); mbsToBuffer.put(mbs, buf); } return buf.new ShareBuffer(queueSize); } public static synchronized void removeNotificationBuffer(MBeanServer mbs) { mbsToBuffer.remove(mbs); } synchronized void addSharer(ShareBuffer sharer) { if (sharer.getSize() > queueSize) resize(sharer.getSize()); sharers.add(sharer); } void removeSharer(ShareBuffer sharer) { boolean empty; synchronized (this) { sharers.remove(sharer); empty = sharers.isEmpty(); if (!empty) { int max = 0; for (ShareBuffer buf : sharers) { int bufsize = buf.getSize(); if (bufsize > max) max = bufsize; } if (max < queueSize) resize(max); } } if (empty) dispose(); } private void resize(int newSize) { if (newSize == queueSize) return; while (queue.size() > newSize) dropNotification(); queue.resize(newSize); queueSize = newSize; } private class ShareBuffer implements NotificationBuffer { ShareBuffer(int size) { this.size = size; addSharer(this); } public NotificationResult fetchNotifications(NotificationBufferFilter filter, long startSequenceNumber, long timeout, int maxNotifications) throws InterruptedException { NotificationBuffer buf = ArrayNotificationBuffer.this; return buf.fetchNotifications(filter, startSequenceNumber, timeout, maxNotifications); } public void dispose() { ArrayNotificationBuffer.this.removeSharer(this); } int getSize() { return size; } private final int size; } // ARRAYNOTIFICATIONBUFFER IMPLEMENTATION private ArrayNotificationBuffer(MBeanServer mbs, int queueSize) { if (logger.traceOn()) logger.trace("Constructor", "queueSize=" + queueSize); if (mbs == null || queueSize < 1) throw new IllegalArgumentException("Bad args"); this.mBeanServer = mbs; this.queueSize = queueSize; this.queue = new ArrayQueue<NamedNotification>(queueSize); this.earliestSequenceNumber = System.currentTimeMillis(); this.nextSequenceNumber = this.earliestSequenceNumber; createListeners(); logger.trace("Constructor", "ends"); } private synchronized boolean isDisposed() { return disposed; } public void dispose() { logger.trace("dispose", "starts"); synchronized(this) { removeNotificationBuffer(mBeanServer); disposed = true; //Notify potential waiting fetchNotification call notifyAll(); } destroyListeners(); logger.trace("dispose", "ends"); } /** * <p>Fetch notifications that match the given listeners.</p> * * <p>The operation only considers notifications with a sequence * number at least <code>startSequenceNumber</code>. It will take * no longer than <code>timeout</code>, and will return no more * than <code>maxNotifications</code> different notifications.</p> * * <p>If there are no notifications matching the criteria, the * operation will block until one arrives, subject to the * timeout.</p> * * @param filter an object that will add notifications to a * {@code List<TargetedNotification>} if they match the current * listeners with their filters. * @param startSequenceNumber the first sequence number to * consider. * @param timeout the maximum time to wait. May be 0 to indicate * not to wait if there are no notifications. * @param maxNotifications the maximum number of notifications to * return. May be 0 to indicate a wait for eligible notifications * that will return a usable <code>nextSequenceNumber</code>. The * {@link TargetedNotification} array in the returned {@link * NotificationResult} may contain more than this number of * elements but will not contain more than this number of * different notifications. */ public NotificationResult fetchNotifications(NotificationBufferFilter filter, long startSequenceNumber, long timeout, int maxNotifications) throws InterruptedException { logger.trace("fetchNotifications", "starts"); if (startSequenceNumber < 0 || isDisposed()) { synchronized(this) { return new NotificationResult(earliestSequenceNumber(), nextSequenceNumber(), new TargetedNotification[0]); } } // Check arg validity if (filter == null || startSequenceNumber < 0 || timeout < 0 || maxNotifications < 0) { logger.trace("fetchNotifications", "Bad args"); throw new IllegalArgumentException("Bad args to fetch"); } if (logger.debugOn()) { logger.trace("fetchNotifications", "filter=" + filter + "; startSeq=" + startSequenceNumber + "; timeout=" + timeout + "; max=" + maxNotifications); } if (startSequenceNumber > nextSequenceNumber()) { final String msg = "Start sequence number too big: " + startSequenceNumber + " > " + nextSequenceNumber(); logger.trace("fetchNotifications", msg); throw new IllegalArgumentException(msg); } /* Determine the end time corresponding to the timeout value. Caller may legitimately supply Long.MAX_VALUE to indicate no timeout. In that case the addition will overflow and produce a negative end time. Set end time to Long.MAX_VALUE in that case. We assume System.currentTimeMillis() is positive. */ long endTime = System.currentTimeMillis() + timeout; if (endTime < 0) // overflow endTime = Long.MAX_VALUE; if (logger.debugOn()) logger.debug("fetchNotifications", "endTime=" + endTime); /* We set earliestSeq the first time through the loop. If we set it here, notifications could be dropped before we started examining them, so earliestSeq might not correspond to the earliest notification we examined. */ long earliestSeq = -1; long nextSeq = startSequenceNumber; List<TargetedNotification> notifs = new ArrayList<TargetedNotification>(); /* On exit from this loop, notifs, earliestSeq, and nextSeq must all be correct values for the returned NotificationResult. */ while (true) { logger.debug("fetchNotifications", "main loop starts"); NamedNotification candidate; /* Get the next available notification regardless of filters, or wait for one to arrive if there is none. */ synchronized (this) { /* First time through. The current earliestSequenceNumber is the first one we could have examined. */ if (earliestSeq < 0) { earliestSeq = earliestSequenceNumber(); if (logger.debugOn()) { logger.debug("fetchNotifications", "earliestSeq=" + earliestSeq); } if (nextSeq < earliestSeq) { nextSeq = earliestSeq; logger.debug("fetchNotifications", "nextSeq=earliestSeq"); } } else earliestSeq = earliestSequenceNumber(); /* If many notifications have been dropped since the last time through, nextSeq could now be earlier than the current earliest. If so, notifications may have been lost and we return now so the caller can see this next time it calls. */ if (nextSeq < earliestSeq) { logger.trace("fetchNotifications", "nextSeq=" + nextSeq + " < " + "earliestSeq=" + earliestSeq + " so may have lost notifs"); break; } if (nextSeq < nextSequenceNumber()) { candidate = notificationAt(nextSeq); if (logger.debugOn()) { logger.debug("fetchNotifications", "candidate: " + candidate); logger.debug("fetchNotifications", "nextSeq now " + nextSeq); } } else { /* nextSeq is the largest sequence number. If we already got notifications, return them now. Otherwise wait for some to arrive, with timeout. */ if (notifs.size() > 0) { logger.debug("fetchNotifications", "no more notifs but have some so don't wait"); break; } long toWait = endTime - System.currentTimeMillis(); if (toWait <= 0) { logger.debug("fetchNotifications", "timeout"); break; } /* dispose called */ if (isDisposed()) { if (logger.debugOn()) logger.debug("fetchNotifications", "dispose callled, no wait"); return new NotificationResult(earliestSequenceNumber(), nextSequenceNumber(), new TargetedNotification[0]); } if (logger.debugOn()) logger.debug("fetchNotifications", "wait(" + toWait + ")"); wait(toWait); continue; } } /* We have a candidate notification. See if it matches our filters. We do this outside the synchronized block so we don't hold up everyone accessing the buffer (including notification senders) while we evaluate potentially slow filters. */ ObjectName name = candidate.getObjectName(); Notification notif = candidate.getNotification(); List<TargetedNotification> matchedNotifs = new ArrayList<TargetedNotification>(); logger.debug("fetchNotifications", "applying filter to candidate"); filter.apply(matchedNotifs, name, notif); if (matchedNotifs.size() > 0) { /* We only check the max size now, so that our returned nextSeq is as large as possible. This prevents the caller from thinking it missed interesting notifications when in fact we knew they weren't. */ if (maxNotifications <= 0) { logger.debug("fetchNotifications", "reached maxNotifications");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -