java.util.concurrent.locks.AbstractQueuedSynchronizer源码分析

简介:

  • AbstractQueuedSynchronizer(以下简称AQS)是Java并发包提供的一个同步基础机制,大部分的同步器((如:Lock,Semaphore、 CountDownLatch和FutureTask等))都是基于AQS实现。

  • AQS内部包含一个FIFO的同步队列,简单的说,没有成功获取控制权的线程会在这个队列中等待。同步等待队列是 “CLH”(Craig,Landin,和Hagersten)锁队列的变体。

  • AQS内部有一个int(32)的同步状态,同步状态按自己需求灵活使用,比如:ReentrantLock(记录锁重入次数),CountDownLatch(内部的count)和 Semaphore(许可数量)等。

  • AQS提供了独占和共享两种模式。在独占模式下,当一个线程获取了AQS的控制权,其他线程获取控制权的操作就会失败;但在 共享模式下,其他线程的获取控制权操作就可能成功。并发包中的同步机制如ReentrantLock就是典型的独占模式,Semaphore是 共享模式;也有同时使用两种模式的同步机制,如ReentrantReadWriteLock。

  • AQS内部提供了一个ConditionObject类来支持独占模式下的锁队列条件,这个条件的功能比Object的wait和notify/notifyAll的功 能强大。

一、同步队列

  AQS依赖内部的同步等待队列(一个FIFO双向链表队列)来完成同步状态管理。同步队列是CLH锁队列的变体。

  同步队列源码分析

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
      static final class Node {
          /** Marker to indicate a node is waiting in shared mode */
          /**共享锁模式标志*/
          static final Node SHARED = new Node();
          /** Marker to indicate a node is waiting in exclusive mode */
          /**独占模式标志*/
          static final Node EXCLUSIVE = null;
  
          /** waitStatus value to indicate thread has cancelled */
          /**线程取消*/
          static final int CANCELLED =  1;
          /** waitStatus value to indicate successor's thread needs unparking */
          /**线程等待*/
          static final int SIGNAL    = -1;
          /** waitStatus value to indicate thread is waiting on condition */
          /**等待条件*/
          static final int CONDITION = -2;
          /**
           * waitStatus value to indicate the next acquireShared should
           * unconditionally propagate
           */
          /**表明下一次acquireShared应该无条件传播*/
          static final int PROPAGATE = -3;
  
          /**
           * Status field, taking on only the values:
           *   SIGNAL:     The successor of this node is (or will soon be)
           *               blocked (via park), so the current node must
           *               unpark its successor when it releases or
           *               cancels. To avoid races, acquire methods must
           *               first indicate they need a signal,
           *               then retry the atomic acquire, and then,
           *               on failure, block.
           *   CANCELLED:  This node is cancelled due to timeout or interrupt.
           *               Nodes never leave this state. In particular,
           *               a thread with cancelled node never again blocks.
           *   CONDITION:  This node is currently on a condition queue.
           *               It will not be used as a sync queue node
           *               until transferred, at which time the status
           *               will be set to 0. (Use of this value here has
           *               nothing to do with the other uses of the
           *               field, but simplifies mechanics.)
           *   PROPAGATE:  A releaseShared should be propagated to other
           *               nodes. This is set (for head node only) in
           *               doReleaseShared to ensure propagation
           *               continues, even if other operations have
           *               since intervened.
           *   0:          None of the above
           *
           * The values are arranged numerically to simplify use.
           * Non-negative values mean that a node doesn't need to
           * signal. So, most code doesn't need to check for particular
           * values, just for sign.
           *
           * The field is initialized to 0 for normal sync nodes, and
           * CONDITION for condition nodes.  It is modified using CAS
           * (or when possible, unconditional volatile writes).
           */
          /** 状态字段,只能取下面的值: 
          *   SIGNAL : 这个结点的后继是(或很快是)阻塞的(通过park),所以当前结点 
          *             必须unpark它的后继,当它释放或取消时。为了避免竞争,acquire方法必须 
          *             首先表明它们需要一个信号,然后再次尝试原子性acquire,如果失败了就阻塞。 
          *
          *  CANCELLED : 这个结点由于超时或中断已被取消。结点从不离开这种状态。尤其是, 
          *               这种状态的线程从不再次阻塞。 
          *               
          * CONDITION : 这个结点当前在一个条件队列上。它将不会用于同步等待队列的结点,
          *              直到被转移,在那时,结点的状态将被设为0.  这个值在这里的使用
          *              与其他字段的使用没有关系,仅仅是简化结构。 
          *             
          * PROPAGATE : releaseShared应该传递给其他结点。这是在doReleaseShared里设置 
          *             (仅仅是头结点)以确保传递继续,即使其他操作有干涉。 
          *              
          *         0: 非以上任何值。 
          *         
          *   值是组织为数字的用以简化使用。非负值表示结点不需要信号。这样,大部分代码不需要 
          *  检查特定的值,只需要(检查)符号。 
          *  
          *   对于普通同步结点,字段初始化为0;对于条件结点初始化为CONDITION 通过CAS操作修改(或者,当允许时,用无条件volatile写。) 
          */

          volatile int waitStatus;
  
          /**
           * Link to predecessor node that current node/thread relies on
           * for checking waitStatus. Assigned during enqueing, and nulled
           * out (for sake of GC) only upon dequeuing.  Also, upon
           * cancellation of a predecessor, we short-circuit while
           * finding a non-cancelled one, which will always exist
           * because the head node is never cancelled: A node becomes
           * head only as a result of successful acquire. A
           * cancelled thread never succeeds in acquiring, and a thread only
           * cancels itself, not any other node.
           */
           /** 
           *  连接到当前结点/线程依赖的用来检查等待状态的前驱结点。 
           *  在进入队列时赋值,只在出队列时置为空(为了GC考虑)。 
           *  根据前驱结点的取消,我们使查找一个非取消结点的while循环短路,这个总是会退出,
           *  因为头结点从不会是取消了的:一个结点成为头只能是一次成功的acquire操作结果。 
           *  一个取消了的线程从不会在获取操作成功,线程只能取消自己,不能是其他结点。 
           */

          volatile Node prev;
  
          /**
           * Link to the successor node that the current node/thread
           * unparks upon release. Assigned during enqueuing, adjusted
           * when bypassing cancelled predecessors, and nulled out (for
           * sake of GC) when dequeued.  The enq operation does not
           * assign next field of a predecessor until after attachment,
           * so seeing a null next field does not necessarily mean that
           * node is at end of queue. However, if a next field appears
           * to be null, we can scan prev's from the tail to
           * double-check.  The next field of cancelled nodes is set to
           * point to the node itself instead of null, to make life
           * easier for isOnSyncQueue.
           */
          /** 
          * 连接到当前结点/线程释放时解除阻塞的后续结点。 
          * 在入队列时赋值,在绕过已取消前驱节点时调整,出队列时置为空(for GC)。 
          * 入队操作不会给前驱结点的next字段赋值,直到附件后(把新节点赋值给队列的tail属性?),   
          * 所以看到next字段为空不一定表示它就是队列的尾结点。然而,如果next字段看起来是空, 
          * 我们可以从tail向前遍历进行双重检查。 
          * 被取消了的结点的next字段被设置为指向它自己而不是空,这让isOnSyncQueue变得容易。 
          */

          volatile Node next;
  
          /**
           * The thread that enqueued this node.  Initialized on
           * construction and nulled out after use.
           */
          volatile Thread thread;
  
          /**
           * Link to next node waiting on condition, or the special
           * value SHARED.  Because condition queues are accessed only
           * when holding in exclusive mode, we just need a simple
           * linked queue to hold nodes while they are waiting on
           * conditions. They are then transferred to the queue to
           * re-acquire. And because conditions can only be exclusive,
           * we save a field by using special value to indicate shared
           * mode.
           */
           /**
           * 指向下一个条件等待状态节点或者为特殊值(SHARED)。由于条件队列只有在独占模式下才
           * 能访问,所以我们只需要一个普通的链表队列来保存处于等待状态的节点。它们在重新请
           * 求的时候会转移到同步队列。由于条件只存在于独占模式下,所以如果是共享模式,就将
           * 这域保存为一个特殊值(SHARED)。
           */

          Node nextWaiter;
  
          /**
           * Returns true if node is waiting in shared mode
           */
          final boolean isShared() {
              return nextWaiter == SHARED;
          }
  
          /**
           * Returns previous node, or throws NullPointerException if null.
           * Use when predecessor cannot be null.  The null check could
           * be elided, but is present to help the VM.
           *
           * @return the predecessor of this node
           */
          final Node predecessor() throws NullPointerException {
              Node p = prev;
              if (p == null)
                  throw new NullPointerException();
              else
                  return p;
          }
  
          Node() {    // Used to establish initial head or SHARED marker
          }
  
          Node(Thread thread, Node mode) {     // Used by addWaiter
              this.nextWaiter = mode;
              this.thread = thread;
          }
  
          Node(Thread thread, int waitStatus) { // Used by Condition
              this.waitStatus = waitStatus;
              this.thread = thread;
          }
      }

  AQS同步队列头节点和尾节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
      /**
       * 同步等待队列的头节点,延迟初始化。除了初始化之外,只能通过setHead方法来改变
       * 这个域。注:如果头结点存在,那么它的waitStatus可以保证一定不是CANCELLED。
       */

    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    /**等待队列的尾部,延迟初始化。只有通过方法enq修改才能添加新的等待节点。*/
    private transient volatile Node tail;

二、同步状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a <tt>volatile</tt> read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

三、独占和共享模式

1.独占模式

(1).独占模式–忽略中断:acquire(int arg)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     * 
     * 独占模式下进行请求,忽略中断。方法实现中至少会调用一次tryAcquire方法,
     * 请求成功后方法返回。否则当前线程会排队,可能会重复的阻塞和解除阻塞,
     * 执行tryAcquire方法,直到成功。这个方法可以用来实现Lock的lock方法。
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. 先调用tryAcquire(arg)方法,该方法由子类实现相应逻辑处理。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    
        /**
         * Attempts to acquire in exclusive mode. This method should query
         * if the state of the object permits it to be acquired in the
         * exclusive mode, and if so to acquire it.
         *
         * <p>This method is always invoked by the thread performing
         * acquire.  If this method reports failure, the acquire method
         * may queue the thread, if it is not already queued, until it is
         * signalled by a release from some other thread. This can be used
         * to implement method {@link Lock#tryLock()}.
         *
         * <p>The default
         * implementation throws {@link UnsupportedOperationException}.
         *
         * 在独占模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的
         * 状态是否允许在独占模式下请求,如果允许再进行请求。
         *
         * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到
         * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放
         * 操作唤醒。可以用来实现Lock的tryLock方法。
         *
         * 该方法默认抛出UnsupportedOperationException异常。
         * @param arg the acquire argument. This value is always the one
         *        passed to an acquire method, or is the value saved on entry
         *        to a condition wait.  The value is otherwise uninterpreted
         *        and can represent anything you like.
         * @return {@code true} if successful. Upon success, this object has
         *         been acquired.
         * @throws IllegalMonitorStateException if acquiring would place this
         *         synchronizer in an illegal state. This exception must be
         *         thrown in a consistent fashion for synchronization to work
         *         correctly.
         * @throws UnsupportedOperationException if exclusive mode is not supported
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
  2. 调用tryAcquire方法返回false,调用addWaiter(Node.EXCLUSIVE), arg)方法,把该节点添加到同步队列的尾部。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    
        /**
         * Creates and enqueues node for current thread and given mode.
         * 把Node当前线程和指定模式添加到同步队列中
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            //创建一个Node节点 指定当前线程和指定(独享和共享)模式
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                //CAS 修改tail节点属性
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // tail节点为空或者CAS修改tail节点属性失败就调用enq方法
            enq(node);
            return node;
        }
    
     /**
      * Inserts node into queue, initializing if necessary. See picture above.
      * @param node the node to insert
      * @return node's predecessor
      */
     private Node enq(final Node node) {    
         for (;;) {
             Node t = tail;
             //当head为空时,CAS创建一个空节点,为什么创建一个空节点?原因是有个线程正在执行中,无需加载到同步队列中,以空节点标识。
             if (t == null) { // Must initialize
                 if (compareAndSetHead(new Node()))
                     tail = head;
             } else {
              //注意:新节点添加到同步队列尾部 分为3个步骤:
              // 1.将其prev引用指向tail点 
              // 2.尝试CAS将其设置为tail点 
              // 3.CAS修改成功,将其prev节点(第2步之前的tail点)的next指向其本身。
              // 4.CAS修改失败,失败原因有其他线程也修改tail节点,所以tail节点形成竞争条件。循环重新调用1.步骤
                 node.prev = t;
                 if (compareAndSetTail(t, node)) {
                     t.next = node;
                     return t;
                 }
             }
         }
     }
    
  3. 添加到同步队列成功后,调用acquireQueued

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    
       /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                //线程是否曾被中断是由这个变量记录的。
                boolean interrupted = false;
                for (;;) {
                    //获取node节点prev节点,为null,throw NPE
                    final Node p = node.predecessor();
                    //node节点prev是头节点并且tryAcquire返回ture,把node节点设置为头节点,说明该node节点线程获取锁控制权
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //1.shouldParkAfterFailedAcquire方法检查node节点prev节点状态是否为SIGNAL?
                    //2.是, 执行等待,调用LockSupport.park,等待LockSupport.unpark唤醒当前线程节点。
                    //3.否,循环执行上面操作。
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev
         *  1.检查当前节点的prev节点状态是否为SIGNAL
         *  2.前节点的prev节点状态是为CANCELLED,往prev节点的prev节点查询不为CANCELLED状态,在修改查找到prev节点的next节点。
         *  3.prev节点状态是为0和PROPAGATE,设置为SIGNAL状态
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                //如果当前节点的prev节点的状态为SIGNAL,说明当前节点已经声明了需要唤醒,
               // 所以可以阻塞当前节点了,直接返回true。
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                //如果当前节点的prev节点的状态为CANCELLED
                //往prev节点的prev节点查找,找到prev节点状态不为CANCELLED状态为止。
                //node节点赋值查找不为CANCELLED状态prev节点的next节点
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //这里等待状态一定是0或者PROPAGATE。这里将当前节点的前驱节点(非取消状态)的
                // 等待状态设置为SIGNAL。来声明需要一个(唤醒)信号。
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        
        /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
  4. 当前节点线程中断时,调用cancelAcquire

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    
     /**
         * Cancels an ongoing attempt to acquire.
         *
         * @param node the node
         */
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
       
            node.thread = null;
       
            // Skip cancelled predecessors
            //跳过prev节点的状态为CANCELLED
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
       
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            //predNext是拼接的结点,但(CAS操作)也可能会失败,因为可能存在其他"cancel"或者"singal"的竞争
            Node predNext = pred.next;
       
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;
       
            // If we are the tail, remove ourselves.
            //如果当前节点是tail节点,那么删除当前节点(将当前节点的prev节点设置为tail节点)
            if (node == tail && compareAndSetTail(node, pred)) {
                ////将prev节点(已经设置为tail节点)的next置null。
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                // 如果prev节点不是head节点,并且prev节点状态SIGNAL,或者状态不为SIGNAL尝试CAS  
                // 状态改成SIGNAL,并CAS修改pred节点next引用指向  
                // 其当前节点next节点。否则,唤醒后继节点。 
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    //只要prev节点为head节点,直接唤醒该取消节点的next节点
                    unparkSuccessor(node);
                }
       
                node.next = node; // help GC
            }
        }
        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            //如果节点状态为空,CAS修改节点状态值,如果这失败了或status被其他等待线程修改也是没关系的。
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
       
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            //是否当前节点next节点为空或者是CANCELLED状态?
            //是:就从tail节点迭代查找。
            //否:就唤醒当前等待线程
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }    
    
  5. 当前线程释放独占的资源,调用release方法

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    
        /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            //tryRelease 有子类去实现相应逻辑,返回true,唤醒线程
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //唤醒head节点next节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
       
     /**
      * Attempts to set the state to reflect a release in exclusive
      * mode.
      *
      * <p>This method is always invoked by the thread performing release.
      * 尝试设置(AQS的)状态,反映出独占模式下的一个释放动作。
      *
      * 这个方法在线程释放(控制权)的时候被调用。
    
      * <p>The default implementation throws
      * {@link UnsupportedOperationException}.
      *
      * @param arg the release argument. This value is always the one
      *        passed to a release method, or the current state value upon
      *        entry to a condition wait.  The value is otherwise
      *        uninterpreted and can represent anything you like.
      * @return {@code true} if this object is now in a fully released
      *         state, so that any waiting threads may attempt to acquire;
      *         and {@code false} otherwise.
      * @throws IllegalMonitorStateException if releasing would place this
      *         synchronizer in an illegal state. This exception must be
      *         thrown in a consistent fashion for synchronization to work
      *         correctly.
      * @throws UnsupportedOperationException if exclusive mode is not supported
      */
     protected boolean tryRelease(int arg) {
         throw new UnsupportedOperationException();
     }    
    

  独占模式-忽略中断acquire(int arg)调用流程如下:

(2).独占模式–响应中断的请求方法,这个方法会抛出中断异常:acquireInterruptibly(int arg)throws InterruptedException

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

    /**
     * Acquires in exclusive mode, aborting if interrupted.
     * Implemented by first checking interrupt status, then invoking
     * at least once {@link #tryAcquire}, returning on
     * success.  Otherwise the thread is queued, possibly repeatedly
     * blocking and unblocking, invoking {@link #tryAcquire}
     * until success or the thread is interrupted.  This method can be
     * used to implement method {@link Lock#lockInterruptibly}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    /**
     * Acquires in exclusive interruptible mode.
     * 这个方法与acquireQueued基本一样,只不过是检测到中断时则抛出异常。
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }    

(3).独占模式–响应中断并且支持超时的请求方法:tryAcquireNanos(int arg, long nanosTimeout)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
  /**
     * Attempts to acquire in exclusive mode, aborting if interrupted,
     * and failing if the given timeout elapses.  Implemented by first
     * checking interrupt status, then invoking at least once {@link
     * #tryAcquire}, returning on success.  Otherwise, the thread is
     * queued, possibly repeatedly blocking and unblocking, invoking
     * {@link #tryAcquire} until success or the thread is interrupted
     * or the timeout elapses.  This method can be used to implement
     * method {@link Lock#tryLock(long, TimeUnit)}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @param nanosTimeout the maximum number of nanoseconds to wait
     * @return {@code true} if acquired; {@code false} if timed out
     * @throws InterruptedException if the current thread is interrupted
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    /**
     * Acquires in exclusive timed mode.
     * 这个方法与acquireQueued基本逻辑一样,设置超时时间,调用超时等待LockSupport.parkNanos。
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                //计算LockSupport.parkNanos等待的超时时间,
                //tryAcquire(arg)不成立需要重新计算超时等待时间
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }    

2.共享模式

(1).共享模式–忽略中断的请求方法:acquireShared(int arg)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    /**
     * Acquires in shared mode, ignoring interrupts.  Implemented by
     * first invoking at least once {@link #tryAcquireShared},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquireShared} until success.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquireShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
  1. tryAcquireShared(arg)方法,该方法由子类实现相应逻辑处理。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    
      /**
         * Attempts to acquire in shared mode. This method should query if
         * the state of the object permits it to be acquired in the shared
         * mode, and if so to acquire it.
         *
         * <p>This method is always invoked by the thread performing
         * acquire.  If this method reports failure, the acquire method
         * may queue the thread, if it is not already queued, until it is
         * signalled by a release from some other thread.
         * 在共享模式下尝试请求(控制权)。这个方法(实现)应该查看一下对象的
         * 状态是否允许在共享模式下请求,如果允许再进行请求。
         *
         * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到
         * 同步等待队列中(如果当前线程还不在同步等待队列中),直到被其他线程的释放
         * 操作唤醒。
         * <p>The default implementation throws {@link
         * UnsupportedOperationException}.
         *
         * @param arg the acquire argument. This value is always the one
         *        passed to an acquire method, or is the value saved on entry
         *        to a condition wait.  The value is otherwise uninterpreted
         *        and can represent anything you like.
         * @return a negative value on failure; zero if acquisition in shared
         *         mode succeeded but no subsequent shared-mode acquire can
         *         succeed; and a positive value if acquisition in shared
         *         mode succeeded and subsequent shared-mode acquires might
         *         also succeed, in which case a subsequent waiting thread
         *         must check availability. (Support for three different
         *         return values enables this method to be used in contexts
         *         where acquires only sometimes act exclusively.)  Upon
         *         success, this object has been acquired.
         * @throws IllegalMonitorStateException if acquiring would place this
         *         synchronizer in an illegal state. This exception must be
         *         thrown in a consistent fashion for synchronization to work
         *         correctly.
         * @throws UnsupportedOperationException if shared mode is not supported
         */
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
  2. 获取tryAcquireShared(arg)小于等于0,调用doAcquireShared方法,先调addWaiter(Node.SHARED),把该节点添加到同步队列的尾部。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    
      /**
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            //把共享模式节点添加到同步队列的尾部
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                //中断标识
                boolean interrupted = false;
                for (;;) {
                    // 获取该节点prev节点,为null,throw NPE
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //设置head节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //检查当前节点prev节点状态是否SIGNAL?
                    //是:LockSupport.park等待线程
                    //否:循环执行上面操作
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node); //取消节点
            }
        }
    
  3. prev节点是head节点,tryAcquireShared获取大于0 执行setHeadAndPropagate方法
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    
     /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         * 将node设置为同步等待队列的头节点,并且检测一下node的后继节点是
         * 否在共享模式下等待,如果是,并且propagate > 0 或者之前头节
         * 点的等待状态是PROPAGATE,唤醒后续节点。
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             * 尝试去唤醒队列中的下一个节点,如果满足如下条件:
             * 调用者明确表示"传递"(propagate > 0),
             * 或者h.waitStatus为PROPAGATE(被上一个操作设置)
             * (注:这里使用符号检测是因为PROPAGATE状态可能会变成SIGNAL状态)
             * 并且下一个节点处于共享模式或者为null。
    
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();//唤醒后继节点并保证传递
            }
        }
       
     /**
      * Release action for shared mode -- signal successor and ensure
      * propagation. (Note: For exclusive mode, release just amounts
      * to calling unparkSuccessor of head if it needs signal.)
      * 共享模式下的释放(控制权)动作 -- 唤醒后继节点并保证传递。
      * 注:在独占模式下,释放仅仅意味着如果有必要,唤醒头节点的
      * 后继节点。
      */
     private void doReleaseShared() {
         /*
          * Ensure that a release propagates, even if there are other
          * in-progress acquires/releases.  This proceeds in the usual
          * way of trying to unparkSuccessor of head if it needs
          * signal. But if it does not, status is set to PROPAGATE to
          * ensure that upon release, propagation continues.
          * Additionally, we must loop in case a new node is added
          * while we are doing this. Also, unlike other uses of
          * unparkSuccessor, we need to know if CAS to reset status
          * fails, if so rechecking.
          * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的
          * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤
          * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证
          * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必
          * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样
          * 的是,如果(头结点)等待状态设置失败,重新检测。
          */
         for (;;) {
             Node h = head;
             if (h != null && h != tail) {
                 int ws = h.waitStatus;
                 if (ws == Node.SIGNAL) {
                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                         continue;            // loop to recheck cases
                     unparkSuccessor(h);
                 }
                 else if (ws == 0 &&
                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                     continue;                // loop on failed CAS
             }
             if (h == head)                   // loop if head changed
                 break;
         }
     }    
    
  4. 当前线程释放共享的资源,调用releaseShared方法
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
           
         /**
          * Attempts to set the state to reflect a release in shared mode.
          *
          * <p>This method is always invoked by the thread performing release.
          *  尝试设置(AQS的)状态,反映出共享模式下的一个释放动作。
          *
          * 这个方法在线程释放(控制权)的时候被调用。
          * <p>The default implementation throws
          * {@link UnsupportedOperationException}.
          *
          * @param arg the release argument. This value is always the one
          *        passed to a release method, or the current state value upon
          *        entry to a condition wait.  The value is otherwise
          *        uninterpreted and can represent anything you like.
          * @return {@code true} if this release of shared mode may permit a
          *         waiting acquire (shared or exclusive) to succeed; and
          *         {@code false} otherwise
          * @throws IllegalMonitorStateException if releasing would place this
          *         synchronizer in an illegal state. This exception must be
          *         thrown in a consistent fashion for synchronization to work
          *         correctly.
          * @throws UnsupportedOperationException if shared mode is not supported
          */
         protected boolean tryReleaseShared(int arg) {
             throw new UnsupportedOperationException();
         }   
    

  共享模式-忽略中断acquire(int arg)调用流程如下:

(2).共享模式–响应中断的请求方法,这个方法会抛出中断异常: acquireSharedInterruptibly(int arg)throws InterruptedException

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
 
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }    

(3).共享模式–响应中断并且支持超时的请求方法:tryAcquireSharedNanos(int arg, long nanosTimeout)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    /**
      * Attempts to acquire in shared mode, aborting if interrupted, and
      * failing if the given timeout elapses.  Implemented by first
      * checking interrupt status, then invoking at least once {@link
      * #tryAcquireShared}, returning on success.  Otherwise, the
      * thread is queued, possibly repeatedly blocking and unblocking,
      * invoking {@link #tryAcquireShared} until success or the thread
      * is interrupted or the timeout elapses.
      *
      * @param arg the acquire argument.  This value is conveyed to
      *        {@link #tryAcquireShared} but is otherwise uninterpreted
      *        and can represent anything you like.
      * @param nanosTimeout the maximum number of nanoseconds to wait
      * @return {@code true} if acquired; {@code false} if timed out
      * @throws InterruptedException if the current thread is interrupted
      */
     public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
             throws InterruptedException {
         if (Thread.interrupted())
             throw new InterruptedException();
         return tryAcquireShared(arg) >= 0 ||
             doAcquireSharedNanos(arg, nanosTimeout);
     }
 /**
     * Acquires in shared timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {

        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                //计算剩余超时时间
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
     

3.AQS开放了几个方法交由子类实现(本类中抛出UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively 判断AQS的控制权是否被当前线程以独占的方式持有。 在Condition的signal/signalAll调用了。为false,throws IllegalMonitorStateException

子类(具体同步器的内部同步机制)一般只需按照具体逻辑实现这几个方法就可以,注意这个方法内部需要考虑线程安全问题

4.条件队列

Condition 接口

  java.util.concurrent.locks.ConditionObject 的监视器方法(wait、notifynotifyAll)分解成截然不同的对象,以便通过将这些 对象与任意 Lock 实现组合使用, 为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。Condition 实例实质上被绑定到一个锁上,要获得 Condition 实例时可以在目标 Lock 上调用其 newCondition() 方法。

  Condition提供了await/signal/signalAll来支持与Object wait/notify/nofityAll类似的功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public interface Condition {
    //当前线程进入等待状态,直到被唤醒或者被中断时,当前线程进入运行状态
    void await() throws InterruptedException;
    //当前线程进入等待状态,直到被唤醒,对中断不做响应;
    void awaitUninterruptibly();
    //在接口1的返回条件基础上增加了超时响应,返回值表示当前剩余的时间,如果在nanosTimeout之前被唤醒,
    // 返回值 = nanosTimeout - 实际消耗的时间,返回值 <= 0表示超时
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //同样是在接口1的返回条件基础上增加了超时响应,与接口3不同的是:
    // 1.可以自定义超时时间单位;
    // 2. 返回值返回true/false,在time之前被唤醒,返回true,超时返回false。
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //当前线程进入等待状态直到将来的指定时间被通知,如果没有到指定时间被通知返回true,否则,到达指定时间,
    // 返回false;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //唤醒一个等待在Condition上的线程;
    void signal();
    //唤醒等待在Condition上所有的线程。
    void signalAll();
}

AQS内部类ConditionObject

  ConditionObject是AQS中提供的一种锁的基础机制,实现了接口Condition。

  1. 等待队列

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
              public class ConditionObject implements Condition, java.io.Serializable {
                  private static final long serialVersionUID = 1173984872572414699L;
                  /** First node of condition queue. */
                  private transient Node firstWaiter;
                  /** Last node of condition queue. */
                  private transient Node lastWaiter;
             
                  /**
                   * Creates a new <tt>ConditionObject</tt> instance.
                   */
                  public ConditionObject() { }
              }
    

    等待队列是一个FIFO链表的队列。如图下

  2. await()等待

      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    
        /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())//如果当前线程被中断,抛出InterruptedException异常。
                    throw new InterruptedException();
                //将当前线程添加到条件等待队列。
                Node node = addConditionWaiter();
                //释放当前线程对AQS的控制权,并返回当前AQS中的state值。
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) { //判断是不是同步队列
                    LockSupport.park(this);    //阻塞当前线程,等待同步队列释放资源调用 LockSupport.unpark
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//请求同步队列acquireQueued方法
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled 清楚等待队列中的取消状态节点
                    unlinkCancelledWaiters();
                if (interruptMode != 0)     //中断处理
                    reportInterruptAfterWait(interruptMode);
            }
       
        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //如果最后节点是取消节点,清楚等待队列中的取消状态节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //添加一个条件节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null) //如果是队列中第一个节点,那么将firstWaiter指向这个节点,后面也会将lastWaiter指向这个节点。
                firstWaiter = node;
            else
                t.nextWaiter = node; //如果是队列中已经存在其他节点,那么将原本lastWaiter的nextWaiter指向当前节点。
            lastWaiter = node;
            return node;
        }
          /**
             * Unlinks cancelled waiter nodes from condition queue.
             * Called only while holding lock. This is called when
             * cancellation occurred during condition wait, and upon
             * insertion of a new waiter when lastWaiter is seen to have
             * been cancelled. This method is needed to avoid garbage
             * retention in the absence of signals. So even though it may
             * require a full traversal, it comes into play only when
             * timeouts or cancellations occur in the absence of
             * signals. It traverses all nodes rather than stopping at a
             * particular target to unlink all pointers to garbage nodes
             * without requiring many re-traversals during cancellation
             * storms.
             *  移除条件等待队列中的取消状态节点。这个方法一定是在持有锁
             * (拥有AQS控制权)的情况下被调用的(所以不存在竞争)。
             * 当等待条件时被(节点的线程)取消,或者当lastWaiter被取消后
             * 条件等待队列中进入了一个新节点时会调用这个方法。
             * 这个方法需要避免由于没有signal而引起的垃圾滞留。所以尽管
             * 方法内会做一个完全遍历,也只有超时获或取消时(没有signal的
             * 情况下)才被调用。方法中会遍历所有节点,切断所有指向垃圾节
             * 点的引用,而不是一次取消切断一个引用。
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)  //首节点为取消状态
                            firstWaiter = next;
                        else    //这里做下拼接(移除中间的取消节点)
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail; /最后设置尾节点
                    }
                    else
                        trail = t;
                    t = next;
                }
            } 
       /**
         * Invokes release with current state value; returns saved state.
         * Cancels node and throws exception on failure.
         * 调用release方法并传入当前的state。
         * 调用成功会返回传入release方法之前的state.
         * 失败会抛出异常,并取消当前节点。
         * @param node the condition node for this wait
         * @return previous sync state
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        } 
           
        /**
         * Returns true if a node, always one that was initially placed on
         * a condition queue, is now waiting to reacquire on sync queue.
         * @param node the node
         * @return true if is reacquiring
         */
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue 如果有后继节点,说明肯定在AQS同步等待队列里。
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             * 之前的代码中分析到过,node.prev不为空并不能说明节点在AQS的
             * 同步等待队列里面,因为后续的CAS操作可能会失败,所以这里从尾节
             * 开始反向遍历
             */
               
            return findNodeFromTail(node);
        }
       
        /**
         * Returns true if node is on sync queue by searching backwards from tail.
         * Called only when needed by isOnSyncQueue.
         * @return true if present
         */
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
           /** Mode meaning to reinterrupt on exit from wait */
           //等待退出
            private static final int REINTERRUPT =  1;
            /** Mode meaning to throw InterruptedException on exit from wait */
            private static final int THROW_IE    = -1;
            //在退出时抛出中断异常
            /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             * 对中断的检查,
             * 返回THROW_IE 如果在信号发出之前中断,
             * 返回REINTERRUPT如果在信号发出后中断,
             * 如果没有中断,则返回0
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
       
          /**
           * Transfers node, if necessary, to sync queue after a cancelled
           * wait. Returns true if thread was cancelled before being
           * signalled.
           *  在取消等待后,将节点转移到同步队列中。如果线程在唤醒钱被
           * 取消,返回true。
           * @param current the waiting thread
           * @param node its node
           * @return true if cancelled before the node was signalled
           */
          final boolean transferAfterCancelledWait(Node node) {
              if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                  enq(node);
                  return true;
              }
              /*
               * If we lost out to a signal(), then we can't proceed
               * until it finishes its enq().  Cancelling during an
               * incomplete transfer is both rare and transient, so just
               * spin.
               * 如果我们输出了一个signal(),那么我们就不能继续下去直到enq()结束。在不完整的转让过程中取消既罕见又短暂,所以只需自旋。
               */
              while (!isOnSyncQueue(node))
                  Thread.yield();
              return false;
          }  
            /**
             * Throws InterruptedException, reinterrupts current thread, or
             * does nothing, depending on mode.
             */
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }      
    
  3. signal()唤醒

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    
            /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                //判断AQS的控制权是否被当前线程以独占的方式持有。如果不是,抛出IllegalMonitorStateException异常。
                // isHeldExclusively方法子类去实现
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
        /**
         * Returns {@code true} if synchronization is held exclusively with
         * respect to the current (calling) thread.  This method is invoked
         * upon each call to a non-waiting {@link ConditionObject} method.
         * (Waiting methods instead invoke {@link #release}.)
         *
         * <p>The default implementation throws {@link
         * UnsupportedOperationException}. This method is invoked
         * internally only within {@link ConditionObject} methods, so need
         * not be defined if conditions are not used.
         *
         * @return {@code true} if synchronization is held exclusively;
         *         {@code false} otherwise
         * @throws UnsupportedOperationException if conditions are not supported
         */
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)  //赋值firstWaiter 判断是否为空
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * 将一个节点从条件等待队列转移到同步等待队列。
         * 如果成功,返回true。
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal).
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            //如果设置等待状态失败,说明节点已经被取消了,直接返回false。
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
       
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            //将node加入到AQS同步等待队列中,并返回node的前驱节点。
            Node p = enq(node);
            int ws = p.waitStatus;
            //如果前驱节点被取消,或者尝CAS修改前驱节点的状态为SIGNAL(表示node节点需要唤醒)失败,那么唤醒node节点上的线程。
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }        
    
  4. signalAll()唤醒所有等待队列

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
            /**
             * Moves all threads from the wait queue for this condition to
             * the wait queue for the owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
            /**
              * Removes and transfers all nodes.
              * @param first (non-null) the first node on condition queue
              */
             private void doSignalAll(Node first) {
                 lastWaiter = firstWaiter = null;
                 do {
                     Node next = first.nextWaiter;
                     first.nextWaiter = null;
                     transferForSignal(first);
                     first = next;
                 } while (first != null); //循环所有等待队列
             }       
    
  5. void awaitUninterruptibly();

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
            /**
             * Implements uninterruptible condition wait.
             * <ol>
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * </ol>
             */
            public final void awaitUninterruptibly() {
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean interrupted = false;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if (Thread.interrupted())
                        interrupted = true;
                }
                if (acquireQueued(node, savedState) || interrupted)
                    selfInterrupt();
            }
    
  6. long awaitNanos(long nanosTimeout) throws InterruptedException;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    
            /**
             * Implements timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             * 
             */
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                long lastTime = System.nanoTime();
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);//超时 直接把当前节点加入到同步队列。
                        break;
                    }
                    LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
    
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return nanosTimeout - (System.nanoTime() - lastTime);
            }
    
  7. boolean await(long time, TimeUnit unit) throws InterruptedException;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    
       /**
             * Implements timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * <li> If timed out while blocked in step 4, return false, else true.
             * </ol>
             */
            public final boolean await(long time, TimeUnit unit)
                    throws InterruptedException {
                if (unit == null)
                    throw new NullPointerException();
                long nanosTimeout = unit.toNanos(time);
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                long lastTime = System.nanoTime();
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (nanosTimeout <= 0L) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }
    
  8. boolean awaitUntil(Date deadline) throws InterruptedException;

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    
       /**
             * Implements absolute timed condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled, interrupted, or timed out.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * <li> If timed out while blocked in step 4, return false, else true.
             * </ol>
             */
            public final boolean awaitUntil(Date deadline)
                    throws InterruptedException {
                if (deadline == null)
                    throw new NullPointerException();
                long abstime = deadline.getTime();
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                boolean timedout = false;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    if (System.currentTimeMillis() > abstime) {
                        timedout = transferAfterCancelledWait(node);
                        break;
                    }
                    LockSupport.parkUntil(this, abstime);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return !timedout;
            }