linkedblockingqueue.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 735 行 · 第 1/2 页

JAVA
735
字号
/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain. Use, modify, and
 * redistribute this code in any way without acknowledgement.
 */

package edu.emory.mathcs.backport.java.util.concurrent;

import java.util.*;

import edu.emory.mathcs.backport.java.util.AbstractQueue;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;

/**
 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
 * linked nodes.
 * This queue orders elements FIFO (first-in-first-out).
 * The <em>head</em> of the queue is that element that has been on the
 * queue the longest time.
 * The <em>tail</em> of the queue is that element that has been on the
 * queue the shortest time. New elements
 * are inserted at the tail of the queue, and the queue retrieval
 * operations obtain elements at the head of the queue.
 * Linked queues typically have higher throughput than array-based queues but
 * less predictable performance in most concurrent applications.
 *
 * <p> The optional capacity bound constructor argument serves as a
 * way to prevent excessive queue expansion. The capacity, if unspecified,
 * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
 * dynamically created upon each insertion unless this would bring the
 * queue above capacity.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../guide/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 *
 **/
public class LinkedBlockingQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    class LinkedNode {
        Object value;
        LinkedNode next;
        LinkedNode() {}
        LinkedNode(Object x) { value = x; }
        LinkedNode(Object x, LinkedNode n) { value = x; next = n; }
    }

    /**
     * Dummy header node of list. The first actual node, if it exists, is always
     * at head_.next. After each take, the old first node becomes the head.
     **/
    protected transient LinkedNode head_;

    /**
     * The last node of list. Put() appends to list, so modifies last_
     **/
    protected transient LinkedNode last_;

    private static class Guard implements java.io.Serializable {}

    /**
     * Helper monitor. Ensures that only one put at a time executes.
     **/

    protected final Object putGuard_ = new Guard();

    /**
     * Helper monitor. Protects and provides wait queue for takes
     **/

    protected final Object takeGuard_ = new Guard();


    /** Number of elements allowed **/
    protected int capacity_;


    /**
     * One side of a split permit count.
     * The counts represent permits to do a put. (The queue is full when zero).
     * Invariant: putSidePutPermits_ + takeSidePutPermits_ = capacity_ - length.
     * (The length is never separately recorded, so this cannot be
     * checked explicitly.)
     * To minimize contention between puts and takes, the
     * put side uses up all of its permits before transfering them from
     * the take side. The take side just increments the count upon each take.
     * Thus, most puts and take can run independently of each other unless
     * the queue is empty or full.
     * Initial value is queue capacity.
     **/

    protected transient int putSidePutPermits_;

    /** Number of takes since last reconcile **/
    protected transient int takeSidePutPermits_ = 0;


    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue.
     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
     *         than zero.
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        capacity_ = capacity;
        putSidePutPermits_ = capacity;
        head_ =  new LinkedNode(null);
        last_ = head_;
    }

    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
     * {@link Integer#MAX_VALUE}, initially containing the elements of the
     * given collection,
     * added in traversal order of the collection's iterator.
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if <tt>c</tt> or any element within it
     * is <tt>null</tt>
     */
    public LinkedBlockingQueue(Collection c) {
        this(Integer.MAX_VALUE);
        for (Iterator it = c.iterator(); it.hasNext();)
            add(it.next());
    }

    /**
     * Move put permits from take side to put side;
     * return the number of put side permits that are available.
     * Call only under synch on puGuard_ AND this.
     **/
    protected final int reconcilePutPermits() {
      putSidePutPermits_ += takeSidePutPermits_;
      takeSidePutPermits_ = 0;
      return putSidePutPermits_;
    }

    // this doc comment is overridden to remove the reference to collections
    // greater in size than Integer.MAX_VALUE
    /**
     * Returns the number of elements in this queue.
     *
     * @return  the number of elements in this queue.
     */
    public synchronized int size() {
        /*
          This should ideally synch on putGuard_, but
          doing so would cause it to block waiting for an in-progress
          put, which might be stuck. So we instead use whatever
          value of putSidePutPermits_ that we happen to read.
         */
        return capacity_ - (takeSidePutPermits_ + putSidePutPermits_);
    }

    // this doc comment is a modified copy of the inherited doc comment,
    // without the reference to unlimited queues.
    /**
     * Returns the number of elements that this queue can ideally (in
     * the absence of memory or resource constraints) accept without
     * blocking. This is always equal to the initial capacity of this queue
     * less the current <tt>size</tt> of this queue.
     * <p>Note that you <em>cannot</em> always tell if
     * an attempt to <tt>add</tt> an element will succeed by
     * inspecting <tt>remainingCapacity</tt> because it may be the
     * case that a waiting consumer is ready to <tt>take</tt> an
     * element out of an otherwise full queue.
     */
    public int remainingCapacity() {
        return capacity_ - size();
    }

    /** Notify a waiting take if needed **/
    protected final void allowTake() {
        synchronized (takeGuard_) {
            takeGuard_.notify();
        }
    }

    /**
     * Create and insert a node.
     * Call only under synch on putGuard_
     **/
    protected void insert(Object x) {
        --putSidePutPermits_;
        LinkedNode p = new LinkedNode(x);
        synchronized (last_) {
            last_.next = p;
            last_ = p;
        }
    }

    /**
     * Adds the specified element to the tail of this queue, waiting if
     * necessary for space to become available.
     * @param o the element to add
     * @throws InterruptedException if interrupted while waiting.
     * @throws NullPointerException if the specified element is <tt>null</tt>.
     */
    public void put(Object o) throws InterruptedException {
        if (o == null)
            throw new NullPointerException();
        if (Thread.interrupted())
            throw new InterruptedException();

        synchronized (putGuard_) {

            if (putSidePutPermits_ <= 0) { // wait for permit.
                synchronized (this) {
                    if (reconcilePutPermits() <= 0) {
                        try {
                            for (; ; ) {
                                wait();
                                if (reconcilePutPermits() > 0) {
                                    break;
                                }
                            }
                        }
                        catch (InterruptedException ex) {
                            notify();
                            throw ex;
                        }
                    }
                }
            }
            insert(o);
        }
        // call outside of lock to loosen put/take coupling
        allowTake();
    }

    /**
     * Inserts the specified element at the tail of this queue, waiting if
     * necessary up to the specified wait time for space to become available.
     * @param o the element to add
     * @param timeout how long to wait before giving up, in units of
     * <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     * <tt>timeout</tt> parameter
     * @return <tt>true</tt> if successful, or <tt>false</tt> if
     * the specified waiting time elapses before space is available.
     * @throws InterruptedException if interrupted while waiting.
     * @throws NullPointerException if the specified element is <tt>null</tt>.
     */
    public boolean offer(Object o, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (o == null) throw new IllegalArgumentException();
        if (Thread.interrupted()) throw new InterruptedException();
        long nanos = unit.toNanos(timeout);

        synchronized (putGuard_) {

            if (putSidePutPermits_ <= 0) {
                synchronized (this) {
                    if (reconcilePutPermits() <= 0) {
                        if (nanos <= 0)
                            return false;
                        else {
                            try {
                                long deadline = Utils.nanoTime() + nanos;
                                for (; ; ) {
                                    TimeUnit.NANOSECONDS.timedWait(this, nanos);
                                    if (reconcilePutPermits() > 0) {
                                        break;
                                    }
                                    else {
                                        nanos = deadline - Utils.nanoTime();
                                        if (nanos <= 0) {
                                            return false;
                                        }
                                    }
                                }
                            }
                            catch (InterruptedException ex) {
                                notify();
                                throw ex;
                            }
                        }
                    }
                }
            }

            insert(o);
        }

        allowTake();
        return true;
    }

    /**
     * Inserts the specified element at the tail of this queue if possible,
     * returning immediately if this queue is full.
     *
     * @param o the element to add.
     * @return <tt>true</tt> if it was possible to add the element to
     *         this queue, else <tt>false</tt>
     * @throws NullPointerException if the specified element is <tt>null</tt>
     */
    public boolean offer(Object o) {
        if (o == null) throw new NullPointerException();
        synchronized (putGuard_) {
            if (putSidePutPermits_ <= 0) {
                synchronized (this) {
                    if (reconcilePutPermits() <= 0) return false;
                }
            }
            insert(o);
        }

        allowTake();
        return true;
    }

    /** Main mechanics for take/poll **/
    protected synchronized Object extract() {
        synchronized (head_) {
            Object x = null;
            LinkedNode first = head_.next;
            if (first != null) {
                x = first.value;
                first.value = null;
                head_ = first;
                ++takeSidePutPermits_;
                notify();
            }
            return x;
        }
    }

    public Object take() throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        Object x = extract();
        if (x != null)
            return x;
        else {
            synchronized (takeGuard_) {
                try {
                    for (; ; ) {
                        x = extract();
                        if (x != null) {
                            return x;
                        }
                        else {
                            takeGuard_.wait();
                        }
                    }
                }
                catch (InterruptedException ex) {
                    takeGuard_.notify();
                    throw ex;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?