arrayblockingqueue.java
来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 702 行 · 第 1/2 页
JAVA
702 行
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package edu.emory.mathcs.backport.java.util.concurrent;
import java.util.*;
import edu.emory.mathcs.backport.java.util.AbstractQueue;
import edu.emory.mathcs.backport.java.util.concurrent.locks.*;
import edu.emory.mathcs.backport.java.util.concurrent.helpers.*;
/**
* A bounded {@linkplain BlockingQueue blocking queue} backed by an
* array. This queue orders elements FIFO (first-in-first-out). The
* <em>head</em> of the queue is that element that has been on the
* queue the longest time. The <em>tail</em> of the queue is that
* element that has been on the queue the shortest time. New elements
* are inserted at the tail of the queue, and the queue retrieval
* operations obtain elements at the head of the queue.
*
* <p>This is a classic "bounded buffer", in which a
* fixed-sized array holds elements inserted by producers and
* extracted by consumers. Once created, the capacity cannot be
* increased. Attempts to put an element to a full queue will
* result in the put operation blocking; attempts to retrieve an
* element from an empty queue will similarly block.
*
* <p> This class supports an optional fairness policy for ordering
* waiting producer and consumer threads. By default, this ordering
* is not guaranteed. However, a queue constructed with fairness set
* to <tt>true</tt> grants threads access in FIFO order. Fairness
* generally decreases throughput but reduces variability and avoids
* starvation.
*
* <p>This class and its iterator implement all of the
* <em>optional</em> methods of the {@link Collection} and {@link
* Iterator} interfaces.
*
* <p>This class is a member of the
* <a href="{@docRoot}/../guide/collections/index.html">
* Java Collections Framework</a>.
*
* @since 1.5
* @author Doug Lea
*/
public class ArrayBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;
/** The queued items */
private final Object[] items;
/** items index for next take, poll or remove */
private transient int takeIndex;
/** items index for next put, offer, or add. */
private transient int putIndex;
/** Number of items in the queue */
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
// Internal helper methods
/**
* Circularly increment i.
*/
final int inc(int i) {
return (++i == items.length)? 0 : i;
}
/**
* Insert element at current put position, advance, and signal.
* Call only when holding lock.
*/
private void insert(Object x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
/**
* Extract element at current take position, advance, and signal.
* Call only when holding lock.
*/
private Object extract() {
final Object[] items = this.items;
Object x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
/**
* Utility for remove and iterator.remove: Delete item at position i.
* Call only when holding lock.
*/
void removeAt(int i) {
final Object[] items = this.items;
// if removing front item, just advance
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// slide over all others up through putIndex.
for (;;) {
int nexti = inc(i);
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
putIndex = i;
break;
}
}
}
--count;
notFull.signal();
}
/**
* Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
* capacity and default access policy.
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
* capacity and the specified access policy.
* @param capacity the capacity of this queue
* @param fair if <tt>true</tt> then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order; if <tt>false</tt>
* the access order is unspecified.
* @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
* @param capacity the capacity of this queue
* @param fair if <tt>true</tt> then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order; if <tt>false</tt>
* the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if <tt>capacity</tt> is less than
* <tt>c.size()</tt>, or less than 1.
* @throws NullPointerException if <tt>c</tt> or any element within it
* is <tt>null</tt>
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator it = c.iterator(); it.hasNext();)
add(it.next());
}
/**
* Inserts the specified element at the tail of this queue if possible,
* returning immediately if this queue is full.
*
* @param o the element to add.
* @return <tt>true</tt> if it was possible to add the element to
* this queue, else <tt>false</tt>
* @throws NullPointerException if the specified element is <tt>null</tt>
*/
public boolean offer(Object o) {
if (o == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(o);
return true;
}
} finally {
lock.unlock();
}
}
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
* @param o the element to add
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return <tt>true</tt> if successful, or <tt>false</tt> if
* the specified waiting time elapses before space is available.
* @throws InterruptedException if interrupted while waiting.
* @throws NullPointerException if the specified element is <tt>null</tt>.
*/
public boolean offer(Object o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
long nanos = unit.toNanos(timeout);
long deadline = Utils.nanoTime() + nanos;
for (;;) {
if (count != items.length) {
insert(o);
return true;
}
if (nanos <= 0)
return false;
try {
notFull.await(nanos, TimeUnit.NANOSECONDS);
nanos = deadline - Utils.nanoTime();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
public Object poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)
return null;
Object x = extract();
return x;
} finally {
lock.unlock();
}
}
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
long nanos = unit.toNanos(timeout);
long deadline = Utils.nanoTime() + nanos;
for (;;) {
if (count != 0) {
Object x = extract();
return x;
}
if (nanos <= 0)
return null;
try {
notEmpty.await(nanos, TimeUnit.NANOSECONDS);
nanos = deadline - Utils.nanoTime();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present.
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int k = 0;
for (;;) {
if (k++ >= count)
return false;
if (o.equals(items[i])) {
removeAt(i);
return true;
}
i = inc(i);
}
} finally {
lock.unlock();
}
}
public Object peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : items[takeIndex];
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?