synchronousqueue.java
来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 708 行 · 第 1/2 页
JAVA
708 行
}
/**
* Inserts the specified element into this queue, waiting if necessary
* up to the specified wait time for another thread to receive it.
* @param o the element to add
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return <tt>true</tt> if successful, or <tt>false</tt> if
* the specified waiting time elapses before a consumer appears.
* @throws InterruptedException if interrupted while waiting.
* @throws NullPointerException if the specified element is <tt>null</tt>.
*/
public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException {
if (o == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
final ReentrantLock qlock = this.qlock;
for (;;) {
Node node;
boolean mustWait;
if (Thread.interrupted()) throw new InterruptedException();
qlock.lock();
try {
node = waitingConsumers.deq();
if ( (mustWait = (node == null)) )
node = waitingProducers.enq(o);
} finally {
qlock.unlock();
}
if (mustWait)
return node.waitForTake(nanos);
else if (node.setItem(o))
return true;
// else consumer cancelled, so retry
}
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* for another thread to insert it.
* @throws InterruptedException if interrupted while waiting.
* @return the head of this queue
*/
public Object take() throws InterruptedException {
final ReentrantLock qlock = this.qlock;
for (;;) {
Node node;
boolean mustWait;
if (Thread.interrupted()) throw new InterruptedException();
qlock.lock();
try {
node = waitingProducers.deq();
if ( (mustWait = (node == null)) )
node = waitingConsumers.enq(null);
} finally {
qlock.unlock();
}
if (mustWait) {
Object x = node.waitForPut();
return (Object)x;
}
else {
Object x = node.getItem();
if (x != null)
return (Object)x;
// else cancelled, so retry
}
}
}
/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time, for another thread
* to insert it.
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException if interrupted while waiting.
*/
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock qlock = this.qlock;
for (;;) {
Node node;
boolean mustWait;
if (Thread.interrupted()) throw new InterruptedException();
qlock.lock();
try {
node = waitingProducers.deq();
if ( (mustWait = (node == null)) )
node = waitingConsumers.enq(null);
} finally {
qlock.unlock();
}
if (mustWait) {
Object x = node.waitForPut(nanos);
return (Object)x;
}
else {
Object x = node.getItem();
if (x != null)
return (Object)x;
// else cancelled, so retry
}
}
}
// Untimed nonblocking versions
/**
* Inserts the specified element into this queue, if another thread is
* waiting to receive it.
*
* @param o the element to add.
* @return <tt>true</tt> if it was possible to add the element to
* this queue, else <tt>false</tt>
* @throws NullPointerException if the specified element is <tt>null</tt>
*/
public boolean offer(Object o) {
if (o == null) throw new NullPointerException();
final ReentrantLock qlock = this.qlock;
for (;;) {
Node node;
qlock.lock();
try {
node = waitingConsumers.deq();
} finally {
qlock.unlock();
}
if (node == null)
return false;
else if (node.setItem(o))
return true;
// else retry
}
}
/**
* Retrieves and removes the head of this queue, if another thread
* is currently making an element available.
*
* @return the head of this queue, or <tt>null</tt> if no
* element is available.
*/
public Object poll() {
final ReentrantLock qlock = this.qlock;
for (;;) {
Node node;
qlock.lock();
try {
node = waitingProducers.deq();
} finally {
qlock.unlock();
}
if (node == null)
return null;
else {
Object x = node.getItem();
if (x != null)
return (Object)x;
// else retry
}
}
}
/**
* Always returns <tt>true</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @return <tt>true</tt>
*/
public boolean isEmpty() {
return true;
}
/**
* Always returns zero.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @return zero.
*/
public int size() {
return 0;
}
/**
* Always returns zero.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @return zero.
*/
public int remainingCapacity() {
return 0;
}
/**
* Does nothing.
* A <tt>SynchronousQueue</tt> has no internal capacity.
*/
public void clear() {}
/**
* Always returns <tt>false</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @param o the element
* @return <tt>false</tt>
*/
public boolean contains(Object o) {
return false;
}
/**
* Always returns <tt>false</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
*
* @param o the element to remove
* @return <tt>false</tt>
*/
public boolean remove(Object o) {
return false;
}
/**
* Returns <tt>false</tt> unless given collection is empty.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @param c the collection
* @return <tt>false</tt> unless given collection is empty
*/
public boolean containsAll(Collection c) {
return c.isEmpty();
}
/**
* Always returns <tt>false</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @param c the collection
* @return <tt>false</tt>
*/
public boolean removeAll(Collection c) {
return false;
}
/**
* Always returns <tt>false</tt>.
* A <tt>SynchronousQueue</tt> has no internal capacity.
* @param c the collection
* @return <tt>false</tt>
*/
public boolean retainAll(Collection c) {
return false;
}
/**
* Always returns <tt>null</tt>.
* A <tt>SynchronousQueue</tt> does not return elements
* unless actively waited on.
* @return <tt>null</tt>
*/
public Object peek() {
return null;
}
static class EmptyIterator implements Iterator {
public boolean hasNext() {
return false;
}
public Object next() {
throw new NoSuchElementException();
}
public void remove() {
throw new IllegalStateException();
}
}
/**
* Returns an empty iterator in which <tt>hasNext</tt> always returns
* <tt>false</tt>.
*
* @return an empty iterator
*/
public Iterator iterator() {
return new EmptyIterator();
}
/**
* Returns a zero-length array.
* @return a zero-length array
*/
public Object[] toArray() {
return new Object[0];
}
/**
* Sets the zeroeth element of the specified array to <tt>null</tt>
* (if the array has non-zero length) and returns it.
* @param a the array
* @return the specified array
*/
public Object[] toArray(Object[] a) {
if (a.length > 0)
a[0] = null;
return a;
}
public int drainTo(Collection c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
Object e;
while ( (e = poll()) != null) {
c.add(e);
++n;
}
return n;
}
public int drainTo(Collection c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
int n = 0;
Object e;
while (n < maxElements && (e = poll()) != null) {
c.add(e);
++n;
}
return n;
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?