⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 unbiasedqueue.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     *  <p/>A diagnostic toString implementation.     */    @Override    public synchronized String toString() {                return this.getClass().getName() + " :" + " size=" + getCurrentInQueue() + " capacity=" + getMaxQueueSize() + " enqueued="                + getNumEnqueued() + " avgAtEnqueue=" + getAvgInQueueAtEnqueue() + " dequeued=" + getNumDequeued()                + " avgAtDequeue=" + getAvgInQueueAtDequeue();    }        /**     *  Atomically return whether or not this queue has been closed. Closed     *  queues will not accept "push" requests, but elements will still be     *  returned with "pop".     *     *  @return boolean indicating whether this queue has been closed.     */    public boolean isClosed() {        return closeFlag; // closeFlag is volatile.    }        /**     * Close the queue. This will prevent any further objects from being enqueued.     */    public void close() {        closeFlag = true;        synchronized (queue) {            queue.notifyAll();        }    }        /**     *  Flush the queue of all pending objects.     */    public void clear() {        numDropped += queue.size();        queue.clear();    }        /**     *  Attempt to push an object onto the queue. If the queue is full then the     *  object will not be pushed. This method does not use any synchronization     *  and should not be used if other threads are using {@link #pop(long)} to     *  retrieve elements.     *     *  @param obj object to push     *  @return true if the obj was pushed, otherwise false.     */    public boolean push(Object obj) {        if (queue.size() >= maxObjects) {            return false;        }                numEnqueued++;        sumOfQueueSizesEnqueue += queue.size();        queue.add(obj);                return true;    }        /**     *  Attempt to push an object back at the head the queue. If the queue is     *  full then the object will not be pushed. This method does not use any     * synchronization and should not be used if other threads are using     * {@link #pop(long)} to retrieve elements.     *     *  @param obj object to push     *  @return true if the obj was pushed, otherwise false.     */    public boolean pushBack(Object obj) {        if (queue.size() >= maxObjects) {            return false;        }                numEnqueued++;        sumOfQueueSizesEnqueue += queue.size();        queue.add(0, obj);                return true;    }        /**     * Push an object onto the queue. If the queue is full then the push will     *  wait for up to "timeout" milliseconds to push the object. At the end of     *  "timeout" milliseconds, the push will either return false or remove the     *  oldest item from the queue and insert "obj". This behaviour is contolled     *  by the constructor parameter "dropOldest".     *     * @param obj Object to be pushed onto the queue     * @param timeout Time in milliseconds to try to insert the item into a full     *  queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly.     *  Negative values force no wait period at all.     *  @return true if the object was intersted into the queue, otherwise false.     *  @throws InterruptedException    if the operation is interrupted before     *      the timeout interval is completed.     */    public boolean push(Object obj, long timeout) throws InterruptedException {        return push3(obj, timeout, false);    }        /**     *  Push an object back at the head of the queue. If the queue is full then     *  the push will wait for up to "timeout" milliseconds to push the object.     *  At the end of "timeout" milliseconds, the push will either return false     *  or remove the oldest item from the queue and insert "obj". This behaviour     *  is contolled by the constructor parameter "dropOldest".     *     *  <p/>Timeout control is accomplished via synchronization and     *  {@link Object#wait(long)}. {@link #pushBack(Object,long)} should only     *  be used in conjunction with {@link #push(Object,long)} and     *  {@link #pop(long)}     *     * @param obj Object to be pushed onto the queue     * @param timeout Time in milliseconds to try to insert the item into a full     *  queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly.     *  Negative values force no wait period at all.     *  @return <tt>true</tt> if the object was intersted into the queue,     *  otherwise <tt>false</tt>.     *  @throws InterruptedException    if the operation is interrupted before     *  the timeout interval is completed.     */    public boolean pushBack(Object obj, long timeout) throws InterruptedException {        return push3(obj, timeout, true);    }        private boolean push3(Object obj, long timeout, boolean atHead) throws InterruptedException {        if (null == obj) {            throw new AssertionError("obj is null");        }        if (null == queue) {            throw new AssertionError("queue is null");        }               if (0 == timeout) {            timeout = Long.MAX_VALUE;        }                long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);                synchronized (queue) {            // this is the loop we stay in until there is space in the queue,            // the queue becomes closed or we get tired of waiting.            do {                // This queue is closed. No additional objects allowed.                if (isClosed()) {                    queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.                    return false;                }                                if (queue.size() >= maxObjects) {                    long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);                    if (waitfor > 0) {                        queue.wait(waitfor);                                                // something happened, we check again.                        continue;                    }                                        // Queue is full but its time to do something.                    // discard an element or simply return.                    if (dropOldestObject) {                        // Issue a warning if we have not done so recently.                        long now = TimeUtils.timeNow();                        if ((now > nextDroppedWarn) && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.warning("Queue full, dropped one or more elements. Now dropped " + numDropped + " elements.");                            nextDroppedWarn = now + DROPPED_OBJECT_WARNING_INTERVAL;                        }                                                if (atHead) {                            // we have chosen to drop this element since it is                            // the oldest. We can safely return true because we                            // took the right action for this element.                                                        numEnqueued++; // one was queued.                            numDropped++; // one was dropped.                            // (happens they are the same)                            queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.                            return true;                        } else {                            // Due to queue resizing, we have have to drop more than                            // one element                            while (queue.size() >= maxObjects) {                                numDropped++;                                queue.remove(0);                            }                        }                    } else {                        queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.                        return false;                    }                                    } else {                    break;                }            } while (!isClosed());                        boolean pushed = (atHead ? pushBack(obj) : push(obj));                        queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.            return pushed;        }    }        /**     * Return the next Object from the queue without removing it.     *     * @return Object, null if the queue is empty.     */    public Object peek() {        Object result = null;                if (queue.isEmpty()) {            return null;        }                result = queue.get(0);                return  result;    }        /**     * Remove and return the next Object from the queue.     *     * @return Object, null if the queue is empty.     */    public Object pop() {        Object result = null;                if (queue.isEmpty()) {            return null;        }                sumOfQueueSizesDequeue += queue.size();        numDequeued++;                result = queue.remove(0);                return  result;    }        /**     * Gets a Object from the queue. If no Object is immediately available,     * then wait the specified amount of time for an Object to be inserted.     *     * @param timeout   Amount of time to wait in milliseconds for an object to     * be available. Per Java convention, a timeout of zero (0) means wait an     * infinite amount of time. Negative values mean do not wait at all.     * @return The next object in the queue.     * @throws InterruptedException    if the operation is interrupted before     * the timeout interval is completed.     */    public Object pop(long timeout) throws InterruptedException {                if (0 == timeout) {            timeout = Long.MAX_VALUE;        }                long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout);                Object result = null;                synchronized (queue) {            do {                /*                 *  Because there may be more than one thread waiting on this                 *  queue, when we are woken up we do not necessarily get the                 *  next obj in the queue. In this case, rather than terminating                 *  because we didn't get the obj we resume waiting, but we                 *  ensure that we never wait longer than the amount of time                 *  which was originally requested. (if we fail to get the obj                 *  after being woken its actually a little less than the                 *  requested time)                 */                result = pop();                                if (null != result) {                    break;                }          // we have an obj                                if (isClosed()) { // we didn't get one and its closed so there                    break;                }          // is no chance there will ever be one.                                long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut);                                if (waitfor <= 0) { // there is no wait time left.                    break;                }                                queue.wait(waitfor);            } while (!isClosed());                        // wake someone else who might be waiting. This is apparently better            // than just letting the scheduler notice the synchro is no longer            // occupied.            queue.notify();        }                return result;    }        /**     *  Returns an array of objects, possibly empty, from the queue.     *     *  @param maxObjs  the maximum number of items to return.     *  @return an array of objects, possibly empty containing the returned     *  queue elements.     */    public Object[] popMulti(int maxObjs) {        if (maxObjs <= 0) {            throw new IllegalArgumentException("maxObjs must be > 0");        }                maxObjs = Math.min(maxObjs, queue.size());        Object[] result = new Object[maxObjs];        for (int eachElement = 0; eachElement < maxObjs; eachElement++) {            sumOfQueueSizesDequeue += queue.size();            numDequeued++;            result[eachElement] = queue.remove(0);        }                return result;    }        /**     *  How many objects will fit in this queue     *     *  @return int indicating how many objects will fit in the queue.     */    public int getMaxQueueSize() {        return maxObjects;    }        /**     *  Set how many objects this queue may store. Note that if there are more     *  objects already in the queue than the specified amount then the queue     *  will retain its current capacity.     *     *  @param maxObjs  The number of objects which the queue must be able to     *  store.     */    public void setMaxQueueSize(int maxObjs) {        maxObjects = maxObjs;    }        /**     *  Return the number of elements currently in the queue. This method is     *  useful for statistical sampling, but should not be used to determine     *  program logic due to the multi-threaded behaviour of these queues.     *     *  <p/>You should use the return values and timeout behaviour of the     *  {@link #push(Object)} and {@link #pop(long)} methods to regulate how you     *  use the queue.     *     *  @return the number of elements currently in the queue. Be warned that     *  even two sequential calls to this method may return different answers     *  due to activity on other threads.     */    public int getCurrentInQueue() {        return queue.size();    }        /**     *  Return the total number of objects which have been enqueued on to this     *  queue during its existance.     *     *  @return how many objects have been queued.     */    public long getNumEnqueued() {        return numEnqueued;    }        /**     *  Return the average number of elements in the queue at Enqueue time.     *     *  @return average number of elements which were in the queue at during all     *  of the "push" operations which returned a "true" result. Does not     *  include the item being pushed. If no elements have ever been enqueued     *  then "NaN" will be returned.     */    public double getAvgInQueueAtEnqueue() {        if (numEnqueued > 0) {            return (double) sumOfQueueSizesEnqueue / numEnqueued;        } else {            return Double.NaN;        }    }        /**     *  Return the total number of objects which have been dequeued from this     *  queue during its existance.     *     *  @return how many objects have been queued.     */    public long getNumDequeued() {        return numDequeued;    }        /**     *  Return the average number of elements in the queue at dequeue time.     *     *  @return average number of elements which were in the queue at during all     *  of the "pop" operations which returned a non-null result. Includes the     * item being "pop"ed in the average. If no elements have ever been dequeued     *  then "NaN" will be returned.     */    public double getAvgInQueueAtDequeue() {        if (numDequeued > 0) {            return (double) sumOfQueueSizesDequeue / numDequeued;        } else {            return Double.NaN;        }    }        /**     *  Return the total number of objects which have been dropped by this queue     *  during its existance.     *     *  @return how many objects have been dropped.     */    public long getNumDropped() {        return numDropped;    }        public void interrupt() {        synchronized (queue) {            queue.notify();        }    }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -