⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 unbiasedqueue.java

📁 jxta平台的开发包
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    public synchronized String toString() {                return this.getClass().getName() + " :" +        " size=" + getCurrentInQueue() +        " capacity=" + getMaxQueueSize() +        " enqueued=" + getNumEnqueued() +        " avgAtEnqueue=" + getAvgInQueueAtEnqueue() +        " dequeued=" + getNumDequeued() +        " avgAtDequeue=" + getAvgInQueueAtDequeue();    }        /**     *  Atomically return whether or not this queue has been closed. Closed     *  queues will not accept "push" requests, but elements will still be     *  returned with "pop".     *     *  @return boolean indicating whether this queue has been closed.     **/    public boolean isClosed() {        return closeFlag; // closeFlag is volatile.    }        /**     * Close the queue. This will prevent any further objects from being enqueued.     **/    public void close() {        closeFlag = true;        synchronized( queue ) {            queue.notifyAll();        }    }        /**     *  Flush the queue of all pending objects.     **/    public void clear() {        numDropped += queue.size();        queue.clear();    }        /**     *  Attempt to push an object onto the queue. If the queue is full then the     *  object will not be pushed. This method does not use any synchronization     *  and should not be used if other threads are using {@link #pop(long)} to     *  retrieve elements.     *     *  @param obj object to push     *  @return true if the obj was pushed, otherwise false.     **/    public boolean push( Object obj ) {        if ( queue.size() >= maxObjects )            return false;                numEnqueued++;        sumOfQueueSizesEnqueue += queue.size();        queue.add(obj);                return true;    }        /**     *  Attempt to push an object back at the head the queue. If the queue is     *  full then the object will not be pushed. This method does not use any     * synchronization and should not be used if other threads are using     * {@link #pop(long)} to retrieve elements.     *     *  @param obj object to push     *  @return true if the obj was pushed, otherwise false.     **/    public boolean pushBack( Object obj ) {        if (queue.size() >= maxObjects )            return false;                numEnqueued++;        sumOfQueueSizesEnqueue += queue.size();        queue.add(0, obj);                return true;    }        /**     * Push an object onto the queue. If the queue is full then the push will     *  wait for up to "timeout" milliseconds to push the object. At the end of     *  "timeout" milliseconds, the push will either return false or remove the     *  oldest item from the queue and insert "obj". This behaviour is contolled     *  by the constructor parameter "dropOldest".     *     * @param obj Object to be pushed onto the queue     * @param timeout Time in milliseconds to try to insert the item into a full     *  queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly.     *  Negative values force no wait period at all.     *  @return true if the object was intersted into the queue, otherwise false.     *  @throws InterruptedException    if the operation is interrupted before     *      the timeout interval is completed.     **/    public boolean push( Object obj, long timeout ) throws InterruptedException {        return push3(obj, timeout, false);    }        /**     *  Push an object back at the head of the queue. If the queue is full then     *  the push will wait for up to "timeout" milliseconds to push the object.     *  At the end of "timeout" milliseconds, the push will either return false     *  or remove the oldest item from the queue and insert "obj". This behaviour     *  is contolled by the constructor parameter "dropOldest".     *     *  <p/>Timeout control is accomplished via synchronization and     *  {@link Object#wait(long)}. {@link #pushBack(Object,long)} should only     *  be used in conjunction with {@link #push(Object,long)} and     *  {@link #pop(long)}     *     * @param obj Object to be pushed onto the queue     * @param timeout Time in milliseconds to try to insert the item into a full     *  queue. Per Java standards, a timeout of "0" (zero) will wait indefinitly.     *  Negative values force no wait period at all.     *  @return <tt>true</tt> if the object was intersted into the queue,     *  otherwise <tt>false</tt>.     *  @throws InterruptedException    if the operation is interrupted before     *  the timeout interval is completed.     **/    public boolean pushBack( Object obj, long timeout ) throws InterruptedException {        return push3(obj, timeout, true);    }        private boolean push3( Object obj, long timeout, boolean atHead ) throws InterruptedException {                if( 0 == timeout )            timeout = Long.MAX_VALUE;                long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout );                synchronized ( queue ) {            // this is the loop we stay in until there is space in the queue,            // the queue becomes closed or we get tired of waiting.            do {                // This queue is closed. No additional objects allowed.                if ( isClosed() ) {                    return false;                }                                if (queue.size() >= maxObjects ) {                    long waitfor = TimeUtils.toRelativeTimeMillis( absoluteTimeOut );                    if( waitfor > 0) {                        queue.wait( waitfor );                                                // something happened, we check again.                        continue;                    }                                        // Queue is full but its time to do something.                    // discard an element or simply return.                    if( dropOldestObject ) {                        //  Issue a warning if we have not done so recently.                        long now = TimeUtils.timeNow();                        if ( (now > nextDroppedWarn) && LOG.isEnabledFor(Level.WARN) ) {                            LOG.warn( "Queue full, dropped one or more elements. Now dropped " + numDropped + " elements." );                            nextDroppedWarn = now + DROPPED_OBJECT_WARNING_INTERVAL;                        }                                                if( atHead ) {                            // we have chosen to drop this element since it is                            // the oldest. We can safely return true because we                            // took the right action for this element.                                                        numEnqueued++;  // one was queued.                            numDropped++;   // one was dropped.                                            // (happens they are the same)                            queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.                            return true;                        } else {                            // Due to queue resizing, we have have to drop more than                            // one element                            while( queue.size() >= maxObjects ) {                                numDropped++;                                queue.remove(0);                            }                        }                    }                    else                        return false;                } else                    break;            } while( !isClosed() );                        boolean pushed = (atHead ? pushBack( obj ) : push( obj ));                        queue.notify(); // inform someone who is waiting. we dont have to tell everyone though.            return pushed;        }    }        /**     * Return the next Object from the queue without removing it.     *     * @return Object, null if the queue is empty.     **/    public Object peek() {        Object result = null;                if ( queue.isEmpty() ) {            return null;        }                result = queue.get(0);                return  result;    }        /**     * Remove and return the next Object from the queue.     *     * @return Object, null if the queue is empty.     **/    public Object pop() {        Object result = null;                if ( queue.isEmpty() ) {            return null;        }                sumOfQueueSizesDequeue += queue.size();        numDequeued++;                result = queue.remove(0);                return  result;    }        /**     * Gets a Object from the queue. If no Object is immediately available,     * then wait the specified amount of time for an Object to be inserted.     *     * @param timeout   Amount of time to wait in milliseconds for an object to     * be available. Per Java convention, a timeout of zero (0) means wait an     * infinite amount of time. Negative values mean do not wait at all.     * @return The next object in the queue.     * @throws InterruptedException    if the operation is interrupted before     * the timeout interval is completed.     */    public Object pop( long timeout ) throws InterruptedException {                if( 0 == timeout )            timeout = Long.MAX_VALUE;                long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis( timeout );                Object result = null;                synchronized ( queue ) {            do {                /*                 *  Because there may be more than one thread waiting on this                 *  queue, when we are woken up we do not necessarily get the                 *  next obj in the queue. In this case, rather than terminating                 *  because we didn't get the obj we resume waiting, but we                 *  ensure that we never wait longer than the amount of time                 *  which was originally requested. (if we fail to get the obj                 *  after being woken its actually a little less than the                 *  requested time)                 */                result = pop();                                if( null != result )                    break;          // we have an obj                                if( isClosed() )    // we didn't get one and its closed so there                    break;          // is no chance there will ever be one.                                long waitfor = TimeUtils.toRelativeTimeMillis( absoluteTimeOut );                                if( waitfor <= 0 )  // there is no wait time left.                    break;                                queue.wait( waitfor );            } while( !isClosed() );                        // wake someone else who might be waiting. This is apparently better            // than just letting the scheduler notice the synchro is no longer            // occupied.            queue.notify();        }                return result;    }        /**     *  Returns an array of objects, possibly empty, from the queue.     *     *  @param maxObjs  the maximum number of items to return.     *  @return an array of objects, possibly empty containing the returned     *  queue elements.     **/    public Object [] popMulti( int maxObjs ) {        if( maxObjs <= 0 )            throw new IllegalArgumentException( "maxObjs must be > 0" );                maxObjs = Math.min( maxObjs, queue.size() );        Object [] result = new Object [maxObjs];        for( int eachElement = 0; eachElement < maxObjs; eachElement++ ) {            sumOfQueueSizesDequeue += queue.size();            numDequeued++;            result [eachElement] = queue.remove(0);        }                        return result;    }        /**     *  How many objects will fit in this queue     *     *  @return int indicating how many objects will fit in the queue.     **/    public int getMaxQueueSize() {        return maxObjects;    }        /**     *  Set how many objects this queue may store. Note that if there are more     *  objects already in the queue than the specified amount then the queue     *  will retain its current capacity.     *     *  @param maxObjs  The number of objects which the queue must be able to     *  store.     **/    public void setMaxQueueSize( int maxObjs ) {        maxObjects = maxObjs;    }        /**     *  Return the number of elements currently in the queue. This method is     *  useful for statistical sampling, but should not be used to determine     *  program logic due to the multi-threaded behaviour of these queues.     *     *  <p/>You should use the return values and timeout behaviour of the     *  {@link #push(Object)} and {@link #pop(long)} methods to regulate how you     *  use the queue.     *     *  @return the number of elements currently in the queue. Be warned that     *  even two sequential calls to this method may return different answers     *  due to activity on other threads.     **/    public int getCurrentInQueue() {        return queue.size();    }        /**     *  Return the total number of objects which have been enqueued on to this     *  queue during its existance.     *     *  @return how many objects have been queued.     **/    public long getNumEnqueued() {        return numEnqueued;    }        /**     *  Return the average number of elements in the queue at Enqueue time.     *     *  @return average number of elements which were in the queue at during all     *  of the "push" operations which returned a "true" result. Does not     *  include the item being pushed. If no elements have ever been enqueued     *  then "NaN" will be returned.     **/    public double getAvgInQueueAtEnqueue() {        if( numEnqueued > 0 )            return (double) sumOfQueueSizesEnqueue / numEnqueued;        else            return Double.NaN;    }        /**     *  Return the total number of objects which have been dequeued from this     *  queue during its existance.     *     *  @return how many objects have been queued.     **/    public long getNumDequeued() {        return numDequeued;    }        /**     *  Return the average number of elements in the queue at dequeue time.     *     *  @return average number of elements which were in the queue at during all     *  of the "pop" operations which returned a non-null result. Includes the     * item being "pop"ed in the average. If no elements have ever been dequeued     *  then "NaN" will be returned.     **/    public double getAvgInQueueAtDequeue() {        if( numDequeued > 0 )            return (double) sumOfQueueSizesDequeue / numDequeued;        else return Double.NaN;    }        /**     *  Return the total number of objects which have been dropped by this queue     *  during its existance.     *     *  @return how many objects have been dropped.     **/    public long getNumDropped() {        return numDropped;    }        public void interrupt() {        synchronized (queue) {            queue.notify();        }    }    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -