queue.java

来自「JGRoups源码」· Java 代码 · 共 633 行 · 第 1/2 页

JAVA
633
字号
// $Id: Queue.java,v 1.29 2006/08/08 15:34:22 belaban Exp $package org.jgroups.util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.TimeoutException;import java.util.LinkedList;import java.util.Collection;import java.util.Iterator;import java.util.Enumeration;/** * Elements are added at the tail and removed from the head. Class is thread-safe in that * 1 producer and 1 consumer may add/remove elements concurrently. The class is not * explicitely designed for multiple producers or consumers. Implemented as a linked * list, so that removal of an element at the head does not cause a right-shift of the * remaining elements (as in a Vector-based implementation). * @author Bela Ban */public class Queue {    /*head and the tail of the list so that we can easily add and remove objects*/    private Element head=null, tail=null;    /*flag to determine the state of the queue*/    private boolean closed=false;    /*current size of the queue*/    private int     size=0;    /* Lock object for synchronization. Is notified when element is added */    private final Object  mutex=new Object();    /** Lock object for syncing on removes. It is notified when an object is removed */    // Object  remove_mutex=new Object();    /*the number of end markers that have been added*/    private int     num_markers=0;    /**     * if the queue closes during the runtime     * an endMarker object is added to the end of the queue to indicate that     * the queue will close automatically when the end marker is encountered     * This allows for a "soft" close.     * @see Queue#close     */    private static final Object endMarker=new Object();    protected static final Log log=LogFactory.getLog(Queue.class);    /**     * the class Element indicates an object in the queue.     * This element allows for the linked list algorithm by always holding a     * reference to the next element in the list.     * if Element.next is null, then this element is the tail of the list.     */    static class Element {        /*the actual value stored in the queue*/        Object  obj=null;        /*pointer to the next item in the (queue) linked list*/        Element next=null;        /**         * creates an Element object holding its value         * @param o - the object to be stored in the queue position         */        Element(Object o) {            obj=o;        }        /**         * prints out the value of the object         */        public String toString() {            return obj != null? obj.toString() : "null";        }    }    /**     * creates an empty queue     */    public Queue() {    }    /**     * Returns the first element. Returns null if no elements are available.     */    public Object getFirst() {        synchronized(mutex) {            return head != null? head.obj : null;        }    }    /**     * Returns the last element. Returns null if no elements are available.     */    public Object getLast() {        synchronized(mutex) {            return tail != null? tail.obj : null;        }    }    /**     * returns true if the Queue has been closed     * however, this method will return false if the queue has been closed     * using the close(true) method and the last element has yet not been received.     * @return true if the queue has been closed     */    public boolean closed() {        synchronized(mutex) {            return closed;        }    }    /**     * adds an object to the tail of this queue     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     * @param obj - the object to be added to the queue     * @exception QueueClosedException exception if closed() returns true     */    public void add(Object obj) throws QueueClosedException {        if(obj == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        /*lock the queue from other threads*/        synchronized(mutex) {           if(closed)              throw new QueueClosedException();           if(this.num_markers > 0)              throw new QueueClosedException("queue has been closed. You can not add more elements. " +                                             "Waiting for removal of remaining elements.");            addInternal(obj);            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    public void addAll(Collection c) throws QueueClosedException {        if(c == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        /*lock the queue from other threads*/        synchronized(mutex) {           if(closed)              throw new QueueClosedException();           if(this.num_markers > 0)              throw new QueueClosedException("queue has been closed. You can not add more elements. " +                                             "Waiting for removal of remaining elements.");            Object obj;            for(Iterator it=c.iterator(); it.hasNext();) {                obj=it.next();                if(obj != null)                    addInternal(obj);            }            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    public void addAll(List l) throws QueueClosedException {        if(l == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        /*lock the queue from other threads*/        synchronized(mutex) {           if(closed)              throw new QueueClosedException();           if(this.num_markers > 0)              throw new QueueClosedException("queue has been closed. You can not add more elements. " +                                             "Waiting for removal of remaining elements.");            Object obj;            for(Enumeration en=l.elements(); en.hasMoreElements();) {                obj=en.nextElement();                if(obj != null)                    addInternal(obj);            }            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    /**     * Adds a new object to the head of the queue     * basically (obj.equals(queue.remove(queue.add(obj)))) returns true     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     * @param obj - the object to be added to the queue     * @exception QueueClosedException exception if closed() returns true     */    public void addAtHead(Object obj) throws QueueClosedException {        if(obj == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        /*lock the queue from other threads*/        synchronized(mutex) {           if(closed)              throw new QueueClosedException();           if(this.num_markers > 0)              throw new QueueClosedException("Queue.addAtHead(): queue has been closed. You can not add more elements. " +                                             "Waiting for removal of remaining elements.");            Element el=new Element(obj);            /*check the head element in the list*/            if(head == null) {                /*this is the first object, we could have done add(obj) here*/                head=el;                tail=head;                size=1;            }            else {                /*set the head element to be the child of this one*/                el.next=head;                /*set the head to point to the recently added object*/                head=el;                /*increase the size*/                size++;            }            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    /**     * Removes 1 element from head or <B>blocks</B>     * until next element has been added or until queue has been closed     * @return the first element to be taken of the queue     */    public Object remove() throws QueueClosedException {        Object retval;        synchronized(mutex) {            /*wait as long as the queue is empty. return when an element is present or queue is closed*/            while(size == 0) {                if(closed)                    throw new QueueClosedException();                try {                    mutex.wait();                }                catch(InterruptedException ex) {                }            }            if(closed)                throw new QueueClosedException();            /*remove the head from the queue, if we make it to this point, retval should not be null !*/            retval=removeInternal();            if(retval == null)                if(log.isErrorEnabled()) log.error("element was null, should never be the case");        }        /*         * we ran into an Endmarker, which means that the queue was closed before         * through close(true)         *///        if(retval == endMarker) {//            close(false); // mark queue as closed//            throw new QueueClosedException();//        }        return retval;    }    /**     * Removes 1 element from the head.     * If the queue is empty the operation will wait for timeout ms.     * if no object is added during the timeout time, a Timout exception is thrown     * @param timeout - the number of milli seconds this operation will wait before it times out     * @return the first object in the queue     */    public Object remove(long timeout) throws QueueClosedException, TimeoutException {        Object retval;        synchronized(mutex) {            if(closed)                throw new QueueClosedException();            /*if the queue size is zero, we want to wait until a new object is added*/            if(size == 0) {                try {                    /*release the mutex lock and wait no more than timeout ms*/                    mutex.wait(timeout);                }                catch(InterruptedException ex) {                }            }            /*we either timed out, or got notified by the mutex lock object*/            if(closed)                throw new QueueClosedException();            /*get the next value*/

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?