linkedlistqueue.java

来自「JGRoups源码」· Java 代码 · 共 407 行

JAVA
407
字号
// $Id: LinkedListQueue.java,v 1.6 2004/09/23 16:29:56 belaban Exp $package org.jgroups.util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.TimeoutException;import java.util.Iterator;import java.util.LinkedList;import java.util.NoSuchElementException;import java.util.Vector;/** * LinkedListQueue implementation based on java.util.Queue. Can be renamed to Queue.java and compiled if someone wants to * use this implementation rather than the original Queue. However, a simple insertion and removal of 1 million * objects into this queue shoed that it was ca. 15-20% slower than the original queue. We just include it in the * JGroups distribution to maybe use it at a later point when it has become faster. * * @author Bela Ban */public class LinkedListQueue {    final LinkedList l=new LinkedList();    /*flag to determine the state of the Queue*/    boolean closed=false;    /*lock object for synchronization*/    final Object mutex=new Object();    /*the number of end markers that have been added*/    int num_markers=0;    /**     * if the queue closes during the runtime     * an endMarker object is added to the end of the queue to indicate that     * the queue will close automatically when the end marker is encountered     * This allows for a "soft" close.     *     * @see LinkedListQueue#close     */    private static final Object endMarker=new Object();    protected static final Log log=LogFactory.getLog(LinkedListQueue.class);    /**     * creates an empty queue     */    public LinkedListQueue() {    }    /**     * returns true if the Queue has been closed     * however, this method will return false if the queue has been closed     * using the close(true) method and the last element has yet not been received.     *     * @return true if the queue has been closed     */    public boolean closed() {        return closed;    }    /**     * adds an object to the tail of this queue     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     *     * @param obj - the object to be added to the queue     * @throws QueueClosed exception if closed() returns true     */    public void add(Object obj) throws QueueClosedException {        if(closed)            throw new QueueClosedException();        if(this.num_markers > 0)            throw new QueueClosedException("LinkedListQueue.add(): queue has been closed. You can not add more elements. " +                                           "Waiting for removal of remaining elements.");        /*lock the queue from other threads*/        synchronized(mutex) {            l.add(obj);            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    /**     * Adds a new object to the head of the queue     * basically (obj.equals(LinkedListQueue.remove(LinkedListQueue.add(obj)))) returns true     * If the queue has been closed with close(true) no exception will be     * thrown if the queue has not been flushed yet.     *     * @param obj - the object to be added to the queue     * @throws QueueClosed exception if closed() returns true     */    public void addAtHead(Object obj) throws QueueClosedException {        if(closed)            throw new QueueClosedException();        if(this.num_markers > 0)            throw new QueueClosedException("LinkedListQueue.addAtHead(): queue has been closed. You can not add more elements. " +                                           "Waiting for removal of remaining elements.");        /*lock the queue from other threads*/        synchronized(mutex) {            l.addFirst(obj);            /*wake up all the threads that are waiting for the lock to be released*/            mutex.notifyAll();        }    }    /**     * Removes 1 element from head or <B>blocks</B>     * until next element has been added     *     * @return the first element to be taken of the queue     */    public Object remove() throws QueueClosedException {        Object retval=null;        /*lock the queue*/        synchronized(mutex) {            /*wait as long as the queue is empty*/            while(l.size() == 0) {                if(closed)                    throw new QueueClosedException();                try {                    mutex.wait();                }                catch(InterruptedException ex) {                }            }            if(closed)                throw new QueueClosedException();            /*remove the head from the queue*/            try {                retval=l.removeFirst();                if(l.size() == 1 && l.getFirst().equals(endMarker))                    closed=true;            }            catch(NoSuchElementException ex) {                if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size());                return null;            }            // we ran into an Endmarker, which means that the queue was closed before            // through close(true)            if(retval == endMarker) {                close(false); // mark queue as closed                throw new QueueClosedException();            }        }        /*return the object, should be never null*/        return retval;    }    /**     * Removes 1 element from the head.     * If the queue is empty the operation will wait for timeout ms.     * if no object is added during the timeout time, a Timout exception is thrown     *     * @param timeout - the number of milli seconds this operation will wait before it times out     * @return the first object in the queue     */    public Object remove(long timeout) throws QueueClosedException, TimeoutException {        Object retval=null;        /*lock the queue*/        synchronized(mutex) {            /*if the queue size is zero, we want to wait until a new object is added*/            if(l.size() == 0) {                if(closed)                    throw new QueueClosedException();                try {                    /*release the mutex lock and wait no more than timeout ms*/                    mutex.wait(timeout);                }                catch(InterruptedException ex) {                }            }            /*we either timed out, or got notified by the mutex lock object*/            /*check to see if the object closed*/            if(closed)                throw new QueueClosedException();            /*get the next value*/            try {                retval=l.removeFirst();                if(l.size() == 1 && l.getFirst().equals(endMarker))                    closed=true;            }            catch(NoSuchElementException ex) {                /*null result means we timed out*/                throw new TimeoutException();            }	                /*if we reached an end marker we are going to close the queue*/            if(retval == endMarker) {                close(false);                throw new QueueClosedException();            }            /*at this point we actually did receive a value from the queue, return it*/            return retval;        }    }    /**     * removes a specific object from the queue.     * the object is matched up using the Object.equals method.     *     * @param obj the actual object to be removed from the queue     */    public void removeElement(Object obj) throws QueueClosedException {        boolean removed;        if(obj == null) return;	        /*lock the queue*/        synchronized(mutex) {            removed=l.remove(obj);            if(!removed)                if(log.isWarnEnabled()) log.warn("element " + obj + " was not found in the queue");        }    }    /**     * returns the first object on the queue, without removing it.     * If the queue is empty this object blocks until the first queue object has     * been added     *     * @return the first object on the queue     */    public Object peek() throws QueueClosedException {        Object retval=null;        synchronized(mutex) {            while(l.size() == 0) {                if(closed)                    throw new QueueClosedException();                try {                    mutex.wait();                }                catch(InterruptedException ex) {                }            }            if(closed)                throw new QueueClosedException();            try {                retval=l.getFirst();            }            catch(NoSuchElementException ex) {                if(log.isErrorEnabled()) log.error("retval == null, size()=" + l.size());                return null;            }        }        if(retval == endMarker) {            close(false); // mark queue as closed            throw new QueueClosedException();        }        return retval;    }    /**     * returns the first object on the queue, without removing it.     * If the queue is empty this object blocks until the first queue object has     * been added or the operation times out     *     * @param timeout how long in milli seconds will this operation wait for an object to be added to the queue     *                before it times out     * @return the first object on the queue     */    public Object peek(long timeout) throws QueueClosedException, TimeoutException {        Object retval=null;        synchronized(mutex) {            if(l.size() == 0) {                if(closed)                    throw new QueueClosedException();                try {                    mutex.wait(timeout);                }                catch(InterruptedException ex) {                }            }            if(closed)                throw new QueueClosedException();            try {                retval=l.getFirst();            }            catch(NoSuchElementException ex) {                /*null result means we timed out*/                throw new TimeoutException();            }            if(retval == endMarker) {                close(false);                throw new QueueClosedException();            }            return retval;        }    }    /**     * Marks the queues as closed. When an <code>add</code> or <code>remove</code> operation is     * attempted on a closed queue, an exception is thrown.     *     * @param flush_entries When true, a end-of-entries marker is added to the end of the queue.     *                      Entries may be added and removed, but when the end-of-entries marker     *                      is encountered, the queue is marked as closed. This allows to flush     *                      pending messages before closing the queue.     */    public void close(boolean flush_entries) {        if(flush_entries) {            try {                add(endMarker); // add an end-of-entries marker to the end of the queue                num_markers++;            }            catch(QueueClosedException closed) {            }            return;        }        synchronized(mutex) {            closed=true;            try {                mutex.notifyAll();            }            catch(Exception e) {                if(log.isErrorEnabled()) log.error("exception=" + e);            }        }    }    /**     * resets the queue.     * This operation removes all the objects in the queue and marks the queue open     */    public void reset() {        num_markers=0;        if(!closed)            close(false);        synchronized(mutex) {            l.clear();            closed=false;        }    }    /**     * returns the number of objects that are currently in the queue     */    public int size() {        return l.size() - num_markers;    }    /**     * prints the size of the queue     */    public String toString() {        return "LinkedListQueue (" + size() + ") messages [closed=" + closed + ']';    }    /**     * returns a vector with all the objects currently in the queue     */    public Vector getContents() {        Vector retval=new Vector();        synchronized(mutex) {            for(Iterator it=l.iterator(); it.hasNext();) {                retval.addElement(it.next());            }        }        return retval;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?