📄 unbiasedqueue.java
字号:
public synchronized String toString() { return this.getClass().getName() + " :" + " size=" + getCurrentInQueue() + " capacity=" + getMaxQueueSize() + " enqueued=" + getNumEnqueued() + " avgAtEnqueue=" + getAvgInQueueAtEnqueue() + " dequeued=" + getNumDequeued() + " avgAtDequeue=" + getAvgInQueueAtDequeue(); } /** * Atomically return whether or not this queue has been closed. Closed * queues will not accept "push" requests, but elements will still be * returned with "pop". * * @return boolean indicating whether this queue has been closed. **/ public boolean isClosed() { return closeFlag; // closeFlag is volatile. } /** * Close the queue. This will prevent any further objects from being enqueued. **/ public void close() { closeFlag = true; synchronized( queue ) { queue.notifyAll(); } } /** * Flush the queue of all pending objects. **/ public void clear() { numDropped += queue.size(); queue.clear(); } /** * Attempt to push an object onto the queue. If the queue is full then the * object will not be pushed. This method does not use any synchronization * and should not be used if other threads are using {@link #pop(long)} to * retrieve elements. * * @param obj object to push * @return true if the obj was pushed, otherwise false. **/ public boolean push( Object obj ) { if ( queue.size() >= maxObjects ) return false; numEnqueued++; sumOfQueueSizesEnqueue += queue.size(); queue.add(obj); return true; } /** * Attempt to push an object back at the head the queue. If the queue is * full then the object will not be pushed. This method does not use any * synchronization and should not be used if other threads are using * {@link #pop(long)} to retrieve elements. * * @param obj object to push * @return true if the obj was pushed, otherwise false. **/ public boolean pushBack( Object obj ) { if (queue.size() >= maxObjects ) return false; numEnqueued++; sumOfQueueSizesEnqueue += queue.size(); queue.add(0, obj); return true; } /** * Push an object onto the queue. If the queue is full then the push will * wait for up to "timeout" milliseconds to push the object. At the end of * "timeout" milliseconds, the push will either return false or remove the * oldest item from the queue and insert "obj". This behaviour is contolled * by the constructor parameter "dropOldest". * * @param obj Object to be pushed onto the queue * @param timeout Time in milliseconds to try to insert the item into a full * queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly. * Negative values force no wait period at all. * @return true if the object was intersted into the queue, otherwise false. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. **/ public boolean push( Object obj, long timeout ) throws InterruptedException { return push3(obj, timeout, false); } /** * Push an object back at the head of the queue. If the queue is full then * the push will wait for up to "timeout" milliseconds to push the object. * At the end of "timeout" milliseconds, the push will either return false * or remove the oldest item from the queue and insert "obj". This behaviour * is contolled by the constructor parameter "dropOldest". * * <p/>Timeout control is accomplished via synchronization and * {@link Object#wait(long)}. {@link #pushBack(Object,long)} should only * be used in conjunction with {@link #push(Object,long)} and * {@link #pop(long)} * * @param obj Object to be pushed onto the queue * @param timeout Time in milliseconds to try to insert the item into a full * queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly. * Negative values force no wait period at all. * @return <tt>true</tt> if the object was intersted into the queue, * otherwise <tt>false</tt>. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. **/ public boolean pushBack( Object obj, long timeout ) throws InterruptedException { return push3(obj, timeout, true); } private boolean push3( Object obj, long timeout, boolean atHead ) throws InterruptedException { if( 0 == timeout ) timeout = Long.MAX_VALUE; long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout ); synchronized ( queue ) { // this is the loop we stay in until there is space in the queue, // the queue becomes closed or we get tired of waiting. do { // This queue is closed. No additional objects allowed. if ( isClosed() ) { return false; } if (queue.size() >= maxObjects ) { long waitfor = TimeUtils.toRelativeTimeMillis( absoluteTimeOut ); if( waitfor > 0) { queue.wait( waitfor ); // something happened, we check again. continue; } // Queue is full but its time to do something. // discard an element or simply return. if( dropOldestObject ) { // Issue a warning if we have not done so recently. long now = TimeUtils.timeNow(); if ( (now > nextDroppedWarn) && LOG.isEnabledFor(Level.WARN) ) { LOG.warn( "Queue full, dropped one or more elements. Now dropped " + numDropped + " elements." ); nextDroppedWarn = now + DROPPED_OBJECT_WARNING_INTERVAL; } if( atHead ) { // we have chosen to drop this element since it is // the oldest. We can safely return true because we // took the right action for this element. numEnqueued++; // one was queued. numDropped++; // one was dropped. // (happens they are the same) queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return true; } else { // Due to queue resizing, we have have to drop more than // one element while( queue.size() >= maxObjects ) { numDropped++; queue.remove(0); } } } else return false; } else break; } while( !isClosed() ); boolean pushed = (atHead ? pushBack( obj ) : push( obj )); queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return pushed; } } /** * Return the next Object from the queue without removing it. * * @return Object, null if the queue is empty. **/ public Object peek() { Object result = null; if ( queue.isEmpty() ) { return null; } result = queue.get(0); return result; } /** * Remove and return the next Object from the queue. * * @return Object, null if the queue is empty. **/ public Object pop() { Object result = null; if ( queue.isEmpty() ) { return null; } sumOfQueueSizesDequeue += queue.size(); numDequeued++; result = queue.remove(0); return result; } /** * Gets a Object from the queue. If no Object is immediately available, * then wait the specified amount of time for an Object to be inserted. * * @param timeout Amount of time to wait in milliseconds for an object to * be available. Per Java convention, a timeout of zero (0) means wait an * infinite amount of time. Negative values mean do not wait at all. * @return The next object in the queue. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public Object pop( long timeout ) throws InterruptedException { if( 0 == timeout ) timeout = Long.MAX_VALUE; long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout ); Object result = null; synchronized ( queue ) { do { /* * Because there may be more than one thread waiting on this * queue, when we are woken up we do not necessarily get the * next obj in the queue. In this case, rather than terminating * because we didn't get the obj we resume waiting, but we * ensure that we never wait longer than the amount of time * which was originally requested. (if we fail to get the obj * after being woken its actually a little less than the * requested time) */ result = pop(); if( null != result ) break; // we have an obj if( isClosed() ) // we didn't get one and its closed so there break; // is no chance there will ever be one. long waitfor = TimeUtils.toRelativeTimeMillis( absoluteTimeOut ); if( waitfor <= 0 ) // there is no wait time left. break; queue.wait( waitfor ); } while( !isClosed() ); // wake someone else who might be waiting. This is apparently better // than just letting the scheduler notice the synchro is no longer // occupied. queue.notify(); } return result; } /** * Returns an array of objects, possibly empty, from the queue. * * @param maxObjs the maximum number of items to return. * @return an array of objects, possibly empty containing the returned * queue elements. **/ public Object [] popMulti( int maxObjs ) { if( maxObjs <= 0 ) throw new IllegalArgumentException( "maxObjs must be > 0" ); maxObjs = Math.min( maxObjs, queue.size() ); Object [] result = new Object [maxObjs]; for( int eachElement = 0; eachElement < maxObjs; eachElement++ ) { sumOfQueueSizesDequeue += queue.size(); numDequeued++; result [eachElement] = queue.remove(0); } return result; } /** * How many objects will fit in this queue * * @return int indicating how many objects will fit in the queue. **/ public int getMaxQueueSize() { return maxObjects; } /** * Set how many objects this queue may store. Note that if there are more * objects already in the queue than the specified amount then the queue * will retain its current capacity. * * @param maxObjs The number of objects which the queue must be able to * store. **/ public void setMaxQueueSize( int maxObjs ) { maxObjects = maxObjs; } /** * Return the number of elements currently in the queue. This method is * useful for statistical sampling, but should not be used to determine * program logic due to the multi-threaded behaviour of these queues. * * <p/>You should use the return values and timeout behaviour of the * {@link #push(Object)} and {@link #pop(long)} methods to regulate how you * use the queue. * * @return the number of elements currently in the queue. Be warned that * even two sequential calls to this method may return different answers * due to activity on other threads. **/ public int getCurrentInQueue() { return queue.size(); } /** * Return the total number of objects which have been enqueued on to this * queue during its existance. * * @return how many objects have been queued. **/ public long getNumEnqueued() { return numEnqueued; } /** * Return the average number of elements in the queue at Enqueue time. * * @return average number of elements which were in the queue at during all * of the "push" operations which returned a "true" result. Does not * include the item being pushed. If no elements have ever been enqueued * then "NaN" will be returned. **/ public double getAvgInQueueAtEnqueue() { if( numEnqueued > 0 ) return (double) sumOfQueueSizesEnqueue / numEnqueued; else return Double.NaN; } /** * Return the total number of objects which have been dequeued from this * queue during its existance. * * @return how many objects have been queued. **/ public long getNumDequeued() { return numDequeued; } /** * Return the average number of elements in the queue at dequeue time. * * @return average number of elements which were in the queue at during all * of the "pop" operations which returned a non-null result. Includes the * item being "pop"ed in the average. If no elements have ever been dequeued * then "NaN" will be returned. **/ public double getAvgInQueueAtDequeue() { if( numDequeued > 0 ) return (double) sumOfQueueSizesDequeue / numDequeued; else return Double.NaN; } /** * Return the total number of objects which have been dropped by this queue * during its existance. * * @return how many objects have been dropped. **/ public long getNumDropped() { return numDropped; } public void interrupt() { synchronized (queue) { queue.notify(); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -