synchronousqueue.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 708 行 · 第 1/2 页

JAVA
708
字号
    }

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * up to the specified wait time for another thread to receive it.
     * @param o the element to add
     * @param timeout how long to wait before giving up, in units of
     * <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     * <tt>timeout</tt> parameter
     * @return <tt>true</tt> if successful, or <tt>false</tt> if
     * the specified waiting time elapses before a consumer appears.
     * @throws InterruptedException if interrupted while waiting.
     * @throws NullPointerException if the specified element is <tt>null</tt>.
     */
    public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            boolean mustWait;
            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingConsumers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingProducers.enq(o);
            } finally {
                qlock.unlock();
            }

            if (mustWait)
                return node.waitForTake(nanos);

            else if (node.setItem(o))
                return true;

            // else consumer cancelled, so retry
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * for another thread to insert it.
     * @throws InterruptedException if interrupted while waiting.
     * @return the head of this queue
     */
    public Object take() throws InterruptedException {
        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            boolean mustWait;

            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingProducers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingConsumers.enq(null);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                Object x = node.waitForPut();
                return (Object)x;
            }
            else {
                Object x = node.getItem();
                if (x != null)
                    return (Object)x;
                // else cancelled, so retry
            }
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time, for another thread
     * to insert it.
     * @param timeout how long to wait before giving up, in units of
     * <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     * <tt>timeout</tt> parameter
     * @return the head of this queue, or <tt>null</tt> if the
     * specified waiting time elapses before an element is present.
     * @throws InterruptedException if interrupted while waiting.
     */
    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            boolean mustWait;

            if (Thread.interrupted()) throw new InterruptedException();
            qlock.lock();
            try {
                node = waitingProducers.deq();
                if ( (mustWait = (node == null)) )
                    node = waitingConsumers.enq(null);
            } finally {
                qlock.unlock();
            }

            if (mustWait) {
                Object x = node.waitForPut(nanos);
                return (Object)x;
            }
            else {
                Object x = node.getItem();
                if (x != null)
                    return (Object)x;
                // else cancelled, so retry
            }
        }
    }

    // Untimed nonblocking versions

   /**
    * Inserts the specified element into this queue, if another thread is
    * waiting to receive it.
    *
    * @param o the element to add.
    * @return <tt>true</tt> if it was possible to add the element to
    *         this queue, else <tt>false</tt>
    * @throws NullPointerException if the specified element is <tt>null</tt>
    */
    public boolean offer(Object o) {
        if (o == null) throw new NullPointerException();
        final ReentrantLock qlock = this.qlock;

        for (;;) {
            Node node;
            qlock.lock();
            try {
                node = waitingConsumers.deq();
            } finally {
                qlock.unlock();
            }
            if (node == null)
                return false;

            else if (node.setItem(o))
                return true;
            // else retry
        }
    }

    /**
     * Retrieves and removes the head of this queue, if another thread
     * is currently making an element available.
     *
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public Object poll() {
        final ReentrantLock qlock = this.qlock;
        for (;;) {
            Node node;
            qlock.lock();
            try {
                node = waitingProducers.deq();
            } finally {
                qlock.unlock();
            }
            if (node == null)
                return null;

            else {
                Object x = node.getItem();
                if (x != null)
                    return (Object)x;
                // else retry
            }
        }
    }

    /**
     * Always returns <tt>true</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @return <tt>true</tt>
     */
    public boolean isEmpty() {
        return true;
    }

    /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @return zero.
     */
    public int size() {
        return 0;
    }

    /**
     * Always returns zero.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @return zero.
     */
    public int remainingCapacity() {
        return 0;
    }

    /**
     * Does nothing.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     */
    public void clear() {}

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @param o the element
     * @return <tt>false</tt>
     */
    public boolean contains(Object o) {
        return false;
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element to remove
     * @return <tt>false</tt>
     */
    public boolean remove(Object o) {
        return false;
    }

    /**
     * Returns <tt>false</tt> unless given collection is empty.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @param c the collection
     * @return <tt>false</tt> unless given collection is empty
     */
    public boolean containsAll(Collection c) {
        return c.isEmpty();
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean removeAll(Collection c) {
        return false;
    }

    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     * @param c the collection
     * @return <tt>false</tt>
     */
    public boolean retainAll(Collection c) {
        return false;
    }

    /**
     * Always returns <tt>null</tt>.
     * A <tt>SynchronousQueue</tt> does not return elements
     * unless actively waited on.
     * @return <tt>null</tt>
     */
    public Object peek() {
        return null;
    }


    static class EmptyIterator implements Iterator {
        public boolean hasNext() {
            return false;
        }
        public Object next() {
            throw new NoSuchElementException();
        }
        public void remove() {
            throw new IllegalStateException();
        }
    }

    /**
     * Returns an empty iterator in which <tt>hasNext</tt> always returns
     * <tt>false</tt>.
     *
     * @return an empty iterator
     */
    public Iterator iterator() {
        return new EmptyIterator();
    }


    /**
     * Returns a zero-length array.
     * @return a zero-length array
     */
    public Object[] toArray() {
        return new Object[0];
    }

    /**
     * Sets the zeroeth element of the specified array to <tt>null</tt>
     * (if the array has non-zero length) and returns it.
     * @param a the array
     * @return the specified array
     */
    public Object[] toArray(Object[] a) {
        if (a.length > 0)
            a[0] = null;
        return a;
    }


    public int drainTo(Collection c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        Object e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

    public int drainTo(Collection c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        Object e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }
}





⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?