📄 unbiasedqueue.java
字号:
public void close() {
closeFlag = true;
}
/**
* Flush the queue of all pending objects.
**/
public void clear() {
numDropped += queue.size();
queue.clear();
}
/**
* Push an object onto the queue. If the queue is full then the push will
* wait for up to "timeout" milliseconds to push the object. At the end of
* "timeout" milliseconds, the push will either return false or remove the
* oldest item from the queue and insert "obj". This behaviour is contolled
* by the constructor parameter "dropOldest".
*
* @param obj Object to be pushed onto the queue
* @param timeout Time in milliseconds to try to insert the item into a full
* queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly.
* Negative values force no wait period at all.
* @return true if the object was intersted into the queue, otherwise false.
* @throws InterruptedException if the operation is interrupted before
* the timeout interval is completed.
**/
public boolean push( Object obj, long timeout ) throws InterruptedException {
if( 0 == timeout )
timeout = Long.MAX_VALUE;
long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout );
synchronized ( queue ) {
// this is the loop we stay in until there is space in the queue,
// the queue becomes closed or we get tired of waiting.
do {
// This queue is closed. No additional objects allowed.
if ( isClosed() ) {
return false;
}
if (queue.size() >= maxObjects ) {
timeout = TimeUtils.toRelativeTimeMillis( absoluteTimeOut );
if( timeout > 0) {
queue.wait( timeout );
// something happened, we check again.
continue;
}
// Queue is full but its time to do something.
// discard an element or simply return.
if( dropOldestObject ) {
// Due to queue resizing, we have have to drop more than
// one element
while( queue.size() >= maxObjects ) {
numDropped++;
queue.remove(0);
}
// Issue a warning if we have not done so recently.
long now = TimeUtils.timeNow();
if ( (now > nextDroppedWarn) && LOG.isEnabledFor(Priority.WARN) ) {
LOG.warn( "Queue full, dropped one or more elements. Now dropped " + numDropped + " elements." );
nextDroppedWarn = now + DROPPED_OBJECT_WARNING_INTERVAL;
}
}
else
return false;
} else
break;
} while( !isClosed() );
numEnqueued++;
sumOfQueueSizesEnqueue += queue.size();
queue.add(obj);
queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.
}
return true;
}
/**
* Return next obj in the queue if there is one.
*
* @return Object, null if the queue is empty
**/
public Object pop() {
Object result = null;
if ( queue.isEmpty() ) {
return null;
}
sumOfQueueSizesDequeue += queue.size();
numDequeued++;
result = queue.remove(0);
return result;
}
/**
* Gets a Object from the queue. If no Object is immediately available,
* then wait the specified amount of time. Per Java convention, a timeout
* of zero (0) means wait an infinite amount of time. Negative values mean
* do not wait at all.
*
* @param timeout amount of time to wait in milliseconds for an object to
* be available. Per Java convention, a timeout of zero (0) means wait an
* infinite amount of time. Negative values mean do not wait at all.
* @return The next object in the queue.
* @throws InterruptedException if the operation is interrupted before
* the timeout interval is completed.
*/
public Object pop( long timeout ) throws InterruptedException {
/*
* Because there may be more than one thread waiting on this queue,
* when we are woken up we do not necessarily get the next obj in
* the queue. In this case, rather than terminating because we didn't
* get the obj we resume waiting, but we ensure that we never wait
* longer than the amount of time which was originally requested.
* (if we fail to get the obj after being woken its actually a
* little less than the requested time)
*/
if( 0 == timeout )
timeout = Long.MAX_VALUE;
long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout );
Object result = null;
synchronized ( queue ) {
do {
result = pop();
if( null != result )
break; // we have an obj
if( isClosed() ) // we didn't get one and its closed so there
break; // is no chance there will ever be one.
timeout = TimeUtils.toRelativeTimeMillis( absoluteTimeOut );
if( timeout <= 0 ) // there is no wait time left.
break;
queue.wait( timeout );
} while( !isClosed() );
// wake someone else who might be waiting. This is apparently better
// than just letting the scheduler notice the synchro is no longer
// occupied.
queue.notify();
}
return result;
}
/**
* Returns an array of objects, possibly empty, from the queue.
*
* @param maxObjs the maximum number of items to return.
* @return an array of objects, possibly empty containing the returned
* queue elements.
**/
public Object [] popMulti( int maxObjs ) {
if( maxObjs <= 0 )
throw new IllegalArgumentException( "maxObjs must be > 0" );
maxObjs = Math.min( maxObjs, queue.size() );
Object [] result = new Object [maxObjs];
for( int eachElement = 0; eachElement < maxObjs; eachElement++ ) {
sumOfQueueSizesDequeue += queue.size();
numDequeued++;
result [eachElement] = queue.remove(0);
}
return result;
}
/**
* How many objects will fit in this queue
*
* @return int indicating how many objects will fit in the queue.
**/
public int getMaxQueueSize() {
return maxObjects;
}
/**
* Set how many objects this queue may store. Note that if there are more
* objects already in the queue than the specified amount then the queue
* will retain its current capacity.
*
* @param maxObjs The number of objects which the queue must be able to
* store.
**/
public void setMaxQueueSize( int maxObjs ) {
maxObjects = maxObjs;
}
/**
* Return the number of elements currently in the queue. This method is
* useful for statistical sampling, but should not be used to determine
* program logic due to the multi-threaded behaviour of these queues. You
* should use the return values and timeout behaviour of the push() and
* pop() methods to regulate how you use the queue.
*
* @return the number of elements currently in the queue. Be warned that
* even two sequential calls to this method may return different answers
* due to activity on other threads.
*
**/
public int getCurrentInQueue() {
return queue.size();
}
/**
* Return the total number of objects which have been enqueued on to this
* receive queue during its existance.
*
* @return how many objects have been queued.
**/
public long getNumEnqueued() {
return numEnqueued;
}
/**
* Return the average number of elements in the queue at Enqueue time.
*
* @return average number of elements which were in the queue at during all
* of the "push" operations which returned a "true" result. Does not
* include the item being pushed. If no elements have ever been enqueued
* then "NaN" will be returned.
**/
public double getAvgInQueueAtEnqueue() {
if( numEnqueued > 0 )
return (double) sumOfQueueSizesEnqueue / numEnqueued;
else return Double.NaN;
}
/**
* Return the total number of objects which have been dequeued from this
* queue during its existance.
*
* @return how many objects have been queued.
**/
public long getNumDequeued() {
return numDequeued;
}
/**
* Return the average number of elements in the queue at dequeue time.
*
* @return average number of elements which were in the queue at during all
* of the "pop" operations which returned a non-null result. Includes the
* item being "pop"ed in the average. If no elements have ever been dequeued
* then "NaN" will be returned.
**/
public double getAvgInQueueAtDequeue() {
if( numDequeued > 0 )
return (double) sumOfQueueSizesDequeue / numDequeued;
else return Double.NaN;
}
/**
* Return the total number of objects which have been dropped by this queue
* during its existance.
*
* @return how many objects have been dropped.
**/
public long getNumDropped() {
return numDropped;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -