📄 queue.java
字号:
package com.huawei.comm.smap;
// $Id: Queue.java,v 1.29 2006/08/08 15:34:22 belaban Exp $
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.LinkedList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Enumeration;
/**
* Elements are added at the tail and removed from the head. Class is thread-safe in that
* 1 producer and 1 consumer may add/remove elements concurrently. The class is not
* explicitely designed for multiple producers or consumers. Implemented as a linked
* list, so that removal of an element at the head does not cause a right-shift of the
* remaining elements (as in a Vector-based implementation).
* @author Bela Ban
*/
public class Queue
{
/*head and the tail of the list so that we can easily add and remove objects*/
private Element head = null, tail = null;
/*flag to determine the state of the queue*/
private boolean closed = false;
/*current size of the queue*/
private int size = 0;
/* Lock object for synchronization. Is notified when element is added */
private final Object mutex = new Object();
/** Lock object for syncing on removes. It is notified when an object is removed */
// Object remove_mutex=new Object();
/*the number of end markers that have been added*/
private int num_markers = 0;
/**
* if the queue closes during the runtime
* an endMarker object is added to the end of the queue to indicate that
* the queue will close automatically when the end marker is encountered
* This allows for a "soft" close.
* @see Queue#close
*/
private static final Object endMarker = new Object();
protected static final Log log = LogFactory.getLog(Queue.class);
/**
* the class Element indicates an object in the queue.
* This element allows for the linked list algorithm by always holding a
* reference to the next element in the list.
* if Element.next is null, then this element is the tail of the list.
*/
static class Element
{
/*the actual value stored in the queue*/
Object obj = null;
/*pointer to the next item in the (queue) linked list*/
Element next = null;
/**
* creates an Element object holding its value
* @param o - the object to be stored in the queue position
*/
Element(Object o)
{
obj = o;
}
/**
* prints out the value of the object
*/
public String toString()
{
return obj != null ? obj.toString() : "null";
}
}
/**
* creates an empty queue
*/
public Queue()
{
}
/**
* Returns the first element. Returns null if no elements are available.
*/
public Object getFirst()
{
synchronized (mutex)
{
return head != null ? head.obj : null;
}
}
/**
* Returns the last element. Returns null if no elements are available.
*/
public Object getLast()
{
synchronized (mutex)
{
return tail != null ? tail.obj : null;
}
}
/**
* returns true if the Queue has been closed
* however, this method will return false if the queue has been closed
* using the close(true) method and the last element has yet not been received.
* @return true if the queue has been closed
*/
public boolean closed()
{
synchronized (mutex)
{
return closed;
}
}
/**
* adds an object to the tail of this queue
* If the queue has been closed with close(true) no exception will be
* thrown if the queue has not been flushed yet.
* @param obj - the object to be added to the queue
* @exception QueueClosedException exception if closed() returns true
*/
public void add(Object obj)
throws QueueClosedException
{
if (obj == null)
{
if (log.isErrorEnabled())
{
log.error("argument must not be null");
}
return;
}
/*lock the queue from other threads*/
synchronized (mutex)
{
if (closed)
{
throw new QueueClosedException();
}
if (this.num_markers > 0)
{
throw new QueueClosedException(
"queue has been closed. You can not add more elements. " +
"Waiting for removal of remaining elements.");
}
addInternal(obj);
/*wake up all the threads that are waiting for the lock to be released*/
mutex.notifyAll();
}
}
public void addAll(Collection c)
throws QueueClosedException
{
if (c == null)
{
if (log.isErrorEnabled())
{
log.error("argument must not be null");
}
return;
}
/*lock the queue from other threads*/
synchronized (mutex)
{
if (closed)
{
throw new QueueClosedException();
}
if (this.num_markers > 0)
{
throw new QueueClosedException(
"queue has been closed. You can not add more elements. " +
"Waiting for removal of remaining elements.");
}
Object obj;
for (Iterator it = c.iterator(); it.hasNext(); )
{
obj = it.next();
if (obj != null)
{
addInternal(obj);
}
}
/*wake up all the threads that are waiting for the lock to be released*/
mutex.notifyAll();
}
}
public void addAll(List l)
throws QueueClosedException
{
if (l == null)
{
if (log.isErrorEnabled())
{
log.error("argument must not be null");
}
return;
}
/*lock the queue from other threads*/
synchronized (mutex)
{
if (closed)
{
throw new QueueClosedException();
}
if (this.num_markers > 0)
{
throw new QueueClosedException(
"queue has been closed. You can not add more elements. " +
"Waiting for removal of remaining elements.");
}
Object obj;
for (Enumeration en = l.elements(); en.hasMoreElements(); )
{
obj = en.nextElement();
if (obj != null)
{
addInternal(obj);
}
}
/*wake up all the threads that are waiting for the lock to be released*/
mutex.notifyAll();
}
}
/**
* Adds a new object to the head of the queue
* basically (obj.equals(queue.remove(queue.add(obj)))) returns true
* If the queue has been closed with close(true) no exception will be
* thrown if the queue has not been flushed yet.
* @param obj - the object to be added to the queue
* @exception QueueClosedException exception if closed() returns true
*/
public void addAtHead(Object obj)
throws QueueClosedException
{
if (obj == null)
{
if (log.isErrorEnabled())
{
log.error("argument must not be null");
}
return;
}
/*lock the queue from other threads*/
synchronized (mutex)
{
if (closed)
{
throw new QueueClosedException();
}
if (this.num_markers > 0)
{
throw new QueueClosedException(
"Queue.addAtHead(): queue has been closed. You can not add more elements. " +
"Waiting for removal of remaining elements.");
}
Element el = new Element(obj);
/*check the head element in the list*/
if (head == null)
{
/*this is the first object, we could have done add(obj) here*/
head = el;
tail = head;
size = 1;
}
else
{
/*set the head element to be the child of this one*/
el.next = head;
/*set the head to point to the recently added object*/
head = el;
/*increase the size*/
size++;
}
/*wake up all the threads that are waiting for the lock to be released*/
mutex.notifyAll();
}
}
/**
* Removes 1 element from head or <B>blocks</B>
* until next element has been added or until queue has been closed
* @return the first element to be taken of the queue
*/
public Object remove()
throws QueueClosedException
{
Object retval;
synchronized (mutex)
{
/*wait as long as the queue is empty. return when an element is present or queue is closed*/
while (size == 0)
{
if (closed)
{
throw new QueueClosedException();
}
try
{
mutex.wait();
}
catch (InterruptedException ex)
{
}
}
if (closed)
{
throw new QueueClosedException();
}
/*remove the head from the queue, if we make it to this point, retval should not be null !*/
retval = removeInternal();
if (retval == null)
{
if (log.isErrorEnabled())
{
log.error("element was null, should never be the case");
}
}
}
/*
* we ran into an Endmarker, which means that the queue was closed before
* through close(true)
*/
// if(retval == endMarker) {
// close(false); // mark queue as closed
// throw new QueueClosedException();
// }
return retval;
}
/**
* Removes 1 element from the head.
* If the queue is empty the operation will wait for timeout ms.
* if no object is added during the timeout time, a Timout exception is thrown
* @param timeout - the number of milli seconds this operation will wait before it times out
* @return the first object in the queue
*/
public Object remove(long timeout)
throws QueueClosedException, TimeoutException
{
Object retval;
synchronized (mutex)
{
if (closed)
{
throw new QueueClosedException();
}
/*if the queue size is zero, we want to wait until a new object is added*/
if (size == 0)
{
try
{
/*release the mutex lock and wait no more than timeout ms*/
mutex.wait(timeout);
}
catch (InterruptedException ex)
{
}
}
/*we either timed out, or got notified by the mutex lock object*/
if (closed)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -