ForkJoin模式分析

简介

Fork/Join框架介绍不错博客,阿里方腾飞

Fork/Join使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制, 通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中, 进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

一、ForkJoinPool概述

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
    /*
     * Implementation Overview
     *
     * This class and its nested classes provide the main
     * functionality and control for a set of worker threads:
     * Submissions from non-FJ threads enter into submission queues.
     * Workers take these tasks and typically split them into subtasks
     * that may be stolen by other workers.  Preference rules give
     * first priority to processing tasks from their own queues (LIFO
     * or FIFO, depending on mode), then to randomized FIFO steals of
     * tasks in other queues.  This framework began as vehicle for
     * supporting tree-structured parallelism using work-stealing.
     * Over time, its scalability advantages led to extensions and
     * changes to better support more diverse usage contexts.  Because
     * most internal methods and nested classes are interrelated,
     * their main rationale and descriptions are presented here;
     * individual methods and nested classes contain only brief
     * comments about details.
     * 该类及其嵌套类为一组工作线程提供主要功能和控制:来自non-FJ线程的提交进入提交队列。工人们承担这些任务,
     * 通常将他们分解成可能被其他工人窃取的子任务。优先规则优先处理来自其自己的队列(LIFO或FIFO,取决于模式)的任务,
     * 然后是其他队列中任务的随机FIFO窃取。这个框架开始作为支持使用工作窃取的树形结构并行的工具。
     * 随着时间的推移,其可扩展性的优势导致扩展和变化,以更好地支持更多不同的使用情况。因为大多数内部方法和嵌套类是相互关联的,
     * 所以它们的主要基本原理和描述在这里给出;个别方法和嵌套类只包含关于细节的简短评论。
     *
     * WorkQueues 工作队列
     * ==========
     *
     * Most operations occur within work-stealing queues (in nested
     * class WorkQueue).  These are special forms of Deques that
     * support only three of the four possible end-operations -- push,
     * pop, and poll (aka steal), under the further constraints that
     * push and pop are called only from the owning thread (or, as
     * extended here, under a lock), while poll may be called from
     * other threads.  (If you are unfamiliar with them, you probably
     * want to read Herlihy and Shavit's book "The Art of
     * Multiprocessor programming", chapter 16 describing these in
     * more detail before proceeding.)  The main work-stealing queue
     * design is roughly similar to those in the papers "Dynamic
     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
     * (http://research.sun.com/scalable/pubs/index.html) and
     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
     * The main differences ultimately stem from GC requirements that
     * we null out taken slots as soon as we can, to maintain as small
     * a footprint as possible even in programs generating huge
     * numbers of tasks. To accomplish this, we shift the CAS
     * arbitrating pop vs poll (steal) from being on the indices
     * ("base" and "top") to the slots themselves.
     * 
     * 大多数操作发生在工作窃取队列中(在嵌套的类WorkQueue中)。
     * 这些是Deques的特殊形式,它只支持四种可能的最终操作中的三种 - push,pop和poll(又名steal),
     * 在进一步的约束条件下push和pop只能从拥有的线程中调用(或者像扩展这里,在一个锁)下,而轮询可能会从其他线程调用。 
     * (如果你不熟悉它们,你可能要阅读Herlihy和Shavit的书"多处理器编程的艺术",第16章在进行更详细的描述之前)。
     * 主要的工作队列设计大致类似于Chase和Lev撰写的"Dynamic Circular Work-Stealing Deque",SPAA 2005(http://research.sun.com/scalable/pubs/index.html)
     * 和Michael,Saraswat和Vechev的"幂等工作窃取"PPoPP 2009(http://portal.acm.org/citation.cfm?id=1504186)。
     * 主要的差异最终来自GC的要求,我们尽可能地将其空缺,尽可能保持尽可能小的占用空间,即使在产生大量任务的程序中也是如此。
     * 为了实现这一点,我们将CAS仲裁pop与poll(steal)从指标("base"和"top")转移到角色本身。
     *
     * Adding tasks then takes the form of a classic array push(task): 然后添加任务,然后采取经典数组push(task)的形式:
     *    q.array[q.top] = task; ++q.top;
     *
     * (The actual code needs to null-check and size-check the array,
     * properly fence the accesses, and possibly signal waiting
     * workers to start scanning -- see below.)  Both a successful pop
     * and poll mainly entail a CAS of a slot from non-null to null.
     * (实际的代码需要对数组进行空值检查和大小检查,对访问进行适当的隔离,并可能发出信号等待工作人员开始扫描 - 见下文)。
     *  成功的pop和poll主要是由 non-null 到null的槽的CAS.
     *
     * The pop operation (always performed by owner) is:
     *   if ((base != top) and
     *        (the task at top slot is not null) and
     *        (CAS slot to null))
     *           decrement top and return task;
     *
     * And the poll operation (usually by a stealer) is
     *    if ((base != top) and
     *        (the task at base slot is not null) and
     *        (base has not changed) and
     *        (CAS slot to null))
     *           increment base and return task;
     *
     * Because we rely on CASes of references, we do not need tag bits
     * on base or top.  They are simple ints as used in any circular
     * array-based queue (see for example ArrayDeque).  Updates to the
     * indices guarantee that top == base means the queue is empty,
     * but otherwise may err on the side of possibly making the queue
     * appear nonempty when a push, pop, or poll have not fully
     * committed. (Method isEmpty() checks the case of a partially
     * completed removal of the last element.)  Because of this, the
     * poll operation, considered individually, is not wait-free. One
     * thief cannot successfully continue until another in-progress
     * one (or, if previously empty, a push) completes.  However, in
     * the aggregate, we ensure at least probabilistic
     * non-blockingness.  If an attempted steal fails, a thief always
     * chooses a different random victim target to try next. So, in
     * order for one thief to progress, it suffices for any
     * in-progress poll or new push on any empty queue to
     * complete. (This is why we normally use method pollAt and its
     * variants that try once at the apparent base index, else
     * consider alternative actions, rather than method poll, which
     * retries.)
     * 
     * 因为我们依赖引用的CASes,所以我们不需要在底部或顶部标记位。它们是用于任何基于循环数组的队列中的简单整数(参见例如ArrayDeque)。
     * 对索引的更新保证top == base意味着队列是空的,但是否则可能会在push,pop或poll未完全提交时可能使队列显示为非空。 
     * (方法isEmpty()检查部分完成删除最后一个元素的情况。)因此,单独考虑的poll操作不是等待的。
     * 一个小偷不能成功地继续下去,直到另一个小偷(或者,如果先前是空的,推)完成。但总的来说,我们至少保证概率性的非阻塞性。
     * 如果一次偷窃失败,一个小偷总是选择一个不同的随机受害者目标来尝试下一个。
     * 所以,为了让一个小偷进步,只要有任何进行中的poll或者任何空队列的新push就足够了。 
     * (这就是为什么我们通常使用方法pollAt及其在变量基础上尝试一次的方法,否则考虑替代操作,而不是方法poll,重试。)
     *
     * This approach also enables support of a user mode in which
     * local task processing is in FIFO, not LIFO order, simply by
     * using poll rather than pop.  This can be useful in
     * message-passing frameworks in which tasks are never joined.
     * However neither mode considers affinities, loads, cache
     * localities, etc, so rarely provide the best possible
     * performance on a given machine, but portably provide good
     * throughput by averaging over these factors.  Further, even if
     * we did try to use such information, we do not usually have a
     * basis for exploiting it.  For example, some sets of tasks
     * profit from cache affinities, but others are harmed by cache
     * pollution effects. Additionally, even though it requires
     * scanning, long-term throughput is often best using random
     * selection rather than directed selection policies, so cheap
     * randomization of sufficient quality is used whenever
     * applicable.  Various Marsaglia XorShifts (some with different
     * shift constants) are inlined at use points.
     * 这种方法还支持用本地任务处理在FIFO中而不是LIFO顺序的用户模式,只需使用poll而不是pop。
     * 这在消息传递框架中是非常有用的,在这个框架中任务永远不会被加入。
     * 然而,这两种模式都不考虑亲和性,负载,缓存区域等,因此很少在给定的机器上提供最好的性能,
     * 但是通过对这些因素进行平均,可移植性提供了良好的吞吐量。
     * 此外,即使我们试图使用这些信息,我们通常也没有发现它的依据。
     * 例如,一些任务集合从缓存关系中获益,而另外一些则受到缓存污染影响的伤害。
     * 另外,即使需要扫描,长期吞吐量通常也是使用随机选择而不是定向选择策略的最佳选择,
     * 所以在适用的情况下使用足够质量的廉价随机化。各种Marsaglia XorShifts(一些具有不同的移位常数)在使用点内联。
     *
     * WorkQueues are also used in a similar way for tasks submitted
     * to the pool. We cannot mix these tasks in the same queues used
     * by workers. Instead, we randomly associate submission queues
     * with submitting threads, using a form of hashing.  The
     * ThreadLocalRandom probe value serves as a hash code for
     * choosing existing queues, and may be randomly repositioned upon
     * contention with other submitters.  In essence, submitters act
     * like workers except that they are restricted to executing local
     * tasks that they submitted (or in the case of CountedCompleters,
     * others with the same root task).  Insertion of tasks in shared
     * mode requires a lock (mainly to protect in the case of
     * resizing) but we use only a simple spinlock (using field
     * qlock), because submitters encountering a busy queue move on to
     * try or create other queues -- they block only when creating and
     * registering new queues. Additionally, "qlock" saturates to an
     * unlockable value (-1) at shutdown. Unlocking still can be and
     * is performed by cheaper ordered writes of "qlock" in successful
     * cases, but uses CAS in unsuccessful cases.
     * 
     * WorkQueues也以类似的方式用于提交给池的任务。我们不能把这些任务混合在工人使用的同一个队列中。
     * 相反,我们使用散列形式将提交队列与提交线程随机关联。
      * ThreadLocalRandom探测值用作选择现有队列的哈希代码,并且可以在与其他提交者竞争时随机地重新定位。
      * 实质上,提交者的行为与工作者相似,只是他们被限制为执行他们提交的本地任务(或者在CountedCompleters的情况下,其他人则具有相同的根任务)。
      * 在共享模式下插入任务需要一个锁(主要是为了保护在调整大小的情况下),
      * 但是我们只使用一个简单的自旋锁(使用字段qlock),因为提交者遇到一个忙碌的队列后继续尝试或创建其他队列 - 只有在创建和注册新的队列时
      * 。此外,"qlock"在关机时饱和到一个不可解锁的值(-1)。解锁仍然可以和成功的情况下执行更便宜的有序的"qlock"写入,但使用CAS不成功的情况下。
     *
     * Management 管理
     * ==========
     *
     * The main throughput advantages of work-stealing stem from
     * decentralized control -- workers mostly take tasks from
     * themselves or each other, at rates that can exceed a billion
     * per second.  The pool itself creates, activates (enables
     * scanning for and running tasks), deactivates, blocks, and
     * terminates threads, all with minimal central information.
     * There are only a few properties that we can globally track or
     * maintain, so we pack them into a small number of variables,
     * often maintaining atomicity without blocking or locking.
     * Nearly all essentially atomic control state is held in two
     * volatile variables that are by far most often read (not
     * written) as status and consistency checks. (Also, field
     * "config" holds unchanging configuration state.)
     * 
     * 偷窃工作的主要优势在于分散控制 - 工人们大多是从自己或彼此的任务中,以每秒超过10亿的速度进行工作。
     * 池本身创建,激活(启用扫描和运行任务),停用,阻塞和终止线程,所有这些都以最少的中央信息。
     * 我们可以全局跟踪或维护的属性只有少数,所以我们将它们打包成少量的变量,通常保持原子性而不会阻塞或锁定。
     * 几乎所有本质上的原子控制状态都被保存在两个非常常被读取(不被写入)的易失变量中作为状态和一致性检查。 (另外,"config"字段保持不变的配置状态。)

     * 
     *
     * Field "ctl" contains 64 bits holding information needed to
     * atomically decide to add, inactivate, enqueue (on an event
     * queue), dequeue, and/or re-activate workers.  To enable this
     * packing, we restrict maximum parallelism to (1<<15)-1 (which is
     * far in excess of normal operating range) to allow ids, counts,
     * and their negations (used for thresholding) to fit into 16bit
     * subfields.
     * 
     * 字段"ctl"包含64位,用于存储原子级决定添加,禁用,入队(在事件队列上),出队和/或重新激活工作人员所需的信息。
     * 为了实现这种打包,我们限制最大平行度为(1 << 15)-1(远远超过正常工作范围),以允许ID,计数和它们的否定(用于阈值处理)适合16位子域。
     *
     * Field "runState" holds lockable state bits (STARTED, STOP, etc)
     * also protecting updates to the workQueues array.  When used as
     * a lock, it is normally held only for a few instructions (the
     * only exceptions are one-time array initialization and uncommon
     * resizing), so is nearly always available after at most a brief
     * spin. But to be extra-cautious, after spinning, method
     * awaitRunStateLock (called only if an initial CAS fails), uses a
     * wait/notify mechanics on a builtin monitor to block when
     * (rarely) needed. This would be a terrible idea for a highly
     * contended lock, but most pools run without the lock ever
     * contending after the spin limit, so this works fine as a more
     * conservative alternative. Because we don't otherwise have an
     * internal Object to use as a monitor, the "stealCounter" (an
     * AtomicLong) is used when available (it too must be lazily
     * initialized; see externalSubmit).
     * 
     * 字段"runState"保存可锁定状态位(STARTED,STOP等),同时保护对workQueues数组的更新。
     * 当作为锁使用时,通常只保留几条指令(唯一的例外是一次性数组初始化和不常见的调整大小),
     * 所以在最多短暂旋转后几乎总是可用的。但要谨慎,在旋转之后,方法awaitRunStateLock(仅在初始CAS失败时调用)
     * 在内置监视器上使用等待/通知机制来阻止何时(很少)需要。这对于高度锁定来说是一个可怕的想法,
     * 但是大多数游戏池在没有锁定的情况下仍然在旋转限制之后竞争,所以这可以作为更保守的选择。
     * 因为我们没有其他内部对象用作监视器,所以在可用时使用"stealCounter"(AtomicLong)(它也必须被懒惰地初始化;参见externalSubmit)
     *
     * Usages of "runState" vs "ctl" interact in only one case:
     * deciding to add a worker thread (see tryAddWorker), in which
     * case the ctl CAS is performed while the lock is held.
     * “runState”vs“ctl”的用法仅在一个例子中进行交互:决定添加一个工作线程(请参阅“添加工作人员”),在这种情况下,在锁定锁的情况下执行ctl cas。
     *
     * Recording WorkQueues.  WorkQueues are recorded in the
     * "workQueues" array. The array is created upon first use (see
     * externalSubmit) and expanded if necessary.  Updates to the
     * array while recording new workers and unrecording terminated
     * ones are protected from each other by the runState lock, but
     * the array is otherwise concurrently readable, and accessed
     * directly. We also ensure that reads of the array reference
     * itself never become too stale. To simplify index-based
     * operations, the array size is always a power of two, and all
     * readers must tolerate null slots. Worker queues are at odd
     * indices. Shared (submission) queues are at even indices, up to
     * a maximum of 64 slots, to limit growth even if array needs to
     * expand to add more workers. Grouping them together in this way
     * simplifies and speeds up task scanning.
     * 
     * 录制工作流程WorkQueues被记录在"workQueues"数组中。该数组在首次使用时创建(请参阅externalSubmit)并在必要时展开。
     * 录制新工作人员和不录制已终止人员时对阵列的更新通过runState锁相互隔离,但数组可以同时读取并直接访问。我们还确保读取数组引用本身不会变得过时。
     * 为了简化基于索引的操作,数组大小始终是2的乘方,所有读者都必须容忍空位。工人队列在奇数指数。共享(提交)队列处于偶数索引,最多可达64个时隙,
     * 即使数组需要扩展以添加更多工作人员,也会限制增长。以这种方式将它们分组在一起简化和加快了任务扫描。
     *
     * All worker thread creation is on-demand, triggered by task
     * submissions, replacement of terminated workers, and/or
     * compensation for blocked workers. However, all other support
     * code is set up to work with other policies.  To ensure that we
     * do not hold on to worker references that would prevent GC, All
     * accesses to workQueues are via indices into the workQueues
     * array (which is one source of some of the messy code
     * constructions here). In essence, the workQueues array serves as
     * a weak reference mechanism. Thus for example the stack top
     * subfield of ctl stores indices, not references.
     * 
     * 所有工作线程的创建都是按需提供的,由任务提交,更换终止的工作人员和/或对被阻止的工作人员进行补偿。
     * 但是,所有其他支持代码都已设置为与其他策略配合使用。为了确保我们不要拘泥于可能阻止GC的工作者引用,
     * 所有对workQueues的访问都是通过指向workQueues数组(这是这里一些混乱的代码构造的一个来源)的索引。
     * 本质上,workQueues数组是一个弱引用机制。因此,例如,ctl的堆栈顶部子字段存储索引,而不是引用。
     *
     * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
     * cannot let workers spin indefinitely scanning for tasks when
     * none can be found immediately, and we cannot start/resume
     * workers unless there appear to be tasks available.  On the
     * other hand, we must quickly prod them into action when new
     * tasks are submitted or generated. In many usages, ramp-up time
     * to activate workers is the main limiting factor in overall
     * performance, which is compounded at program start-up by JIT
     * compilation and allocation. So we streamline this as much as
     * possible.
     * 
     * 排队空闲的工人。不像高性能计算机窃取工作的框架,我们不能让工作人员无限期地扫描任务,
     * 而不能立即找到任何工作,除非看起来有任务可用,否则我们不能启动/恢复工人。
     * 另一方面,当新任务提交或产生时,我们必须迅速采取行动。在许多用法中,启动员工的加速时间是整体绩效的主要限制因素,
     * 在程序启动时通过JIT汇编和分配加剧。所以我们尽可能简化它。
     *
     * The "ctl" field atomically maintains active and total worker
     * counts as well as a queue to place waiting threads so they can
     * be located for signalling. Active counts also play the role of
     * quiescence indicators, so are decremented when workers believe
     * that there are no more tasks to execute. The "queue" is
     * actually a form of Treiber stack.  A stack is ideal for
     * activating threads in most-recently used order. This improves
     * performance and locality, outweighing the disadvantages of
     * being prone to contention and inability to release a worker
     * unless it is topmost on stack.  We park/unpark workers after
     * pushing on the idle worker stack (represented by the lower
     * 32bit subfield of ctl) when they cannot find work.  The top
     * stack state holds the value of the "scanState" field of the
     * worker: its index and status, plus a version counter that, in
     * addition to the count subfields (also serving as version
     * stamps) provide protection against Treiber stack ABA effects.
     * 
     * "ctl"字段自动维护活动和总工人数以及放置等待线程的队列,以便它们可以定位用于信令。
     * 积极的计数也起到静止指标的作用,所以当工人认为没有更多的任务要执行时,积分计数就会减少。 
     * "队列"实际上是一个Treiber栈的形式。堆栈非常适合以最近使用的顺序激活线程。
     * 这提高了性能和局部性,超过了容易发生争用和无法释放工人的缺点,除非它是最上面的堆栈。
     * 我们在闲置的工作人员堆栈(由ctl的低32位子域表示)找不到工作时,将工人停放/停放。
     * 顶层堆栈状态保存worker的"scanState"字段的值:索引和状态,以及版本计数器,除了计数子字段(也用作版本标记)之外,
     * 还提供针对Treiber堆栈ABA效果的保护。
     *
     * Field scanState is used by both workers and the pool to manage
     * and track whether a worker is INACTIVE (possibly blocked
     * waiting for a signal), or SCANNING for tasks (when neither hold
     * it is busy running tasks).  When a worker is inactivated, its
     * scanState field is set, and is prevented from executing tasks,
     * even though it must scan once for them to avoid queuing
     * races. Note that scanState updates lag queue CAS releases so
     * usage requires care. When queued, the lower 16 bits of
     * scanState must hold its pool index. So we place the index there
     * upon initialization (see registerWorker) and otherwise keep it
     * there or restore it when necessary.
     * 
     * 现场scanState由worker和pool用来管理和跟踪worker是否处于INACTIVE状态(可能被阻塞在等待信号),
     * 或者SCANNING用于任务(当两个任务都没有忙于运行任务时)。当工作人员被禁用时,它的scanState字段被设置,
     * 并且被阻止执行任务,即使它必须扫描一次以避免排队竞争。请注意,scanState更新滞后队列CAS版本,因此使用需要谨慎。
     * 排队时,scanState的低16位必须保存其索引。所以我们在初始化时将索引放在那里(请参阅registerWorker),否则将它保留在那里或在必要时恢复。
     *
     * Memory ordering.  See "Correct and Efficient Work-Stealing for
     * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
     * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
     * analysis of memory ordering requirements in work-stealing
     * algorithms similar to the one used here.  We usually need
     * stronger than minimal ordering because we must sometimes signal
     * workers, requiring Dekker-like full-fences to avoid lost
     * signals.  Arranging for enough ordering without expensive
     * over-fencing requires tradeoffs among the supported means of
     * expressing access constraints. The most central operations,
     * taking from queues and updating ctl state, require full-fence
     * CAS.  Array slots are read using the emulation of volatiles
     * provided by Unsafe.  Access from other threads to WorkQueue
     * base, top, and array requires a volatile load of the first of
     * any of these read.  We use the convention of declaring the
     * "base" index volatile, and always read it before other fields.
     * The owner thread must ensure ordered updates, so writes use
     * ordered intrinsics unless they can piggyback on those for other
     * writes.  Similar conventions and rationales hold for other
     * WorkQueue fields (such as "currentSteal") that are only written
     * by owners but observed by others.
     * 
     * 内存排序。 
     * Le,Pop,Cohen和Nardelli在PPoPP 2013(http://www.di.ens.fr/~zappa/readings/ppopp13.pdf)上的
     * "正确和高效的偷窃工作"的工作窃取算法中的内存排序要求类似于这里使用的。我们通常需要比最低限度的排序更有力,
     * 因为我们有时必须向工人发出信号,要求像德克一样的全面防护,以避免丢失信号。在没有昂贵的过度围栏的情况下进行足够的
     * 排序需要在表达访问约束的支持手段之间权衡。最重要的操作是从队列中取出并更新ctl状态,这需要全面的CAS。使用不安全提
     * 供的挥发物模拟来读取阵列插槽。从其他线程访问WorkQueue base,top和array需要对这些读取中的第一个进行非易失性加载。
     * 我们使用声明"base"索引的惯例volatile,并且总是在其他字段之前读取它。所有者线程必须确保有序更新,因此写入使用有序
     * 的内在函数,除非它们可以搭载在其他写入的那些函数上。其他WorkQueue字段(如"currentSteal")的其他WorkQueue字段也有
     * 类似的约定和理由,这些字段只能由所有者书写,但是由其他人观察。
     *
     * Creating workers. To create a worker, we pre-increment total
     * count (serving as a reservation), and attempt to construct a
     * ForkJoinWorkerThread via its factory. Upon construction, the
     * new thread invokes registerWorker, where it constructs a
     * WorkQueue and is assigned an index in the workQueues array
     * (expanding the array if necessary). The thread is then
     * started. Upon any exception across these steps, or null return
     * from factory, deregisterWorker adjusts counts and records
     * accordingly.  If a null return, the pool continues running with
     * fewer than the target number workers. If exceptional, the
     * exception is propagated, generally to some external caller.
     * Worker index assignment avoids the bias in scanning that would
     * occur if entries were sequentially packed starting at the front
     * of the workQueues array. We treat the array as a simple
     * power-of-two hash table, expanding as needed. The seedIndex
     * increment ensures no collisions until a resize is needed or a
     * worker is deregistered and replaced, and thereafter keeps
     * probability of collision low. We cannot use
     * ThreadLocalRandom.getProbe() for similar purposes here because
     * the thread has not started yet, but do so for creating
     * submission queues for existing external threads.
     * 
     * 创建工作。为了创建一个工人,我们预先增加总数(作为保留),并尝试通过其工厂构建一个ForkJoinWorkerThread。在构建时,新的线程调用registerWorker,
     * 在那里构造一个WorkQueue,并在workQueues数组中分配一个索引(如果需要,扩展数组)。线程然后启动。如果在这些步骤中发生任何异常,
     * 或者从工厂返回null,则deregisterWorker会相应地调整计数和记录。如果返回空值,那么池将继续运行,而目标编号的工人数少于此值。如果例外,
     * 则将异常传播给一些外部呼叫者。工作者索引分配避免了在从workQueues数组开始的顺序打包条目时发生的扫描偏差。我们把这个数组视为一个简单的二元哈希表,
     * 根据需要进行扩展。 seedIndex增量确保不会发生冲突,直到需要调整大小或者取消注册和替换工作人员,并且此后保持低冲突概率。
     * 我们不能在这里使用ThreadLocalRandom.getProbe()来达到类似的目的,因为线程还没有启动,但是为了创建现有外部线程的提交队列。
     *
     * Deactivation and waiting. Queuing encounters several intrinsic
     * races; most notably that a task-producing thread can miss
     * seeing (and signalling) another thread that gave up looking for
     * work but has not yet entered the wait queue.  When a worker
     * cannot find a task to steal, it deactivates and enqueues. Very
     * often, the lack of tasks is transient due to GC or OS
     * scheduling. To reduce false-alarm deactivation, scanners
     * compute checksums of queue states during sweeps.  (The
     * stability checks used here and elsewhere are probabilistic
     * variants of snapshot techniques -- see Herlihy & Shavit.)
     * Workers give up and try to deactivate only after the sum is
     * stable across scans. Further, to avoid missed signals, they
     * repeat this scanning process after successful enqueuing until
     * again stable.  In this state, the worker cannot take/run a task
     * it sees until it is released from the queue, so the worker
     * itself eventually tries to release itself or any successor (see
     * tryRelease).  Otherwise, upon an empty scan, a deactivated
     * worker uses an adaptive local spin construction (see awaitWork)
     * before blocking (via park). Note the unusual conventions about
     * Thread.interrupts surrounding parking and other blocking:
     * Because interrupts are used solely to alert threads to check
     * termination, which is checked anyway upon blocking, we clear
     * status (using Thread.interrupted) before any call to park, so
     * that park does not immediately return due to status being set
     * via some other unrelated call to interrupt in user code.
     * 
     * 取消激活并等待。排队遇到几个固有的种族;最值得注意的是,一个任务生成线程可能会错过看到(和发信号)另一个线程,
     * 放弃寻找工作,但还没有进入等待队列。当一个工作人员找不到任务要偷取时,就会停用并排队。很多时候,由于GC或OS调度,
     * 缺乏任务是短暂的。为了减少错误警报停用,扫描器在扫描期间计算队列状态的校验和。 
     * (在这里和其他地方使用的稳定性检查是快照技术的概率变体 - 参见Herlihy&Shavit。)工人放弃并且在整个扫描期间总和
     * 稳定后才尝试去激活。此外,为了避免错过信号,他们在成功排队之后重复这个扫描过程直到再次稳定。在这种状态下,工作者
     * 不能接受/运行它所看到的任务,直到它从队列中释放出来,所以工作者本身最终试图释放自己或任何后继者(参见tryRelease)。
     * 否则,在空的扫描中,停用的工人在阻塞(通过停车)之前使用自适应本地旋转构造(参见awaitWork)。注意关于围绕停车和其
     * 他阻塞的Thread.interrupts的不寻常约定:因为中断仅用于提醒线程检查终止,无论如何,在阻塞之后检查终止,所以我们在任何
     * 呼叫停放之前清除状态(使用Thread.interrupted)由于通过其他不相关的呼叫来设置状态,用户代码中断将不会立即返回。
     *
     * Signalling and activation.  Workers are created or activated
     * only when there appears to be at least one task they might be
     * able to find and execute.  Upon push (either by a worker or an
     * external submission) to a previously (possibly) empty queue,
     * workers are signalled if idle, or created if fewer exist than
     * the given parallelism level.  These primary signals are
     * buttressed by others whenever other threads remove a task from
     * a queue and notice that there are other tasks there as well.
     * On most platforms, signalling (unpark) overhead time is
     * noticeably long, and the time between signalling a thread and
     * it actually making progress can be very noticeably long, so it
     * is worth offloading these delays from critical paths as much as
     * possible. Also, because inactive workers are often rescanning
     * or spinning rather than blocking, we set and clear the "parker"
     * field of WorkQueues to reduce unnecessary calls to unpark.
     * (This requires a secondary recheck to avoid missed signals.)
     * 
     * 信令和激活。只有在看起来至少有一项他们可能能够找到并执行的任务时,才会创建或激活工人。
     * 一旦(通过工作人员或外部提交)推送(先前(可能)空队列),工作人员在空闲时发信号通知,
     * 或者如果存在比给定的并行性级别更少的信号,则发出信号。当其他线程从队列中删除一个任务时,
     * 这些主要信号由其他线程支持,并注意到这里还有其他的任务。在大多数平台上,信令(unpark)的开销时间明显较长,
     * 从一个线程到实际进展的时间可能会非常长,所以尽可能地将这些延迟从关键路径上卸载。另外,
     * 由于不活跃的工作人员经常重新扫描或旋转,而不是阻止,我们设置并清除WorkQueues的"parker"字段,以减少不必要的停靠站点。 
     * (这需要二次重新检查以避免错过信号。)
     *
     * Trimming workers. To release resources after periods of lack of
     * use, a worker starting to wait when the pool is quiescent will
     * time out and terminate (see awaitWork) if the pool has remained
     * quiescent for period IDLE_TIMEOUT, increasing the period as the
     * number of threads decreases, eventually removing all workers.
     * Also, when more than two spare threads exist, excess threads
     * are immediately terminated at the next quiescent point.
     * (Padding by two avoids hysteresis.)
     * 
     * 整理工人。为了在不使用期间释放资源,当池静止时开始等待的工作者将超时并终止(参见awaitWork)如果池保持静止持续期间IDLE_TIMEOUT,
     * 则最终随着线程数量的减少而增加删除所有的工人。另外,当存在两个以上的备用线程时,多余的线程立即在下一个静止点终止。 (填充两个避免迟滞。)
     *
     * Shutdown and Termination. A call to shutdownNow invokes
     * tryTerminate to atomically set a runState bit. The calling
     * thread, as well as every other worker thereafter terminating,
     * helps terminate others by setting their (qlock) status,
     * cancelling their unprocessed tasks, and waking them up, doing
     * so repeatedly until stable (but with a loop bounded by the
     * number of workers).  Calls to non-abrupt shutdown() preface
     * this by checking whether termination should commence. This
     * relies primarily on the active count bits of "ctl" maintaining
     * consensus -- tryTerminate is called from awaitWork whenever
     * quiescent. However, external submitters do not take part in
     * this consensus.  So, tryTerminate sweeps through queues (until
     * stable) to ensure lack of in-flight submissions and workers
     * about to process them before triggering the "STOP" phase of
     * termination. (Note: there is an intrinsic conflict if
     * helpQuiescePool is called when shutdown is enabled. Both wait
     * for quiescence, but tryTerminate is biased to not trigger until
     * helpQuiescePool completes.)
     * 
     * 关机和终止。对shutdownNow的调用调用tryTerminate来自动设置一个runState位。调用线程以及之后的其他每个工作者终止,
     * 通过设置(qlock)状态,取消未处理的任务并唤醒它们来帮助终止其他人,重复这样做直到稳定(但是由工人数量限制的循环)。
     * 调用非突发关机()通过检查终止是否应该开始。这主要依赖于保持一致的"ctl"的活动计数位 - 每当静止时,tryTerminate就从await工作中调用。
     * 但是,外部提交者不参与这个共识。因此,尝试终止扫描队列(直到稳定),以确保在触发终止的"停止"阶段之前缺少空中提交和工作人员处理它们。 
     * (注意:如果在启用shutdown的情况下调用helpQuiescePool,则存在内部冲突,两者都等待静止,但tryTerminate有偏见,直到helpQuiescePool完成时才触发。
     *
     *
     * Joining Tasks 加入任务
     * =============
     *
     * Any of several actions may be taken when one worker is waiting
     * to join a task stolen (or always held) by another.  Because we
     * are multiplexing many tasks on to a pool of workers, we can't
     * just let them block (as in Thread.join).  We also cannot just
     * reassign the joiner's run-time stack with another and replace
     * it later, which would be a form of "continuation", that even if
     * possible is not necessarily a good idea since we may need both
     * an unblocked task and its continuation to progress.  Instead we
     * combine two tactics:
     * 当一名工作人员正在等待加入被他人盗取(或总是被盗)的任务时,可能会采取任何一种行动。
     * 因为我们将许多任务复用到工作池中,所以我们不能让它们阻塞(如在Thread.join中)。我们也不能仅仅把joiner的运行时间栈重新分配给另一个栈,
     * 并且稍后将其替换,这将是一种"延续"形式,即使可能也不一定是一个好主意,因为我们可能需要一个畅通无阻的任务,进展。相反,
     * 我们结合了两种策略:
     *
     *   Helping: Arranging for the joiner to execute some task that it
     *      would be running if the steal had not occurred.  帮助:安排joiner执行一些任务,如果没有发生,它将运行。
     *
     *   Compensating: Unless there are already enough live threads,
     *      method tryCompensate() may create or re-activate a spare
     *      thread to compensate for blocked joiners until they unblock. 
     *      补偿:除非已经有足够的活线程,方法尝试补偿()可以创建或重新激活一个备用线程来补偿阻塞的连接,直到它们解除阻塞为止。
     *
     * A third form (implemented in tryRemoveAndExec) amounts to
     * helping a hypothetical compensator: If we can readily tell that
     * a possible action of a compensator is to steal and execute the
     * task being joined, the joining thread can do so directly,
     * without the need for a compensation thread (although at the
     * expense of larger run-time stacks, but the tradeoff is
     * typically worthwhile).
     * 
     * 第三种形式(在tryRemoveAndExec中实现)相当于帮助一个假设的补偿器:如果我们可以很容易地告诉补偿器的一个可能的动作是窃取和执行被加入的任务,
     * 那么加入的线程可以直接完成,而不需要补偿线程(尽管以较大的运行时间堆栈为代价,但折衷通常是值得的)。
     *
     * The ManagedBlocker extension API can't use helping so relies
     * only on compensation in method awaitBlocker.
     * 管理的拦截器扩展API不能使用帮助,因此只依赖于方法中的补偿等待拦截器。
     *
     * The algorithm in helpStealer entails a form of "linear
     * helping".  Each worker records (in field currentSteal) the most
     * recent task it stole from some other worker (or a submission).
     * It also records (in field currentJoin) the task it is currently
     * actively joining. Method helpStealer uses these markers to try
     * to find a worker to help (i.e., steal back a task from and
     * execute it) that could hasten completion of the actively joined
     * task.  Thus, the joiner executes a task that would be on its
     * own local deque had the to-be-joined task not been stolen. This
     * is a conservative variant of the approach described in Wagner &
     * Calder "Leapfrogging: a portable technique for implementing
     * efficient futures" SIGPLAN Notices, 1993
     * (http://portal.acm.org/citation.cfm?id=155354). It differs in
     * that: (1) We only maintain dependency links across workers upon
     * steals, rather than use per-task bookkeeping.  This sometimes
     * requires a linear scan of workQueues array to locate stealers,
     * but often doesn't because stealers leave hints (that may become
     * stale/wrong) of where to locate them.  It is only a hint
     * because a worker might have had multiple steals and the hint
     * records only one of them (usually the most current).  Hinting
     * isolates cost to when it is needed, rather than adding to
     * per-task overhead.  (2) It is "shallow", ignoring nesting and
     * potentially cyclic mutual steals.  (3) It is intentionally
     * racy: field currentJoin is updated only while actively joining,
     * which means that we miss links in the chain during long-lived
     * tasks, GC stalls etc (which is OK since blocking in such cases
     * is usually a good idea).  (4) We bound the number of attempts
     * to find work using checksums and fall back to suspending the
     * worker and if necessary replacing it with another.
     * 
     * helpStealer中的算法需要一种"线性帮助"的形式。
     * 每个工作人员记录(在currentSteal字段中)从其他工作人员(或提交)偷取的最近任务。它还记录(在currentJoin字段中)当前正在加入的任务。
     * 方法helpStealer使用这些标记来试图找到一个工作人员来帮助(即,从任务中取回并执行任务),这可以加快主动联合任务的完成。
     * 因此,加盟者执行一项任务,如果被加入的任务没有被盗,那么这个任务本身就是本地的。
     * 这是Wagner&Calder在"跳跃:一种实现高效期货的便携式技术"(SIGPLAN Notices,1993)(http://portal.acm.org/citation.cfm?id=155354)
     * 中描述的方法的保守变体。它的不同之处在于:
     * (1)我们只保留偷工作中的工人之间的依赖关系,而不是使用按任务记账。这有时需要对workQueues数组进行线性扫描来定位窃听器,
     * 但通常不会因为窃听者留下提示(可能变得陈旧/错误)而将其定位到何处。这只是一个暗示,因为工人可能有多次抢断,
     * 而提示只记录其中的一次(通常是最新的)。暗示隔离成本需要时,而不是增加到每个任务的开销。 
     * (2)"浅",忽略嵌套和潜在的循环互抢。 
     * (3)故意激活:字段currentJoin只在主动加入时更新,这意味着我们在长期任务,GC停顿等期间错过了链中的链接(这是可以的,因为
     * 在这种情况下阻塞通常是个好主意) 。 
     * (4)我们限制了使用校验和的尝试次数,并回退到暂停工作,必要时替换为另一次。
     *
     * Helping actions for CountedCompleters do not require tracking
     * currentJoins: Method helpComplete takes and executes any task
     * with the same root as the task being waited on (preferring
     * local pops to non-local polls). However, this still entails
     * some traversal of completer chains, so is less efficient than
     * using CountedCompleters without explicit joins.
     * 
     * 对计数完成者的帮助操作不需要跟踪当前的联接:方法帮助完成占用和执行任何具有与正在等待的任务相同的根目录的任务
     * (首选本地弹出到非本地轮询)。然而,这仍然需要一些完成链的遍历,所以比没有显式连接的使用计数完成者效率低。
     *
     * Compensation does not aim to keep exactly the target
     * parallelism number of unblocked threads running at any given
     * time. Some previous versions of this class employed immediate
     * compensations for any blocked join. However, in practice, the
     * vast majority of blockages are transient byproducts of GC and
     * other JVM or OS activities that are made worse by replacement.
     * Currently, compensation is attempted only after validating that
     * all purportedly active threads are processing tasks by checking
     * field WorkQueue.scanState, which eliminates most false
     * positives.  Also, compensation is bypassed (tolerating fewer
     * threads) in the most common case in which it is rarely
     * beneficial: when a worker with an empty queue (thus no
     * continuation tasks) blocks on a join and there still remain
     * enough threads to ensure liveness.
     * 
     * 补偿并不是要确保在任何给定的时间保持未被阻塞的线程的目标并行数量。这个类的一些以前的版本采用立即补偿任何封锁的连接。
     * 但实际上,绝大多数堵塞是GC和其他JVM或OS活动的暂时性副产品,通过替换而变得更糟。
     * 目前,只有在通过检查字段"工作队列"来验证所有据称活动的线程正在处理任务之后,才会尝试进行补偿。扫描状态,
     * 从而消除了大多数误报。而且,在最不常见的情况下,
     * 补偿会被绕过(容忍的线程更少):当队列空闲的工作人员(因此没有连续任务)阻塞连接时,仍然有足够的线程来保证活跃性。
     *
     * The compensation mechanism may be bounded.  Bounds for the
     * commonPool (see commonMaxSpares) better enable JVMs to cope
     * with programming errors and abuse before running out of
     * resources to do so. In other cases, users may supply factories
     * that limit thread construction. The effects of bounding in this
     * pool (like all others) is imprecise.  Total worker counts are
     * decremented when threads deregister, not when they exit and
     * resources are reclaimed by the JVM and OS. So the number of
     * simultaneously live threads may transiently exceed bounds.
     * 
     * 补偿机制可能是有限的。 commonPool的界限(请参阅commonMaxSpares)更好地使JVM能够在耗尽资源之前处理编程错误和滥用。
     * 在其他情况下,用户可能会提供限制螺纹结构的工厂。在这个池中的边界的影响(像所有其他)是不精确的。
     * 当线程取消注册时,总工作人员计数递减,而不是退出并且JVM和OS回收资源。所以同时活动的线程数量可能暂时超过界限。
     *
     * Common Pool
     * ===========
     *
     * The static common pool always exists after static
     * initialization.  Since it (or any other created pool) need
     * never be used, we minimize initial construction overhead and
     * footprint to the setup of about a dozen fields, with no nested
     * allocation. Most bootstrapping occurs within method
     * externalSubmit during the first submission to the pool.
     * 静态初始化后,静态公共池始终存在。由于它(或任何其他创建的池)不需要被使用,
     * 所以我们将最初的构建开销和占用空间最小化到大约十几个字段的设置,没有嵌套的分配。大多数引导在第一次提交到池时在externalSubmit方法中发生。
     *
     * When external threads submit to the common pool, they can
     * perform subtask processing (see externalHelpComplete and
     * related methods) upon joins.  This caller-helps policy makes it
     * sensible to set common pool parallelism level to one (or more)
     * less than the total number of available cores, or even zero for
     * pure caller-runs.  We do not need to record whether external
     * submissions are to the common pool -- if not, external help
     * methods return quickly. These submitters would otherwise be
     * blocked waiting for completion, so the extra effort (with
     * liberally sprinkled task status checks) in inapplicable cases
     * amounts to an odd form of limited spin-wait before blocking in
     * ForkJoinTask.join.
     * 
     * 当外部线程提交到公共池时,他们可以在连接时执行子任务处理(请参阅externalHelpComplete和相关方法)。
     * 这个调用者帮助策略使得将公共池并行性级别设置为小于可用内核总数的一个(或更多),
     * 或者对于纯粹的调用者运行甚至为零是明智的。我们不需要记录外部提交是否属于公共池,
     * 否则外部帮助方法会很快返回。否则这些提交者将被阻塞等待完成,所以在不适用的情况下额外的工作量(大量的任务状态检查)
     * 相当于在ForkJoinTask.join阻塞之前的一个奇怪形式的有限旋转等待。
     *
     * As a more appropriate default in managed environments, unless
     * overridden by system properties, we use workers of subclass
     * InnocuousForkJoinWorkerThread when there is a SecurityManager
     * present. These workers have no permissions set, do not belong
     * to any user-defined ThreadGroup, and erase all ThreadLocals
     * after executing any top-level task (see WorkQueue.runTask).
     * The associated mechanics (mainly in ForkJoinWorkerThread) may
     * be JVM-dependent and must access particular Thread class fields
     * to achieve this effect.
     * 
     * 作为托管环境中更合适的缺省值,除非被系统属性覆盖,否则当存在SecurityManager时,我们使用子类InnocuousForkJoinWorkerThread的工作者。
     * 这些工作人员没有权限设置,不属于任何用户定义的ThreadGroup,并在执行任何顶级任务(请参见WorkQueue.runTask)后清除所有ThreadLocals。
     * 相关的机制(主要在ForkJoinWorkerThread中)可能依赖于JVM,并且必须访问特定的Thread类字段才能达到这个效果。
     *
     * Style notes  样式笔记
     * ===========
     *
     * Memory ordering relies mainly on Unsafe intrinsics that carry
     * the further responsibility of explicitly performing null- and
     * bounds- checks otherwise carried out implicitly by JVMs.  This
     * can be awkward and ugly, but also reflects the need to control
     * outcomes across the unusual cases that arise in very racy code
     * with very few invariants. So these explicit checks would exist
     * in some form anyway.  All fields are read into locals before
     * use, and null-checked if they are references.  This is usually
     * done in a "C"-like style of listing declarations at the heads
     * of methods or blocks, and using inline assignments on first
     * encounter.  Array bounds-checks are usually performed by
     * masking with array.length-1, which relies on the invariant that
     * these arrays are created with positive lengths, which is itself
     * paranoically checked. Nearly all explicit checks lead to
     * bypass/return, not exception throws, because they may
     * legitimately arise due to cancellation/revocation during
     * shutdown.
     * 
     * 内存排序主要依赖于不安全的内部函数,这些内部函数承担进一步明确执行空白和边界检查的责任,否则由JVM隐式执行。
     * 这可能是尴尬和丑陋的,但也反映了需要控制的结果在非常激烈的代码中出现的不寻常的情况下,
     * 几乎不变的。所以这些明确的检查将以某种形式存在。所有的字段在使用之前都会被读入本地,如果它们是引用,
     * 则会进行空值检查。这通常在方法或块的头部以"C"类型的列表声明的方式完成,并且在首次遇到时使用内联分配。
     * 数组边界检查通常是通过使用array.length-1进行掩码来实现的,该数组依赖于这些数组以正长度创建的不变量,这本身是经过偏执检查的。
     * 几乎所有显式检查都会导致绕过/返回,而不是异常抛出,因为它们可能会因停机期间的取消/撤销而合法产生。
     *
     * There is a lot of representation-level coupling among classes
     * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
     * fields of WorkQueue maintain data structures managed by
     * ForkJoinPool, so are directly accessed.  There is little point
     * trying to reduce this, since any associated future changes in
     * representations will need to be accompanied by algorithmic
     * changes anyway. Several methods intrinsically sprawl because
     * they must accumulate sets of consistent reads of fields held in
     * local variables.  There are also other coding oddities
     * (including several unnecessary-looking hoisted null checks)
     * that help some methods perform reasonably even when interpreted
     * (not compiled).
     * 
     * 在类ForkJoinPool,ForkJoinWorkerThread和ForkJoinTask之间有很多表示级耦合。 WorkQueue的字段维护由ForkJoinPool管理的数据结构,
     * 因此可以直接访问。试图减少这一点是没有意义的,因为任何关联的未来表示变化都需要伴随着算法变化。有几种方法本质上是蔓延性的,
     * 因为它们必须累积本地变量中保持的字段的一致读取。还有其他的编码怪异(包括几个不必要的提升空检查),
     * 即使在解释(不编译)时也可以帮助某些方法合理地执行。
     *
     * The order of declarations in this file is (with a few exceptions): 这个文件中的声明的顺序是(有一些例外):
     * (1) Static utility functions 静态的效用函数
     * (2) Nested (static) classes  嵌套(静态)类
     * (3) Static fields            静态字段
     * (4) Fields, along with constants used when unpacking some of them 字段,以及在拆包时使用的常量
     * (5) Internal control methods  内部控制的方法
     * (6) Callbacks and other support for ForkJoinTask methods  回调和对ForkJoinTask方法的其他支持
     * (7) Exported methods  导出的方法
     * (8) Static block initializing statics in minimally dependent order  在最小依赖顺序中,静态块初始化静态块
     */

二、WorkQueue分析

1.WorkQueue构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
 /**
     * Queues supporting work-stealing as well as external task
     * submission. See above for descriptions and algorithms.
     * Performance on most platforms is very sensitive to placement of
     * instances of both WorkQueues and their arrays -- we absolutely
     * do not want multiple WorkQueue instances or multiple queue
     * arrays sharing cache lines. The @Contended annotation alerts
     * JVMs to try to keep instances apart.
     * 队列支持工作-窃取和外部任务提交。请参阅上面的描述和算法。大多数平台上的性能对工作队列和
     * 数组的实例放置非常敏感——我们绝对不希望多个工作队列实例或多个队列数组共享缓存线。@ Contended注释警告JVMs试图将实例分开。
     */
    @sun.misc.Contended
    static final class WorkQueue {

        /**
         * Capacity of work-stealing queue array upon initialization.
         * Must be a power of two; at least 4, but should be larger to
         * reduce or eliminate cacheline sharing among queues.
         * Currently, it is much larger, as a partial workaround for
         * the fact that JVMs often place arrays in locations that
         * share GC bookkeeping (especially cardmarks) such that
         * per-write accesses encounter serious memory contention.
         */
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

        /**
         * Maximum size for queue arrays. Must be a power of two less
         * than or equal to 1 << (31 - width of array entry) to ensure
         * lack of wraparound of index calculations, but defined to a
         * value a bit less than this to help users trap runaway
         * programs before saturating systems.
         */
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

        // Instance fields
        volatile int scanState;    // versioned, <0: inactive; odd:scanning
        int stackPred;             // pool stack (ctl) predecessor
        int nsteals;               // number of steals
        int hint;                  // randomization and stealer index hint
        int config;                // pool index and mode
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer

        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
            this.pool = pool;
            this.owner = owner;
            // Place indices in the center of array (that is not yet allocated)
            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
        }
}       
  • WorkQueue类@sun.misc.Contended注解,控制缓存行处理

2.push 添加任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
      /**
       * Pushes a task. Call only by owner in unshared queues.  (The
       * shared-queue version is embedded in method externalPush.)
       *
       * @param task the task. Caller must ensure non-null.
       * @throws RejectedExecutionException if array cannot be resized
       */
      final void push(ForkJoinTask<?> task) {
          ForkJoinTask<?>[] a; ForkJoinPool p;
          int b = base, s = top, n;
          if ((a = array) != null) {    // ignore if queue removed
              int m = a.length - 1;     // fenced write for task visibility
              U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
              U.putOrderedInt(this, QTOP, s + 1);
              if ((n = s - b) <= 1) {   //任务数组内容小于1就唤醒或者创建线程
                  if ((p = pool) != null)
                      p.signalWork(p.workQueues, this);
              }
              else if (n >= m)
                  growArray(); //扩容
          }
      }

3.growArray 扩容

按2倍扩容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
       /**
        * Initializes or doubles the capacity of array. Call either
        * by owner or with lock held -- it is OK for base, but not
        * top, to move while resizings are in progress.
        */
       final ForkJoinTask<?>[] growArray() {
           ForkJoinTask<?>[] oldA = array;
           int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; // 2倍扩容
           if (size > MAXIMUM_QUEUE_CAPACITY)
               throw new RejectedExecutionException("Queue capacity exceeded");
           int oldMask, t, b;
           ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
           if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
               (t = top) - (b = base) > 0) {
               int mask = size - 1;
               do { // emulate poll from old array, push to new array
                   ForkJoinTask<?> x;
                   int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                   int j    = ((b &    mask) << ASHIFT) + ABASE;
                   x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
                   if (x != null &&
                       U.compareAndSwapObject(oldA, oldj, x, null))
                       U.putObjectVolatile(a, j, x);
               } while (++b != t);
           }
           return a;
       }

4.pop LIFO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
        /**
         * Takes next task, if one exists, in LIFO order.  Call only
         * by owner in unshared queues.
         */
        final ForkJoinTask<?> pop() {
            ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
            if ((a = array) != null && (m = a.length - 1) >= 0) {
                for (int s; (s = top - 1) - base >= 0;) {
                    long j = ((m & s) << ASHIFT) + ABASE;
                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                        break;
                    if (U.compareAndSwapObject(a, j, t, null)) {
                        U.putOrderedInt(this, QTOP, s);
                        return t;
                    }
                }
            }
            return null;
        }
         /**
          * Takes a task in FIFO order if b is base of queue and a task
          * can be claimed without contention. Specialized versions
          * appear in ForkJoinPool methods scan and helpStealer.
          */
         final ForkJoinTask<?> pollAt(int b) {
             ForkJoinTask<?> t; ForkJoinTask<?>[] a;
             if ((a = array) != null) {
                 int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
                 if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
                     base == b && U.compareAndSwapObject(a, j, t, null)) {
                     base = b + 1;
                     return t;
                 }
             }
             return null;
         }       

5.poll FIFO

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
       /**
        * Takes next task, if one exists, in FIFO order.
        */
       final ForkJoinTask<?> poll() {
           ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
           while ((b = base) - top < 0 && (a = array) != null) {
               int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
               t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
               if (base == b) {
                   if (t != null) {
                       if (U.compareAndSwapObject(a, j, t, null)) {
                           base = b + 1;
                           return t;
                       }
                   }
                   else if (b + 1 == top) // now empty
                       break;
               }
           }
           return null;
       }

6.runTask 执行窃取任务,在执行本地线程中任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
        /**
         * Executes the given task and any remaining local tasks.
         */
        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                (currentSteal = task).doExec(); // 执行窃取任务
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();  //执行本地线程中任务
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }
 
        /**
          * Polls and runs tasks until empty.
          */
         final void pollAndExecAll() {
             for (ForkJoinTask<?> t; (t = poll()) != null;)
                 t.doExec();
         }
 
         /**
          * Removes and executes all local tasks. If LIFO, invokes
          * pollAndExecAll. Otherwise implements a specialized pop loop
          * to exec until empty.
          */
         final void execLocalTasks() {
             int b = base, m, s;
             ForkJoinTask<?>[] a = array;
             if (b - (s = top - 1) <= 0 && a != null &&
                 (m = a.length - 1) >= 0) {
                 if ((config & FIFO_QUEUE) == 0) {
                     for (ForkJoinTask<?> t;;) {
                         if ((t = (ForkJoinTask<?>)U.getAndSetObject
                              (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                             break;
                         U.putOrderedInt(this, QTOP, s);
                         t.doExec();
                         if (base - (s = top - 1) > 0)
                             break;
                     }
                 }
                 else
                     pollAndExecAll();
             }
         }       
        
        /**
         * Adds steal count to pool stealCounter if it exists, and resets.
         */
        final void transferStealCount(ForkJoinPool p) {
            AtomicLong sc;
            if (p != null && (sc = p.stealCounter) != null) {
                int s = nsteals;
                nsteals = 0;            // if negative, correct for overflow
                sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
            }
        }        

7.tryRemoveAndExec

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
     /**
        * If present, removes from queue and executes the given task,
        * or any other cancelled task. Used only by awaitJoin.
        * 如果存在,则从队列中删除并执行给定任务,或任何其他已取消的任务。, 仅由awaitJoin使用。
        *
        * @return true if queue empty and task not known to be done
        */
       final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
           ForkJoinTask<?>[] a; int m, s, b, n;
           if ((a = array) != null && (m = a.length - 1) >= 0 &&
               task != null) {
               while ((n = (s = top) - (b = base)) > 0) {
                   for (ForkJoinTask<?> t;;) {      // traverse from s to b
                       long j = ((--s & m) << ASHIFT) + ABASE;
                       if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                           return s + 1 == top;     // shorter than expected
                       else if (t == task) {                                  //查询相应task
                           boolean removed = false;
                           if (s + 1 == top) {      // pop                            //最后一个task任务队列;删除该任务,执行任务
                               if (U.compareAndSwapObject(a, j, task, null)) {
                                   U.putOrderedInt(this, QTOP, s);
                                   removed = true;
                               }
                           }
                           else if (base == b)      // replace with proxy           //任务替换成EmptyTask,执行任务
                               removed = U.compareAndSwapObject(
                                   a, j, task, new EmptyTask());
                           if (removed)
                               task.doExec();
                           break;
                       }
                       else if (t.status < 0 && s + 1 == top) {                 //最后任务已执行;删除该任务,
                           if (U.compareAndSwapObject(a, j, t, null))
                               U.putOrderedInt(this, QTOP, s);
                           break;                  // was cancelled
                       }
                       if (--n == 0)                                         //没找到该任务直接返回false
                           return false;
                   }
                   if (task.status < 0)                                    //查找任务已执行;返回false
                       return false;
               }
           }
           return true;
       }

三、ForkJoinPool

1.ForkJoinPoolsh构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53


    // Lower and upper word masks
    private static final long SP_MASK    = 0xffffffffL;
    private static final long UC_MASK    = ~SP_MASK;

    // Active counts
    private static final int  AC_SHIFT   = 48;
    private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
    private static final long AC_MASK    = 0xffffL << AC_SHIFT;

    // Total counts
    private static final int  TC_SHIFT   = 32;
    private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
    private static final long TC_MASK    = 0xffffL << TC_SHIFT;
    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign

    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
    private static final int  RSLOCK     = 1;
    private static final int  RSIGNAL    = 1 << 1;
    private static final int  STARTED    = 1 << 2;
    private static final int  STOP       = 1 << 29;
    private static final int  TERMINATED = 1 << 30;
    private static final int  SHUTDOWN   = 1 << 31;

    // Instance fields
    volatile long ctl;                   // main pool control
    volatile int runState;               // lockable status
    final int config;                    // parallelism, mode
    int indexSeed;                       // to generate worker index
    volatile WorkQueue[] workQueues;     // main registry
    final ForkJoinWorkerThreadFactory factory;
    final UncaughtExceptionHandler ueh;  // per-worker UEH
    final String workerNamePrefix;       // to create worker name string
    volatile AtomicLong stealCounter;  
    
   /**
    * Creates a {@code ForkJoinPool} with the given parameters, without
    * any security checks or parameter validation.  Invoked directly by
    * makeCommonPool.
    */
   private ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        int mode,
                        String workerNamePrefix) {
       this.workerNamePrefix = workerNamePrefix;
       this.factory = factory;
       this.ueh = handler;
       this.config = (parallelism & SMASK) | mode;
       long np = (long)(-parallelism); // offset ctl counts
       this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
   }     

2.runState

(1).lockRunState 获取锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
    /**
     * Acquires the runState lock; returns current (locked) runState.
     */
    private int lockRunState() {
        int rs;
        return ((((rs = runState) & RSLOCK) != 0 ||
                 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
                awaitRunStateLock() : rs);
    }

    /**
     * Spins and/or blocks until runstate lock is available.  See
     * above for explanation.
     */
    private int awaitRunStateLock() {
        Object lock;
        boolean wasInterrupted = false;
        for (int spins = SPINS, r = 0, rs, ns;;) {
            if (((rs = runState) & RSLOCK) == 0) {
                if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
                    if (wasInterrupted) {
                        try {
                            Thread.currentThread().interrupt();
                        } catch (SecurityException ignore) {
                        }
                    }
                    return ns;
                }
            }
            else if (r == 0)
                r = ThreadLocalRandom.nextSecondarySeed();
            else if (spins > 0) {
                r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
                if (r >= 0)
                    --spins;
            }
            else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
                Thread.yield();   // initialization race
            else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {  //锁等待
                synchronized (lock) {
                    if ((runState & RSIGNAL) != 0) {
                        try {
                            lock.wait();
                        } catch (InterruptedException ie) {
                            if (!(Thread.currentThread() instanceof
                                  ForkJoinWorkerThread))
                                wasInterrupted = true;
                        }
                    }
                    else
                        lock.notifyAll();
                }
            }
        }
    }

(2).unlockRunState 解锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
      /**
       * Unlocks and sets runState to newRunState.
       *
       * @param oldRunState a value returned from lockRunState
       * @param newRunState the next value (must have lock bit clear).
       */
      private void unlockRunState(int oldRunState, int newRunState) {
          if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
              Object lock = stealCounter;
              runState = newRunState;              // clears RSIGNAL bit
              if (lock != null)
                  synchronized (lock) { lock.notifyAll(); }
          }
      }

3.submit 提交任务

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

   /**
    * Tries to add the given task to a submission queue at
    * submitter's current queue. Only the (vastly) most common path
    * is directly handled in this method, while screening for need
    * for externalSubmit.
    *
    * @param task the task. Caller must ensure non-null.
    */
   final void externalPush(ForkJoinTask<?> task) {
       WorkQueue[] ws; WorkQueue q; int m;
       int r = ThreadLocalRandom.getProbe(); //xorshift
       int rs = runState;
       if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
           (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
           U.compareAndSwapInt(q, QLOCK, 0, 1)) {  //   m & r & SQMASK 查找index为偶数
           ForkJoinTask<?>[] a; int am, n, s;
           if ((a = q.array) != null &&
               (am = a.length - 1) > (n = (s = q.top) - q.base)) {
               int j = ((am & s) << ASHIFT) + ABASE;
               U.putOrderedObject(a, j, task);
               U.putOrderedInt(q, QTOP, s + 1);
               U.putIntVolatile(q, QLOCK, 0);
               if (n <= 1)                   
                   signalWork(ws, q);
               return;
           }
           U.compareAndSwapInt(q, QLOCK, 1, 0);
       }
       externalSubmit(task);
   }
   /**
    * Full version of externalPush, handling uncommon cases, as well
    * as performing secondary initialization upon the first
    * submission of the first task to the pool.  It also detects
    * first submission by an external thread and creates a new shared
    * queue if the one at index if empty or contended.
    *
    * @param task the task. Caller must ensure non-null.
    */
   private void externalSubmit(ForkJoinTask<?> task) {
       int r;                                    // initialize caller's probe
       if ((r = ThreadLocalRandom.getProbe()) == 0) {
           ThreadLocalRandom.localInit();
           r = ThreadLocalRandom.getProbe();
       }
       for (;;) {
           WorkQueue[] ws; WorkQueue q; int rs, m, k;
           boolean move = false;
           if ((rs = runState) < 0) {
               tryTerminate(false, false);     // help terminate
               throw new RejectedExecutionException();
           }
           else if ((rs & STARTED) == 0 ||     // initialize        //初始化过程
                    ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
               int ns = 0;
               rs = lockRunState();
               try {
                   if ((rs & STARTED) == 0) {
                       U.compareAndSwapObject(this, STEALCOUNTER, null,
                                              new AtomicLong());
                       // create workQueues array with size a power of two
                       int p = config & SMASK; // ensure at least 2 slots
                       int n = (p > 1) ? p - 1 : 1;
                       n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                       n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                       workQueues = new WorkQueue[n];
                       ns = STARTED;
                   }
               } finally {
                   unlockRunState(rs, (rs & ~RSLOCK) | ns);
               }
           }
           else if ((q = ws[k = r & m & SQMASK]) != null) {
               if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //往WorkQueue添加任务
                   ForkJoinTask<?>[] a = q.array;
                   int s = q.top;
                   boolean submitted = false; // initial submission or resizing
                   try {                      // locked version of push
                       if ((a != null && a.length > s + 1 - q.base) ||
                           (a = q.growArray()) != null) {
                           int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                           U.putOrderedObject(a, j, task);
                           U.putOrderedInt(q, QTOP, s + 1);
                           submitted = true;
                       }
                   } finally {
                       U.compareAndSwapInt(q, QLOCK, 1, 0);
                   }
                   if (submitted) {
                       signalWork(ws, q);
                       return;
                   }
               }
               move = true;                   // move on failure  重新生成r
           }
           else if (((rs = runState) & RSLOCK) == 0) { // create new queue 重新创建WorkQueue
               q = new WorkQueue(this, null);
               q.hint = r;
               q.config = k | SHARED_QUEUE;
               q.scanState = INACTIVE;
               rs = lockRunState();           // publish index
               if (rs > 0 &&  (ws = workQueues) != null &&
                   k < ws.length && ws[k] == null)
                   ws[k] = q;                 // else terminated
               unlockRunState(rs, rs & ~RSLOCK);
           }
           else
               move = true;                   // move if busy
           if (move)
               r = ThreadLocalRandom.advanceProbe(r);
       }
   }    

4.signalWork 唤醒工作线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

   /**
    * Tries to create or activate a worker if too few are active.
    *
    * @param ws the worker array to use to find signallees
    * @param q a WorkQueue --if non-null, don't retry if now empty
    */
   final void signalWork(WorkQueue[] ws, WorkQueue q) {
       long c; int sp, i; WorkQueue v; Thread p;
       while ((c = ctl) < 0L) {                       // too few active
           if ((sp = (int)c) == 0) {                  // no idle workers 没有空闲线程
               if ((c & ADD_WORKER) != 0L)            // too few workers  ADD_WORKER=1 << 47 总启动线程数量最高一位;控制启动最多线程
                   tryAddWorker(c);
               break;
           }
           if (ws == null)                            // unstarted/terminated
               break;
           if (ws.length <= (i = sp & SMASK))         // terminated
               break;
           if ((v = ws[i]) == null)                   // terminating
               break;
           int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
           int d = sp - v.scanState;                  // screen CAS
           long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
           if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {  //修改ctl 唤醒线程
               v.scanState = vs;                      // activate v
               if ((p = v.parker) != null)
                   U.unpark(p);
               break;
           }
           if (q != null && q.base == q.top)          // no more work
               break;
       }
   }

5.tryAddWorker 创建工作线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

  /**
   * Tries to add one worker, incrementing ctl counts before doing
   * so, relying on createWorker to back out on failure.
   *
   * @param c incoming ctl value, with total count negative and no
   * idle workers.  On CAS failure, c is refreshed and retried if
   * this holds (otherwise, a new worker is not needed).
   */
  private void tryAddWorker(long c) {
      boolean add = false;
      do {
          long nc = ((AC_MASK & (c + AC_UNIT)) |
                     (TC_MASK & (c + TC_UNIT)));
          if (ctl == c) {
              int rs, stop;                 // check if terminating
              if ((stop = (rs = lockRunState()) & STOP) == 0)
                  add = U.compareAndSwapLong(this, CTL, c, nc);
              unlockRunState(rs, rs & ~RSLOCK);
              if (stop != 0)
                  break;
              if (add) {
                  createWorker();
                  break;
              }
          }
      } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
  }
  
      /**
       * Tries to construct and start one worker. Assumes that total
       * count has already been incremented as a reservation.  Invokes
       * deregisterWorker on any failure.
       *
       * @return true if successful
       */
      private boolean createWorker() {
          ForkJoinWorkerThreadFactory fac = factory;
          Throwable ex = null;
          ForkJoinWorkerThread wt = null;
          try {
              if (fac != null && (wt = fac.newThread(this)) != null) {
                  wt.start();
                  return true;
              }
          } catch (Throwable rex) {
              ex = rex;
          }
          deregisterWorker(wt, ex);
          return false;
      }



6.deregisterWorker 注销WorkQueue数组

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

  /**
   * Final callback from terminating worker, as well as upon failure
   * to construct or start a worker.  Removes record of worker from
   * array, and adjusts counts. If pool is shutting down, tries to
   * complete termination.
   *
   * @param wt the worker thread, or null if construction failed
   * @param ex the exception causing failure, or null if none
   */
  final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
      WorkQueue w = null;
      if (wt != null && (w = wt.workQueue) != null) {
          WorkQueue[] ws;                           // remove index from array
          int idx = w.config & SMASK;
          int rs = lockRunState();
          if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
              ws[idx] = null;
          unlockRunState(rs, rs & ~RSLOCK);
      }
      long c;                                       // decrement counts
      do {} while (!U.compareAndSwapLong
                   (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                         (TC_MASK & (c - TC_UNIT)) |
                                         (SP_MASK & c))));
      if (w != null) {
          w.qlock = -1;                             // ensure set
          w.transferStealCount(this);
          w.cancelAll();                            // cancel remaining tasks
      }
      for (;;) {                                    // possibly replace
          WorkQueue[] ws; int m, sp;
          if (tryTerminate(false, false) || w == null || w.array == null ||
              (runState & STOP) != 0 || (ws = workQueues) == null ||
              (m = ws.length - 1) < 0)              // already terminating
              break;
          if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
              if (tryRelease(c, ws[sp & m], AC_UNIT))
                  break;
          }
          else if (ex != null && (c & ADD_WORKER) != 0L) {
              tryAddWorker(c);                      // create replacement
              break;
          }
          else                                      // don't need replacement
              break;
      }
      if (ex == null)                               // help clean on way out
          ForkJoinTask.helpExpungeStaleExceptions();
      else                                          // rethrow
          ForkJoinTask.rethrow(ex);
  }

7.registerWorker 注册WorkQueue数组

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  /**
    * Callback from ForkJoinWorkerThread constructor to establish and
    * record its WorkQueue.
    *
    * @param wt the worker thread
    * @return the worker's queue
    */
   final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
       UncaughtExceptionHandler handler;
       wt.setDaemon(true);                           // configure thread
       if ((handler = ueh) != null)
           wt.setUncaughtExceptionHandler(handler);
       WorkQueue w = new WorkQueue(this, wt);
       int i = 0;                                    // assign a pool index
       int mode = config & MODE_MASK;
       int rs = lockRunState();
       try {
           WorkQueue[] ws; int n;                    // skip if no array
           if ((ws = workQueues) != null && (n = ws.length) > 0) {
               int s = indexSeed += SEED_INCREMENT;  // unlikely to collide  SEED_INCREMENT 在11.ThreadLocal源码分析.md具体说明 哈希码能均匀的分布在2的N次方的数组里
               int m = n - 1;
               i = ((s << 1) | 1) & m;               // odd-numbered indices  奇数
               if (ws[i] != null) {                  // collision
                   int probes = 0;                   // step by approx half n   大约一半
                   int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                   while (ws[i = (i + step) & m] != null) {
                       if (++probes >= n) {                   //扩容
                           workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                           m = n - 1;
                           probes = 0;
                       }
                   }
               }
               w.hint = s;                           // use as random seed
               w.config = i | mode;
               w.scanState = i;                      // publication fence
               ws[i] = w;
           }
       } finally {
           unlockRunState(rs, rs & ~RSLOCK);
       }
       wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
       return w;
   }

8.runWorker 执行工作线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   /**
    * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
    */
   final void runWorker(WorkQueue w) {
       w.growArray();                   // allocate queue
       int seed = w.hint;               // initially holds randomization hint
       int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
       for (ForkJoinTask<?> t;;) {
           if ((t = scan(w, r)) != null)
               w.runTask(t);
           else if (!awaitWork(w, r))
               break;
           r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
       }
   }

9.scan 扫描任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    /**
     * Scans for and tries to steal a top-level task. Scans start at a
     * random location, randomly moving on apparent contention,
     * otherwise continuing linearly until reaching two consecutive
     * empty passes over all queues with the same checksum (summing
     * each base index of each queue, that moves on each steal), at
     * which point the worker tries to inactivate and then re-scans,
     * attempting to re-activate (itself or some other worker) if
     * finding a task; otherwise returning null to await work.  Scans
     * otherwise touch as little memory as possible, to reduce
     * disruption on other scanning threads.
     *
     * @param w the worker (via its WorkQueue)
     * @param r a random seed
     * @return a task, or null if none found
     */
    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                if ((q = ws[k]) != null) {
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            if (ss >= 0) {
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    q.base = b + 1;
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);
                                    return t;
                                }
                            }
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
                    checkSum = 0;
                }
            }
        }
        return null;
    }
   /**
    * Signals and releases worker v if it is top of idle worker
    * stack.  This performs a one-shot version of signalWork only if
    * there is (apparently) at least one idle worker.
    *
    * @param c incoming ctl value
    * @param v if non-null, a worker
    * @param inc the increment to active count (zero when compensating)
    * @return true if successful
    */
   private boolean tryRelease(long c, WorkQueue v, long inc) {
       int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
       if (v != null && v.scanState == sp) {          // v is at top of stack
           long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
           if (U.compareAndSwapLong(this, CTL, c, nc)) {
               v.scanState = vs;
               if ((p = v.parker) != null)
                   U.unpark(p);
               return true;
           }
       }
       return false;
   }     

10.awaitWork 无任务等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 
    /**
     * Possibly blocks worker w waiting for a task to steal, or
     * returns false if the worker should terminate.  If inactivating
     * w has caused the pool to become quiescent, checks for pool
     * termination, and, so long as this is not the only worker, waits
     * for up to a given duration.  On timeout, if ctl has not
     * changed, terminates the worker, which will in turn wake up
     * another worker to possibly repeat this process.
     *
     * @param w the calling worker
     * @param r a random seed (for spins)
     * @return false if the worker should terminate
     */
    private boolean awaitWork(WorkQueue w, int r) {
        if (w == null || w.qlock < 0)                 // w is terminating
            return false;
        for (int pred = w.stackPred, spins = SPINS, ss;;) {
            if ((ss = w.scanState) >= 0)
                break;
            else if (spins > 0) {
                r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
                if (r >= 0 && --spins == 0) {         // randomize spins 随机旋转,判断stackPred(前等待线程scanState)该等待线程的为空,在重新自选
                    WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                    if (pred != 0 && (ws = workQueues) != null &&
                        (j = pred & SMASK) < ws.length &&
                        (v = ws[j]) != null &&        // see if pred parking
                        (v.parker == null || v.scanState >= 0))
                        spins = SPINS;                // continue spinning
                }
            }
            else if (w.qlock < 0)                     // recheck after spins
                return false;
            else if (!Thread.interrupted()) {
                long c, prevctl, parkTime, deadline;
                int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
                if ((ac <= 0 && tryTerminate(false, false)) ||
                    (runState & STOP) != 0)           // pool terminating
                    return false;
                if (ac <= 0 && ss == (int)c) {        // is last waiter
                    prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                    int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                        return false;                 // else use timed wait
                    parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                    deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
                }
                else
                    prevctl = parkTime = deadline = 0L;
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
                w.parker = wt;
                if (w.scanState < 0 && ctl == c)      // recheck before park
                    U.park(false, parkTime);
                U.putOrderedObject(w, QPARKER, null);
                U.putObject(wt, PARKBLOCKER, null);
                if (w.scanState >= 0)
                    break;
                if (parkTime != 0L && ctl == c &&
                    deadline - System.nanoTime() <= 0L &&
                    U.compareAndSwapLong(this, CTL, c, prevctl))
                    return false;                     // shrink pool
            }
        }
        return true;
    }

11.awaitJoin join操作

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

  /**
   * Helps and/or blocks until the given task is done or timeout.
   *
   * @param w caller
   * @param task the task
   * @param deadline for timed waits, if nonzero
   * @return task status on exit
   */
  final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
      int s = 0;
      if (task != null && w != null) {
          ForkJoinTask<?> prevJoin = w.currentJoin;
          U.putOrderedObject(w, QCURRENTJOIN, task);
          CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
              (CountedCompleter<?>)task : null;
          for (;;) {
              if ((s = task.status) < 0)
                  break;
              if (cc != null)
                  helpComplete(w, cc, 0);
              else if (w.base == w.top || w.tryRemoveAndExec(task))
                  helpStealer(w, task);
              if ((s = task.status) < 0)
                  break;
              long ms, ns;
              if (deadline == 0L)
                  ms = 0L;
              else if ((ns = deadline - System.nanoTime()) <= 0L)
                  break;
              else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                  ms = 1L;
              if (tryCompensate(w)) {
                  task.internalWait(ms);
                  U.getAndAddLong(this, CTL, AC_UNIT);
              }
          }
          U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
      }
      return s;
  }
 /**
   * Tries to decrement active count (sometimes implicitly) and
   * possibly release or create a compensating worker in preparation
   * for blocking. Returns false (retryable by caller), on
   * contention, detected staleness, instability, or termination.
   *
   * @param w caller
   */
  private boolean tryCompensate(WorkQueue w) {
      boolean canBlock;
      WorkQueue[] ws; long c; int m, pc, sp;
      if (w == null || w.qlock < 0 ||           // caller terminating
          (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
          (pc = config & SMASK) == 0)           // parallelism disabled
          canBlock = false;
      else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
          canBlock = tryRelease(c, ws[sp & m], 0L);
      else {
          int ac = (int)(c >> AC_SHIFT) + pc;
          int tc = (short)(c >> TC_SHIFT) + pc;
          int nbusy = 0;                        // validate saturation
          for (int i = 0; i <= m; ++i) {        // two passes of odd indices
              WorkQueue v;
              if ((v = ws[((i << 1) | 1) & m]) != null) {
                  if ((v.scanState & SCANNING) != 0)
                      break;
                  ++nbusy;
              }
          }
          if (nbusy != (tc << 1) || ctl != c)
              canBlock = false;                 // unstable or stale
          else if (tc >= pc && ac > 1 && w.isEmpty()) {
              long nc = ((AC_MASK & (c - AC_UNIT)) |
                         (~AC_MASK & c));       // uncompensated
              canBlock = U.compareAndSwapLong(this, CTL, c, nc);
          }
          else if (tc >= MAX_CAP ||
                   (this == common && tc >= pc + commonMaxSpares))
              throw new RejectedExecutionException(
                  "Thread limit exceeded replacing blocked worker");
          else {                                // similar to tryAddWorker
              boolean add = false; int rs;      // CAS within lock
              long nc = ((AC_MASK & c) |
                         (TC_MASK & (c + TC_UNIT)));
              if (((rs = lockRunState()) & STOP) == 0)
                  add = U.compareAndSwapLong(this, CTL, c, nc);
              unlockRunState(rs, rs & ~RSLOCK);
              canBlock = add && createWorker(); // throws on exception
          }
      }
      return canBlock;
  }
  
 /**
   * Tries to locate and execute tasks for a stealer of the given
   * task, or in turn one of its stealers, Traces currentSteal ->
   * currentJoin links looking for a thread working on a descendant
   * of the given task and with a non-empty queue to steal back and
   * execute tasks from. The first call to this method upon a
   * waiting join will often entail scanning/search, (which is OK
   * because the joiner has nothing better to do), but this method
   * leaves hints in workers to speed up subsequent calls.
   *
   * @param w caller
   * @param task the task to join
   */
  private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
      WorkQueue[] ws = workQueues;
      int oldSum = 0, checkSum, m;
      if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
          task != null) {
          do {                                       // restart point
              checkSum = 0;                          // for stability check
              ForkJoinTask<?> subtask;
              WorkQueue j = w, v;                    // v is subtask stealer
              descent: for (subtask = task; subtask.status >= 0; ) {
                  for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                      if (k > m)                     // can't find stealer
                          break descent;
                      if ((v = ws[i = (h + k) & m]) != null) {
                          if (v.currentSteal == subtask) {
                              j.hint = i;
                              break;
                          }
                          checkSum += v.base;
                      }
                  }
                  for (;;) {                         // help v or descend
                      ForkJoinTask<?>[] a; int b;
                      checkSum += (b = v.base);
                      ForkJoinTask<?> next = v.currentJoin;
                      if (subtask.status < 0 || j.currentJoin != subtask ||
                          v.currentSteal != subtask) // stale
                          break descent;
                      if (b - v.top >= 0 || (a = v.array) == null) {
                          if ((subtask = next) == null)
                              break descent;
                          j = v;
                          break;
                      }
                      int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                      ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                           U.getObjectVolatile(a, i));
                      if (v.base == b) {
                          if (t == null)             // stale
                              break descent;
                          if (U.compareAndSwapObject(a, i, t, null)) {
                              v.base = b + 1;
                              ForkJoinTask<?> ps = w.currentSteal;
                              int top = w.top;
                              do {
                                  U.putOrderedObject(w, QCURRENTSTEAL, t);
                                  t.doExec();        // clear local tasks too
                              } while (task.status >= 0 &&
                                       w.top != top &&
                                       (t = w.pop()) != null);
                              U.putOrderedObject(w, QCURRENTSTEAL, ps);
                              if (w.base != w.top)
                                  return;            // can't further help
                          }
                      }
                  }
              }
          } while (task.status >= 0 && oldSum != (oldSum = checkSum));
      }
  }    

12.shutdown

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

    /**
     * Possibly initiates an orderly shutdown in which previously
     * submitted tasks are executed, but no new tasks will be
     * accepted. Invocation has no effect on execution state if this
     * is the {@link #commonPool()}, and no additional effect if
     * already shut down.  Tasks that are in the process of being
     * submitted concurrently during the course of this method may or
     * may not be rejected.
     *
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public void shutdown() {
        checkPermission();
        tryTerminate(false, true);
    }
   /**
     * Possibly initiates and/or completes termination.
     *
     * @param now if true, unconditionally terminate, else only
     * if no work and no active workers
     * @param enable if true, enable shutdown when next possible
     * @return true if now terminating or terminated
     */
    private boolean tryTerminate(boolean now, boolean enable) {
        int rs;
        if (this == common)                       // cannot shut down
            return false;
        if ((rs = runState) >= 0) {
            if (!enable)
                return false;
            rs = lockRunState();                  // enter SHUTDOWN phase
            unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
        }

        if ((rs & STOP) == 0) {
            if (!now) {                           // check quiescence
                for (long oldSum = 0L;;) {        // repeat until stable
                    WorkQueue[] ws; WorkQueue w; int m, b; long c;
                    long checkSum = ctl;
                    if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
                        return false;             // still active workers
                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
                        break;                    // check queues
                    for (int i = 0; i <= m; ++i) {
                        if ((w = ws[i]) != null) {
                            if ((b = w.base) != w.top || w.scanState >= 0 ||
                                w.currentSteal != null) {
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                                return false;     // arrange for recheck
                            }
                            checkSum += b;
                            if ((i & 1) == 0)
                                w.qlock = -1;     // try to disable external
                        }
                    }
                    if (oldSum == (oldSum = checkSum))
                        break;
                }
            }
            if ((runState & STOP) == 0) {
                rs = lockRunState();              // enter STOP phase
                unlockRunState(rs, (rs & ~RSLOCK) | STOP);
            }
        }

        int pass = 0;                             // 3 passes to help terminate
        for (long oldSum = 0L;;) {                // or until done or stable
            WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
            long checkSum = ctl;
            if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
                (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
                if ((runState & TERMINATED) == 0) {
                    rs = lockRunState();          // done
                    unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
                    synchronized (this) { notifyAll(); } // for awaitTermination
                }
                break;
            }
            for (int i = 0; i <= m; ++i) {
                if ((w = ws[i]) != null) {
                    checkSum += w.base;
                    w.qlock = -1;                 // try to disable
                    if (pass > 0) {
                        w.cancelAll();            // clear queue
                        if (pass > 1 && (wt = w.owner) != null) {
                            if (!wt.isInterrupted()) {
                                try {             // unblock join
                                    wt.interrupt();
                                } catch (Throwable ignore) {
                                }
                            }
                            if (w.scanState < 0)
                                U.unpark(wt);     // wake up
                        }
                    }
                }
            }
            if (checkSum != oldSum) {             // unstable
                oldSum = checkSum;
                pass = 0;
            }
            else if (pass > 3 && pass > m)        // can't further help
                break;
            else if (++pass > 1) {                // try to dequeue
                long c; int j = 0, sp;            // bound attempts
                while (j++ <= m && (sp = (int)(c = ctl)) != 0)
                    tryRelease(c, ws[sp & m], AC_UNIT);
            }
        }
        return true;
    }

四、ForkJoinWorkerThread线程

1.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 
   final ForkJoinPool pool;                // the pool this thread works in
   final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

   /**
    * Creates a ForkJoinWorkerThread operating in the given pool.
    *
    * @param pool the pool this thread works in
    * @throws NullPointerException if pool is null
    */
   protected ForkJoinWorkerThread(ForkJoinPool pool) {
       // Use a placeholder until a useful name can be set in registerWorker
       super("aForkJoinWorkerThread");
       this.pool = pool;
       this.workQueue = pool.registerWorker(this);
   }

2.run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

   /**
    * This method is required to be public, but should never be
    * called explicitly. It performs the main run loop to execute
    * {@link ForkJoinTask}s.
    */
   public void run() {
       if (workQueue.array == null) { // only run once
           Throwable exception = null;
           try {
               onStart();
               pool.runWorker(workQueue);
           } catch (Throwable ex) {
               exception = ex;
           } finally {
               try {
                   onTermination(exception);
               } catch (Throwable ex) {
                   if (exception == null)
                       exception = ex;
               } finally {
                   pool.deregisterWorker(this, exception);
               }
           }
       }
   }

五、ForkJoinTask源码分析

ForkJoinTask抽象类实现Future接口

1.ForkJoinTask构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

    /*
     * See the internal documentation of class ForkJoinPool for a
     * general implementation overview.  ForkJoinTasks are mainly
     * responsible for maintaining their "status" field amidst relays
     * to methods in ForkJoinWorkerThread and ForkJoinPool.
     * 
     * 请参阅ForkJoinPool类的内部文档以获取常规实施概述。 
     * ForkJoinTasks主要负责在中继到ForkJoinWorkerThread和ForkJoinPool中的方法中维护其"status"字段。
     * 
     * 
     *
     * The methods of this class are more-or-less layered into
     * 这个类的方法或多或少分层
     * (1) basic status maintenance  基本的状态维护
     * (2) execution and awaiting completion  执行和等待完成
     * (3) user-level methods that additionally report results. 用户级别的方法,另外报告结果。
     * This is sometimes hard to see because this file orders exported
     * methods in a way that flows well in javadocs.
     * 有时候很难看到这个文件,因为这个文件在javadoc中以一种流畅的方式命令导出的方法。
     */

    /*
     * The status field holds run control status bits packed into a
     * single int to minimize footprint and to ensure atomicity (via
     * CAS).  Status is initially zero, and takes on nonnegative
     * values until completed, upon which status (anded with
     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
     * undergoing blocking waits by other threads have the SIGNAL bit
     * set.  Completion of a stolen task with SIGNAL set awakens any
     * waiters via notifyAll. Even though suboptimal for some
     * purposes, we use basic builtin wait/notify to take advantage of
     * "monitor inflation" in JVMs that we would otherwise need to
     * emulate to avoid adding further per-task bookkeeping overhead.
     * We want these monitors to be "fat", i.e., not use biasing or
     * thin-lock techniques, so use some odd coding idioms that tend
     * to avoid them, mainly by arranging that every synchronized
     * block performs a wait, notifyAll or both.
     * 
     * status字段将运行控制状态位保存在一个int中,以最小化占用空间并确保原子性(通过CAS)。status初始为零,并且在完成之前呈现非负值,
     * 在此status(用DONE_MASK结束)保持NORMAL(正常),CANCELLED(取消)或EXCEPTIONAL(异常)值。
     * 由其他线程阻塞等待的任务将设置SIGNAL位。通过SIGNAL设置完成被盗任务,通过notifyAll唤醒任何waiters。尽管对于某些目的而言不是最理想的,
     * 但我们使用基本的内建wait/notify来利用JVM中的"monitor inflation",否则我们需要模拟这些优先级以避免增加每个任务的簿记开销。
     * 我们希望这些监视器是"fat"的,即不使用偏置或薄锁技术,所以使用一些奇怪的编码习惯用法来避免它们,主要是通过安排每个同步块执行等待,notifyAll或两者。
     *
     * These control bits occupy only (some of) the upper half (16
     * bits) of status field. The lower bits are used for user-defined
     * tags.
     * 这些控制位仅占用状态字段的上半部分(16位)(部分)。低位用于用户定义的标签。
     */

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits 1111 0000 0000 0000 0000 0000 0000 0000
    static final int NORMAL      = 0xf0000000;  // must be negative             1111 0000 0000 0000 0000 0000 0000 0000
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL             1100 0000 0000 0000 0000 0000 0000 0000
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED          1000 0000 0000 0000 0000 0000 0000 0000
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16           0000 0000 0000 0001 0000 0000 0000 0000
    static final int SMASK       = 0x0000ffff;  // short bits for tags          0000 0000 0000 0000 1111 1111 1111 1111
    
    
    /**
     * Table of exceptions thrown by tasks, to enable reporting by
     * callers. Because exceptions are rare, we don't directly keep
     * them with task objects, but instead use a weak ref table.  Note
     * that cancellation exceptions don't appear in the table, but are
     * instead recorded as status values.
     * 由任务引发的异常表,以使调用者能够进行报告。由于异常很少,我们不直接使用任务对象,而是使用弱引用表。
     * 请注意,取消异常不会出现在表格中,而是记录为状态值。
     *
     * Note: These statics are initialized below in static block. 注意:这些静态块在静态块中被初始化。
     */
    private static final ExceptionNode[] exceptionTable;
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue<Object> exceptionTableRefQueue;

    /**
     * Fixed capacity for exceptionTable. 固定的异常表容量。
     */
    private static final int EXCEPTION_MAP_CAPACITY = 32;    
    
 }   
    
    

2. ExceptionNode 异常节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
     * Key-value nodes for exception table.  The chained hash table
     * uses identity comparisons, full locking, and weak references
     * for keys. The table has a fixed capacity because it only
     * maintains task exceptions long enough for joiners to access
     * them, so should never become very large for sustained
     * periods. However, since we do not know when the last joiner
     * completes, we must use weak references and expunge them. We do
     * so on each operation (hence full locking). Also, some thread in
     * any ForkJoinPool will call helpExpungeStaleExceptions when its
     * pool becomes isQuiescent.
     * 异常表的键值节点。链式哈希表使用标识比较,完全锁定和对引用的弱引用。
     * 该表具有固定的容量,因为它只保留足够长的任务异常来访问它们,所以永远不会变得非常大。
     * 但是,由于我们不知道最后一个细木工何时完成,所以我们必须使用弱引用并将其删除。我们这样做每个操作(因此完全锁定)。
     * 而且,任何ForkJoinPool中的某个线程在其池变为isQuiescent时将调用helpExpungeStaleExceptions。
     */
    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // use id not ref to avoid weak cycles 使用id不能避免弱循环
        final int hashCode;  // store task hashCode before weak ref disappears 在弱ref消失之前存储任务哈希代码
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
            super(task, exceptionTableRefQueue);
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task);
        }
    }

3. doExec 执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    /**
     * Primary execution method for stolen tasks. Unless done, calls
     * exec and records status if completed, but doesn't wait for
     * completion otherwise.
     *
     * @return status on exit from this method
     */
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
    
  /**
   * Marks completion and wakes up threads waiting to join this
   * task.
   *
   * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
   * @return completion status on exit
   */
  private int setCompletion(int completion) {
      for (int s;;) {
          if ((s = status) < 0)
              return s;
          if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
              if ((s >>> 16) != 0)
                  synchronized (this) { notifyAll(); }
              return completion;
          }
      }
  }
  



  

4. setExceptionalCompletion 异常处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
   /**
    * Records exception and possibly propagates.
    *
    * @return status on exit
    */
   private int setExceptionalCompletion(Throwable ex) {
       int s = recordExceptionalCompletion(ex);
       if ((s & DONE_MASK) == EXCEPTIONAL)
           internalPropagateException(ex);
       return s;
   }
   
/**
     * Records exception and sets status.
     *
     * @return status on exit
     */
    final int recordExceptionalCompletion(Throwable ex) {
        int s;
        if ((s = status) >= 0) {
            int h = System.identityHashCode(this);
            final ReentrantLock lock = exceptionTableLock;
            lock.lock();
            try {
                expungeStaleExceptions();
                ExceptionNode[] t = exceptionTable;
                int i = h & (t.length - 1);
                for (ExceptionNode e = t[i]; ; e = e.next) {
                    if (e == null) {
                        t[i] = new ExceptionNode(this, ex, t[i]);
                        break;
                    }
                    if (e.get() == this) // already present
                        break;
                }
            } finally {
                lock.unlock();
            }
            s = setCompletion(EXCEPTIONAL);
        }
        return s;
    }
    
   /**
    * Poll stale refs and remove them. Call only while holding lock.
    */
   private static void expungeStaleExceptions() {
       for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
           if (x instanceof ExceptionNode) {
               int hashCode = ((ExceptionNode)x).hashCode;
               ExceptionNode[] t = exceptionTable;
               int i = hashCode & (t.length - 1);
               ExceptionNode e = t[i];
               ExceptionNode pred = null;
               while (e != null) {
                   ExceptionNode next = e.next;
                   if (e == x) {
                       if (pred == null)
                           t[i] = next;
                       else
                           pred.next = next;
                       break;
                   }
                   pred = e;
                   e = next;
               }
           }
       }
   }     

5. fork

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

6. join

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    /**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    
  /**
   * Implementation for join, get, quietlyJoin. Directly handles
   * only cases of already-completed, external wait, and
   * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
   *
   * @return status upon completion
   */
  private int doJoin() {
      int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
      return (s = status) < 0 ? s :
          ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
          (w = (wt = (ForkJoinWorkerThread)t).workQueue).
          tryUnpush(this) && (s = doExec()) < 0 ? s :
          wt.pool.awaitJoin(w, this, 0L) :
          externalAwaitDone();
  }
  
  /**
   * Blocks a non-worker-thread until completion.
   * @return status upon completion
   */
  private int externalAwaitDone() {
      int s = ((this instanceof CountedCompleter) ? // try helping
               ForkJoinPool.common.externalHelpComplete(
                   (CountedCompleter<?>)this, 0) :
               ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
      if (s >= 0 && (s = status) >= 0) {
          boolean interrupted = false;
          do {
              if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                  synchronized (this) {
                      if (status >= 0) {
                          try {
                              wait(0L);
                          } catch (InterruptedException ie) {
                              interrupted = true;
                          }
                      }
                      else
                          notifyAll();
                  }
              }
          } while ((s = status) >= 0);
          if (interrupted)
              Thread.currentThread().interrupt();
      }
      return s;
  }