📄 abstractqueuedsynchronizer.java
字号:
static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: Node is cancelled due to timeout or interrupt * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: Node is currently on a condition queue * It will not be used as a sync queue node until * transferred. (Use of this value here * has nothing to do with the other uses * of the field, but simplifies mechanics.) * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified only using * CAS. */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned once during enqueuing, and * nulled out (for sake of GC) when no longer needed. Upon * cancellation, we cannot adjust this field, but can notice * status and bypass the node if cancelled. The enq operation * does not assign next field of a predecessor until after * attachment, so seeing a null next field does not * necessarily mean that node is at end of queue. However, if * a next field appears to be null, we can scan prev's from * the tail to double-check. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if * null. Use when predecessor cannot be null. * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that * the actual value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // Queuing utilities /** * Insert node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Create and enq node for given thread and mode * @param current the thread * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Set head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * @param node the node */ private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } /** * Wake up node's successor, if one exists. * @param node the node */ private void unparkSuccessor(Node node) { /* * Try to clear status in anticipation of signalling. It is * OK if this fails or if status is changed by waiting thread. */ compareAndSetWaitStatus(node, Node.SIGNAL, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Thread thread; Node s = node.next; if (s != null && s.waitStatus <= 0) thread = s.thread; else { thread = null; for (s = tail; s != null && s != node; s = s.prev) if (s.waitStatus <= 0) thread = s.thread; } LockSupport.unpark(thread); } /** * Set head of queue, and check if successor may be waiting * in shared mode, if so propagating if propagate > 0. * @param pred the node holding waitStatus for node * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0) { /* * Don't bother fully figuring out successor. If it * looks null, call unparkSuccessor anyway to be safe. */ Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } // Utilities for various versions of acquire /** * Cancel an ongoing attempt to acquire. * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // Getting this before setting waitStatus ensures staleness Node predNext = pred.next; // Can use unconditional write instead of CAS here node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If "active" predecessor found... if (pred != head && (pred.waitStatus == Node.SIGNAL || compareAndSetWaitStatus(pred, 0, Node.SIGNAL)) && pred.thread != null) { // If successor is active, set predecessor's next link Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * @param pred node's predecessor holding status * @param node the node * @return true if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int s = pred.waitStatus; if (s < 0) /* * This node has already set status asking a release
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -