queue.java
来自「JGRoups源码」· Java 代码 · 共 633 行 · 第 1/2 页
JAVA
633 行
// $Id: Queue.java,v 1.29 2006/08/08 15:34:22 belaban Exp $package org.jgroups.util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.TimeoutException;import java.util.LinkedList;import java.util.Collection;import java.util.Iterator;import java.util.Enumeration;/** * Elements are added at the tail and removed from the head. Class is thread-safe in that * 1 producer and 1 consumer may add/remove elements concurrently. The class is not * explicitely designed for multiple producers or consumers. Implemented as a linked * list, so that removal of an element at the head does not cause a right-shift of the * remaining elements (as in a Vector-based implementation). * @author Bela Ban */public class Queue { /*head and the tail of the list so that we can easily add and remove objects*/ private Element head=null, tail=null; /*flag to determine the state of the queue*/ private boolean closed=false; /*current size of the queue*/ private int size=0; /* Lock object for synchronization. Is notified when element is added */ private final Object mutex=new Object(); /** Lock object for syncing on removes. It is notified when an object is removed */ // Object remove_mutex=new Object(); /*the number of end markers that have been added*/ private int num_markers=0; /** * if the queue closes during the runtime * an endMarker object is added to the end of the queue to indicate that * the queue will close automatically when the end marker is encountered * This allows for a "soft" close. * @see Queue#close */ private static final Object endMarker=new Object(); protected static final Log log=LogFactory.getLog(Queue.class); /** * the class Element indicates an object in the queue. * This element allows for the linked list algorithm by always holding a * reference to the next element in the list. * if Element.next is null, then this element is the tail of the list. */ static class Element { /*the actual value stored in the queue*/ Object obj=null; /*pointer to the next item in the (queue) linked list*/ Element next=null; /** * creates an Element object holding its value * @param o - the object to be stored in the queue position */ Element(Object o) { obj=o; } /** * prints out the value of the object */ public String toString() { return obj != null? obj.toString() : "null"; } } /** * creates an empty queue */ public Queue() { } /** * Returns the first element. Returns null if no elements are available. */ public Object getFirst() { synchronized(mutex) { return head != null? head.obj : null; } } /** * Returns the last element. Returns null if no elements are available. */ public Object getLast() { synchronized(mutex) { return tail != null? tail.obj : null; } } /** * returns true if the Queue has been closed * however, this method will return false if the queue has been closed * using the close(true) method and the last element has yet not been received. * @return true if the queue has been closed */ public boolean closed() { synchronized(mutex) { return closed; } } /** * adds an object to the tail of this queue * If the queue has been closed with close(true) no exception will be * thrown if the queue has not been flushed yet. * @param obj - the object to be added to the queue * @exception QueueClosedException exception if closed() returns true */ public void add(Object obj) throws QueueClosedException { if(obj == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } /*lock the queue from other threads*/ synchronized(mutex) { if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); addInternal(obj); /*wake up all the threads that are waiting for the lock to be released*/ mutex.notifyAll(); } } public void addAll(Collection c) throws QueueClosedException { if(c == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } /*lock the queue from other threads*/ synchronized(mutex) { if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); Object obj; for(Iterator it=c.iterator(); it.hasNext();) { obj=it.next(); if(obj != null) addInternal(obj); } /*wake up all the threads that are waiting for the lock to be released*/ mutex.notifyAll(); } } public void addAll(List l) throws QueueClosedException { if(l == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } /*lock the queue from other threads*/ synchronized(mutex) { if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); Object obj; for(Enumeration en=l.elements(); en.hasMoreElements();) { obj=en.nextElement(); if(obj != null) addInternal(obj); } /*wake up all the threads that are waiting for the lock to be released*/ mutex.notifyAll(); } } /** * Adds a new object to the head of the queue * basically (obj.equals(queue.remove(queue.add(obj)))) returns true * If the queue has been closed with close(true) no exception will be * thrown if the queue has not been flushed yet. * @param obj - the object to be added to the queue * @exception QueueClosedException exception if closed() returns true */ public void addAtHead(Object obj) throws QueueClosedException { if(obj == null) { if(log.isErrorEnabled()) log.error("argument must not be null"); return; } /*lock the queue from other threads*/ synchronized(mutex) { if(closed) throw new QueueClosedException(); if(this.num_markers > 0) throw new QueueClosedException("Queue.addAtHead(): queue has been closed. You can not add more elements. " + "Waiting for removal of remaining elements."); Element el=new Element(obj); /*check the head element in the list*/ if(head == null) { /*this is the first object, we could have done add(obj) here*/ head=el; tail=head; size=1; } else { /*set the head element to be the child of this one*/ el.next=head; /*set the head to point to the recently added object*/ head=el; /*increase the size*/ size++; } /*wake up all the threads that are waiting for the lock to be released*/ mutex.notifyAll(); } } /** * Removes 1 element from head or <B>blocks</B> * until next element has been added or until queue has been closed * @return the first element to be taken of the queue */ public Object remove() throws QueueClosedException { Object retval; synchronized(mutex) { /*wait as long as the queue is empty. return when an element is present or queue is closed*/ while(size == 0) { if(closed) throw new QueueClosedException(); try { mutex.wait(); } catch(InterruptedException ex) { } } if(closed) throw new QueueClosedException(); /*remove the head from the queue, if we make it to this point, retval should not be null !*/ retval=removeInternal(); if(retval == null) if(log.isErrorEnabled()) log.error("element was null, should never be the case"); } /* * we ran into an Endmarker, which means that the queue was closed before * through close(true) */// if(retval == endMarker) {// close(false); // mark queue as closed// throw new QueueClosedException();// } return retval; } /** * Removes 1 element from the head. * If the queue is empty the operation will wait for timeout ms. * if no object is added during the timeout time, a Timout exception is thrown * @param timeout - the number of milli seconds this operation will wait before it times out * @return the first object in the queue */ public Object remove(long timeout) throws QueueClosedException, TimeoutException { Object retval; synchronized(mutex) { if(closed) throw new QueueClosedException(); /*if the queue size is zero, we want to wait until a new object is added*/ if(size == 0) { try { /*release the mutex lock and wait no more than timeout ms*/ mutex.wait(timeout); } catch(InterruptedException ex) { } } /*we either timed out, or got notified by the mutex lock object*/ if(closed) throw new QueueClosedException(); /*get the next value*/
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?