linkedblockingqueue.java

来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 735 行 · 第 1/2 页

JAVA
735
字号
                }
            }
        }
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        Object x = extract();
        if (x != null)
            return x;
        else {
            long nanos = unit.toNanos(timeout);
            synchronized (takeGuard_) {
                try {
                    long deadline = Utils.nanoTime() + nanos;
                    for (; ; ) {
                        x = extract();
                        if (x != null || nanos <= 0) {
                            return x;
                        }
                        else {
                            TimeUnit.NANOSECONDS.timedWait(takeGuard_, nanos);
                            nanos = deadline - Utils.nanoTime();
                        }
                    }
                }
                catch (InterruptedException ex) {
                    takeGuard_.notify();
                    throw ex;
                }
            }
        }
    }

    public Object poll() {
        return extract();
    }


    public Object peek() {
        synchronized (head_) {
            LinkedNode first = head_.next;
            if (first != null)
                return first.value;
            else
                return null;
        }
    }

    /**
     * Removes a single instance of the specified element from this
     * queue, if it is present.
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        boolean removed = false;

        synchronized (putGuard_) {
            synchronized (takeGuard_) {
                synchronized (this) {
                    synchronized (head_) {
                        LinkedNode trail = head_;
                        LinkedNode p = head_.next;
                        while (p != null) {
                            if (o.equals(p.value)) {
                                removed = true;
                                break;
                            }
                            trail = p;
                            p = p.next;
                        }
                        if (removed) {
                            p.value = null;
                            trail.next = p.next;
                            ++takeSidePutPermits_;
                            notify();
                        }
                    }
                }
            }
        }

        return removed;
    }

    public Object[] toArray() {
        synchronized (putGuard_) {
            synchronized (takeGuard_) {
                int size = size();
                Object[] a = new Object[size];
                int k = 0;
                for (LinkedNode p = head_.next; p != null; p = p.next)
                    a[k++] = p.value;
                return a;
            }
        }
    }

    public Object[] toArray(Object[] a) {
        synchronized (putGuard_) {
            synchronized (takeGuard_) {
                int size = size();
                if (a.length < size)
                    a = (Object[]) java.lang.reflect.Array.newInstance
                        (a.getClass().getComponentType(), size);

                int k = 0;
                for (LinkedNode p = head_.next; p != null; p = p.next)
                    a[k++] = p.value;
                return a;
            }
        }
    }

    public String toString() {
        synchronized (putGuard_) {
            synchronized (takeGuard_) {
                return super.toString();
            }
        }
    }

    /**
     * Atomically removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     */
    public void clear() {
        LinkedNode node;
        LinkedNode tail;
        int count = 0;

        synchronized (this) {
            int sizeBound = this.size();
            node = head_;
            while (count < sizeBound) {
                synchronized (node) {
                    node.value = null;
                    if (node.next == null) break;
                    node = node.next;
                }
                count++;
            }
            head_ = node;
            takeSidePutPermits_ += count;
            notify();
        }
    }

    public int drainTo(Collection c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();

        LinkedNode first;
        LinkedNode tail;
        int count = 0;

        synchronized (this) {
            int maxElements = this.size();
            first = head_;
            tail = head_;
            while (count < maxElements) {
                synchronized (tail) {
                    if (tail.next == null) break;
                    tail = tail.next;
                }
                count++;
            }
            head_ = tail;
            takeSidePutPermits_ += count;
            notify();
        }

        // Transfer the elements outside of locks
        LinkedNode p = first;
        while (p != tail) {
            p = p.next;
            c.add(p.value);
            p.value = null;
        }
        return count;
    }

    public int drainTo(Collection c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;

        LinkedNode first;
        LinkedNode tail;
        int count = 0;

        synchronized (this) {
            int sizeBound = this.size();
            if (maxElements > sizeBound) {
                maxElements = sizeBound;
            }
            first = head_;
            tail = head_;
            while (count < maxElements) {
                synchronized (tail) {
                    if (tail.next == null) break;
                    tail = tail.next;
                }
                count++;
            }
            head_ = tail;
            takeSidePutPermits_ += count;
            notify();
        }

        // Transfer the elements outside of locks
        LinkedNode p = first;
        while (p != tail) {
            p = p.next;
            c.add(p.value);
            p.value = null;
        }
        return count;
    }

    /**
     * Returns an iterator over the elements in this queue in proper sequence.
     * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
     * will never throw {@link java.util.ConcurrentModificationException},
     * and guarantees to traverse elements as they existed upon
     * construction of the iterator, and may (but is not guaranteed to)
     * reflect any modifications subsequent to construction.
     *
     * @return an iterator over the elements in this queue in proper sequence.
     */
    public Iterator iterator() {
      return new Itr();
    }

    private class Itr implements Iterator {
        /*
         * Basic weak-consistent iterator.  At all times hold the next
         * item to hand out so that if hasNext() reports true, we will
         * still have it to return even if lost race with a take etc.
         */
        private LinkedNode current;
        private LinkedNode lastRet;
        private Object currentElement;

        Itr() {
            synchronized (LinkedBlockingQueue.this) {
                synchronized (head_) {
                    current = head_.next;
                    if (current != null)
                        currentElement = current.value;
                }
            }
        }

        public boolean hasNext() {
            return current != null;
        }

        public Object next() {
            synchronized (LinkedBlockingQueue.this) {
                if (current == null)
                    throw new NoSuchElementException();
                synchronized (current) {
                    Object x = currentElement;
                    lastRet = current;
                    current = current.next;
                    if (current != null)
                        currentElement = current.value;
                    return x;
                }
            }
        }

        public void remove() {
            if (lastRet == null)
                throw new IllegalStateException();
            synchronized (LinkedBlockingQueue.this) {
                LinkedNode node = lastRet;
                lastRet = null;
                LinkedNode trail = head_;
                while (trail != null) {
                    synchronized (trail) {
                        if (trail.next == node) {
                            synchronized (node) {
                                trail.next = node.next;
                                node.value = null;
                                takeSidePutPermits_++;
                                LinkedBlockingQueue.this.notify();
                                break;
                            }
                        }
                        trail = trail.next;
                    }
                }
            }
        }
    }

    /**
     * Save the state to a stream (that is, serialize it).
     *
     * @serialData The capacity is emitted (int), followed by all of
     * its elements (each an <tt>Object</tt>) in the proper order,
     * followed by a null
     * @param s the stream
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {

        synchronized (LinkedBlockingQueue.this) {

            // Write out any hidden stuff, plus capacity
            s.defaultWriteObject();

            // Write out all elements in the proper order.
            LinkedNode p;
            synchronized (head_) {
                p = head_.next;
            }
            while (true) {
                if (p == null) {
                    // Use trailing null as sentinel
                    s.writeObject(null);
                    break;
                }
                else {
                    synchronized (p) {
                        s.writeObject(p.value);
                        p = p.next;
                    }
                }
            }
        }
    }

    /**
     * Reconstitute this queue instance from a stream (that is,
     * deserialize it).
     * @param s the stream
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {

        synchronized (this) {
            // Read in capacity, and any hidden stuff
            s.defaultReadObject();

            putSidePutPermits_ = capacity_;
            takeSidePutPermits_ = 0;
            last_ = head_ = new LinkedNode(null);
        }

        // Read in all elements and place in queue
        for (;;) {
            Object item = (Object)s.readObject();
            if (item == null)
                break;
            add(item);
        }
    }
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?