queue2.java

来自「JGRoups源码」· Java 代码 · 共 688 行 · 第 1/2 页

JAVA
688
字号
// $Id: Queue2.java,v 1.5 2004/12/31 14:10:40 belaban Exp $package org.jgroups.util;import EDU.oswego.cs.dl.util.concurrent.CondVar;import EDU.oswego.cs.dl.util.concurrent.Mutex;import EDU.oswego.cs.dl.util.concurrent.Sync;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.TimeoutException;import java.util.Vector;/** * Elements are added at the tail and removed from the head. Class is thread-safe in that * 1 producer and 1 consumer may add/remove elements concurrently. The class is not * explicitely designed for multiple producers or consumers. Implemented as a linked * list, so that removal of an element at the head does not cause a right-shift of the * remaining elements (as in a Vector-based implementation). * <p>Implementation is based on util.concurrent.* classes * @author Bela Ban * @author Filip Hanik */public class Queue2 {    /*head and the tail of the list so that we can easily add and remove objects*/    Element head=null, tail=null;    /*flag to determine the state of the queue*/    boolean closed=false;    /*current size of the queue*/    int     size=0;    /* Lock object for synchronization. Is notified when element is added */    final Sync    mutex=new Mutex();    /** Signals to listeners when an element has been added */    final CondVar add_condvar=new CondVar(mutex);    /** Signals to listeners when an element has been removed */    final CondVar remove_condvar=new CondVar(mutex);    /*the number of end markers that have been added*/    int     num_markers=0;    protected static final Log log=LogFactory.getLog(Queue2.class);    /**     * if the queue closes during the runtime     * an endMarker object is added to the end of the queue to indicate that     * the queue will close automatically when the end marker is encountered     * This allows for a "soft" close.     * @see Queue#close     */    private static final Object endMarker=new Object();    /**     * the class Element indicates an object in the queue.     * This element allows for the linked list algorithm by always holding a     * reference to the next element in the list.     * if Element.next is null, then this element is the tail of the list.     */    class Element {        /*the actual value stored in the queue*/        Object obj=null;        /*pointer to the next item in the (queue) linked list*/        Element next=null;        /**         * creates an Element object holding its value         * @param o - the object to be stored in the queue position         */        Element(Object o) {            obj=o;        }        /**         * prints out the value of the object         */        public String toString() {            return obj != null? obj.toString() : "null";        }    }    /**     * creates an empty queue     */    public Queue2() {    }    /**     * Returns the first element. Returns null if no elements are available.     */    public Object getFirst() {        return head != null? head.obj : null;    }    /**     * Returns the last element. Returns null if no elements are available.     */    public Object getLast() {        return tail != null? tail.obj : null;    }    /**     * returns true if the Queue has been closed     * however, this method will return false if the queue has been closed     * using the close(true) method and the last element has yet not been received.     * @return true if the queue has been closed     */    public boolean closed() {        return closed;    }    /**     * adds an object to the tail of this queue     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     * @param obj - the object to be added to the queue     * @exception QueueClosedException exception if closed() returns true     */    public void add(Object obj) throws QueueClosedException {        if(obj == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        if(closed)            throw new QueueClosedException();        if(this.num_markers > 0)            throw new QueueClosedException("Queue2.add(): queue has been closed. You can not add more elements. " +                                           "Waiting for removal of remaining elements.");        try {            mutex.acquire();            /*create a new linked list element*/            Element el=new Element(obj);            /*check the first element*/            if(head == null) {                /*the object added is the first element*/                /*set the head to be this object*/                head=el;                /*set the tail to be this object*/                tail=head;                /*set the size to be one, since the queue was empty*/                size=1;            }            else {                /*add the object to the end of the linked list*/                tail.next=el;                /*set the tail to point to the last element*/                tail=el;                /*increase the size*/                size++;            }            /*wake up all the threads that are waiting for the lock to be released*/            add_condvar.broadcast(); // todo: maybe signal is all we need ?        }        catch(InterruptedException e) {        }        finally {            mutex.release();        }    }    /**     * Adds a new object to the head of the queue     * basically (obj.equals(queue.remove(queue.add(obj)))) returns true     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     * @param obj - the object to be added to the queue     * @exception QueueClosedException exception if closed() returns true     *     */    public void addAtHead(Object obj) throws QueueClosedException {        if(obj == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        if(closed)            throw new QueueClosedException();        if(this.num_markers > 0)            throw new QueueClosedException("Queue2.addAtHead(): queue has been closed. You can not add more elements. " +                                           "Waiting for removal of remaining elements.");        try {            mutex.acquire();            Element el=new Element(obj);            /*check the head element in the list*/            if(head == null) {                /*this is the first object, we could have done add(obj) here*/                head=el;                tail=head;                size=1;            }            else {                /*set the head element to be the child of this one*/                el.next=head;                /*set the head to point to the recently added object*/                head=el;                /*increase the size*/                size++;            }            /*wake up all the threads that are waiting for the lock to be released*/            add_condvar.broadcast();        }        catch(InterruptedException e) {        }        finally {            mutex.release();        }    }    /**     * Removes 1 element from head or <B>blocks</B>     * until next element has been added or until queue has been closed     * @return the first element to be taken of the queue     */    public Object remove() throws QueueClosedException {        /*initialize the return value*/        Object retval=null;        try {            mutex.acquire();            /*wait as long as the queue is empty. return when an element is present or queue is closed*/            while(size == 0) {                if(closed)                    throw new QueueClosedException();                try {                    add_condvar.await();                }                catch(InterruptedException ex) {                }            }            if(closed)                throw new QueueClosedException();            /*remove the head from the queue, if we make it to this point, retval should not be null !*/            retval=removeInternal();            if(retval == null)                if(log.isErrorEnabled()) log.error("element was null, should never be the case");        }        catch(InterruptedException e) {            ;        }        finally {            mutex.release();        }        /*         * we ran into an Endmarker, which means that the queue was closed before         * through close(true)         */        if(retval == endMarker) {            close(false); // mark queue as closed            throw new QueueClosedException();        }        /*return the object*/        return retval;    }    /**     * Removes 1 element from the head.     * If the queue is empty the operation will wait for timeout ms.     * if no object is added during the timeout time, a Timout exception is thrown     * @param timeout - the number of milli seconds this operation will wait before it times out     * @return the first object in the queue     */    public Object remove(long timeout) throws QueueClosedException, TimeoutException {        Object retval=null;        try {            mutex.acquire();                      /*if the queue size is zero, we want to wait until a new object is added*/            if(size == 0) {                if(closed)                    throw new QueueClosedException();                try {                    /*release the mutex lock and wait no more than timeout ms*/                    add_condvar.timedwait(timeout);                }                catch(InterruptedException ex) {                }            }            /*we either timed out, or got notified by the mutex lock object*/            /*check to see if the object closed*/            if(closed)                throw new QueueClosedException();            /*get the next value*/            retval=removeInternal();            /*null result means we timed out*/            if(retval == null) throw new TimeoutException();            /*if we reached an end marker we are going to close the queue*/            if(retval == endMarker) {                close(false);                throw new QueueClosedException();            }        }        catch(InterruptedException e) {        }        finally {            mutex.release();        }        /*at this point we actually did receive a value from the queue, return it*/        return retval;    }    /**     * removes a specific object from the queue.     * the object is matched up using the Object.equals method.     * @param   obj the actual object to be removed from the queue     */    public void removeElement(Object obj) throws QueueClosedException {        Element el, tmp_el;        boolean removed=false;        if(obj == null) {            if(log.isErrorEnabled()) log.error("argument must not be null");            return;        }        try {            mutex.acquire();            el=head;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?