📄 unbiasedqueue.java
字号:
* <p/>A diagnostic toString implementation. */ @Override public synchronized String toString() { return this.getClass().getName() + " :" + " size=" + getCurrentInQueue() + " capacity=" + getMaxQueueSize() + " enqueued=" + getNumEnqueued() + " avgAtEnqueue=" + getAvgInQueueAtEnqueue() + " dequeued=" + getNumDequeued() + " avgAtDequeue=" + getAvgInQueueAtDequeue(); } /** * Atomically return whether or not this queue has been closed. Closed * queues will not accept "push" requests, but elements will still be * returned with "pop". * * @return boolean indicating whether this queue has been closed. */ public boolean isClosed() { return closeFlag; // closeFlag is volatile. } /** * Close the queue. This will prevent any further objects from being enqueued. */ public void close() { closeFlag = true; synchronized (queue) { queue.notifyAll(); } } /** * Flush the queue of all pending objects. */ public void clear() { numDropped += queue.size(); queue.clear(); } /** * Attempt to push an object onto the queue. If the queue is full then the * object will not be pushed. This method does not use any synchronization * and should not be used if other threads are using {@link #pop(long)} to * retrieve elements. * * @param obj object to push * @return true if the obj was pushed, otherwise false. */ public boolean push(Object obj) { if (queue.size() >= maxObjects) { return false; } numEnqueued++; sumOfQueueSizesEnqueue += queue.size(); queue.add(obj); return true; } /** * Attempt to push an object back at the head the queue. If the queue is * full then the object will not be pushed. This method does not use any * synchronization and should not be used if other threads are using * {@link #pop(long)} to retrieve elements. * * @param obj object to push * @return true if the obj was pushed, otherwise false. */ public boolean pushBack(Object obj) { if (queue.size() >= maxObjects) { return false; } numEnqueued++; sumOfQueueSizesEnqueue += queue.size(); queue.add(0, obj); return true; } /** * Push an object onto the queue. If the queue is full then the push will * wait for up to "timeout" milliseconds to push the object. At the end of * "timeout" milliseconds, the push will either return false or remove the * oldest item from the queue and insert "obj". This behaviour is contolled * by the constructor parameter "dropOldest". * * @param obj Object to be pushed onto the queue * @param timeout Time in milliseconds to try to insert the item into a full * queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly. * Negative values force no wait period at all. * @return true if the object was intersted into the queue, otherwise false. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public boolean push(Object obj, long timeout) throws InterruptedException { return push3(obj, timeout, false); } /** * Push an object back at the head of the queue. If the queue is full then * the push will wait for up to "timeout" milliseconds to push the object. * At the end of "timeout" milliseconds, the push will either return false * or remove the oldest item from the queue and insert "obj". This behaviour * is contolled by the constructor parameter "dropOldest". * * <p/>Timeout control is accomplished via synchronization and * {@link Object#wait(long)}. {@link #pushBack(Object,long)} should only * be used in conjunction with {@link #push(Object,long)} and * {@link #pop(long)} * * @param obj Object to be pushed onto the queue * @param timeout Time in milliseconds to try to insert the item into a full * queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly. * Negative values force no wait period at all. * @return <tt>true</tt> if the object was intersted into the queue, * otherwise <tt>false</tt>. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public boolean pushBack(Object obj, long timeout) throws InterruptedException { return push3(obj, timeout, true); } private boolean push3(Object obj, long timeout, boolean atHead) throws InterruptedException { if (null == obj) { throw new AssertionError("obj is null"); } if (null == queue) { throw new AssertionError("queue is null"); } if (0 == timeout) { timeout = Long.MAX_VALUE; } long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout); synchronized (queue) { // this is the loop we stay in until there is space in the queue, // the queue becomes closed or we get tired of waiting. do { // This queue is closed. No additional objects allowed. if (isClosed()) { queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return false; } if (queue.size() >= maxObjects) { long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut); if (waitfor > 0) { queue.wait(waitfor); // something happened, we check again. continue; } // Queue is full but its time to do something. // discard an element or simply return. if (dropOldestObject) { // Issue a warning if we have not done so recently. long now = TimeUtils.timeNow(); if ((now > nextDroppedWarn) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Queue full, dropped one or more elements. Now dropped " + numDropped + " elements."); nextDroppedWarn = now + DROPPED_OBJECT_WARNING_INTERVAL; } if (atHead) { // we have chosen to drop this element since it is // the oldest. We can safely return true because we // took the right action for this element. numEnqueued++; // one was queued. numDropped++; // one was dropped. // (happens they are the same) queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return true; } else { // Due to queue resizing, we have have to drop more than // one element while (queue.size() >= maxObjects) { numDropped++; queue.remove(0); } } } else { queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return false; } } else { break; } } while (!isClosed()); boolean pushed = (atHead ? pushBack(obj) : push(obj)); queue.notify(); // inform someone who is waiting. we dont have to tell everyone though. return pushed; } } /** * Return the next Object from the queue without removing it. * * @return Object, null if the queue is empty. */ public Object peek() { Object result = null; if (queue.isEmpty()) { return null; } result = queue.get(0); return result; } /** * Remove and return the next Object from the queue. * * @return Object, null if the queue is empty. */ public Object pop() { Object result = null; if (queue.isEmpty()) { return null; } sumOfQueueSizesDequeue += queue.size(); numDequeued++; result = queue.remove(0); return result; } /** * Gets a Object from the queue. If no Object is immediately available, * then wait the specified amount of time for an Object to be inserted. * * @param timeout Amount of time to wait in milliseconds for an object to * be available. Per Java convention, a timeout of zero (0) means wait an * infinite amount of time. Negative values mean do not wait at all. * @return The next object in the queue. * @throws InterruptedException if the operation is interrupted before * the timeout interval is completed. */ public Object pop(long timeout) throws InterruptedException { if (0 == timeout) { timeout = Long.MAX_VALUE; } long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout); Object result = null; synchronized (queue) { do { /* * Because there may be more than one thread waiting on this * queue, when we are woken up we do not necessarily get the * next obj in the queue. In this case, rather than terminating * because we didn't get the obj we resume waiting, but we * ensure that we never wait longer than the amount of time * which was originally requested. (if we fail to get the obj * after being woken its actually a little less than the * requested time) */ result = pop(); if (null != result) { break; } // we have an obj if (isClosed()) { // we didn't get one and its closed so there break; } // is no chance there will ever be one. long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut); if (waitfor <= 0) { // there is no wait time left. break; } queue.wait(waitfor); } while (!isClosed()); // wake someone else who might be waiting. This is apparently better // than just letting the scheduler notice the synchro is no longer // occupied. queue.notify(); } return result; } /** * Returns an array of objects, possibly empty, from the queue. * * @param maxObjs the maximum number of items to return. * @return an array of objects, possibly empty containing the returned * queue elements. */ public Object[] popMulti(int maxObjs) { if (maxObjs <= 0) { throw new IllegalArgumentException("maxObjs must be > 0"); } maxObjs = Math.min(maxObjs, queue.size()); Object[] result = new Object[maxObjs]; for (int eachElement = 0; eachElement < maxObjs; eachElement++) { sumOfQueueSizesDequeue += queue.size(); numDequeued++; result[eachElement] = queue.remove(0); } return result; } /** * How many objects will fit in this queue * * @return int indicating how many objects will fit in the queue. */ public int getMaxQueueSize() { return maxObjects; } /** * Set how many objects this queue may store. Note that if there are more * objects already in the queue than the specified amount then the queue * will retain its current capacity. * * @param maxObjs The number of objects which the queue must be able to * store. */ public void setMaxQueueSize(int maxObjs) { maxObjects = maxObjs; } /** * Return the number of elements currently in the queue. This method is * useful for statistical sampling, but should not be used to determine * program logic due to the multi-threaded behaviour of these queues. * * <p/>You should use the return values and timeout behaviour of the * {@link #push(Object)} and {@link #pop(long)} methods to regulate how you * use the queue. * * @return the number of elements currently in the queue. Be warned that * even two sequential calls to this method may return different answers * due to activity on other threads. */ public int getCurrentInQueue() { return queue.size(); } /** * Return the total number of objects which have been enqueued on to this * queue during its existance. * * @return how many objects have been queued. */ public long getNumEnqueued() { return numEnqueued; } /** * Return the average number of elements in the queue at Enqueue time. * * @return average number of elements which were in the queue at during all * of the "push" operations which returned a "true" result. Does not * include the item being pushed. If no elements have ever been enqueued * then "NaN" will be returned. */ public double getAvgInQueueAtEnqueue() { if (numEnqueued > 0) { return (double) sumOfQueueSizesEnqueue / numEnqueued; } else { return Double.NaN; } } /** * Return the total number of objects which have been dequeued from this * queue during its existance. * * @return how many objects have been queued. */ public long getNumDequeued() { return numDequeued; } /** * Return the average number of elements in the queue at dequeue time. * * @return average number of elements which were in the queue at during all * of the "pop" operations which returned a non-null result. Includes the * item being "pop"ed in the average. If no elements have ever been dequeued * then "NaN" will be returned. */ public double getAvgInQueueAtDequeue() { if (numDequeued > 0) { return (double) sumOfQueueSizesDequeue / numDequeued; } else { return Double.NaN; } } /** * Return the total number of objects which have been dropped by this queue * during its existance. * * @return how many objects have been dropped. */ public long getNumDropped() { return numDropped; } public void interrupt() { synchronized (queue) { queue.notify(); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -