📄 unbiasedqueue.java
字号:
/* * Copyright (c) 2001-2002 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * * $Id: UnbiasedQueue.java,v 1.14 2006/02/13 23:24:25 bondolo Exp $ */package net.jxta.impl.util;import java.util.ArrayList;import java.util.List;import org.apache.log4j.Logger;import org.apache.log4j.Level;import net.jxta.impl.util.TimeUtils;/** * A generic queue class. This queue is explicitly <b>NOT</b> a synchronized queue. * * <p/>FIXME 20020511 bondolo@jxta.org This could be more efficient with a * circular queue implementation, but its a pain to write since we allow the * queue to be resizable. * * <p/>FIXME 20020511 bondolo@jxta.org Exercise for the reader: Extend this * class so that it does both LIFO and FIFO. * * <p/>FIXME 20020910 bondolo@jxta.org Needs an optional listener callback * for droppped elments. * * <p/>FIXME 20020910 bondolo@jxta.org Needs an optional "strategy" for * element insertion and removal. * * @deprecated Please convert all code to use the java.util.concurrent BlockingQueue instead. * **/public class UnbiasedQueue { /** * Log4J Logger **/ private static final Logger LOG = Logger.getLogger(UnbiasedQueue.class.getName()); /** * Default number of queue elements. **/ protected static final int DEFAULT_MAX_OBJECTS = 100; /** * Default object dropping behaviour **/ protected static final boolean DROP_OLDEST_OBJECT = true; /** * Number of milliseconds between notifications that objects are being dropped. **/ protected static final long DROPPED_OBJECT_WARNING_INTERVAL = 10 * TimeUtils.ASECOND; /** * Contains the objects we currently have queued. **/ protected List queue = null; /** * The maximum number of objects we will hold in the queue at one time. **/ protected int maxObjects; /** * If true the queue is being closed and is currently in the process of * being flushed. All new "push" requests will be refused. **/ protected volatile boolean closeFlag = false; /** * When we need to drop objects, drop the oldest obj. **/ protected boolean dropOldestObject = true; /** * total number of objects which have been enqueued into this queue **/ protected long numEnqueued = 0; /** * sum of queue sizes at enqueue time. **/ protected long sumOfQueueSizesEnqueue = 0; /** * total number of objects which have been dequeued from this queue **/ protected long numDequeued = 0; /** * sum of queue sizes at dequeue time. **/ protected long sumOfQueueSizesDequeue = 0; /** * the number of objects we have dropped since we began working. **/ protected long numDropped = 0; /** * absolute time in millis when it will be ok to display a obj about * dropping objects. We throttle this so that there is a chance to do work * rather than just always spewing warnings. */ protected long nextDroppedWarn = 0L; /** * An inner class for wrapping arbitrary queues with synchronization. **/ protected static class SynchronizedQueue extends UnbiasedQueue { UnbiasedQueue innerqueue; /** * Constructs a SynchronizedQueue from an UnbiasedQueue * instance. **/ public SynchronizedQueue( UnbiasedQueue queue ) { innerqueue = queue; } /** * {@inheritDoc} **/ public boolean isClosed() { synchronized( innerqueue.queue ) { return innerqueue.isClosed(); } } /** * {@inheritDoc} **/ public void close() { synchronized( innerqueue.queue ) { innerqueue.close(); } } /** * {@inheritDoc} **/ public void clear() { synchronized( innerqueue.queue ) { innerqueue.clear(); } } /** * {@inheritDoc} **/ public boolean push( Object obj ) { synchronized( innerqueue.queue ) { return innerqueue.push( obj ); } } /** * {@inheritDoc} **/ public boolean pushBack( Object obj ) { synchronized( innerqueue.queue ) { return innerqueue.pushBack( obj ); } } /** * {@inheritDoc} **/ public boolean push( Object obj, long timeout ) throws InterruptedException { synchronized( innerqueue.queue ) { return innerqueue.push( obj, timeout ); } } /** * {@inheritDoc} **/ public boolean pushBack( Object obj, long timeout ) throws InterruptedException { synchronized( innerqueue.queue ) { return innerqueue.pushBack( obj, timeout ); } } /** * {@inheritDoc} **/ public Object peek() { synchronized( innerqueue.queue ) { return innerqueue.peek(); } } /** * {@inheritDoc} **/ public Object pop() { synchronized( innerqueue.queue ) { return innerqueue.pop(); } } /** * {@inheritDoc} **/ public Object pop( long timeout ) throws InterruptedException { synchronized( innerqueue.queue ) { return innerqueue.pop( timeout ); } } /** * {@inheritDoc} **/ public Object [] popMulti( int maxObjs ) { synchronized( innerqueue.queue ) { return innerqueue.popMulti(maxObjs); } } /** * {@inheritDoc} **/ public int getMaxQueueSize() { synchronized( innerqueue.queue ) { return innerqueue.getMaxQueueSize(); } } /** * {@inheritDoc} **/ public void setMaxQueueSize( int maxObjs ) { synchronized( innerqueue.queue ) { innerqueue.setMaxQueueSize( maxObjs ); } } public int getCurrentInQueue() { synchronized( innerqueue.queue ) { return innerqueue.getCurrentInQueue(); } } /** * {@inheritDoc} **/ public long getNumEnqueued() { synchronized( innerqueue.queue ) { return innerqueue.getNumEnqueued(); } } /** * {@inheritDoc} **/ public double getAvgInQueueAtEnqueue() { synchronized( innerqueue.queue ) { return innerqueue.getAvgInQueueAtEnqueue(); } } /** * {@inheritDoc} **/ public long getNumDequeued() { synchronized( innerqueue.queue ) { return innerqueue.getNumDequeued(); } } /** * {@inheritDoc} **/ public double getAvgInQueueAtDequeue() { synchronized( innerqueue.queue ) { return innerqueue.getAvgInQueueAtDequeue(); } } /** * {@inheritDoc} **/ public long getNumDropped() { synchronized( innerqueue.queue ) { return innerqueue.getNumDropped(); } } } /** * Returns a synchronized (thread-safe) list backed by the specified queue. * Most UnbiasedQueue subclasses are either unsynchronized or internally * synchronized. If you need to do any atomic operations upon * UnbiasedQueues (or subclasses) then this method should be used to * "wrap" the queue with synchronization. * * <p/>In order to guarantee serial access, it is critical that all access * to the backing queue is accomplished through the returned queue. * * @param queue the queue to be "wrapped" in a synchronized queue. **/ public static UnbiasedQueue synchronizedQueue( UnbiasedQueue queue ) { return new SynchronizedQueue( queue ); } /** * Default constructor. 100 element FIFO queue which drops oldest element * when full. */ public UnbiasedQueue() { this( DEFAULT_MAX_OBJECTS, DROP_OLDEST_OBJECT ); } /** * Full featured constructor for creating a new UnBiasedQueue. * * @param maxsize Queue will not grow larger than this size. Use * {@link java.lang.Integer#MAX_VALUE} for "unbounded" queue size. * @param dropOldest Controls behaviour of element insertion when the * queue is full. If <tt>true</tt> and the queue is full upon a * {@link #push(Object) push} operation then the oldest element will be * dropped to be replaced with the element currently being pushed. If * <tt>false</tt> then the element will not be inserted if the queue is full. */ public UnbiasedQueue( int maxsize, boolean dropOldest ) { this( maxsize, dropOldest, new ArrayList() ); } /** * Full featured constructor for creating a new UnBiasedQueue. * * @param maxsize Queue will not grow larger than this size. Use * {@link java.lang.Integer#MAX_VALUE} for "unbounded" queue size. * @param dropOldest Controls behaviour of element insertion when the * queue is full. If <tt>true</tt> and the queue is full upon a * {@link #push(Object) push} operation then the oldest element will be * dropped to be replaced with the element currently being pushed. If * <tt>false</tt> then the element will not be inserted if the queue is * full. * @param queue the List class instance to use. This does not need to be * a synchronized list class. (and it works more effciently if it isn't). **/ public UnbiasedQueue(int maxsize, boolean dropOldest, List queue ) { if( maxsize <= 0 ) throw new IllegalArgumentException( "size must be > 0" ); if( null == queue ) throw new IllegalArgumentException( "queue must be non-null" ); maxObjects = maxsize; this.queue = queue; closeFlag = false; dropOldestObject = dropOldest; } /** * {@inheritDoc} * * <p/>A diagnostic toString implementation. **/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -