synchronousqueue.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 708 行 · 第 1/2 页

JAVA
708
字号
/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package edu.emory.mathcs.backport.java.util.concurrent;

import java.util.*;

import edu.emory.mathcs.backport.java.util.AbstractQueue;
import edu.emory.mathcs.backport.java.util.concurrent.locks.*;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;

/**
 * A {@linkplain BlockingQueue blocking queue} in which each
 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa.  A
 * synchronous queue does not have any internal capacity, not even a
 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
 * because an element is only present when you try to take it; you
 * cannot add an element (using any method) unless another thread is
 * trying to remove it; you cannot iterate as there is nothing to
 * iterate.  The <em>head</em> of the queue is the element that the
 * first queued thread is trying to add to the queue; if there are no
 * queued threads then no element is being added and the head is
 * <tt>null</tt>.  For purposes of other <tt>Collection</tt> methods
 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
 * as an empty collection.  This queue does not permit <tt>null</tt>
 * elements.
 *
 * <p>Synchronous queues are similar to rendezvous channels used in
 * CSP and Ada. They are well suited for handoff designs, in which an
 * object running in one thread must sync up with an object running
 * in another thread in order to hand it some information, event, or
 * task.
 *
 * <p> This class supports an optional fairness policy for ordering
 * waiting producer and consumer threads.  By default, this ordering
 * is not guaranteed. However, a queue constructed with fairness set
 * to <tt>true</tt> grants threads access in FIFO order. Fairness
 * generally decreases throughput but reduces variability and avoids
 * starvation.
 *
 * <p>This class and its iterator implement all of the
 * <em>optional</em> methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../guide/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class SynchronousQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;

    /*
      This implementation divides actions into two cases for puts:

      * An arriving producer that does not already have a waiting consumer
      creates a node holding item, and then waits for a consumer to take it.
      * An arriving producer that does already have a waiting consumer fills
      the slot node created by the consumer, and notifies it to continue.

      And symmetrically, two for takes:

      * An arriving consumer that does not already have a waiting producer
      creates an empty slot node, and then waits for a producer to fill it.
      * An arriving consumer that does already have a waiting producer takes
      item from the node created by the producer, and notifies it to continue.

      When a put or take waiting for the actions of its counterpart
      aborts due to interruption or timeout, it marks the node
      it created as "CANCELLED", which causes its counterpart to retry
      the entire put or take sequence.

      This requires keeping two simple queues, waitingProducers and
      waitingConsumers. Each of these can be FIFO (preserves fairness)
      or LIFO (improves throughput).
    */

    /** Lock protecting both wait queues */
    private final ReentrantLock qlock;
    /** Queue holding waiting puts */
    private final WaitQueue waitingProducers;
    /** Queue holding waiting takes */
    private final WaitQueue waitingConsumers;

    /**
     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
     * @param fair if true, threads contend in FIFO order for access;
     * otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
    }

    /**
     * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
     * These queues have all transient fields, but are serializable
     * in order to recover fairness settings when deserialized.
     */
    static abstract class WaitQueue implements java.io.Serializable {
        /** Create, add, and return node for x */
        abstract Node enq(Object x);
        /** Remove and return node, or null if empty */
        abstract Node deq();
    }

    /**
     * FIFO queue to hold waiting puts/takes.
     */
    static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
        private static final long serialVersionUID = -3623113410248163686L;
        private transient Node head;
        private transient Node last;

        Node enq(Object x) {
            Node p = new Node(x);
            if (last == null)
                last = head = p;
            else
                last = last.next = p;
            return p;
        }

        Node deq() {
            Node p = head;
            if (p != null) {
                if ((head = p.next) == null)
                    last = null;
                p.next = null;
            }
            return p;
        }
    }

    /**
     * LIFO queue to hold waiting puts/takes.
     */
    static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
        private static final long serialVersionUID = -3633113410248163686L;
        private transient Node head;

        Node enq(Object x) {
            return head = new Node(x, head);
        }

        Node deq() {
            Node p = head;
            if (p != null) {
                head = p.next;
                p.next = null;
            }
            return p;
        }
    }

    /**
     * Nodes each maintain an item and handle waits and signals for
     * getting and setting it. The class extends
     * AbstractQueuedSynchronizer to manage blocking, using AQS state
     *  0 for waiting, 1 for ack, -1 for cancelled.
     */
    static final class Node {
        /** Synchronization state value representing that node acked */
        private static final int ACK    =  1;
        /** Synchronization state value representing that node cancelled */
        private static final int CANCEL = -1;

        int state = 0;

        /** The item being transferred */
        Object item;
        /** Next node in wait queue */
        Node next;

        /** Creates a node with initial item */
        Node(Object x) { item = x; }

        /** Creates a node with initial item and next */
        Node(Object x, Node n) { item = x; next = n; }

        /**
         * Takes item and nulls out field (for sake of GC)
         *
         * PRE: lock owned
         */
        private Object extract() {
            Object x = item;
            item = null;
            return x;
        }

        /**
         * Tries to cancel on interrupt; if so rethrowing,
         * else setting interrupt state
         *
         * PRE: lock owned
         */
        private void checkCancellationOnInterrupt(InterruptedException ie)
            throws InterruptedException
        {
            if (state == 0) {
                state = CANCEL;
                notify();
                throw ie;
            }
            Thread.currentThread().interrupt();
        }

        /**
         * Fills in the slot created by the consumer and signal consumer to
         * continue.
         */
        synchronized boolean setItem(Object x) {
            if (state != 0) return false;
            item = x;
            state = ACK;
            notify();
            return true;
        }

        /**
         * Removes item from slot created by producer and signal producer
         * to continue.
         */
        synchronized Object getItem() {
            if (state != 0) return null;
            state = ACK;
            notify();
            return extract();
        }

        /**
         * Waits for a consumer to take item placed by producer.
         */
        synchronized void waitForTake() throws InterruptedException {
            try {
                while (state == 0) wait();
            } catch (InterruptedException ie) {
                checkCancellationOnInterrupt(ie);
            }
        }

        /**
         * Waits for a producer to put item placed by consumer.
         */
        synchronized Object waitForPut() throws InterruptedException {
            try {
                while (state == 0) wait();
            } catch (InterruptedException ie) {
                checkCancellationOnInterrupt(ie);
            }
            return extract();
        }

        private boolean attempt(long nanos) throws InterruptedException {
            if (state != 0) return true;
            if (nanos <= 0) {
                state = CANCEL;
                notify();
                return false;
            }
            long deadline = Utils.nanoTime() + nanos;
            while (true) {
                TimeUnit.NANOSECONDS.timedWait(this, nanos);
                if (state != 0) return true;
                nanos = deadline - Utils.nanoTime();
                if (nanos <= 0) {
                    state = CANCEL;
                    notify();
                    return false;
                }
            }
        }

        /**
         * Waits for a consumer to take item placed by producer or time out.
         */
        synchronized boolean waitForTake(long nanos) throws InterruptedException {
            try {
                if (!attempt(nanos)) return false;
            } catch (InterruptedException ie) {
                checkCancellationOnInterrupt(ie);
            }
            return true;
        }

        /**
         * Waits for a producer to put item placed by consumer, or time out.
         */
        synchronized Object waitForPut(long nanos) throws InterruptedException {
            try {
                if (!attempt(nanos)) return null;
            } catch (InterruptedException ie) {
                checkCancellationOnInterrupt(ie);
            }
            return extract();
        }
    }

    /**
     * Adds the specified element to this queue, waiting if necessary for
     * another thread to receive it.
     * @param o the element to add
     * @throws InterruptedException if interrupted while waiting.
     * @throws NullPointerException if the specified element is <tt>null</tt>.
     */
    public void put(Object o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            boolean mustWait;
            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingConsumers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingProducers.enq(o);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                node.waitForTake();
                return;
            }

            else if (node.setItem(o))
                return;

            // else consumer cancelled, so retry
        }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?