queue2.java
来自「JGRoups源码」· Java 代码 · 共 688 行 · 第 1/2 页
JAVA
688 行
// $Id: Queue2.java,v 1.5 2004/12/31 14:10:40 belaban Exp $package org.jgroups.util;import EDU.oswego.cs.dl.util.concurrent.CondVar;import EDU.oswego.cs.dl.util.concurrent.Mutex;import EDU.oswego.cs.dl.util.concurrent.Sync;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.TimeoutException;import java.util.Vector;/** * Elements are added at the tail and removed from the head. Class is thread-safe in that * 1 producer and 1 consumer may add/remove elements concurrently. The class is not * explicitely designed for multiple producers or consumers. Implemented as a linked * list, so that removal of an element at the head does not cause a right-shift of the * remaining elements (as in a Vector-based implementation). * <p>Implementation is based on util.concurrent.* classes * @author Bela Ban * @author Filip Hanik */public class Queue2 { /*head and the tail of the list so that we can easily add and remove objects*/ Element head=null, tail=null; /*flag to determine the state of the queue*/ boolean closed=false; /*current size of the queue*/ int size=0; /* Lock object for synchronization. Is notified when element is added */ final Sync mutex=new Mutex(); /** Signals to listeners when an element has been added */ final CondVar add_condvar=new CondVar(mutex); /** Signals to listeners when an element has been removed */ final CondVar remove_condvar=new CondVar(mutex); /*the number of end markers that have been added*/ int num_markers=0; protected static final Log log=LogFactory.getLog(Queue2.class); /** * if the queue closes during the runtime * an endMarker object is added to the end of the queue to indicate that * the queue will close automatically when the end marker is encountered * This allows for a "soft" close. * @see Queue#close */ private static final Object endMarker=new Object(); /** * the class Element indicates an object in the queue. * This element allows for the linked list algorithm by always holding a * reference to the next element in the list. * if Element.next is null, then this element is the tail of the list. */ class Element { /*the actual value stored in the queue*/ Object obj=null; /*pointer to the next item in the (queue) linked list*/ Element next=null; /** * creates an Element object holding its value * @param o - the object to be stored in the queue position */ Element(Object o) { obj=o; } /** * prints out the value of the object */ public String toString() { return obj != null? obj.toString() : "null"; } } /** * creates an empty queue */ public Queue2() { } /** * Returns the first element. Returns null if no elements are available. */ public Object getFirst() { return head != null? head.obj : null; } /** * Returns the last element. Returns null if no elements are available. */ public Object getLast() { return tail != null? tail.obj : null; } /** * returns true if the Queue has been closed * however, this method will return false if the queue has been closed * using the close(true) method and the last element has yet not been received. * @return true if the queue has been closed */ public boolean closed() { return closed; } /** * adds an object to the tail of this queue * If the queue has been closed with close(true) no exception will be * thrown if the queue has not been flushed yet. * @param obj - the object to be added to the queue * @exception QueueClosedException exception if closed() returns true */ public void add(Object obj) throws QueueClosedException { if(obj == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("Queue2.add(): queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); try { mutex.acquire(); /*create a new linked list element*/ Element el=new Element(obj); /*check the first element*/ if(head == null) { /*the object added is the first element*/ /*set the head to be this object*/ head=el; /*set the tail to be this object*/ tail=head; /*set the size to be one, since the queue was empty*/ size=1; } else { /*add the object to the end of the linked list*/ tail.next=el; /*set the tail to point to the last element*/ tail=el; /*increase the size*/ size++; } /*wake up all the threads that are waiting for the lock to be released*/ add_condvar.broadcast(); // todo: maybe signal is all we need ? } catch(InterruptedException e) { } finally { mutex.release(); } } /** * Adds a new object to the head of the queue * basically (obj.equals(queue.remove(queue.add(obj)))) returns true * If the queue has been closed with close(true) no exception will be * thrown if the queue has not been flushed yet. * @param obj - the object to be added to the queue * @exception QueueClosedException exception if closed() returns true * */ public void addAtHead(Object obj) throws QueueClosedException { if(obj == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("Queue2.addAtHead(): queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); try { mutex.acquire(); Element el=new Element(obj); /*check the head element in the list*/ if(head == null) { /*this is the first object, we could have done add(obj) here*/ head=el; tail=head; size=1; } else { /*set the head element to be the child of this one*/ el.next=head; /*set the head to point to the recently added object*/ head=el; /*increase the size*/ size++; } /*wake up all the threads that are waiting for the lock to be released*/ add_condvar.broadcast(); } catch(InterruptedException e) { } finally { mutex.release(); } } /** * Removes 1 element from head or <B>blocks</B> * until next element has been added or until queue has been closed * @return the first element to be taken of the queue */ public Object remove() throws QueueClosedException { /*initialize the return value*/ Object retval=null; try { mutex.acquire(); /*wait as long as the queue is empty. return when an element is present or queue is closed*/ while(size == 0) { if(closed) throw new QueueClosedException(); try { add_condvar.await(); } catch(InterruptedException ex) { } } if(closed) throw new QueueClosedException(); /*remove the head from the queue, if we make it to this point, retval should not be null !*/ retval=removeInternal(); if(retval == null) if(log.isErrorEnabled()) log.error("element was null, should never be the case"); } catch(InterruptedException e) { ; } finally { mutex.release(); } /* * we ran into an Endmarker, which means that the queue was closed before * through close(true) */ if(retval == endMarker) { close(false); // mark queue as closed throw new QueueClosedException(); } /*return the object*/ return retval; } /** * Removes 1 element from the head. * If the queue is empty the operation will wait for timeout ms. * if no object is added during the timeout time, a Timout exception is thrown * @param timeout - the number of milli seconds this operation will wait before it times out * @return the first object in the queue */ public Object remove(long timeout) throws QueueClosedException, TimeoutException { Object retval=null; try { mutex.acquire(); /*if the queue size is zero, we want to wait until a new object is added*/ if(size == 0) { if(closed) throw new QueueClosedException(); try { /*release the mutex lock and wait no more than timeout ms*/ add_condvar.timedwait(timeout); } catch(InterruptedException ex) { } } /*we either timed out, or got notified by the mutex lock object*/ /*check to see if the object closed*/ if(closed) throw new QueueClosedException(); /*get the next value*/ retval=removeInternal(); /*null result means we timed out*/ if(retval == null) throw new TimeoutException(); /*if we reached an end marker we are going to close the queue*/ if(retval == endMarker) { close(false); throw new QueueClosedException(); } } catch(InterruptedException e) { } finally { mutex.release(); } /*at this point we actually did receive a value from the queue, return it*/ return retval; } /** * removes a specific object from the queue. * the object is matched up using the Object.equals method. * @param obj the actual object to be removed from the queue */ public void removeElement(Object obj) throws QueueClosedException { Element el, tmp_el; boolean removed=false; if(obj == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } try { mutex.acquire(); el=head;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?