AQS
文章目录
【注意】最后更新于 March 15, 2017,文中内容可能已过时,请谨慎使用。
java.util.concurrent.locks.AbstractQueuedSynchronizer源码分析
简介:
-
AbstractQueuedSynchronizer(以下简称AQS)是Java并发包提供的一个同步基础机制,大部分的同步器((如:Lock,Semaphore、 CountDownLatch和FutureTask等))都是基于AQS实现。
-
AQS内部包含一个FIFO的同步队列,简单的说,没有成功获取控制权的线程会在这个队列中等待。同步等待队列是 “CLH”(Craig,Landin,和Hagersten)锁队列的变体。
-
AQS内部有一个int(32)的同步状态,同步状态按自己需求灵活使用,比如:ReentrantLock(记录锁重入次数),CountDownLatch(内部的count)和 Semaphore(许可数量)等。
-
AQS提供了独占和共享两种模式。在独占模式下,当一个线程获取了AQS的控制权,其他线程获取控制权的操作就会失败;但在 共享模式下,其他线程的获取控制权操作就可能成功。并发包中的同步机制如ReentrantLock就是典型的独占模式,Semaphore是 共享模式;也有同时使用两种模式的同步机制,如ReentrantReadWriteLock。
-
AQS内部提供了一个ConditionObject类来支持独占模式下的锁队列条件,这个条件的功能比Object的wait和notify/notifyAll的功 能强大。
一、同步队列
AQS依赖内部的同步等待队列(一个FIFO双向链表队列)来完成同步状态管理。同步队列是CLH锁队列的变体。
同步队列源码分析
|
|
AQS同步队列头节点和尾节点
|
|
二、同步状态
|
|
三、独占和共享模式
1.独占模式
(1).独占模式–忽略中断:acquire(int arg)
|
|
-
先调用tryAcquire(arg)方法,该方法由子类实现相应逻辑处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
/** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * * <p>The default * implementation throws {@link UnsupportedOperationException}. * * 在独占模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的 * 状态是否允许在独占模式下请求,如果允许再进行请求。 * * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到 * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放 * 操作唤醒。可以用来实现Lock的tryLock方法。 * * 该方法默认抛出UnsupportedOperationException异常。 * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
-
调用tryAcquire方法返回false,调用addWaiter(Node.EXCLUSIVE), arg)方法,把该节点添加到同步队列的尾部。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
/** * Creates and enqueues node for current thread and given mode. * 把Node当前线程和指定模式添加到同步队列中 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //创建一个Node节点 指定当前线程和指定(独享和共享)模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; //CAS 修改tail节点属性 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // tail节点为空或者CAS修改tail节点属性失败就调用enq方法 enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; //当head为空时,CAS创建一个空节点,为什么创建一个空节点?原因是有个线程正在执行中,无需加载到同步队列中,以空节点标识。 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { //注意:新节点添加到同步队列尾部 分为3个步骤: // 1.将其prev引用指向tail点 // 2.尝试CAS将其设置为tail点 // 3.CAS修改成功,将其prev节点(第2步之前的tail点)的next指向其本身。 // 4.CAS修改失败,失败原因有其他线程也修改tail节点,所以tail节点形成竞争条件。循环重新调用1.步骤 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
-
添加到同步队列成功后,调用acquireQueued
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //线程是否曾被中断是由这个变量记录的。 boolean interrupted = false; for (;;) { //获取node节点prev节点,为null,throw NPE final Node p = node.predecessor(); //node节点prev是头节点并且tryAcquire返回ture,把node节点设置为头节点,说明该node节点线程获取锁控制权 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //1.shouldParkAfterFailedAcquire方法检查node节点prev节点状态是否为SIGNAL? //2.是, 执行等待,调用LockSupport.park,等待LockSupport.unpark唤醒当前线程节点。 //3.否,循环执行上面操作。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * 1.检查当前节点的prev节点状态是否为SIGNAL * 2.前节点的prev节点状态是为CANCELLED,往prev节点的prev节点查询不为CANCELLED状态,在修改查找到prev节点的next节点。 * 3.prev节点状态是为0和PROPAGATE,设置为SIGNAL状态 * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果当前节点的prev节点的状态为SIGNAL,说明当前节点已经声明了需要唤醒, // 所以可以阻塞当前节点了,直接返回true。 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //如果当前节点的prev节点的状态为CANCELLED //往prev节点的prev节点查找,找到prev节点状态不为CANCELLED状态为止。 //node节点赋值查找不为CANCELLED状态prev节点的next节点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //这里等待状态一定是0或者PROPAGATE。这里将当前节点的前驱节点(非取消状态)的 // 等待状态设置为SIGNAL。来声明需要一个(唤醒)信号。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
-
当前节点线程中断时,调用cancelAcquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors //跳过prev节点的状态为CANCELLED Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. //predNext是拼接的结点,但(CAS操作)也可能会失败,因为可能存在其他"cancel"或者"singal"的竞争 Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果当前节点是tail节点,那么删除当前节点(将当前节点的prev节点设置为tail节点) if (node == tail && compareAndSetTail(node, pred)) { ////将prev节点(已经设置为tail节点)的next置null。 compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. // 如果prev节点不是head节点,并且prev节点状态SIGNAL,或者状态不为SIGNAL尝试CAS // 状态改成SIGNAL,并CAS修改pred节点next引用指向 // 其当前节点next节点。否则,唤醒后继节点。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //只要prev节点为head节点,直接唤醒该取消节点的next节点 unparkSuccessor(node); } node.next = node; // help GC } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //如果节点状态为空,CAS修改节点状态值,如果这失败了或status被其他等待线程修改也是没关系的。 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //是否当前节点next节点为空或者是CANCELLED状态? //是:就从tail节点迭代查找。 //否:就唤醒当前等待线程 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
-
当前线程释放独占的资源,调用release方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //tryRelease 有子类去实现相应逻辑,返回true,唤醒线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒head节点next节点 unparkSuccessor(h); return true; } return false; } /** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * 尝试设置(AQS的)状态,反映出独占模式下的一个释放动作。 * * 这个方法在线程释放(控制权)的时候被调用。 * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
独占模式-忽略中断acquire(int arg)调用流程如下:
(2).独占模式–响应中断的请求方法,这个方法会抛出中断异常:acquireInterruptibly(int arg)throws InterruptedException
|
|
(3).独占模式–响应中断并且支持超时的请求方法:tryAcquireNanos(int arg, long nanosTimeout)
|
|
2.共享模式
(1).共享模式–忽略中断的请求方法:acquireShared(int arg)
|
|
- tryAcquireShared(arg)方法,该方法由子类实现相应逻辑处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * * <p>This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * 在共享模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的 * 状态是否允许在共享模式下请求,如果允许再进行请求。 * * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到 * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放 * 操作唤醒。 * <p>The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
- 获取tryAcquireShared(arg)小于等于0,调用doAcquireShared方法,先调addWaiter(Node.SHARED),把该节点添加到同步队列的尾部。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //把共享模式节点添加到同步队列的尾部 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //中断标识 boolean interrupted = false; for (;;) { // 获取该节点prev节点,为null,throw NPE final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //设置head节点 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //检查当前节点prev节点状态是否SIGNAL? //是:LockSupport.park等待线程 //否:循环执行上面操作 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); //取消节点 } }
- prev节点是head节点,tryAcquireShared获取大于0 执行setHeadAndPropagate方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * 将node设置为同步等待队列的头节点,并且检测一下node的后继节点是 * 否在共享模式下等待,如果是,并且propagate > 0 或者之前头节 * 点的等待状态是PROPAGATE,唤醒后续节点。 * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * 尝试去唤醒队列中的下一个节点,如果满足如下条件: * 调用者明确表示"传递"(propagate > 0), * 或者h.waitStatus为PROPAGATE(被上一个操作设置) * (注:这里使用符号检测是因为PROPAGATE状态可能会变成SIGNAL状态) * 并且下一个节点处于共享模式或者为null。 * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();//唤醒后继节点并保证传递 } } /** * Release action for shared mode -- signal successor and ensure * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) * 共享模式下的释放(控制权)动作 -- 唤醒后继节点并保证传递。 * 注:在独占模式下,释放仅仅意味着如果有必要,唤醒头节点的 * 后继节点。 */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的 * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤 * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证 * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必 * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样 * 的是,如果(头结点)等待状态设置失败,重新检测。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
- 当前线程释放共享的资源,调用releaseShared方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * Attempts to set the state to reflect a release in shared mode. * * <p>This method is always invoked by the thread performing release. * 尝试设置(AQS的)状态,反映出共享模式下的一个释放动作。 * * 这个方法在线程释放(控制权)的时候被调用。 * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this release of shared mode may permit a * waiting acquire (shared or exclusive) to succeed; and * {@code false} otherwise * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
共享模式-忽略中断acquire(int arg)调用流程如下:
(2).共享模式–响应中断的请求方法,这个方法会抛出中断异常: acquireSharedInterruptibly(int arg)throws InterruptedException
|
|
(3).共享模式–响应中断并且支持超时的请求方法:tryAcquireSharedNanos(int arg, long nanosTimeout)
|
|
3.AQS开放了几个方法交由子类实现(本类中抛出UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively 判断AQS的控制权是否被当前线程以独占的方式持有。 在Condition的signal/signalAll调用了。为false,throws IllegalMonitorStateException
子类(具体同步器的内部同步机制)一般只需按照具体逻辑实现这几个方法就可以,注意这个方法内部需要考虑线程安全问题
4.条件队列
Condition 接口
java.util.concurrent.locks.Condition
将 Object
的监视器方法(wait、notify
和 notifyAll
)分解成截然不同的对象,以便通过将这些
对象与任意 Lock
实现组合使用, 为每个对象提供多个等待 set(wait-set)。其中,Lock
替代了 synchronized
方法和语句的使用,Condition
替代了 Object
监视器方法的使用。Condition
实例实质上被绑定到一个锁上,要获得 Condition
实例时可以在目标 Lock
上调用其 newCondition()
方法。
Condition提供了await/signal/signalAll来支持与Object wait/notify/nofityAll类似的功能。
|
|
AQS内部类ConditionObject
ConditionObject是AQS中提供的一种锁的基础机制,实现了接口Condition。
-
等待队列
1 2 3 4 5 6 7 8 9 10 11 12
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new <tt>ConditionObject</tt> instance. */ public ConditionObject() { } }
等待队列是一个FIFO链表的队列。如图下
-
await()等待
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted())//如果当前线程被中断,抛出InterruptedException异常。 throw new InterruptedException(); //将当前线程添加到条件等待队列。 Node node = addConditionWaiter(); //释放当前线程对AQS的控制权,并返回当前AQS中的state值。 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //判断是不是同步队列 LockSupport.park(this); //阻塞当前线程,等待同步队列释放资源调用 LockSupport.unpark if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//请求同步队列acquireQueued方法 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled 清楚等待队列中的取消状态节点 unlinkCancelledWaiters(); if (interruptMode != 0) //中断处理 reportInterruptAfterWait(interruptMode); } /** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果最后节点是取消节点,清楚等待队列中的取消状态节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //添加一个条件节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //如果是队列中第一个节点,那么将firstWaiter指向这个节点,后面也会将lastWaiter指向这个节点。 firstWaiter = node; else t.nextWaiter = node; //如果是队列中已经存在其他节点,那么将原本lastWaiter的nextWaiter指向当前节点。 lastWaiter = node; return node; } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. * 移除条件等待队列中的取消状态节点。这个方法一定是在持有锁 * (拥有AQS控制权)的情况下被调用的(所以不存在竞争)。 * 当等待条件时被(节点的线程)取消,或者当lastWaiter被取消后 * 条件等待队列中进入了一个新节点时会调用这个方法。 * 这个方法需要避免由于没有signal而引起的垃圾滞留。所以尽管 * 方法内会做一个完全遍历,也只有超时获或取消时(没有signal的 * 情况下)才被调用。方法中会遍历所有节点,切断所有指向垃圾节 * 点的引用,而不是一次取消切断一个引用。 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) //首节点为取消状态 firstWaiter = next; else //这里做下拼接(移除中间的取消节点) trail.nextWaiter = next; if (next == null) lastWaiter = trail; /最后设置尾节点。 } else trail = t; t = next; } } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * 调用release方法并传入当前的state。 * 调用成功会返回传入release方法之前的state. * 失败会抛出异常,并取消当前节点。 * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue 如果有后继节点,说明肯定在AQS同步等待队列里。 return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. * 之前的代码中分析到过,node.prev不为空并不能说明节点在AQS的 * 同步等待队列里面,因为后续的CAS操作可能会失败,所以这里从尾节 * 开始反向遍历 */ return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } /** Mode meaning to reinterrupt on exit from wait */ //等待退出 private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; //在退出时抛出中断异常 /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. * 对中断的检查, * 返回THROW_IE 如果在信号发出之前中断, * 返回REINTERRUPT如果在信号发出后中断, * 如果没有中断,则返回0 */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Transfers node, if necessary, to sync queue after a cancelled * wait. Returns true if thread was cancelled before being * signalled. * 在取消等待后,将节点转移到同步队列中。如果线程在唤醒钱被 * 取消,返回true。 * @param current the waiting thread * @param node its node * @return true if cancelled before the node was signalled */ final boolean transferAfterCancelledWait(Node node) { if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. * 如果我们输出了一个signal(),那么我们就不能继续下去直到enq()结束。在不完整的转让过程中取消既罕见又短暂,所以只需自旋。 */ while (!isOnSyncQueue(node)) Thread.yield(); return false; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
-
signal()唤醒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { //判断AQS的控制权是否被当前线程以独占的方式持有。如果不是,抛出IllegalMonitorStateException异常。 // isHeldExclusively方法子类去实现 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Returns {@code true} if synchronization is held exclusively with * respect to the current (calling) thread. This method is invoked * upon each call to a non-waiting {@link ConditionObject} method. * (Waiting methods instead invoke {@link #release}.) * * <p>The default implementation throws {@link * UnsupportedOperationException}. This method is invoked * internally only within {@link ConditionObject} methods, so need * not be defined if conditions are not used. * * @return {@code true} if synchronization is held exclusively; * {@code false} otherwise * @throws UnsupportedOperationException if conditions are not supported */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) //赋值firstWaiter 判断是否为空 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * 将一个节点从条件等待队列转移到同步等待队列。 * 如果成功,返回true。 * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal). */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果设置等待状态失败,说明节点已经被取消了,直接返回false。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //将node加入到AQS同步等待队列中,并返回node的前驱节点。 Node p = enq(node); int ws = p.waitStatus; //如果前驱节点被取消,或者尝CAS修改前驱节点的状态为SIGNAL(表示node节点需要唤醒)失败,那么唤醒node节点上的线程。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
-
signalAll()唤醒所有等待队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); //循环所有等待队列 }
-
void awaitUninterruptibly();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
-
long awaitNanos(long nanosTimeout) throws InterruptedException;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> * */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node);//超时 直接把当前节点加入到同步队列。 break; } LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); }
-
boolean await(long time, TimeUnit unit) throws InterruptedException;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { if (unit == null) throw new NullPointerException(); long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
-
boolean awaitUntil(Date deadline) throws InterruptedException;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean awaitUntil(Date deadline) throws InterruptedException { if (deadline == null) throw new NullPointerException(); long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }