linkedblockingqueue.java
来自「SRI international 发布的OAA框架软件」· Java 代码 · 共 735 行 · 第 1/2 页
JAVA
735 行
}
}
}
}
public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Object x = extract();
if (x != null)
return x;
else {
long nanos = unit.toNanos(timeout);
synchronized (takeGuard_) {
try {
long deadline = Utils.nanoTime() + nanos;
for (; ; ) {
x = extract();
if (x != null || nanos <= 0) {
return x;
}
else {
TimeUnit.NANOSECONDS.timedWait(takeGuard_, nanos);
nanos = deadline - Utils.nanoTime();
}
}
}
catch (InterruptedException ex) {
takeGuard_.notify();
throw ex;
}
}
}
}
public Object poll() {
return extract();
}
public Object peek() {
synchronized (head_) {
LinkedNode first = head_.next;
if (first != null)
return first.value;
else
return null;
}
}
/**
* Removes a single instance of the specified element from this
* queue, if it is present.
*/
public boolean remove(Object o) {
if (o == null) return false;
boolean removed = false;
synchronized (putGuard_) {
synchronized (takeGuard_) {
synchronized (this) {
synchronized (head_) {
LinkedNode trail = head_;
LinkedNode p = head_.next;
while (p != null) {
if (o.equals(p.value)) {
removed = true;
break;
}
trail = p;
p = p.next;
}
if (removed) {
p.value = null;
trail.next = p.next;
++takeSidePutPermits_;
notify();
}
}
}
}
}
return removed;
}
public Object[] toArray() {
synchronized (putGuard_) {
synchronized (takeGuard_) {
int size = size();
Object[] a = new Object[size];
int k = 0;
for (LinkedNode p = head_.next; p != null; p = p.next)
a[k++] = p.value;
return a;
}
}
}
public Object[] toArray(Object[] a) {
synchronized (putGuard_) {
synchronized (takeGuard_) {
int size = size();
if (a.length < size)
a = (Object[]) java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (LinkedNode p = head_.next; p != null; p = p.next)
a[k++] = p.value;
return a;
}
}
}
public String toString() {
synchronized (putGuard_) {
synchronized (takeGuard_) {
return super.toString();
}
}
}
/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
LinkedNode node;
LinkedNode tail;
int count = 0;
synchronized (this) {
int sizeBound = this.size();
node = head_;
while (count < sizeBound) {
synchronized (node) {
node.value = null;
if (node.next == null) break;
node = node.next;
}
count++;
}
head_ = node;
takeSidePutPermits_ += count;
notify();
}
}
public int drainTo(Collection c) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
LinkedNode first;
LinkedNode tail;
int count = 0;
synchronized (this) {
int maxElements = this.size();
first = head_;
tail = head_;
while (count < maxElements) {
synchronized (tail) {
if (tail.next == null) break;
tail = tail.next;
}
count++;
}
head_ = tail;
takeSidePutPermits_ += count;
notify();
}
// Transfer the elements outside of locks
LinkedNode p = first;
while (p != tail) {
p = p.next;
c.add(p.value);
p.value = null;
}
return count;
}
public int drainTo(Collection c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
LinkedNode first;
LinkedNode tail;
int count = 0;
synchronized (this) {
int sizeBound = this.size();
if (maxElements > sizeBound) {
maxElements = sizeBound;
}
first = head_;
tail = head_;
while (count < maxElements) {
synchronized (tail) {
if (tail.next == null) break;
tail = tail.next;
}
count++;
}
head_ = tail;
takeSidePutPermits_ += count;
notify();
}
// Transfer the elements outside of locks
LinkedNode p = first;
while (p != tail) {
p = p.next;
c.add(p.value);
p.value = null;
}
return count;
}
/**
* Returns an iterator over the elements in this queue in proper sequence.
* The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
* will never throw {@link java.util.ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence.
*/
public Iterator iterator() {
return new Itr();
}
private class Itr implements Iterator {
/*
* Basic weak-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private LinkedNode current;
private LinkedNode lastRet;
private Object currentElement;
Itr() {
synchronized (LinkedBlockingQueue.this) {
synchronized (head_) {
current = head_.next;
if (current != null)
currentElement = current.value;
}
}
}
public boolean hasNext() {
return current != null;
}
public Object next() {
synchronized (LinkedBlockingQueue.this) {
if (current == null)
throw new NoSuchElementException();
synchronized (current) {
Object x = currentElement;
lastRet = current;
current = current.next;
if (current != null)
currentElement = current.value;
return x;
}
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
synchronized (LinkedBlockingQueue.this) {
LinkedNode node = lastRet;
lastRet = null;
LinkedNode trail = head_;
while (trail != null) {
synchronized (trail) {
if (trail.next == node) {
synchronized (node) {
trail.next = node.next;
node.value = null;
takeSidePutPermits_++;
LinkedBlockingQueue.this.notify();
break;
}
}
trail = trail.next;
}
}
}
}
}
/**
* Save the state to a stream (that is, serialize it).
*
* @serialData The capacity is emitted (int), followed by all of
* its elements (each an <tt>Object</tt>) in the proper order,
* followed by a null
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
synchronized (LinkedBlockingQueue.this) {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();
// Write out all elements in the proper order.
LinkedNode p;
synchronized (head_) {
p = head_.next;
}
while (true) {
if (p == null) {
// Use trailing null as sentinel
s.writeObject(null);
break;
}
else {
synchronized (p) {
s.writeObject(p.value);
p = p.next;
}
}
}
}
}
/**
* Reconstitute this queue instance from a stream (that is,
* deserialize it).
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
synchronized (this) {
// Read in capacity, and any hidden stuff
s.defaultReadObject();
putSidePutPermits_ = capacity_;
takeSidePutPermits_ = 0;
last_ = head_ = new LinkedNode(null);
}
// Read in all elements and place in queue
for (;;) {
Object item = (Object)s.readObject();
if (item == null)
break;
add(item);
}
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?