📄 queue.java
字号:
{
throw new QueueClosedException();
}
/*get the next value*/
retval = removeInternal();
/*null result means we timed out*/
if (retval == null)
{
throw new TimeoutException("timeout=" + timeout + "ms");
}
/*if we reached an end marker we are going to close the queue*/
// if(retval == endMarker) {
// close(false);
// throw new QueueClosedException();
// }
/*at this point we actually did receive a value from the queue, return it*/
return retval;
}
}
/**
* removes a specific object from the queue.
* the object is matched up using the Object.equals method.
* @param obj the actual object to be removed from the queue
*/
public void removeElement(Object obj)
throws QueueClosedException
{
Element el, tmp_el;
if (obj == null)
{
if (log.isErrorEnabled())
{
log.error("argument must not be null");
}
return;
}
synchronized (mutex)
{
if (closed) /*check to see if the queue is closed*/
{
throw new QueueClosedException();
}
el = head;
/*the queue is empty*/
if (el == null)
{
return;
}
/*check to see if the head element is the one to be removed*/
if (el.obj.equals(obj))
{
/*the head element matched we will remove it*/
head = el.next;
el.next = null;
el.obj = null;
/*check if we only had one object left
*at this time the queue becomes empty
*this will set the tail=head=null
*/
if (size == 1)
{
tail = head; // null
}
decrementSize();
return;
}
/*look through the other elements*/
while (el.next != null)
{
if (el.next.obj.equals(obj))
{
tmp_el = el.next;
if (tmp_el == tail) // if it is the last element, move tail one to the left (bela Sept 20 2002)
{
tail = el;
}
el.next.obj = null;
el.next = el.next.next; // point to the el past the next one. can be null.
tmp_el.next = null;
tmp_el.obj = null;
decrementSize();
break;
}
el = el.next;
}
}
}
/**
* returns the first object on the queue, without removing it.
* If the queue is empty this object blocks until the first queue object has
* been added
* @return the first object on the queue
*/
public Object peek()
throws QueueClosedException
{
Object retval;
synchronized (mutex)
{
while (size == 0)
{
if (closed)
{
throw new QueueClosedException();
}
try
{
mutex.wait();
}
catch (InterruptedException ex)
{
}
}
if (closed)
{
throw new QueueClosedException();
}
retval = (head != null) ? head.obj : null;
}
if (retval == endMarker)
{
close(false); // mark queue as closed
throw new QueueClosedException();
}
return retval;
}
/**
* returns the first object on the queue, without removing it.
* If the queue is empty this object blocks until the first queue object has
* been added or the operation times out
* @param timeout how long in milli seconds will this operation wait for an object to be added to the queue
* before it times out
* @return the first object on the queue
*/
public Object peek(long timeout)
throws QueueClosedException, TimeoutException
{
Object retval;
synchronized (mutex)
{
if (size == 0)
{
if (closed)
{
throw new QueueClosedException();
}
try
{
mutex.wait(timeout);
}
catch (InterruptedException ex)
{
}
}
if (closed)
{
throw new QueueClosedException();
}
retval = head != null ? head.obj : null;
if (retval == null)
{
throw new TimeoutException("timeout=" + timeout + "ms");
}
if (retval == endMarker)
{
close(false);
throw new QueueClosedException();
}
return retval;
}
}
/** Removes all elements from the queue. This method can succeed even when the queue is closed */
public void clear()
{
synchronized (mutex)
{
head = tail = null;
size = 0;
num_markers = 0;
mutex.notifyAll();
}
}
/**
Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is
attempted on a closed queue, an exception is thrown.
@param flush_entries When true, a end-of-entries marker is added to the end of the queue.
Entries may be added and removed, but when the end-of-entries marker
is encountered, the queue is marked as closed. This allows to flush
pending messages before closing the queue.
*/
public void close(boolean flush_entries)
{
synchronized (mutex)
{
if (flush_entries && size > 0)
{
try
{
add(endMarker); // add an end-of-entries marker to the end of the queue
num_markers++;
}
catch (QueueClosedException closed_ex)
{
}
return;
}
closed = true;
mutex.notifyAll();
}
}
/** Waits until the queue has been closed. Returns immediately if already closed
* @param timeout Number of milliseconds to wait. A value <= 0 means to wait forever
*/
public void waitUntilClosed(long timeout)
{
synchronized (mutex)
{
if (closed)
{
return;
}
try
{
mutex.wait(timeout);
}
catch (InterruptedException e)
{
}
}
}
/**
* resets the queue.
* This operation removes all the objects in the queue and marks the queue open
*/
public void reset()
{
synchronized (mutex)
{
num_markers = 0;
if (!closed)
{
close(false);
}
size = 0;
head = null;
tail = null;
closed = false;
mutex.notifyAll();
}
}
/**
* Returns all the elements of the queue
* @return A copy of the queue
*/
public LinkedList values()
{
LinkedList retval = new LinkedList();
synchronized (mutex)
{
Element el = head;
while (el != null)
{
retval.add(el.obj);
el = el.next;
}
}
return retval;
}
/**
* returns the number of objects that are currently in the queue
*/
public int size()
{
synchronized (mutex)
{
return size - num_markers;
}
}
/**
* prints the size of the queue
*/
public String toString()
{
return "Queue (" + size() + ") elements";
}
/* ------------------------------------- Private Methods ----------------------------------- */
private final void addInternal(Object obj)
{
/*create a new linked list element*/
Element el = new Element(obj);
/*check the first element*/
if (head == null)
{
/*the object added is the first element*/
/*set the head to be this object*/
head = el;
/*set the tail to be this object*/
tail = head;
/*set the size to be one, since the queue was empty*/
size = 1;
}
else
{
/*add the object to the end of the linked list*/
tail.next = el;
/*set the tail to point to the last element*/
tail = el;
/*increase the size*/
size++;
}
}
/**
* Removes the first element. Returns null if no elements in queue.
* Always called with mutex locked (we don't have to lock mutex ourselves)
*/
private Object removeInternal()
{
Element retval;
Object obj;
/*if the head is null, the queue is empty*/
if (head == null)
{
return null;
}
retval = head; // head must be non-null now
head = head.next;
if (head == null)
{
tail = null;
}
decrementSize();
if (head != null && head.obj == endMarker)
{
closed = true;
mutex.notifyAll();
}
retval.next = null;
obj = retval.obj;
retval.obj = null;
return obj;
}
/** Doesn't need to be synchronized; is always called from synchronized methods */
final private void decrementSize()
{
size--;
if (size < 0)
{
size = 0;
}
}
/* ---------------------------------- End of Private Methods -------------------------------- */
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -