LinkedTransferQueue源码分析

简介

  LinkedTransferQueue是TransferQueue接口的实现类,其定义为一个无界的队列,具有先进先出(FIFO)的特性。

  有人这样评价它:“TransferQueue是是ConcurrentLinkedQueue、SynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。”

一、LinkedTransferQueue数据结构实现

1. Node 实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
    static final class Node {
        final boolean isData;   // false if this is a request node 如果这是一个请求节点,则为false
        volatile Object item;   // initially non-null if isData; CASed to match  isData为true时,item存放数据,后面匹配后置为null
        volatile Node next;
        volatile Thread waiter; // null until waiting

        // CAS methods for fields
        final boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        final boolean casItem(Object cmp, Object val) {
            // assert cmp == null || cmp.getClass() != Node.class;
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(Object item, boolean isData) {
            UNSAFE.putObject(this, itemOffset, item); // relaxed write
            this.isData = isData;
        }

        /**
         * Links node to itself to avoid garbage retention.  Called
         * only after CASing head field, so uses relaxed write.
         * 将节点链接到自身以避免垃圾留存。只有在CAS 头节点之后调用,所以使用宽松的写法。
         */
        final void forgetNext() {
            UNSAFE.putObject(this, nextOffset, this);
        }

        /**
         * Sets item to self and waiter to null, to avoid garbage
         * retention after matching or cancelling. Uses relaxed writes
         * because order is already constrained in the only calling
         * contexts: item is forgotten only after volatile/atomic
         * mechanics that extract items.  Similarly, clearing waiter
         * follows either CAS or return from park (if ever parked;
         * else we don't care).
         * 将item 设置为自己并将waiter 设置为空,以避免匹配或取消后的垃圾留存。
         * 使用轻松的写入,因为顺序已经被限制在唯一的调用上下文中:只有在提取item的 volatile/atomic 之后,item才被遗忘。同样,
         * 清空 waiter 要么遵循CAS,要么从park 返回(如果 parked;,否则我们不在乎)。
         */
        final void forgetContents() {
            UNSAFE.putObject(this, itemOffset, this);
            UNSAFE.putObject(this, waiterOffset, null);
        }

        /**
         * Returns true if this node has been matched, including the
         * case of artificial matches due to cancellation.
         * 如果此节点已匹配,则返回true,包括由于取消而导致的人为匹配。
         */
        final boolean isMatched() {
            Object x = item;
            return (x == this) || ((x == null) == isData);
        }

        /**
         * Returns true if this is an unmatched request node.
         * 是否请求节点
         */
        final boolean isUnmatchedRequest() {
            return !isData && item == null;
        }

        /**
         * Returns true if a node with the given mode cannot be
         * appended to this node because this node is unmatched and
         * has opposite data mode.
         * 如果具有给定模式的节点不能附加到该节点,则返回true,因为此节点不匹配,并且具有相反的数据模式。
         */
        final boolean cannotPrecede(boolean haveData) {
            boolean d = isData;
            Object x;
            return d != haveData && (x = item) != this && (x != null) == d;
        }

        /**
         * Tries to artificially match a data node -- used by remove.
         */
        final boolean tryMatchData() {
            // assert isData;
            Object x = item;
            if (x != null && x != this && casItem(x, null)) {
                LockSupport.unpark(waiter);
                return true;
            }
            return false;
        }

        private static final long serialVersionUID = -3375979862319811754L;

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
        private static final long waiterOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
                waiterOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiter"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

2. LinkedTransferQueue 构造

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {
 
 /*
  * *** Overview of Dual Queues with Slack *** 双重队列的概述
  *
  * Dual Queues, introduced by Scherer and Scott
  * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
  * (linked) queues in which nodes may represent either data or
  * requests.  When a thread tries to enqueue a data node, but
  * encounters a request node, it instead "matches" and removes it;
  * and vice versa for enqueuing requests. Blocking Dual Queues
  * arrange that threads enqueuing unmatched requests block until
  * other threads provide the match. Dual Synchronous Queues (see
  * Scherer, Lea, & Scott
  * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
  * additionally arrange that threads enqueuing unmatched data also
  * block.  Dual Transfer Queues support all of these modes, as
  * dictated by callers.
  *
  * A FIFO dual queue may be implemented using a variation of the
  * Michael & Scott (M&S) lock-free queue algorithm
  * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
  * It maintains two pointer fields, "head", pointing to a
  * (matched) node that in turn points to the first actual
  * (unmatched) queue node (or null if empty); and "tail" that
  * points to the last node on the queue (or again null if
  * empty). For example, here is a possible queue with four data
  * elements:
  * 使用Michael & Scott(M & S)无锁队列算法的变体,可以实现FIFO双队列.
  * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
  *  它维护两个指针字段“head”,指向一个(匹配的)节点,该节点指向第一个实际(未匹配的)队列节点(如果空);
  *  以及指向队列上最后一个节点的“tail”(如果是空的,则返回null)。例如,这里有一个带有四个数据元素的队列:
  *  head                tail
  *    |                   |
  *    v                   v
  *    M -> U -> U -> U -> U
  *
  * The M&S queue algorithm is known to be prone to scalability and
  * overhead limitations when maintaining (via CAS) these head and
  * tail pointers. This has led to the development of
  * contention-reducing variants such as elimination arrays (see
  * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
  * optimistic back pointers (see Ladan-Mozes & Shavit
  * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
  * However, the nature of dual queues enables a simpler tactic for
  * improving M&S-style implementations when dual-ness is needed.
  * 已知M&S队列算法在维护(通过CAS)这些头部和尾部指针时易于出现可扩展性和开销限制。
  * 这导致了减少竞争的变体的开发,如消除数组和乐观的后退指针。然而,当双重队列需要时,
  * 双重队列的本质使得更简单的策略可以改进M&S风格的实现。
  * 
  *
  * In a dual queue, each node must atomically maintain its match
  * status. While there are other possible variants, we implement
  * this here as: for a data-mode node, matching entails CASing an
  * "item" field from a non-null data value to null upon match, and
  * vice-versa for request nodes, CASing from null to a data
  * value. (Note that the linearization properties of this style of
  * queue are easy to verify -- elements are made available by
  * linking, and unavailable by matching.) Compared to plain M&S
  * queues, this property of dual queues requires one additional
  * successful atomic operation per enq/deq pair. But it also
  * enables lower cost variants of queue maintenance mechanics. (A
  * variation of this idea applies even for non-dual queues that
  * support deletion of interior elements, such as
  * j.u.c.ConcurrentLinkedQueue.)
  * 在双队列中,每个节点必须自动维护其匹配状态。虽然还有其他可能的变体,但我们在这里实现:对于数据模式节点,
  * 匹配需要将非空数据值中的"item"字段填充为匹配时为空,反之亦然,请求节点null为数据值。 
  *(请注意,这种类型的队列的线性化属性很容易验证 - 元素通过链接提供,而不能通过匹配)。与普通的M&S队列相比,双队列的这个属性需要每个enq / deq对。
  * 但是它也可以实现队列维护机制的更低成本的变体。 (即使对支持删除内部元素的非双重队列,如j.u.c.ConcurrentLinkedQueue也适用这种思路的变体。)
  *
  * Once a node is matched, its match status can never again
  * change.  We may thus arrange that the linked list of them
  * contain a prefix of zero or more matched nodes, followed by a
  * suffix of zero or more unmatched nodes. (Note that we allow
  * both the prefix and suffix to be zero length, which in turn
  * means that we do not use a dummy header.)  If we were not
  * concerned with either time or space efficiency, we could
  * correctly perform enqueue and dequeue operations by traversing
  * from a pointer to the initial node; CASing the item of the
  * first unmatched node on match and CASing the next field of the
  * trailing node on appends. (Plus some special-casing when
  * initially empty).  While this would be a terrible idea in
  * itself, it does have the benefit of not requiring ANY atomic
  * updates on head/tail fields.
  * 一旦节点匹配,其匹配状态就不会再改变。因此,我们可以安排它们的链表包含零个或多个匹配节点的前缀,后跟零个或多个不匹配节点的后缀。 
  * (请注意,我们允许前缀和后缀为零长度,这意味着我们不使用虚拟标头。)如果我们不关心时间或空间的效率,我们可以正确执行入队和出队操作从指针遍历到初始节点; 
  * CASing 将匹配的第一个未匹配节点的item,并在追加的末尾处CASing下一个字段。。(加上一些特殊的外壳时,最初是空的)。虽然这本身就是一个可怕的想法,
  * 但它的好处是不需要在头部/尾部字段上进行任何原子更新。
  *
  *
  * We introduce here an approach that lies between the extremes of
  * never versus always updating queue (head and tail) pointers.
  * This offers a tradeoff between sometimes requiring extra
  * traversal steps to locate the first and/or last unmatched
  * nodes, versus the reduced overhead and contention of fewer
  * updates to queue pointers. For example, a possible snapshot of
  * a queue is:
  * 我们在这里介绍一种位于从不与总是更新队列(头部和尾部)指针的极端之间的方法。
  * 这提供了有时需要额外的遍历步骤来定位第一个和/或最后一个不匹配的节点与降低的开销和对队列指针的较少更新的争用之间的折衷。
  * 例如,一个可能的队列快照是
  *
  *  head           tail
  *    |              |
  *    v              v
  *    M -> M -> U -> U -> U -> U
  *
  * The best value for this "slack" (the targeted maximum distance
  * between the value of "head" and the first unmatched node, and
  * similarly for "tail") is an empirical matter. We have found
  * that using very small constants in the range of 1-3 work best
  * over a range of platforms. Larger values introduce increasing
  * costs of cache misses and risks of long traversal chains, while
  * smaller values increase CAS contention and overhead.
  * 对于这种"松弛"("头"的值和第一个不匹配的节点之间的目标最大距离,以及类似于"尾巴")的最佳值是经验问题。
  * 我们发现使用1-3范围内的非常小的常数在一系列平台上工作得最好。较大的值会导致缓存未命中的增加成本以及长遍历链的风险,而较小的值会增加CAS竞争和开销。
  *
  * Dual queues with slack differ from plain M&S dual queues by
  * virtue of only sometimes updating head or tail pointers when
  * matching, appending, or even traversing nodes; in order to
  * maintain a targeted slack.  The idea of "sometimes" may be
  * operationalized in several ways. The simplest is to use a
  * per-operation counter incremented on each traversal step, and
  * to try (via CAS) to update the associated queue pointer
  * whenever the count exceeds a threshold. Another, that requires
  * more overhead, is to use random number generators to update
  * with a given probability per traversal step.
  * 由于在匹配,追加或遍历节点时只有时更新头部或尾部指针,所以具有松弛的双重队列不同于普通的M&S双重队列;以维持目标松懈。 
  * "有时"的想法可以通过几种方式来实施。最简单的方法是在每个遍历步骤中使用逐个计数的计数器,并在计数超过阈值时尝试(通过CAS)更新关联的队列指针。
  * 另一个需要更多开销的方法是使用随机数生成器,以每个遍历步骤的给定概率进行更新。
  *
  * In any strategy along these lines, because CASes updating
  * fields may fail, the actual slack may exceed targeted
  * slack. However, they may be retried at any time to maintain
  * targets.  Even when using very small slack values, this
  * approach works well for dual queues because it allows all
  * operations up to the point of matching or appending an item
  * (hence potentially allowing progress by another thread) to be
  * read-only, thus not introducing any further contention. As
  * described below, we implement this by performing slack
  * maintenance retries only after these points.
  * 在沿着这些路线的任何策略中,因为CASes更新字段可能失败,所以实际的松弛可能超过目标松弛。
  * 但是,他们可能会随时重试以维持目标。即使使用非常小的松弛值,这种方法也适用于双重队列,因为它允许所有的操作直到匹配或附加一个item(因此可能允许另一个线程的进度)
  * 为只读的,因此不会引入任何进一步的争。如下所述,我们通过仅在这些点之后执行松弛维护重试来实现这一点。
  *
  * As an accompaniment to such techniques, traversal overhead can
  * be further reduced without increasing contention of head
  * pointer updates: Threads may sometimes shortcut the "next" link
  * path from the current "head" node to be closer to the currently
  * known first unmatched node, and similarly for tail. Again, this
  * may be triggered with using thresholds or randomization.
  * 作为这种技术的伴随,在不增加头指针更新争用的情况下,可以进一步减少遍历开销:线程有时可以使从当前"头"节点开始的"下一个"链接路径更接近当前已知的第一个不匹配节点,
  * 类似的尾巴。再一次地,这可以通过使用阈值或随机化来触发。
  *
  * These ideas must be further extended to avoid unbounded amounts
  * of costly-to-reclaim garbage caused by the sequential "next"
  * links of nodes starting at old forgotten head nodes: As first
  * described in detail by Boehm
  * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
  * delays noticing that any arbitrarily old node has become
  * garbage, all newer dead nodes will also be unreclaimed.
  * (Similar issues arise in non-GC environments.)  To cope with
  * this in our implementation, upon CASing to advance the head
  * pointer, we set the "next" link of the previous head to point
  * only to itself; thus limiting the length of connected dead lists.
  * (We also take similar care to wipe out possibly garbage
  * retaining values held in other Node fields.)  However, doing so
  * adds some further complexity to traversal: If any "next"
  * pointer links to itself, it indicates that the current thread
  * has lagged behind a head-update, and so the traversal must
  * continue from the "head".  Traversals trying to find the
  * current tail starting from "tail" may also encounter
  * self-links, in which case they also continue at "head".
  *这些想法必须进一步扩展,以避免从旧的被遗忘的头节点开始的连续的"下一个"节点链接造成的成本高昂的回收垃圾的无限量:
  * 如Boehm(http://portal.acm。 org / citation.cfm?doid = 503272.503282)如果GC延迟注意到任何旧的节点已经变成垃圾,所有更新的死节点也将被废弃。 
  * (在非GC环境中也会出现类似的问题)在我们的实现中,为了应对这种情况,在CASing提高头指针时,我们设置前一个头的"下一个"链接指向它自己;从而限制连接的死亡列表的长度。
  * (我们也采取类似的措施来消除其他节点字段中可能存在的垃圾保留值)。但是,这样做会增加一些进一步的复杂性:如果任何"下一个"指针链接到自身,则表示当前线程已经落后
  *  头部更新,所以遍历必须从"头部"继续。遍历试图从"尾巴"开始寻找当前的尾巴也可能遇到自连接,在这种情况下,它们也继续在"头部"。
  *  
  * It is tempting in slack-based scheme to not even use CAS for
  * updates (similarly to Ladan-Mozes & Shavit). However, this
  * cannot be done for head updates under the above link-forgetting
  * mechanics because an update may leave head at a detached node.
  * And while direct writes are possible for tail updates, they
  * increase the risk of long retraversals, and hence long garbage
  * chains, which can be much more costly than is worthwhile
  * considering that the cost difference of performing a CAS vs
  * write is smaller when they are not triggered on each operation
  * (especially considering that writes and CASes equally require
  * additional GC bookkeeping ("write barriers") that are sometimes
  * more costly than the writes themselves because of contention).
  * 基于松弛的方案很诱人,甚至不使用CAS进行更新(类似于Ladan-Mozes&Shavit)。
  * 但是,在上述链接遗忘机制下,这不能用于头部更新,因为更新可能会使头部处于分离节点。
  * 尽管直接写入对于尾部更新是可能的,但是它们增加了长时间回缩的风险,并且因此增加了长的垃圾链,
  * 考虑到执行CAS与写入的成本差异当不是更小时,可能成本更高(尤其是考虑到写入和CASes同样需要额外的GC簿记(“写入障碍”),
  * 由于争用的原因,有时比写入本身更昂贵)。
  *
  * *** Overview of implementation ***
  *
  * We use a threshold-based approach to updates, with a slack
  * threshold of two -- that is, we update head/tail when the
  * current pointer appears to be two or more steps away from the
  * first/last node. The slack value is hard-wired: a path greater
  * than one is naturally implemented by checking equality of
  * traversal pointers except when the list has only one element,
  * in which case we keep slack threshold at one. Avoiding tracking
  * explicit counts across method calls slightly simplifies an
  * already-messy implementation. Using randomization would
  * probably work better if there were a low-quality dirt-cheap
  * per-thread one available, but even ThreadLocalRandom is too
  * heavy for these purposes.
  * 我们使用基于阈值的方法进行更新,其中松弛阈值为2--
  * 也就是说,当当前指针显示为离第一个/最后一个节点的两个或更多的步骤时,我们将更新head / tail。
  * 他的松弛值是硬连线的:大于1的路径自然是通过检查遍历指针的相等性来实现的,除非列表只有一个元素,在这种情况下,我们保持松弛阈值为1。
  * 避免跟踪方法调用中的显式计数会略微简化已经很混乱的实现.
  * 使用随机化可能会更好,如果有一个低质量的廉价的每线程可用,但即使ThreadLocalRandom太重这些目的。
  *
  * With such a small slack threshold value, it is not worthwhile
  * to augment this with path short-circuiting (i.e., unsplicing
  * interior nodes) except in the case of cancellation/removal (see
  * below).
  * 有了这样一个小的松弛门限值,除了取消/删除(见下文)的情况之外,不值得用路径短的路径(即,非拼接的内部节点)来增加这个值 
  *
  * We allow both the head and tail fields to be null before any
  * nodes are enqueued; initializing upon first append.  This
  * simplifies some other logic, as well as providing more
  * efficient explicit control paths instead of letting JVMs insert
  * implicit NullPointerExceptions when they are null.  While not
  * currently fully implemented, we also leave open the possibility
  * of re-nulling these fields when empty (which is complicated to
  * arrange, for little benefit.)
  * 在任何节点入队之前,我们允许头字段和尾字段为空;初次追加。
  * 这简化了一些其他的逻辑,并提供了更高效的显式控制路径,而不是让JVMs在其为空时插入隐式NullPointerExceptions 。
  * 虽然目前还没有完全实施,但是我们也可以在空的时候重新确定这些领域的可能性(这很复杂,没有什么好处)。
  *
  * All enqueue/dequeue operations are handled by the single method
  * "xfer" with parameters indicating whether to act as some form
  * of offer, put, poll, take, or transfer (each possibly with
  * timeout). The relative complexity of using one monolithic
  * method outweighs the code bulk and maintenance problems of
  * using separate methods for each case.
  * 所有enqueue / dequeue操作都由单个方法“xfer”来处理,参数指示是否作为某种形式的offer、put、poll、take或transfer(每个可能有超时)。
  * 使用单方法的相对复杂性超过了对每种情况使用单独方法的代码批量和维护问题。
  * 
  * Operation consists of up to three phases. The first is
  * implemented within method xfer, the second in tryAppend, and
  * the third in method awaitMatch.
  * 操作包括三个阶段。第一个在方法xfer中实现,第二个在tryAppend,第三个在方法awaitMatch。
  *
  * 1. Try to match an existing node 尝试匹配现有节点
  *
  *    Starting at head, skip already-matched nodes until finding
  *    an unmatched node of opposite mode, if one exists, in which
  *    case matching it and returning, also if necessary updating
  *    head to one past the matched node (or the node itself if the
  *    list has no other unmatched nodes). If the CAS misses, then
  *    a loop retries advancing head by two steps until either
  *    success or the slack is at most two. By requiring that each
  *    attempt advances head by two (if applicable), we ensure that
  *    the slack does not grow without bound. Traversals also check
  *    if the initial head is now off-list, in which case they
  *    start at the new head.
  *    从head开始,跳过已匹配的节点,直到找到与模式相反的未匹配节点,如果存在,在这种情况下,匹配它并返回,
  *    如果需要更新头到一个经过匹配的节点(或者节点本身,如果列表具有没有其他不匹配的节点)。
  *    如果CAS未命中,则循环重试前进两步,直到成功或松弛最多为两次。
  *    通过要求每次尝试都提前两次(如果适用),我们确保松弛不会无限制地增长。遍历还会检查最初的头是否已经不在列表中,在这种情况下,他们从新的头开始。
  *
  *    If no candidates are found and the call was untimed 
  *    poll/offer, (argument "how" is NOW) return.
  *    如果没有找到候选人,并且该调用是不计时的poll/offer,(参数"how"是NOW)返回。
  *
  * 2. Try to append a new node (method tryAppend) 尝试添加一个新节点(方法tryAppend)
  *
  *    Starting at current tail pointer, find the actual last node
  *    and try to append a new node (or if head was null, establish
  *    the first node). Nodes can be appended only if their
  *    predecessors are either already matched or are of the same
  *    mode. If we detect otherwise, then a new node with opposite
  *    mode must have been appended during traversal, so we must
  *    restart at phase 1. The traversal and update steps are
  *    otherwise similar to phase 1: Retrying upon CAS misses and
  *    checking for staleness.  In particular, if a self-link is
  *    encountered, then we can safely jump to a node on the list
  *    by continuing the traversal at current head.
  *    从当前的tail指针开始,找到实际的最后一个节点,并尝试添加一个新节点(或者如果head为null,则建立第一个节点)。
  *    如果节点已经匹配或处于相同的模式,则可以添加节点。如果我们检测到其他情况,则必须在遍历过程中追加一个具有相反模式的新节点,因此我们必须在第1阶段重新启动。
  *    遍历和更新步骤与第1阶段类似:对CAS的重试失败和检查过时。特别地,如果遇到一个自链接,那么我们可以通过在当前head上继续遍历来安全地跳转到列表中的节点。
  *    
  *    On successful append, if the call was ASYNC, return.
  *    在成功追加的上,如果调用是ASYNC,则返回。
  *
  * 3. Await match or cancellation (method awaitMatch) 等待匹配或取消(方法awaitMatch)
  *
  *    Wait for another thread to match node; instead cancelling if
  *    the current thread was interrupted or the wait timed out. On
  *    multiprocessors, we use front-of-queue spinning: If a node
  *    appears to be the first unmatched node in the queue, it
  *    spins a bit before blocking. In either case, before blocking
  *    it tries to unsplice any nodes between the current "head"
  *    and the first unmatched node.
  *    等待另一个线程匹配节点;如果当前线程被中断或等待超时,则取消。在多处理器上,我们使用front-of-queue旋转:
  *    如果一个节点看起来是队列中第一个不匹配的节点,它将在阻塞之前旋转一点。在任何一种情况下,在阻塞之前,它都会试图在当前的"头"和第一个不匹配的节点之间的任何节点上解开。
  *
  *    Front-of-queue spinning vastly improves performance of
  *    heavily contended queues. And so long as it is relatively
  *    brief and "quiet", spinning does not much impact performance
  *    of less-contended queues.  During spins threads check their
  *    interrupt status and generate a thread-local random number
  *    to decide to occasionally perform a Thread.yield. While
  *    yield has underdefined specs, we assume that might it help,
  *    and will not hurt in limiting impact of spinning on busy
  *    systems.  We also use smaller (1/2) spins for nodes that are
  *    not known to be front but whose predecessors have not
  *    blocked -- these "chained" spins avoid artifacts of
  *    front-of-queue rules which otherwise lead to alternating
  *    nodes spinning vs blocking. Further, front threads that
  *    represent phase changes (from data to request node or vice
  *    versa) compared to their predecessors receive additional
  *    chained spins, reflecting longer paths typically required to
  *    unblock threads during phase changes.
  *    Front-of-queue队列旋转极大地提高了大量争用队列的性能。只要它比较简短和"quiet",旋转对排队较少的队列的性能影响不大。
  *    在旋转线程中检查他们的中断状态并生成一个thread-local随机数来决定偶尔执行一个Thread.yield。
  *    虽然yield 有未定义的规格,但我们认为它可能会有所帮助,而且不会影响在繁忙的系统中旋转的影响。
  *    我们也使用较小的(1/2)的自旋为节点,这些节点不知道是前面的,但其前身没有被阻塞——这些“链”自旋避免了front-of-queue规则的工件,否则会导致交错的节点旋转vs阻塞。
  *    此外,与他们的前辈相比,代表阶段变化的前端线程(从数据到请求节点,或者反之亦然),会得到额外的链接自旋,这反映了在阶段变化过程中通常需要打开线程的更长的路径。
  *    此外,表示相位变化(从数据到请求节点或反之亦然)的前线程相比于其前身接收额外的链式自旋,反映了通常在相位变化期间解锁线程所需的较长路径。
  *    
  * ** Unlinking removed interior nodes ** 链接删除内部节点
  *
  * In addition to minimizing garbage retention via self-linking
  * described above, we also unlink removed interior nodes. These
  * may arise due to timed out or interrupted waits, or calls to
  * remove(x) or Iterator.remove.  Normally, given a node that was
  * at one time known to be the predecessor of some node s that is
  * to be removed, we can unsplice s by CASing the next field of
  * its predecessor if it still points to s (otherwise s must
  * already have been removed or is now offlist). But there are two
  * situations in which we cannot guarantee to make node s
  * unreachable in this way: (1) If s is the trailing node of list
  * (i.e., with null next), then it is pinned as the target node
  * for appends, so can only be removed later after other nodes are
  * appended. (2) We cannot necessarily unlink s given a
  * predecessor node that is matched (including the case of being
  * cancelled): the predecessor may already be unspliced, in which
  * case some previous reachable node may still point to s.
  * (For further explanation see Herlihy & Shavit "The Art of
  * Multiprocessor Programming" chapter 9).  Although, in both
  * cases, we can rule out the need for further action if either s
  * or its predecessor are (or can be made to be) at, or fall off
  * from, the head of list.
  * 
  * 除了通过上述自链接最小化垃圾留存之外,我们还取消了删除的内部节点的链接。这些可能是由于超时或中断等待,或调用删除(x)或Iterator.remove而引起的。
  * 通常情况下,如果一个节点曾经被认为是要删除的某个节点的前身,那么如果它的前一个字段仍然指向s,我们就可以通过加入它的下一个字段来取消修改
  * (否则s必须已经被删除删除或现在是offlist)。
  * 但有两种情况我们不能保证以这种方式使节点不可达:
  *   (1)如果s是列表的尾节点(即下一个为空),那么它被固定为附加的目标节点,所以只能在其他节点附加后才能删除。 
  *   (2)给定一个匹配的前驱节点(包 ),我们不一定解除链接:前驱可能已经是未被剪接的,在这种情况下,前一个可到达节点可能仍然指向s。 
  *   
  *   (进一步的解释见Herlihy&Shavit"多处理器编程的艺术"第9章)。虽然在这两种情况下,如果任何一方或其前任(或可以被制造)处于或脱离名单的首位,
  *   我们可以排除进一步采取行动的必要性。
  * 
  * Without taking these into account, it would be possible for an
  * unbounded number of supposedly removed nodes to remain
  * reachable.  Situations leading to such buildup are uncommon but
  * can occur in practice; for example when a series of short timed
  * calls to poll repeatedly time out but never otherwise fall off
  * the list because of an untimed call to take at the front of the
  * queue.
  * 
  * 如果不考虑这些因素,那么可能会有无数的假定被删除的节点保持可达状态。导致这种积聚的情况是不常见的,但可以在实践中发生;
  * 例如,当一系列短时间调用重复轮询时间超时,但是由于在队列前面进行不计时的调用,从不会以其他方式脱离列表
  *
  * When these cases arise, rather than always retraversing the
  * entire list to find an actual predecessor to unlink (which
  * won't help for case (1) anyway), we record a conservative
  * estimate of possible unsplice failures (in "sweepVotes").
  * We trigger a full sweep when the estimate exceeds a threshold
  * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
  * removal failures to tolerate before sweeping through, unlinking
  * cancelled nodes that were not unlinked upon initial removal.
  * We perform sweeps by the thread hitting threshold (rather than
  * background threads or by spreading work to other threads)
  * because in the main contexts in which removal occurs, the
  * caller is already timed-out, cancelled, or performing a
  * potentially O(n) operation (e.g. remove(x)), none of which are
  * time-critical enough to warrant the overhead that alternatives
  * would impose on other threads.
  * 
  * 当出现这些情况时,我们不是总是回顾整个列表以找到一个实际的前任解除联系(对于情况(1)无助于此),
  * 我们记录保守估计的可能的失败失败(在"sweepVotes"中)。当估计值超过阈值("SWEEP_THRESHOLD")时,
  * 我们会触发完全扫描,指示在扫描之前允许的最大移除失败次数的最大值,取消初始移除时取消关联的取消节点。
  * 我们通过线程触发阈值(而不是后台线程或将工作分散到其他线程)执行扫描,因为在发生删除的主要上下文中,
  * 调用者已经超时,取消或执行潜在的O(n)操作(例如remove(x)),其中没有一个对时间要求太高,不足以保证备选方案会对其他线程施加的开销。
  *
  * Because the sweepVotes estimate is conservative, and because
  * nodes become unlinked "naturally" as they fall off the head of
  * the queue, and because we allow votes to accumulate even while
  * sweeps are in progress, there are typically significantly fewer
  * such nodes than estimated.  Choice of a threshold value
  * balances the likelihood of wasted effort and contention, versus
  * providing a worst-case bound on retention of interior nodes in
  * quiescent queues. The value defined below was chosen
  * empirically to balance these under various timeout scenarios.
  *  由于清除投票的估计是保守的,并且由于节点脱离队列头部而"自然地"断开连接,并且因为即使在扫描正在进行时我们也允许投票累计,所以通常显着少于估计的节点。
  *  阈值的选择平衡了浪费的努力和争用的可能性,而不是在静止队列中保留内部节点的最坏情况。下面定义的值是根据经验选择的,以在各种超时情况下平衡这些值。
  * 
  *
  * Note that we cannot self-link unlinked interior nodes during
  * sweeps. However, the associated garbage chains terminate when
  * some successor ultimately falls off the head of the list and is
  * self-linked.
  * 请注意,在扫描期间,我们不能自行链接未连接的内部节点。然而,当一些后继者最终脱离列表头并且是自联系的时,相关的垃圾链终止。
  */
    private static final long serialVersionUID = -3223113410248163686L;
    /** True if on multiprocessor  如果在多处理器上,则为真*/
    private static final boolean MP =
        Runtime.getRuntime().availableProcessors() > 1;

    /**
     * The number of times to spin (with randomly interspersed calls
     * to Thread.yield) on multiprocessor before blocking when a node
     * is apparently the first waiter in the queue.  See above for
     * explanation. Must be a power of two. The value is empirically
     * derived -- it works pretty well across a variety of processors,
     * numbers of CPUs, and OSes.
     * 当一个节点显然是队列中的第一个waiter时,在阻塞之前对多处理器进行旋转的次数(对Thread.yield进行随机散布的调用)。请参阅上面的解释。必须是2的幂。
     * 这个值是凭经验推导出来的 - 在各种处理器,CPU数量和操作系统上都能很好地工作。
     * 
     * 自旋次数
     */
    private static final int FRONT_SPINS   = 1 << 7;

    /**
     * The number of times to spin before blocking when a node is
     * preceded by another node that is apparently spinning.  Also
     * serves as an increment to FRONT_SPINS on phase changes, and as
     * base average frequency for yielding during spins. Must be a
     * power of two.
     * 当一个节点位于另一个明显旋转的节点的前面时,在阻塞之前旋转的次数。也用作相位变化的FRONT_SPINS增量,以及旋转期间的屈服平均频率。必须是2的幂。
     */
    private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;

    /**
     * The maximum number of estimated removal failures (sweepVotes)
     * to tolerate before sweeping through the queue unlinking
     * cancelled nodes that were not unlinked upon initial
     * removal. See above for explanation. The value must be at least
     * two to avoid useless sweeps when removing trailing nodes.
     * 在清除整个队列之前,可以容忍的最大清除失败次数(sweepVote)取消链接已取消的节点,这些节点在初始删除时并未解除链接。
     * 请参阅上面的解释。删除尾随节点时,该值必须至少为2以避免无用的扫描。
     */
    static final int SWEEP_THRESHOLD = 32;
    /** head of the queue; null until first enqueue */
    //头节点
    transient volatile Node head;

    /** tail of the queue; null until first append */
    //尾节点
    private transient volatile Node tail;

    /** The number of apparent failures to unsplice removed nodes */
    //清除删除节点的明显故障数
    private transient volatile int sweepVotes;

    // CAS methods for fields
    private boolean casTail(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }

    private boolean casHead(Node cmp, Node val) {
        return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
    }

    private boolean casSweepVotes(int cmp, int val) {
        return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
    }

    /**
     * Creates an initially empty {@code LinkedTransferQueue}.
     */
    public LinkedTransferQueue() {
    }

    /**
     * Creates a {@code LinkedTransferQueue}
     * initially containing the elements of the given collection,
     * added in traversal order of the collection's iterator.
     *
     * @param c the collection of elements to initially contain
     * @throws NullPointerException if the specified collection or any
     *         of its elements are null
     */
    public LinkedTransferQueue(Collection<? extends E> c) {
        this();
        addAll(c);
    }

 // Unsafe mechanics

 private static final sun.misc.Unsafe UNSAFE;
 private static final long headOffset;
 private static final long tailOffset;
 private static final long sweepVotesOffset;
 static {
     try {
         UNSAFE = sun.misc.Unsafe.getUnsafe();
         Class k = LinkedTransferQueue.class;
         headOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("head"));
         tailOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("tail"));
         sweepVotesOffset = UNSAFE.objectFieldOffset
             (k.getDeclaredField("sweepVotes"));
     } catch (Exception e) {
         throw new Error(e);
     }
 }
 }

3. LinkedTransferQueue实现TransferQueue接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public interface TransferQueue<E> extends BlockingQueue<E> {
    /**
     * Transfers the element to a waiting consumer immediately, if possible.
     * 立即转交一个元素给消费者,如果此时队列没有消费者,那就false
     * 
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * otherwise returning {@code false} without enqueuing the element.
     *
     * @param e the element to transfer
     * @return {@code true} if the element was transferred, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean tryTransfer(E e);

    /**
     * Transfers the element to a consumer, waiting if necessary to do so.
     * 转交一个元素给消费者,如果此时队列没有消费者,那就阻塞
     *
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * else waits until the element is received by a consumer.
     *
     * @param e the element to transfer
     * @throws InterruptedException if interrupted while waiting,
     *         in which case the element is not left enqueued
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void transfer(E e) throws InterruptedException;

    /**
     * Transfers the element to a consumer if it is possible to do so
     * before the timeout elapses.
     * 如果在超时之前可以这样做,将元素转移给消费者。
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * else waits until the element is received by a consumer,
     * returning {@code false} if the specified wait time elapses
     * before the element can be transferred.
     *
     * @param e the element to transfer
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before completion,
     *         in which case the element is not left enqueued
     * @throws InterruptedException if interrupted while waiting,
     *         in which case the element is not left enqueued
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Returns {@code true} if there is at least one consumer waiting
     * to receive an element via {@link #take} or
     * timed {@link #poll(long,TimeUnit) poll}.
     * The return value represents a momentary state of affairs.
     * 是否有消费者等待接收数据,瞬时状态,不一定准
     * @return {@code true} if there is at least one waiting consumer
     */
    boolean hasWaitingConsumer();

    /**
     * Returns an estimate of the number of consumers waiting to
     * receive elements via {@link #take} or timed
     * {@link #poll(long,TimeUnit) poll}.  The return value is an
     * approximation of a momentary state of affairs, that may be
     * inaccurate if consumers have completed or given up waiting.
     * The value may be useful for monitoring and heuristics, but
     * not for synchronization control.  Implementations of this
     * method are likely to be noticeably slower than those for
     * {@link #hasWaitingConsumer}.
     *  返回还有多少个等待的消费者,跟上面那个一样,都是一种瞬时状态,不一定准
     *
     * @return the number of consumers waiting to receive elements
     */
    int getWaitingConsumerCount();
}

二、xfer LinkedTransferQueue核心方法

put和take等都调用这个方法。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
 /*
     * Possible values for "how" argument in xfer method.
     * xfer方法中的“how”参数的可能值。
     */
    private static final int NOW   = 0; // for untimed poll, tryTransfer
    private static final int ASYNC = 1; // for offer, put, add
    private static final int SYNC  = 2; // for transfer, take
    private static final int TIMED = 3; // for timed poll,timed tryTransfer

    @SuppressWarnings("unchecked")
    static <E> E cast(Object item) {
        // assert item == null || item.getClass() != Node.class;
        return (E) item;
    }

    /**
     * Implements all queuing methods. See above for explanation.
     *
     * @param e the item or null for take
     * @param haveData true if this is a put, else a take
     * @param how NOW, ASYNC, SYNC, or TIMED
     * @param nanos timeout in nanosecs, used only if mode is TIMED
     * @return an item if matched, else e
     * @throws NullPointerException if haveData mode but e is null
     */
    private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {                            // restart on append race

            for (Node h = head, p = h; p != null;) { // find & match first node  查找&匹配第一个节点
                boolean isData = p.isData;
                Object item = p.item;
                if (item != p && (item != null) == isData) { // unmatched    不相配
                    if (isData == haveData)   // can't match 不能匹配
                        break;
                    if (p.casItem(item, e)) { // match 匹配成功
                        for (Node q = p; q != h;) {   // 松弛一次进行for 循环
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) { //如果CAS未命中,则循环重试前进两步,直到成功或松弛最多为两次。
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2   除非松弛小于2
                        }
                        LockSupport.unpark(p.waiter);
                        return this.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);
                Node pred = tryAppend(s, haveData);  //尝试将节点s作为tail。
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
    /**
     * Tries to append node s as tail.
     * 尝试将节点s作为tail。
     * @param s the node to append
     * @param haveData true if appending in data mode
     * @return null on failure due to losing race with append in
     * different mode, else s's predecessor, or s itself if no
     * predecessor
     */
    private Node tryAppend(Node s, boolean haveData) {
        for (Node t = tail, p = t;;) {        // move p to last node and append  将p移动到最后一个节点并追加
            Node n, u;                        // temps for reads of next & tail
            if (p == null && (p = head) == null) { // 第一次调用
                if (casHead(null, s))
                    return s;                 // initialize 初始化
            }
            else if (p.cannotPrecede(haveData))    //不匹配isData,这里匹配相反模式,重新遍历,
                return null;                  // lost race vs opposite mode 
            else if ((n = p.next) != null)    // not last; keep traversing    tail没更新,松弛
                p = p != t && t != (u = tail) ? (t = u) : // stale tail       以前tail数据  p != t && t != (u = tail) ?tail被其他线程修改过,这里重新赋值tail
                    (p != n) ? n : null;      // restart if off list          p != n 判断该节点是否取消或者出队列
            else if (!p.casNext(null, s))      //更新next值
                p = p.next;                   // re-read on CAS failure       cas更新失败,重新read
            else {
                if (p != t) {                 // update if slack now >= 2     松弛大于2
                    while ((tail != t || !casTail(t, s)) &&                   更新tail
                           (t = tail)   != null &&
                           (s = t.next) != null && // advance and retry
                           (s = s.next) != null && s != t);                  //判断tail更新松弛在2里面
                }
                return p;
            }
        }
    }

    /**
     * Spins/yields/blocks until node s is matched or caller gives up.
     *
     * @param s the waiting node
     * @param pred the predecessor of s, or s itself if it has no
     * predecessor, or null if unknown (the null case does not occur
     * in any current calls but may in possible future extensions)
     * @param e the comparison value for checking match
     * @param timed if true, wait only until timeout elapses
     * @param nanos timeout in nanosecs, used only if timed is true
     * @return matched item, or e if unmatched on interrupt or timeout
     */
    private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
        long lastTime = timed ? System.nanoTime() : 0L;
        Thread w = Thread.currentThread();
        int spins = -1; // initialized after first item and cancel checks
        ThreadLocalRandom randomYields = null; // bound if needed

        for (;;) {
            Object item = s.item;
            if (item != e) {                  // matched
                // assert item != s;
                s.forgetContents();           // avoid garbage
                return this.<E>cast(item);
            }
            if ((w.isInterrupted() || (timed && nanos <= 0)) &&
                    s.casItem(e, s)) {        // cancel      取消操作
                unsplice(pred, s);
                return e;
            }

            if (spins < 0) {                  // establish spins at/near front   自旋 
                if ((spins = spinsFor(pred, s.isData)) > 0)
                    randomYields = ThreadLocalRandom.current();
            }
            else if (spins > 0) {             // spin
                --spins;
                if (randomYields.nextInt(CHAINED_SPINS) == 0)
                    Thread.yield();           // occasionally yield
            }
            else if (s.waiter == null) {
                s.waiter = w;                 // request unpark then recheck
            }
            else if (timed) {
                long now = System.nanoTime();
                if ((nanos -= now - lastTime) > 0)
                    LockSupport.parkNanos(this, nanos);
                lastTime = now;
            }
            else {
                LockSupport.park(this);
            }
        }
    }

    /**
     * Returns spin/yield value for a node with given predecessor and
     * data mode. See above for explanation.
     */
    private static int spinsFor(Node pred, boolean haveData) {
        if (MP && pred != null) {                                 //前提多核cpu
            if (pred.isData != haveData)      // phase change      模式不相同
                return FRONT_SPINS + CHAINED_SPINS;
            if (pred.isMatched())             // probably at front  
                return FRONT_SPINS;
            if (pred.waiter == null)          // pred apparently spinning
                return CHAINED_SPINS;
        }
        return 0;
    }

二、入队

1. offer方法

  • offer(E e)
  • offer(E e, long timeout, TimeUnit unit) 跟offer(E e)实现一样,没有实现等待时间
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never return {@code false}.
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }
    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never block or
     * return {@code false}.
     *
     * @return {@code true} (as specified by
     *  {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        xfer(e, true, ASYNC, 0);
        return true;
    }    

2. add方法

add(E e) 无界堵塞队列 不支持throw IllegalStateException异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never throw
     * {@link IllegalStateException} or return {@code false}.
     *
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws NullPointerException if the specified element is null
     */
    public boolean add(E e) {
        xfer(e, true, ASYNC, 0);
        return true;
    }

3. put方法

  • 将指定的元素插入到这个延迟队列中。由于队列是无限的,此方法将永不阻塞。
1
2
3
4
5
6
7
8
9
   /**
     * Inserts the specified element at the tail of this queue.
     * As the queue is unbounded, this method will never block.
     *
     * @throws NullPointerException if the specified element is null
     */
    public void put(E e) {
        xfer(e, true, ASYNC, 0);
    }

三、出队

1. poll方法

  • poll() 获取并移除此队列的头,如果此队列为空,则返回 null
  • E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,队列为空时,在指定的等待时间前等待可用的元素。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = xfer(null, false, TIMED, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return e;
        throw new InterruptedException();
    }

    public E poll() {
        return xfer(null, false, NOW, 0);
    }

2. take方法

1
2
3
4
5
6
7
    public E take() throws InterruptedException {
        E e = xfer(null, false, SYNC, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

3. peek方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    public E peek() {
        return firstDataItem();
    }
    /**
     * Returns the item in the first unmatched node with isData; or
     * null if none.  Used by peek.
     */
    private E firstDataItem() {
        for (Node p = head; p != null; p = succ(p)) {
            Object item = p.item;
            if (p.isData) {
                if (item != null && item != p)
                    return this.<E>cast(item);
            }
            else if (item == null)
                return null;
        }
        return null;
    }    

四、移除队列

1. remove方法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
    /**
     * Removes a single instance of the specified element from this queue,
     * if it is present.  More formally, removes an element {@code e} such
     * that {@code o.equals(e)}, if this queue contains one or more such
     * elements.
     * Returns {@code true} if this queue contained the specified element
     * (or equivalently, if this queue changed as a result of the call).
     *
     * @param o element to be removed from this queue, if present
     * @return {@code true} if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        return findAndRemove(o);
    }
 /**
     * Main implementation of remove(Object)
     */
    private boolean findAndRemove(Object e) {
        if (e != null) {
            for (Node pred = null, p = head; p != null; ) {
                Object item = p.item;
                if (p.isData) {
                    if (item != null && item != p && e.equals(item) &&
                        p.tryMatchData()) {
                        unsplice(pred, p);
                        return true;
                    }
                }
                else if (item == null)
                    break;
                pred = p;
                if ((p = p.next) == pred) { // stale
                    pred = null;
                    p = head;
                }
            }
        }
        return false;
    }  
  /**
     * Unsplices (now or later) the given deleted/cancelled node with
     * the given predecessor.
     *
     * @param pred a node that was at one time known to be the
     * predecessor of s, or null or s itself if s is/was at head
     * @param s the node to be unspliced
     */
    final void unsplice(Node pred, Node s) {
        s.forgetContents(); // forget unneeded fields
        /*
         * See above for rationale. Briefly: if pred still points to
         * s, try to unlink s.  If s cannot be unlinked, because it is
         * trailing node or pred might be unlinked, and neither pred
         * nor s are head or offlist, add to sweepVotes, and if enough
         * votes have accumulated, sweep.
         */
        if (pred != null && pred != s && pred.next == s) {
            Node n = s.next;
            if (n == null ||
                (n != s && pred.casNext(s, n) && pred.isMatched())) {
                for (;;) {               // check if at, or could be, head  检查一下,或者可能是头部。
                    Node h = head;
                    if (h == pred || h == s || h == null)
                        return;          // at head or list empty
                    if (!h.isMatched())
                        break;
                    Node hn = h.next;
                    if (hn == null)
                        return;          // now empty
                    if (hn != h && casHead(h, hn))  
                        h.forgetNext();  // advance head
                }
                if (pred.next != pred && s.next != s) { // recheck if offlist
                    for (;;) {           // sweep now if enough votes
                        int v = sweepVotes;
                        if (v < SWEEP_THRESHOLD) {
                            if (casSweepVotes(v, v + 1))
                                break;
                        }
                        else if (casSweepVotes(v, 0)) {
                            sweep();
                            break;
                        }
                    }
                }
            }
        }
    }

    /**
     * Unlinks matched (typically cancelled) nodes encountered in a
     * traversal from head.
     */
    private void sweep() {
        for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
            if (!s.isMatched())
                // Unmatched nodes are never self-linked
                p = s;
            else if ((n = s.next) == null) // trailing node is pinned
                break;
            else if (s == n)    // stale
                // No need to also check for p == s, since that implies s == n
                p = head;
            else
                p.casNext(s, n);
        }
    }    

2. clear方法

//AbstractQueue抽象类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    /**
     * Removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     *
     * <p>This implementation repeatedly invokes {@link #poll poll} until it
     * returns <tt>null</tt>.
     */
    public void clear() {
        while (poll() != null)
            ;
    }

3. drainTo方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  /**
     * @throws NullPointerException     {@inheritDoc}
     * @throws IllegalArgumentException {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while ( (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

    /**
     * @throws NullPointerException     {@inheritDoc}
     * @throws IllegalArgumentException {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        int n = 0;
        E e;
        while (n < maxElements && (e = poll()) != null) {
            c.add(e);
            ++n;
        }
        return n;
    }

五、TransferQueue 接口方法实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

 /**
  * Transfers the element to a waiting consumer immediately, if possible.
  *
  * <p>More precisely, transfers the specified element immediately
  * if there exists a consumer already waiting to receive it (in
  * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
  * otherwise returning {@code false} without enqueuing the element.
  *
  * @throws NullPointerException if the specified element is null
  */
 public boolean tryTransfer(E e) {
     return xfer(e, true, NOW, 0) == null;
 }

 /**
  * Transfers the element to a consumer, waiting if necessary to do so.
  *
  * <p>More precisely, transfers the specified element immediately
  * if there exists a consumer already waiting to receive it (in
  * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
  * else inserts the specified element at the tail of this queue
  * and waits until the element is received by a consumer.
  *
  * @throws NullPointerException if the specified element is null
  */
 public void transfer(E e) throws InterruptedException {
     if (xfer(e, true, SYNC, 0) != null) {
         Thread.interrupted(); // failure possible only due to interrupt
         throw new InterruptedException();
     }
 }

 /**
  * Transfers the element to a consumer if it is possible to do so
  * before the timeout elapses.
  *
  * <p>More precisely, transfers the specified element immediately
  * if there exists a consumer already waiting to receive it (in
  * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
  * else inserts the specified element at the tail of this queue
  * and waits until the element is received by a consumer,
  * returning {@code false} if the specified wait time elapses
  * before the element can be transferred.
  *
  * @throws NullPointerException if the specified element is null
  */
 public boolean tryTransfer(E e, long timeout, TimeUnit unit)
     throws InterruptedException {
     if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
         return true;
     if (!Thread.interrupted())
         return false;
     throw new InterruptedException();
 }
  public boolean hasWaitingConsumer() {
      return firstOfMode(false) != null;
  } 
  
      /**
       * Returns the first unmatched node of the given mode, or null if
       * none.  Used by methods isEmpty, hasWaitingConsumer.
       */
      private Node firstOfMode(boolean isData) {
          for (Node p = head; p != null; p = succ(p)) {
              if (!p.isMatched())
                  return (p.isData == isData) ? p : null;
          }
          return null;
      }
      
   public int getWaitingConsumerCount() {
        return countOfMode(false);
    }
    /**
     * Traverses and counts unmatched nodes of the given mode.
     * Used by methods size and getWaitingConsumerCount.
     */
    private int countOfMode(boolean data) {
        int count = 0;
        for (Node p = head; p != null; ) {
            if (!p.isMatched()) {
                if (p.isData != data)
                    return 0;
                if (++count == Integer.MAX_VALUE) // saturated
                    break;
            }
            Node n = p.next;
            if (n != p)
                p = n;
            else {
                count = 0;
                p = head;
            }
        }
        return count;
    }    

六、Iterator 迭代器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
  final class Itr implements Iterator<E> {
        private Node nextNode;   // next node to return item for
        private E nextItem;      // the corresponding item
        private Node lastRet;    // last returned node, to support remove
        private Node lastPred;   // predecessor to unlink lastRet

        /**
         * Moves to next node after prev, or first node if prev null.
         */
        private void advance(Node prev) {
            /*
             * To track and avoid buildup of deleted nodes in the face
             * of calls to both Queue.remove and Itr.remove, we must
             * include variants of unsplice and sweep upon each
             * advance: Upon Itr.remove, we may need to catch up links
             * from lastPred, and upon other removes, we might need to
             * skip ahead from stale nodes and unsplice deleted ones
             * found while advancing.
             */

            Node r, b; // reset lastPred upon possible deletion of lastRet
            if ((r = lastRet) != null && !r.isMatched())
                lastPred = r;    // next lastPred is old lastRet
            else if ((b = lastPred) == null || b.isMatched())
                lastPred = null; // at start of list
            else {
                Node s, n;       // help with removal of lastPred.next
                while ((s = b.next) != null &&
                       s != b && s.isMatched() &&
                       (n = s.next) != null && n != s)
                    b.casNext(s, n);
            }

            this.lastRet = prev;

            for (Node p = prev, s, n;;) {
                s = (p == null) ? head : p.next;
                if (s == null)
                    break;
                else if (s == p) {
                    p = null;
                    continue;
                }
                Object item = s.item;
                if (s.isData) {
                    if (item != null && item != s) {
                        nextItem = LinkedTransferQueue.<E>cast(item);
                        nextNode = s;
                        return;
                    }
                }
                else if (item == null)
                    break;
                // assert s.isMatched();
                if (p == null)
                    p = s;
                else if ((n = s.next) == null)
                    break;
                else if (s == n)
                    p = null;
                else
                    p.casNext(s, n);
            }
            nextNode = null;
            nextItem = null;
        }

        Itr() {
            advance(null);
        }

        public final boolean hasNext() {
            return nextNode != null;
        }

        public final E next() {
            Node p = nextNode;
            if (p == null) throw new NoSuchElementException();
            E e = nextItem;
            advance(p);
            return e;
        }

        public final void remove() {
            final Node lastRet = this.lastRet;
            if (lastRet == null)
                throw new IllegalStateException();
            this.lastRet = null;
            if (lastRet.tryMatchData())
                unsplice(lastPred, lastRet);
        }
    }