SynchronousQueue源码分析

简介

  • SynchronousQueue是一种特殊的阻塞队列,它本身没有容量,只有当一个线程从队列取数据的同时,另一个线程才能放一个数据到队列中, 反之亦然。存取过程相当于一个线程把数据(安全的)交给另一个线程的过程。
  • SynchronousQueue也支持公平和非公平模式。

一、SynchronousQueue数据结构实现

1. 实现Transferer抽象类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    /**
     * Shared internal API for dual stacks and queues.
     * 共享内部API的双栈和队列。
     */
    abstract static class Transferer {
        /**
         * Performs a put or take.
         * 执行put或take。
         *
         * @param e if non-null, the item to be handed to a consumer;
         *          if null, requests that transfer return an item
         *          offered by producer.
         * @param timed if this operation should timeout
         * @param nanos the timeout, in nanoseconds
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract Object transfer(Object e, boolean timed, long nanos);
    }

2. Dual stack的TransferStack实现

(1). Node节点实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

     /** Node class for TransferStacks. */
     static final class SNode {
         volatile SNode next;        // next node in stack  下一个节点堆栈
         volatile SNode match;       // the node matched to this   和当前节点完成匹配的节点
         volatile Thread waiter;     // to control park/unpark 等待线程
         Object item;                // data; or null for REQUESTs 数据;或者是null 为 REQUEST模式
         int mode;                   // 模式
         // Note: item and mode fields don't need to be volatile
         // since they are always written before, and read after,
         // other volatile/atomic operations.
         // 注意:项item and mode字段不需要是volatile,因为它们总是在其他volatile/atomic操作之前写入和读取之后。


         SNode(Object item) {
             this.item = item;
         }
         // CAS next
         boolean casNext(SNode cmp, SNode val) {
             return cmp == next &&
                 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
         }

         /**
          * Tries to match node s to this node, if so, waking up thread.
          * Fulfillers call tryMatch to identify their waiters.
          * Waiters block until they have been matched.
          *
          * 尝试匹配节点s和当前节点,如果匹配成功,唤醒等待线程。
          * (向消费者传递数据或向生产者获取数据)调用tryMatch方法
          * 来确定它们的等待线程,然后唤醒这个等待线程。
          * @param s the node to match
          * @return true if successfully matched to s
          */
         boolean tryMatch(SNode s) {
             if (match == null &&
                 UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                 Thread w = waiter;
                 //如果当前节点的match为空,那么CAS设置s为match,然后唤醒waiter。
                 if (w != null) {    // waiters need at most one unpark
                     waiter = null;
                     LockSupport.unpark(w);
                 }
                 return true;
             }
              //如果match不为null,或者CAS设置match失败,那么比较match和s是否为相同对象。
             //如果相同,说明已经完成匹配,匹配成功。
             return match == s;
         }

         /**
          * Tries to cancel a wait by matching node to itself.
          * 尝试取消当前节点(有线程等待),通过将match设置为自身。
          */
         void tryCancel() {
             UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
         }

         boolean isCancelled() {
             return match == this;
         }

         // Unsafe mechanics
         private static final sun.misc.Unsafe UNSAFE;
         private static final long matchOffset;
         private static final long nextOffset;

         static {
             try {
                 UNSAFE = sun.misc.Unsafe.getUnsafe();
                 Class k = SNode.class;
                 matchOffset = UNSAFE.objectFieldOffset
                     (k.getDeclaredField("match"));
                 nextOffset = UNSAFE.objectFieldOffset
                     (k.getDeclaredField("next"));
             } catch (Exception e) {
                 throw new Error(e);
             }
         }
     }

(2). TransferStack 构造

TransferStack只有默认构造器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 /** Dual stack */
    static final class TransferStack extends Transferer {
        /*
         * This extends Scherer-Scott dual stack algorithm, differing,
         * among other ways, by using "covering" nodes rather than
         * bit-marked pointers: Fulfilling operations push on marker
         * nodes (with FULFILLING bit set in mode) to reserve a spot
         * to match a waiting node.
         *  这扩展了Scherer-Scott双栈算法,不同之处在于使用"covering"节点而不是位标记指针:
         *  Fulfilling操作在标记节点上进行push(FULFILLING位设置为模式)以保留点以匹配等待节点。
         */

        /* Modes for SNodes, ORed together in node fields */
        /** Node represents an unfulfilled consumer */
        //节点代表消费者等待消费资源
        static final int REQUEST    = 0;
        /** Node represents an unfulfilled producer */
        //节点代表生产者等待生产资源
        static final int DATA       = 1;
        /** Node is fulfilling another unfulfilled DATA or REQUEST */
        //节点正在履行另一个未完成的DATA或REQUEST
        static final int FULFILLING = 2;

        /** Return true if m has fulfilling bit set */
        //判断是否包含正在匹配(FULFILLING)的标记
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
        /** The head (top) of the stack */
        //堆栈的头(顶部)
        volatile SNode head;
        
        //设置头部
        boolean casHead(SNode h, SNode nh) {
            return h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
        }

        /**
         * Creates or resets fields of a node. Called only from transfer
         * where the node to push on stack is lazily created and
         * reused when possible to help reduce intervals between reads
         * and CASes of head and to avoid surges of garbage when CASes
         * to push nodes fail due to contention.
         *  创建或重置节点的字段。只能从transfer中调用,推迟堆栈的节点被延迟创建,并在可能时重新使用,
         *  以帮助减少读取和头部CASes之间的时间间隔,并避免由于争用导致节点CASes推送失败时浪费的浪费。
         */
        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

      // Unsafe mechanics
      private static final sun.misc.Unsafe UNSAFE;
      private static final long headOffset;
      static {
          try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class k = TransferStack.class;
              headOffset = UNSAFE.objectFieldOffset
                  (k.getDeclaredField("head"));
          } catch (Exception e) {
              throw new Error(e);
          }
      }
 }     

(3).transfer(Put or takes an item)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
        /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /*
             * Basic algorithm is to loop trying one of three actions:
             * 基本算法是循环尝试,执行下面两个步中的,其中一个:
             * 
             * 1. If apparently empty or already containing nodes of same
             *    mode, try to push node on stack and wait for a match,
             *    returning it, or null if cancelled.
             *   如果显然为空或已经包含相同模式的节点,则尝试在堆栈上推送节点并等待匹配,返回该匹配;如果取消,则返回null。
             *   
             * 2. If apparently containing node of complementary mode,
             *    try to push a fulfilling node on to stack, match
             *    with corresponding waiting node, pop both from
             *    stack, and return matched item. The matching or
             *    unlinking might not actually be necessary because of
             *    other threads performing action 3:
             *    如果包含一个互补模式的节点(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)),
             *     则尝试一个FULFILLING节点入栈,同时匹配等待的协同节点,两个节点同时出栈,返回匹配的元素。
             *     由于其他线程执行步骤3,实际匹配和解除链接指针动作不会发生。
             *
             * 3. If top of stack already holds another fulfilling node,
             *    help it out by doing its match and/or pop
             *    operations, and then continue. The code for helping
             *    is essentially the same as for fulfilling, except
             *    that it doesn't return the item.
             *    如果栈顶存在另外一个FULFILLING的节点,则匹配节点,并出栈。这段的代码
             *      与fulfilling相同,除非没有元素返回
             */

            SNode s = null; // constructed/reused as needed 根据需要构建/重用
            int mode = (e == null) ? REQUEST : DATA; 

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode 1.头节点为空或者模式相同
                    if (timed && nanos <= 0) {      // can't wait            没有等到时间
                        if (h != null && h.isCancelled())                        头节点不为空并且头结点是否取消
                            casHead(h, h.next);     // pop cancelled node            把头结点next设置为头节点  
                        else
                            return null;                                 //      头结点为空或者头结点不为取消节点就返回null
                    } else if (casHead(h, s = snode(s, e, h, mode))) {   //   CAS 头结点 snode创建或者重置节点
                        SNode m = awaitFulfill(s, timed, nanos);         //      自旋转或者等待线程
                        if (m == s) {               // wait was cancelled          等待取消,清理next节点
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)           //      CAS 头节点
                            casHead(h, s.next);     // help s's fulfiller
                        return (mode == REQUEST) ? m.item : s.item;      //       REQUEST返回match.item,反之s.item
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill       2.头节点模式不是FULFILLING
                    if (h.isCancelled())            // already cancelled        头结点取消,CAS 设置头节点    
                        casHead(h, h.next);         // pop and retry                
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {     //CAS 头结点 snode创建或者重置节点
                        for (;;) { // loop until matched or waiters disappear         循环直到匹配或waiters消失
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone             所有waiters都不见了
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;                                      
                            if (m.tryMatch(s)) {                                        //
                                casHead(s, mn);     // pop both s and m
                                return (mode == REQUEST) ? m.item : s.item;
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller              3.FULFILLING  模式
                    SNode m = h.next;               // m is h's match             
                    if (m == null)                  // waiter is gone                   waiter 没有
                        casHead(h, null);           // pop fulfilling node  
                    else {
                        SNode mn = m.next;                                            
                        if (m.tryMatch(h))          // help match                      帮助match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }
         /**
          * Spins/blocks until node s is matched by a fulfill operation.
          * 自旋或阻塞直到节点s匹配完成操作
          * 
          * @param s the waiting node
          * @param timed true if timed wait
          * @param nanos timeout value
          * @return matched node, or s if cancelled
          */
         SNode awaitFulfill(SNode s, boolean timed, long nanos) {
             /*
              * When a node/thread is about to block, it sets its waiter
              * field and then rechecks state at least one more time
              * before actually parking, thus covering race vs
              * fulfiller noticing that waiter is non-null so should be
              * woken.
              *当一个节点/线程即将被阻塞时,它将设置其waiter字段,然后在实际parking之前至少再次检查状态,
              * 从而涵盖竞争与履行者注意到waiter是非空的,所以应该被唤醒。
              * 
              * 
              * When invoked by nodes that appear at the point of call
              * to be at the head of the stack, calls to park are
              * preceded by spins to avoid blocking when producers and
              * consumers are arriving very close in time.  This can
              * happen enough to bother only on multiprocessors.
              * 当被节点调用时,它会出现在堆栈的顶部,为了避免在生产者和消费者及时到达时,调用park进行自旋以避免阻塞。
              * 这种情况只会发生在多处理器上。
              *
              * The order of checks for returning out of main loop
              * reflects fact that interrupts have precedence over
              * normal returns, which have precedence over
              * timeouts. (So, on timeout, one last check for match is
              * done before giving up.) Except that calls from untimed
              * SynchronousQueue.{poll/offer} don't check interrupts
              * and don't wait at all, so are trapped in transfer
              * method rather than calling awaitFulfill.
              * 从主循环返回的检查顺序反映了中断优先于正常返回(优先于超时)的事实。 
              * (所以,在超时之后,最后一次检查匹配是在放弃之前完成的。)
              * 除了不计时的SynchronousQueue。{poll / offer}的调用不检查中断并且根本不等待,
              * 所以被困在传输方法中比呼唤awaitFulfill。
              */
             long lastTime = timed ? System.nanoTime() : 0;
             Thread w = Thread.currentThread();
             SNode h = head;
             int spins = (shouldSpin(s) ?                                 //spins 自旋转的次数
                          (timed ? maxTimedSpins : maxUntimedSpins) : 0);  
             for (;;) {
                 if (w.isInterrupted())            
                     s.tryCancel();           // 把节点match字段设置自己
                 SNode m = s.match;           
                 if (m != null)              // 节点match不为空,就返回match
                     return m;
                 if (timed) {
                     long now = System.nanoTime();    
                     nanos -= now - lastTime;      //计算等待时间
                     lastTime = now;
                     if (nanos <= 0) {            //计算出等待时间小于0,进行取消状态。
                         s.tryCancel();       
                         continue;
                     }
                 }
                 if (spins > 0)                        //进行自旋转操作
                     spins = shouldSpin(s) ? (spins-1) : 0;
                 else if (s.waiter == null)           // 设置等待线程   
                     s.waiter = w; // establish waiter so can park next iter
                 else if (!timed)                    // 判断是否等待超时时间,否 park 是parkNanos
                     LockSupport.park(this);
                 else if (nanos > spinForTimeoutThreshold)
                     LockSupport.parkNanos(this, nanos);
             }
         }  
        /**
         * Returns true if node s is at head or there is an active
         * fulfiller.
         * 如果节点在头或有活跃的Fulfilling,则返回true。
         */
        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        } 
  /**
         * Unlinks s from the stack.
         */
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread

            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel.
             * 在最坏的情况下,我们可能需要遍历整个栈来取消链接。如果有多个并发调用需要清理,
             * 如果另一个线程已经删除了它,我们可能看不到s。但是当我们看到任何已知的节点时,我们可以停止。
             * 我们使用s.next,除非它被取消,在这种情况下,我们尝试一个过去的节点。
             * 我们不再进一步检查,因为我们不希望双重穿越而找到哨兵。
             */

            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next;

            // Absorb cancelled nodes at head 在头部吸收取消的节点
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);

            // Unsplice embedded nodes 无法嵌入节点
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }        

3. Dual Queue的TransferQueue实现

(1). Node节点实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    /*
         * This extends Scherer-Scott dual queue algorithm, differing,
         * among other ways, by using modes within nodes rather than
         * marked pointers. The algorithm is a little simpler than
         * that for stacks because fulfillers do not need explicit
         * nodes, and matching is done by CAS'ing QNode.item field
         * from non-null to null (for put) or vice versa (for take).
         * 本算法实现拓展了Scherer-Scott双队列算法,不同的是用节点模式,
         * 而不是标记指针来区分节点操作类型。这个算法比栈算法的实现简单,
         *  因为fulfillers需要明确指定节点,同时匹配节点用CAS操作QNode的
         *  元素field即可,put操作从非null到null,反则亦然,take从null到非null。
         */

        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;

            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }

            boolean casNext(QNode cmp, QNode val) {
                return next == cmp &&
                    UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
            }

            boolean casItem(Object cmp, Object val) {
                return item == cmp &&
                    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
            }

            /**
             * Tries to cancel by CAS'ing ref to this as item.
             */
            void tryCancel(Object cmp) {
                UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
            }

            boolean isCancelled() {
                return item == this;
            }

            /**
             * Returns true if this node is known to be off the queue
             * because its next pointer has been forgotten due to
             * an advanceHead operation.
             * 如果已知此节点不在队列中,则返回true,因为由于advanceHead操作,其下一个指针已被遗忘。
             *
             */
            boolean isOffList() {
                return next == this;
            }

            // Unsafe mechanics
            private static final sun.misc.Unsafe UNSAFE;
            private static final long itemOffset;
            private static final long nextOffset;

            static {
                try {
                    UNSAFE = sun.misc.Unsafe.getUnsafe();
                    Class k = QNode.class;
                    itemOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("item"));
                    nextOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("next"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }

(2). TransferQueue构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    /** Dual Queue */
    static final class TransferQueue extends Transferer {
       /** Head of queue  头节点*/
        transient volatile QNode head;
        /** Tail of queue  尾节点*/
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it cancelled. 
         * 刚入队列的节点,取消等待,但还没有出队列的节点
         */
        transient volatile QNode cleanMe;

        TransferQueue() {
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

        /**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
         * 尝试设置新的队头节点为nh,并比较旧头节点,成功则,解除旧队列头节点的next链接,及指向自己
         */
        void advanceHead(QNode h, QNode nh) {
            if (h == head &&
                UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
                h.next = h; // forget old next
        }

        /**
         * Tries to cas nt as new tail.
         */
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
        }

        /**
         * Tries to CAS cleanMe slot.
         */
        boolean casCleanMe(QNode cmp, QNode val) {
            return cleanMe == cmp &&
                UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
        }
        
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long cleanMeOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = TransferQueue.class;
                headOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("head"));
                tailOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("tail"));
                cleanMeOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("cleanMe"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }        
    }

(3).transfer(Put or takes an item)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
  /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             * 基本算法是循环尝试,执行下面两个步中的,其中一个:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *    如果队列为空,或队列中为相同模式的节点,尝试节点入队列等待,
             *      直到fulfilled,返回匹配元素,或者由于中断,超时取消等待。
             *
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *    如果队列中包含节点,transfer方法被一个协同模式的节点调用,
             *    则尝试补给或填充等待线程节点的元素,并出队列,返回匹配元素。
             *    
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             * 在每一种情况,执行的过程中,检查和尝试帮助其他stalled/slow线程移动队列头和尾节点
             * 
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             *  循环开始,首先进行null检查,防止为初始队列头和尾节点。当然这种情况,
             *  在当前同步队列中,不可能发生,如果调用持有transferer的non-volatile/final引用,
             *  可能出现这种情况。一般在循环的开始,都要进行null检查,检查过程非常快,不用过多担心
             *  性能问题。
             *
             */

            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode  1. 如果队列为空,或当前节点与队尾模式相同
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read      读取不一致性,这里有竞争问题。
                        continue;
                    if (tn != null) {               // lagging tail              尾节点.next 不为空,CAS设置新尾节点
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait                不等待 返回null
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);                               //根据元素和模式构造节点
                    if (!t.casNext(null, s))        // failed to link in    设置头节点.next     
                        continue;

                    advanceTail(t, s);              // swing tail and wait         把s节点设置尾节点
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled          节点取消
                        clean(t, s);                                              
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked  如果s节点已经不再队列中,next==this 在advanceHead设置头节点,以前头节点next指定自己本身
                        advanceHead(t, s);          // unlink if head       
                        if (x != null)              // and forget fields
                            s.item = s;             // 指定头节点只是取消节点。
                        s.waiter = null;
                    }
                    return (x != null) ? x : e;

                } else {                            // complementary-mode     如果队列不为空,且与队头的模式不同,及匹配成功
                    QNode m = h.next;               // node to fulfill        匹配头节点 next节点
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled  头节点isData模式
                        x == m ||                   // m cancelled          取消节点
                        !m.casItem(x, e)) {         // lost CAS             更新头节点item值
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled 
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? x : e;
                }
            }
        }
  /**
         * Spins/blocks until node s is fulfilled.
         * 自旋或阻塞直到节点被fulfilled
         * 
         * @param s the waiting node
         * @param e the comparison value for checking match
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched item, or s if cancelled
         */
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            long lastTime = timed ? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?           //自旋次数
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())      //线程是否中断 中断把节点设置尾取消节点 
                    s.tryCancel(e);     
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;        //计算等待时间
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel(e);            //设置取消节点
                        continue;
                    }
                }
                if (spins > 0)                   //自选次数
                    --spins;
                else if (s.waiter == null)       //设置等待线程
                    s.waiter = w;                
                else if (!timed)      
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }   
       /**
         * Gets rid of cancelled node s with original predecessor pred.
         * 移除队列中取消等待的线程节点
         */
        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * At any given time, exactly one node on list cannot be
             * deleted -- the last inserted node. To accommodate this,
             * if we cannot delete s, we save its predecessor as
             * "cleanMe", deleting the previously saved version
             * first. At least one of node s or the node previously
             * saved can always be deleted, so this always terminates.
             *  在任何时候,最后一个节点入队列时,队列中都有可能存在取消等待,但没有删除的节点。
             *  为了将这些节点删除,如果我们不能删除最后入队列的节点,我们可以用cleanMe记录它的前驱,
             *  删除cleanMe后继节点。s节点和cleanMe后继节点至少一个删除,则停止。
             */
            while (pred.next == s) { // Return early if already unlinked
                QNode h = head;
                QNode hn = h.next;   // Absorb cancelled first node as head 吸收取消的第一个节点作为头
                if (hn != null && hn.isCancelled()) {   //如果队头不为空,且取消等待,设置后继为新的队头元素
                    advanceHead(h, hn);
                    continue;
                }
                QNode t = tail;      // Ensure consistent read for tail 确保tail一致
                if (t == h)
                    return;
                QNode tn = t.next;
                if (t != tail)
                    continue;
                if (tn != null) {
                    advanceTail(t, tn);
                    continue;
                }
                if (s != t) {        // If not tail, try to unsplice  如果s不是tail,试着解开
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn))             //CAS unlike节点
                        return;
                }
                QNode dp = cleanMe;   
                if (dp != null) {    // Try unlinking previous cancelled node  
                    QNode d = dp.next;
                    QNode dn;
                    //移除前一个取消等待的节点
                    if (d == null ||               // d is gone or   
                        d == dp ||                 // d is off list or   已移除
                        !d.isCancelled() ||        // d not cancelled or 不是取消节点
                        (d != t &&                 // d not tail and     删除节点不是尾节点并且next不为空且不等本身,cas更新
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       // d unspliced
                        casCleanMe(dp, null);        //清楚上一个取消节点
                    if (dp == pred)
                        return;      // s is already saved node
                } else if (casCleanMe(null, pred))     //记录删除节点,前节点记录
                    return;          // Postpone cleaning s
            }
        }
     

4. SynchronousQueue 构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -3223113410248163686L;
    /** The number of CPUs, for spin control  CPU的数量,用于旋转控制*/
    static final int NCPUS = Runtime.getRuntime().availableProcessors();

    /**
     * The number of times to spin before blocking in timed waits.
     * The value is empirically derived -- it works well across a
     * variety of processors and OSes. Empirically, the best value
     * seems not to vary with number of CPUs (beyond 2) so is just
     * a constant.
     * 定时等待阻塞之前旋转的次数。价值是从经验中得出的 - 它适用于各种处理器和操作系统。
     * 经验上,最好的价值似乎不会随着CPU数量(超过2)而变化,所以只是一个常数。
     */
    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

    /**
     * The number of times to spin before blocking in untimed waits.
     * This is greater than timed value because untimed waits spin
     * faster since they don't need to check times on each spin.
     * 在不计时的等待中阻塞之前旋转的次数。这大于定时值,因为不需要在每次旋转时检查时间,所以不定时地等待旋转更快。
     */
    static final int maxUntimedSpins = maxTimedSpins * 16;

    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices.
     * 他用数秒的时间快速旋转,而不是使用定时停车。粗略的估计就足够了。
     */
    static final long spinForTimeoutThreshold = 1000L;
   /**
     * The transferer. Set only in constructor, but cannot be declared
     * as final without further complicating serialization.  Since
     * this is accessed only at most once per public method, there
     * isn't a noticeable performance penalty for using volatile
     * instead of final here.
     * transferer只在构造函数中设置,但不能在没有进一步复杂的序列化的情况下声明为final。
     * 由于每个公共方法最多只能访问一次,所以在这里使用volatile来代替final是没有明显的性能损失。
     */
    private transient volatile Transferer transferer;

    /**
     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a <tt>SynchronousQueue</tt> with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }
}  

二、入队

1. offer方法

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit) 将指定元素插入该队列的尾部,如果队列已满,DelayQueue不支持等待时间,主要数据元素保存到优先队列中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 /**
     * Inserts the specified element into this queue, if another thread is
     * waiting to receive it.
     *
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    /**
     * Inserts the specified element into this queue, waiting if necessary
     * up to the specified wait time for another thread to receive it.
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
     *         specified waiting time elapses before a consumer appears.
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }    

2. add方法

//AbstractQueue类中

1
2
3
4
5
6
   public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

3. put方法

  • 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    /**
     * Adds the specified element to this queue, waiting if necessary for
     * another thread to receive it.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

三、出队

1. poll方法

  • poll() 获取并移除此队列的头,如果此队列为空,则返回 null
  • E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,队列为空时,在指定的等待时间前等待可用的元素。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    /**
     * Retrieves and removes the head of this queue, if another thread
     * is currently making an element available.
     *
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }
    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time, for another thread
     * to insert it.
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is present.
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }    

2. take方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
 /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * for another thread to insert it.
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
        Thread.interrupted();
        throw new InterruptedException();
    }

3. peek方法

peek方法返回为空,没用容量。

四、移除队列

1. remove方法

没有容量,没有移除值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    /**
     * Always returns <tt>false</tt>.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     *
     * @param o the element to remove
     * @return <tt>false</tt>
     */
    public boolean remove(Object o) {
        return false;
    }

2. clear方法

1
2
3
4
5
6
    /**
     * Does nothing.
     * A <tt>SynchronousQueue</tt> has no internal capacity.
     */
    public void clear() {
    }

3. drainTo方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

    /**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

五、Iterator 迭代器实现

没有容量没有实现迭代器

1
2
3
    public Iterator<E> iterator() {
        return Collections.emptyIterator();
    }