📄 unbiasedqueue.java
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.util;import java.util.ArrayList;import java.util.List;import java.util.logging.Logger;import java.util.logging.Level;import net.jxta.logging.Logging;import net.jxta.impl.util.TimeUtils;/** * A generic queue class. This queue is explicitly <b>NOT</b> a synchronized queue. * * <p/>FIXME 20020511 bondolo@jxta.org This could be more efficient with a * circular queue implementation, but its a pain to write since we allow the * queue to be resizable. * * <p/>FIXME 20020511 bondolo@jxta.org Exercise for the reader: Extend this * class so that it does both LIFO and FIFO. * * <p/>FIXME 20020910 bondolo@jxta.org Needs an optional listener callback * for droppped elments. * * <p/>FIXME 20020910 bondolo@jxta.org Needs an optional "strategy" for * element insertion and removal. * * @deprecated Please convert all code to use the java.util.concurrent BlockingQueue instead. * */@Deprecatedpublic class UnbiasedQueue { /** * Logger */ private static final Logger LOG = Logger.getLogger(UnbiasedQueue.class.getName()); /** * Default number of queue elements. */ protected static final int DEFAULT_MAX_OBJECTS = 100; /** * Default object dropping behaviour */ protected static final boolean DROP_OLDEST_OBJECT = true; /** * Number of milliseconds between notifications that objects are being dropped. */ protected static final long DROPPED_OBJECT_WARNING_INTERVAL = 10 * TimeUtils.ASECOND; /** * Contains the objects we currently have queued. */ protected List<Object> queue = null; /** * The maximum number of objects we will hold in the queue at one time. */ protected int maxObjects; /** * If true the queue is being closed and is currently in the process of * being flushed. All new "push" requests will be refused. */ protected volatile boolean closeFlag = false; /** * When we need to drop objects, drop the oldest obj. */ protected boolean dropOldestObject = true; /** * total number of objects which have been enqueued into this queue */ protected long numEnqueued = 0; /** * sum of queue sizes at enqueue time. */ protected long sumOfQueueSizesEnqueue = 0; /** * total number of objects which have been dequeued from this queue */ protected long numDequeued = 0; /** * sum of queue sizes at dequeue time. */ protected long sumOfQueueSizesDequeue = 0; /** * the number of objects we have dropped since we began working. */ protected long numDropped = 0; /** * absolute time in millis when it will be ok to display a obj about * dropping objects. We throttle this so that there is a chance to do work * rather than just always spewing warnings. */ protected long nextDroppedWarn = 0L; /** * An inner class for wrapping arbitrary queues with synchronization. */ protected static class SynchronizedQueue extends UnbiasedQueue { UnbiasedQueue innerqueue; /** * Constructs a SynchronizedQueue from an UnbiasedQueue * instance. */ public SynchronizedQueue(UnbiasedQueue queue) { innerqueue = queue; } /** * {@inheritDoc} */ @Override public boolean isClosed() { synchronized (innerqueue.queue) { return innerqueue.isClosed(); } } /** * {@inheritDoc} */ @Override public void close() { synchronized (innerqueue.queue) { innerqueue.close(); } } /** * {@inheritDoc} */ @Override public void clear() { synchronized (innerqueue.queue) { innerqueue.clear(); } } /** * {@inheritDoc} */ @Override public boolean push(Object obj) { synchronized (innerqueue.queue) { return innerqueue.push(obj); } } /** * {@inheritDoc} */ @Override public boolean pushBack(Object obj) { synchronized (innerqueue.queue) { return innerqueue.pushBack(obj); } } /** * {@inheritDoc} */ @Override public boolean push(Object obj, long timeout) throws InterruptedException { synchronized (innerqueue.queue) { return innerqueue.push(obj, timeout); } } /** * {@inheritDoc} */ @Override public boolean pushBack(Object obj, long timeout) throws InterruptedException { synchronized (innerqueue.queue) { return innerqueue.pushBack(obj, timeout); } } /** * {@inheritDoc} */ @Override public Object peek() { synchronized (innerqueue.queue) { return innerqueue.peek(); } } /** * {@inheritDoc} */ @Override public Object pop() { synchronized (innerqueue.queue) { return innerqueue.pop(); } } /** * {@inheritDoc} */ @Override public Object pop(long timeout) throws InterruptedException { synchronized (innerqueue.queue) { return innerqueue.pop(timeout); } } /** * {@inheritDoc} */ @Override public Object[] popMulti(int maxObjs) { synchronized (innerqueue.queue) { return innerqueue.popMulti(maxObjs); } } /** * {@inheritDoc} */ @Override public int getMaxQueueSize() { synchronized (innerqueue.queue) { return innerqueue.getMaxQueueSize(); } } /** * {@inheritDoc} */ @Override public void setMaxQueueSize(int maxObjs) { synchronized (innerqueue.queue) { innerqueue.setMaxQueueSize(maxObjs); } } @Override public int getCurrentInQueue() { synchronized (innerqueue.queue) { return innerqueue.getCurrentInQueue(); } } /** * {@inheritDoc} */ @Override public long getNumEnqueued() { synchronized (innerqueue.queue) { return innerqueue.getNumEnqueued(); } } /** * {@inheritDoc} */ @Override public double getAvgInQueueAtEnqueue() { synchronized (innerqueue.queue) { return innerqueue.getAvgInQueueAtEnqueue(); } } /** * {@inheritDoc} */ @Override public long getNumDequeued() { synchronized (innerqueue.queue) { return innerqueue.getNumDequeued(); } } /** * {@inheritDoc} */ @Override public double getAvgInQueueAtDequeue() { synchronized (innerqueue.queue) { return innerqueue.getAvgInQueueAtDequeue(); } } /** * {@inheritDoc} */ @Override public long getNumDropped() { synchronized (innerqueue.queue) { return innerqueue.getNumDropped(); } } } /** * Returns a synchronized (thread-safe) list backed by the specified queue. * Most UnbiasedQueue subclasses are either unsynchronized or internally * synchronized. If you need to do any atomic operations upon * UnbiasedQueues (or subclasses) then this method should be used to * "wrap" the queue with synchronization. * * <p/>In order to guarantee serial access, it is critical that all access * to the backing queue is accomplished through the returned queue. * * @param queue the queue to be "wrapped" in a synchronized queue. */ public static UnbiasedQueue synchronizedQueue(UnbiasedQueue queue) { return new SynchronizedQueue(queue); } /** * Default constructor. 100 element FIFO queue which drops oldest element * when full. */ public UnbiasedQueue() { this(DEFAULT_MAX_OBJECTS, DROP_OLDEST_OBJECT); } /** * Full featured constructor for creating a new UnBiasedQueue. * * @param maxsize Queue will not grow larger than this size. Use * {@link java.lang.Integer#MAX_VALUE} for "unbounded" queue size. * @param dropOldest Controls behaviour of element insertion when the * queue is full. If <tt>true</tt> and the queue is full upon a * {@link #push(Object) push} operation then the oldest element will be * dropped to be replaced with the element currently being pushed. If * <tt>false</tt> then the element will not be inserted if the queue is full. */ public UnbiasedQueue(int maxsize, boolean dropOldest) { this(maxsize, dropOldest, new ArrayList<Object>()); } /** * Full featured constructor for creating a new UnBiasedQueue. * * @param maxsize Queue will not grow larger than this size. Use * {@link java.lang.Integer#MAX_VALUE} for "unbounded" queue size. * @param dropOldest Controls behaviour of element insertion when the * queue is full. If <tt>true</tt> and the queue is full upon a * {@link #push(Object) push} operation then the oldest element will be * dropped to be replaced with the element currently being pushed. If * <tt>false</tt> then the element will not be inserted if the queue is * full. * @param queue the List class instance to use. This does not need to be * a synchronized list class. (and it works more effciently if it isn't). */ public UnbiasedQueue(int maxsize, boolean dropOldest, List<Object> queue) { if (maxsize <= 0) { throw new IllegalArgumentException("size must be > 0"); } if (null == queue) { throw new IllegalArgumentException("queue must be non-null"); } maxObjects = maxsize; this.queue = queue; closeFlag = false; dropOldestObject = dropOldest; } /** * {@inheritDoc} *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -