ForkJoin模式分析

简介

Fork/Join框架介绍不错博客,阿里方腾飞

Fork/Join使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制, 通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中, 进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

一、ForkJoinPool源码分析

ForkJoinPool继承AbstractExecutorService抽象类

1.ForkJoinPool实现概述

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
 /*
     * Implementation Overview
     *
     * This class provides the central bookkeeping and control for a
     * set of worker threads: Submissions from non-FJ threads enter
     * into a submission queue. Workers take these tasks and typically
     * split them into subtasks that may be stolen by other workers.
     * Preference rules give first priority to processing tasks from
     * their own queues (LIFO or FIFO, depending on mode), then to
     * randomized FIFO steals of tasks in other worker queues, and
     * lastly to new submissions.
     * 该类为一组工作线程提供中央记帐和控制:来自非FJ线程的提交输入到提交队列中。
     * 工人们承担这些任务,通常将他们分解成可能被其他工人窃取的子任务。优先规则优先处理来自其自己的队列(LIFO或FIFO,取决于模式)的任务,
     * 然后是其他工作队列中的任务的随机FIFO窃取,并且最后是新的提交。
     *
     * The main throughput advantages of work-stealing stem from
     * decentralized control -- workers mostly take tasks from
     * themselves or each other. We cannot negate this in the
     * implementation of other management responsibilities. The main
     * tactic for avoiding bottlenecks is packing nearly all
     * essentially atomic control state into a single 64bit volatile
     * variable ("ctl"). This variable is read on the order of 10-100
     * times as often as it is modified (always via CAS). (There is
     * some additional control state, for example variable "shutdown"
     * for which we can cope with uncoordinated updates.)  This
     * streamlines synchronization and control at the expense of messy
     * constructions needed to repack status bits upon updates.
     * Updates tend not to contend with each other except during
     * bursts while submitted tasks begin or end.  In some cases when
     * they do contend, threads can instead do something else
     * (usually, scan for tasks) until contention subsides.
     * 
     * 窃取工作的主要优势在于分散控制 - 工人大多是从他们自己或其他人那里获得工作。
     * 在履行其他管理责任方面,我们不能否定这一点。避免瓶颈的主要策略是将几乎所有的基本原子控制状态都打包成一个64位的volatile变量("ctl")。
     * 这个变量的读取次数是它修改的时间的10-100倍(总是通过CAS)。 (有一些额外的控制状态,例如变量"关机",我们可以应付不协调的更新。)
     * 这简化了同步和控制,代价是在更新时重新包装状态位所需的混乱构造。除了在提交的任务开始或结束时爆发期间,更新往往不会相互竞争。
     * 在某些情况下,线程可以做其他事情(通常是扫描任务),直到争用消退。
     *
     * To enable packing, we restrict maximum parallelism to (1<<15)-1
     * (which is far in excess of normal operating range) to allow
     * ids, counts, and their negations (used for thresholding) to fit
     * into 16bit fields.
     * 为了实现打包,我们限制最大平行度为(1 << 15)-1 *(远远超过正常的操作范围),以允许* id,计数和它们的否定(用于阈值处理)适合16位字段。
     *
     * Recording Workers.  Workers are recorded in the "workers" array
     * that is created upon pool construction and expanded if (rarely)
     * necessary.  This is an array as opposed to some other data
     * structure to support index-based random steals by workers.
     * Updates to the array recording new workers and unrecording
     * terminated ones are protected from each other by a seqLock
     * (scanGuard) but the array is otherwise concurrently readable,
     * and accessed directly by workers. To simplify index-based
     * operations, the array size is always a power of two, and all
     * readers must tolerate null slots. To avoid flailing during
     * start-up, the array is presized to hold twice #parallelism
     * workers (which is unlikely to need further resizing during
     * execution). But to avoid dealing with so many null slots,
     * variable scanGuard includes a mask for the nearest power of two
     * that contains all current workers.  All worker thread creation
     * is on-demand, triggered by task submissions, replacement of
     * terminated workers, and/or compensation for blocked
     * workers. However, all other support code is set up to work with
     * other policies.  To ensure that we do not hold on to worker
     * references that would prevent GC, ALL accesses to workers are
     * via indices into the workers array (which is one source of some
     * of the messy code constructions here). In essence, the workers
     * array serves as a weak reference mechanism. Thus for example
     * the wait queue field of ctl stores worker indices, not worker
     * references.  Access to the workers in associated methods (for
     * example signalWork) must both index-check and null-check the
     * IDs. All such accesses ignore bad IDs by returning out early
     * from what they are doing, since this can only be associated
     * with termination, in which case it is OK to give up.
     * 
     * 记录工作者。工人被记录在游泳池建造时创建的"工人"数组中,并且在(很少)必要时扩展。这是一个数组,而不是其他一些数据结构来支
     * 持工人的基于索引的随机窃取。对录制新工作者和未录制终止录音的阵列的更新通过seqLock(scanGuard)彼此保护,但该阵列可以同时读取,
     * 并且可以由工作人员直接访问。为了简化基于索引的操作,数组大小始终是2的乘方,所有读者都必须容忍空位。为了避免在启动过程中出现混乱,
     * 阵列被预先保存两次#并行工作者(这在执行期间不太可能需要进一步调整大小)。但是为了避免处理这么多的空位,变量scanGuard包含了一个
     * 包含所有当前工作人员的最近权力的掩码。所有工作线程的创建都是按需提供的,由提交任务,更换终止的工作人员和/或对被阻止的工作人员进
     * 行补偿。但是,所有其他支持代码都已设置为与其他策略配合使用。为了确保我们不拘泥于可能阻止GC的工作者引用,对worker的所有访问都是通
     * 过指向workers数组(这是这里一些混乱的代码构造的一个来源)的索引。从本质上讲,工人阵列是一个薄弱的参照机制。因此,例如ctl的等待队
     * 列字段存储工人索引,而不是工作者引用。在关联的方法(例如signalWork)中访问工作人员必须对这些ID进行索引检查和空值检查。所有这样的
     * 访问都会忽略坏ID,因为它们只能与终止相关联,在这种情况下,放弃是可以的。
     *
     * All uses of the workers array, as well as queue arrays, check
     * that the array is non-null (even if previously non-null). This
     * allows nulling during termination, which is currently not
     * necessary, but remains an option for resource-revocation-based
     * shutdown schemes.
     * workers数组的所有用法以及队列数组都检查数组是否为非null(即使之前为非null)。这允许在终止期间归零,这当前不是必需的,
     * 但仍然是用于基于资源撤销的关闭方案的选项。
     *
     * Wait Queuing. Unlike HPC work-stealing frameworks, we cannot
     * let workers spin indefinitely scanning for tasks when none can
     * be found immediately, and we cannot start/resume workers unless
     * there appear to be tasks available.  On the other hand, we must
     * quickly prod them into action when new tasks are submitted or
     * generated.  We park/unpark workers after placing in an event
     * wait queue when they cannot find work. This "queue" is actually
     * a simple Treiber stack, headed by the "id" field of ctl, plus a
     * 15bit counter value to both wake up waiters (by advancing their
     * count) and avoid ABA effects. Successors are held in worker
     * field "nextWait".  Queuing deals with several intrinsic races,
     * mainly that a task-producing thread can miss seeing (and
     * signalling) another thread that gave up looking for work but
     * has not yet entered the wait queue. We solve this by requiring
     * a full sweep of all workers both before (in scan()) and after
     * (in tryAwaitWork()) a newly waiting worker is added to the wait
     * queue. During a rescan, the worker might release some other
     * queued worker rather than itself, which has the same net
     * effect. Because enqueued workers may actually be rescanning
     * rather than waiting, we set and clear the "parked" field of
     * ForkJoinWorkerThread to reduce unnecessary calls to unpark.
     * (Use of the parked field requires a secondary recheck to avoid
     * missed signals.)
     * 
     * 等待排队。不像高性能计算机窃取工作的框架,我们不能让工作人员无限期地扫描任务,而不能马上找到任何工作,除非看起来有任务可用,
     * 否则我们不能启动/恢复工人。另一方面,当新的任务提交或产生时,我们必须迅速采取行动。在找不到工作的时候,我们把工作人员放在一个
     * 事件等待队列里。这个"队列"实际上是一个简单的Treiber堆栈,由ctl的"id"字段加上一个15bit的计数器值来唤醒等待者(通过提前计数)并避免ABA效应。
     * 接班人在工作场所"nextWait"举行。排队处理几个内在的种族,主要是一个任务生成线程可能会错过看到(和信号)另一个线程,放弃寻找工作,
     * 但还没有进入等待队列。我们通过在(在scan()中)和after(在tryAwaitWork())之前全部扫描所有的工人来解决这个问题,一个新等待的工人被
     * 添加到等待队列中。在重新扫描期间,工人可能会释放一些其他排队的工人,而不是自己,这具有相同的净效果。因为排队的工人可能实际上是在重新扫描,
     * 而不是等待,所以我们设置并清除ForkJoinWorkerThread的"停放"字段,以减少不必要的停靠。 (使用停放区域需要进行二次重新检查以避免错过信号。)
     *
     * Signalling.  We create or wake up workers only when there
     * appears to be at least one task they might be able to find and
     * execute.  When a submission is added or another worker adds a
     * task to a queue that previously had two or fewer tasks, they
     * signal waiting workers (or trigger creation of new ones if
     * fewer than the given parallelism level -- see signalWork).
     * These primary signals are buttressed by signals during rescans
     * as well as those performed when a worker steals a task and
     * notices that there are more tasks too; together these cover the
     * signals needed in cases when more than two tasks are pushed
     * but untaken.
     * 信号。只有在看起来至少有一项任务可以找到并执行时,我们才会创造或唤醒员工。当一个提交被添加或另一个工作人员添加一个任务到以前有两个或更少
     * 的任务的队列,他们信号等待工作人员(或触发创建新的,如果少于给定的并行水平 - 见signalWork)。这些主要信号在重新扫描期间受到信号的支持,
     * 以及在工作人员窃取任务时执行的信号,并注意到还有更多的任务。它们一起涵盖了当两个以上的任务被推动但未被采取的情况下所需要的信号。
     *
     * Trimming workers. To release resources after periods of lack of
     * use, a worker starting to wait when the pool is quiescent will
     * time out and terminate if the pool has remained quiescent for
     * SHRINK_RATE nanosecs. This will slowly propagate, eventually
     * terminating all workers after long periods of non-use.
     * 整理工人。要在缺乏使用期间释放资源,当池暂停时,开始等待的工作人员将超时并终止,如果池的剩余容量为SHRINK_RATE nanosecs。这将慢慢地传播,
     * 最终在所有工人长时间不使用后终止。
     * 
     * Submissions. External submissions are maintained in an
     * array-based queue that is structured identically to
     * ForkJoinWorkerThread queues except for the use of
     * submissionLock in method addSubmission. Unlike the case for
     * worker queues, multiple external threads can add new
     * submissions, so adding requires a lock.
     * 
     * 提交。外部提交在基于数组的队列中维护,其结构与ForkJoinWorkerThread队列的结构相同,除了在方法addSubmission中使用submissionLock。
     * 与工作队列的情况不同,多个外部线程可以添加新的提交,因此添加需要锁定。
     *
     * Compensation. Beyond work-stealing support and lifecycle
     * control, the main responsibility of this framework is to take
     * actions when one worker is waiting to join a task stolen (or
     * always held by) another.  Because we are multiplexing many
     * tasks on to a pool of workers, we can't just let them block (as
     * in Thread.join).  We also cannot just reassign the joiner's
     * run-time stack with another and replace it later, which would
     * be a form of "continuation", that even if possible is not
     * necessarily a good idea since we sometimes need both an
     * unblocked task and its continuation to progress. Instead we
     * combine two tactics:
     * 补偿。除了工作窃取支持和生命周期控制之外,这个框架的主要职责是当一个工作人员等待加入一个被他人窃取(或总是被他人占有)的任务时采取行动。
     * 因为我们将许多任务复用到工作池中,所以我们不能让它们阻塞(如在Thread.join中)。我们也不能仅仅把joiner的运行时间栈重新分配给另一个栈,
     * 并且稍后将其替换,这将是一种"延续"形式,即使可能也不一定是一个好主意,因为我们有时需要一个畅通无阻的任务和它的延续进展。
     * 相反,我们结合了两种策略:
     *
     *   Helping: Arranging for the joiner to execute some task that it
     *      would be running if the steal had not occurred.  Method
     *      ForkJoinWorkerThread.joinTask tracks joining->stealing
     *      links to try to find such a task.
     *      帮助:安排joiner执行一些任务,如果偷窃没有发生,它将会运行。方法Fork连接工作线程。连接任务跟踪连接- >窃取链接试图找到这样的任务。
     *
     *   Compensating: Unless there are already enough live threads,
     *      method tryPreBlock() may create or re-activate a spare
     *      thread to compensate for blocked joiners until they
     *      unblock.
     *      补偿:除非已经有足够的活动线程,方法tryPreBlock()可能会创建或重新激活一个备用线程来补偿被阻止的参与者,直到它们被解除阻塞。
     *
     * The ManagedBlocker extension API can't use helping so relies
     * only on compensation in method awaitBlocker.
     * 管理的拦截器扩展API不能使用帮助,因此只依赖于方法中的补偿等待拦截器。
     *
     * It is impossible to keep exactly the target parallelism number
     * of threads running at any given time.  Determining the
     * existence of conservatively safe helping targets, the
     * availability of already-created spares, and the apparent need
     * to create new spares are all racy and require heuristic
     * guidance, so we rely on multiple retries of each.  Currently,
     * in keeping with on-demand signalling policy, we compensate only
     * if blocking would leave less than one active (non-waiting,
     * non-blocked) worker. Additionally, to avoid some false alarms
     * due to GC, lagging counters, system activity, etc, compensated
     * blocking for joins is only attempted after rechecks stabilize
     * (retries are interspersed with Thread.yield, for good
     * citizenship).  The variable blockedCount, incremented before
     * blocking and decremented after, is sometimes needed to
     * distinguish cases of waiting for work vs blocking on joins or
     * other managed sync. Both cases are equivalent for most pool
     * control, so we can update non-atomically. (Additionally,
     * contention on blockedCount alleviates some contention on ctl).
     * 
     * 在任何给定的时间都不可能精确地保持运行的线程的目标并行数。确定保守安全帮助目标的存在,
     * 已经创建的备件的可用性以及创建新备件的明显需求都非常活跃,并且需要启发式的指导,所以我们依靠每个备份的多次重试。
     * 目前,按照按需信令政策,我们只有在阻塞情况下才会补偿少于一个活跃(非等待,非阻塞)的工作人员。
     * 另外,为了避免由于GC,滞后计数器,系统活动等引起的一些虚假警报,只有在重新检查稳定(重试与Thread.yield交织,为了良好的公民权)
     * 之后才尝试补偿联接的阻塞。变量blockedCount在阻塞之前递增,之后递减,有时需要区分等待工作与阻塞连接或其他受管同步的情况。
     * 这两种情况对于大多数池控制都是相同的,所以我们可以非自动更新。 (此外,blockingCount上的争用减轻了对ctl的一些争议)。
     * 
     *
     * Shutdown and Termination. A call to shutdownNow atomically sets
     * the ctl stop bit and then (non-atomically) sets each workers
     * "terminate" status, cancels all unprocessed tasks, and wakes up
     * all waiting workers.  Detecting whether termination should
     * commence after a non-abrupt shutdown() call requires more work
     * and bookkeeping. We need consensus about quiesence (i.e., that
     * there is no more work) which is reflected in active counts so
     * long as there are no current blockers, as well as possible
     * re-evaluations during independent changes in blocking or
     * quiescing workers.
     * 
     * 关机和终止。调用shutdownNow自动设置CTL停止位,然后(非原子)设置每个工人"终止"状态,取消所有未处理的任务,
     * 并唤醒所有等待工作人员。在非突然关机()调用之后检测终止是否应该开始需要更多的工作和簿记。我们需要关于安静(即没有更多的工作)的共识,
     * 只要没有当前的阻断者,反映在积极的计数中,以及在阻止或停顿工人的独立变化期间可能重新评估
     *
     * Style notes: There is a lot of representation-level coupling
     * among classes ForkJoinPool, ForkJoinWorkerThread, and
     * ForkJoinTask.  Most fields of ForkJoinWorkerThread maintain
     * data structures managed by ForkJoinPool, so are directly
     * accessed.  Conversely we allow access to "workers" array by
     * workers, and direct access to ForkJoinTask.status by both
     * ForkJoinPool and ForkJoinWorkerThread.  There is little point
     * trying to reduce this, since any associated future changes in
     * representations will need to be accompanied by algorithmic
     * changes anyway. All together, these low-level implementation
     * choices produce as much as a factor of 4 performance
     * improvement compared to naive implementations, and enable the
     * processing of billions of tasks per second, at the expense of
     * some ugliness.
     * 
     * 样式说明:在类ForkJoinPool,ForkJoinWorkerThread和ForkJoinTask中有很多表示级的耦合。
      * ForkJoinWorkerThread的大部分字段都维护由ForkJoinPool管理的数据结构,因此可以直接访问。相反,
      * 我们允许工作人员访问"workers"数组,并通过ForkJoinPool和ForkJoinWorkerThread直接访问ForkJoinTask.status。试图减少这一点是没有意义的,
      * 因为任何关联的未来表示变化都需要伴随着算法变化。总而言之,这些低级的实现选择与原始实现相比,性能提高了4倍,
      * 并且能够以每秒处理数十亿个任务为代价,而不惜牺牲一些丑陋。
     *
     * Methods signalWork() and scan() are the main bottlenecks so are
     * especially heavily micro-optimized/mangled.  There are lots of
     * inline assignments (of form "while ((local = field) != 0)")
     * which are usually the simplest way to ensure the required read
     * orderings (which are sometimes critical). This leads to a
     * "C"-like style of listing declarations of these locals at the
     * heads of methods or blocks.  There are several occurrences of
     * the unusual "do {} while (!cas...)"  which is the simplest way
     * to force an update of a CAS'ed variable. There are also other
     * coding oddities that help some methods perform reasonably even
     * when interpreted (not compiled).
     * 
     * 方法signalWork()和scan()是主要的瓶颈,所以特别严重的是微优化/损坏。 ("(local = field)!= 0)"中有很多内联赋值,
     * 它们通常是确保所需的读取顺序(有时很重要)的最简单的方法。这导致了这些当地人在方法或块的头上的"C"式的列名声明。
     * 有一些不寻常的"do {} while(!cas ...)"出现,这是强制更新CAS'ed变量的最简单的方法。还有其他的编码古怪,即使解释(不编译),
     * 帮助一些方法合理地执行。
     *
     * The order of declarations in this file is: (1) declarations of
     * statics (2) fields (along with constants used when unpacking
     * some of them), listed in an order that tends to reduce
     * contention among them a bit under most JVMs.  (3) internal
     * control methods (4) callbacks and other support for
     * ForkJoinTask and ForkJoinWorkerThread classes, (5) exported
     * methods (plus a few little helpers). (6) static block
     * initializing all statics in a minimally dependent order.
     * 
     * 在这个文件中声明的顺序是:
     * (1)静态
     * (2)字段的声明(以及解包其中的一些时使用的常量),按大多数JVM下的顺序排列。 
     * (3)内部控制方法
     * (4)回调和其他支持ForkJoinTask和ForkJoinWorkerThread类,
     * (5)导出方法(加上一些小助手)。 
     * (6)静态块以最小依赖的顺序初始化所有静态。
     */

2.ForkJoinPool实现 ForkJoinWorkerThreadFactory

创建新的ForkJoinWorkerThread的工厂

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
     * Factory for creating new {@link ForkJoinWorkerThread}s.
     * A {@code ForkJoinWorkerThreadFactory} must be defined and used
     * for {@code ForkJoinWorkerThread} subclasses that extend base
     * functionality or initialize threads with different contexts.
     * 用于创建新的ForkJoinWorkerThread的工厂。必须定义一个ForkJoinWorkerThreadFactory,
     * 并将其用于扩展基本功能或使用不同上下文初始化线程的ForkJoinWorkerThread子类。
     */
    public static interface ForkJoinWorkerThreadFactory {
        /**
         * Returns a new worker thread operating in the given pool.
         *
         * @param pool the pool this thread works in
         * @throws NullPointerException if the pool is null
         */
        public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }

    /**
     * Default ForkJoinWorkerThreadFactory implementation; creates a
     * new ForkJoinWorkerThread.
     * 默认的ForkJoinWorkerThreadFactory实现;创建一个新的ForkJoinWorkerThread。
     */
    static class DefaultForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
        public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return new ForkJoinWorkerThread(pool);
        }
    }

3.ForkJoinWorkerThread 工作线程

ForkJoinWorkerThread继承Thread

(1).ForkJoinWorkerThread构造

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
public class ForkJoinWorkerThread extends Thread {
    /*
     * Overview:
     *
     * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
     * ForkJoinTasks. This class includes bookkeeping in support of
     * worker activation, suspension, and lifecycle control described
     * in more detail in the internal documentation of class
     * ForkJoinPool. And as described further below, this class also
     * includes special-cased support for some ForkJoinTask
     * methods. But the main mechanics involve work-stealing:
     * 
     * ForkJoinWorkerThreads由ForkJoinPools管理并执行ForkJoinTasks。本课程包括记录,以支持工作人员激活,暂停和生命周期控制,
     * 在ForkJoinPool类的内部文档中有更详细的描述。如下所述,这个类还包括对某些ForkJoinTask方法的特殊支持。但主要的机制涉及盗窃工作:
     *
     * Work-stealing queues are special forms of Deques that support
     * only three of the four possible end-operations -- push, pop,
     * and deq (aka steal), under the further constraints that push
     * and pop are called only from the owning thread, while deq may
     * be called from other threads.  (If you are unfamiliar with
     * them, you probably want to read Herlihy and Shavit's book "The
     * Art of Multiprocessor programming", chapter 16 describing these
     * in more detail before proceeding.)  The main work-stealing
     * queue design is roughly similar to those in the papers "Dynamic
     * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
     * (http://research.sun.com/scalable/pubs/index.html) and
     * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
     * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
     * The main differences ultimately stem from gc requirements that
     * we null out taken slots as soon as we can, to maintain as small
     * a footprint as possible even in programs generating huge
     * numbers of tasks. To accomplish this, we shift the CAS
     * arbitrating pop vs deq (steal) from being on the indices
     * ("queueBase" and "queueTop") to the slots themselves (mainly
     * via method "casSlotNull()"). So, both a successful pop and deq
     * mainly entail a CAS of a slot from non-null to null.  Because
     * we rely on CASes of references, we do not need tag bits on
     * queueBase or queueTop.  They are simple ints as used in any
     * circular array-based queue (see for example ArrayDeque).
     * Updates to the indices must still be ordered in a way that
     * guarantees that queueTop == queueBase means the queue is empty,
     * but otherwise may err on the side of possibly making the queue
     * appear nonempty when a push, pop, or deq have not fully
     * committed. Note that this means that the deq operation,
     * considered individually, is not wait-free. One thief cannot
     * successfully continue until another in-progress one (or, if
     * previously empty, a push) completes.  However, in the
     * aggregate, we ensure at least probabilistic non-blockingness.
     * If an attempted steal fails, a thief always chooses a different
     * random victim target to try next. So, in order for one thief to
     * progress, it suffices for any in-progress deq or new push on
     * any empty queue to complete.
     * 
     * 窃取队列是Deques的特殊形式,它只支持四种可能的最终操作中的三种 - push,pop和deq(aka steal),
     * 在push和pop只能从拥有的线程中被调用的同时, deq可能会被其他线程调用。 
     * (如果您不熟悉这些内容,您可能需要阅读Herlihy和Shavit的书"多处理器编程的艺术",第16章在继续之前详细描述了这些内容)。
     * 主要的工作队列设计大致类似于Chase和Lev撰写的"Dynamic Circular Work-Stealing Deque",SPAA 2005(http://research.sun.com/scalable/pubs/index.html)
     * 和Michael,Saraswat和Vechev的"幂等工作窃取"PPoPP 2009(http://portal.acm.org/citation.cfm?id=1504186)。主要的差异最终来源于gc的要求,
     * 我们尽可能地尽快地将空位排除在外,即使在产生大量任务的程序中,也尽可能地保持足够小的空间。
     * 为了实现这一点,我们将CAS仲裁pop和deq(steal)从指标("queueBase"和"queueTop")转移到槽本身(主要通过方法"casSlotNull()")。
     * 所以,一个成功的pop和deq主要包含一个从非null到null的时隙的CAS。因为我们依赖于引用的CASes,
     * 所以我们不需要queueBase或queueTop上的标签位。它们是用于任何基于循环数组的队列中的简单整数(参见例如ArrayDeque)。
     * 对索引的更新仍然必须以保证queueTop == queueBase意味着队列为空的方式排序,否则在push,pop或deq未完全提交时可能使队列显示为非空。
     * 请注意,这意味着单独考虑deq操作并非等待。一个小偷不能成功地继续下去,直到另一个小偷(或者,如果先前是空的,推)完成。
     * 但总的来说,我们至少保证概率性的非阻塞性。如果一次偷窃失败,一个小偷总是选择一个不同的随机受害者目标来尝试下一个。
     * 所以,为了让一个小偷进步,只要任何空队列中的任何正在进行的排队或新推进就足以完成。
     *
     * This approach also enables support for "async mode" where local
     * task processing is in FIFO, not LIFO order; simply by using a
     * version of deq rather than pop when locallyFifo is true (as set
     * by the ForkJoinPool).  This allows use in message-passing
     * frameworks in which tasks are never joined.  However neither
     * mode considers affinities, loads, cache localities, etc, so
     * rarely provide the best possible performance on a given
     * machine, but portably provide good throughput by averaging over
     * these factors.  (Further, even if we did try to use such
     * information, we do not usually have a basis for exploiting
     * it. For example, some sets of tasks profit from cache
     * affinities, but others are harmed by cache pollution effects.)
     * 
     * 这种方法还支持"异步模式",其中本地任务处理处于FIFO中,而不是LIFO命令;当localFifo为true时(通过ForkJoinPool设置),
     * 只需使用deq版本而不是pop。这允许在消息传递框架中使用,其中任务从未被加入。然而,这两种模式都不考虑亲和性,负载,缓存区域等,
     * 因此很少在给定的机器上提供最好的性能,但是通过对这些因素进行平均,可移植性提供了良好的吞吐量。 
     * (另外,即使我们试图使用这些信息,通常也没有开发它的依据,例如,某些任务从缓存关系中获益,而另外一些则受到缓存污染的影响。
     *
     * When a worker would otherwise be blocked waiting to join a
     * task, it first tries a form of linear helping: Each worker
     * records (in field currentSteal) the most recent task it stole
     * from some other worker. Plus, it records (in field currentJoin)
     * the task it is currently actively joining. Method joinTask uses
     * these markers to try to find a worker to help (i.e., steal back
     * a task from and execute it) that could hasten completion of the
     * actively joined task. In essence, the joiner executes a task
     * that would be on its own local deque had the to-be-joined task
     * not been stolen. This may be seen as a conservative variant of
     * the approach in Wagner & Calder "Leapfrogging: a portable
     * technique for implementing efficient futures" SIGPLAN Notices,
     * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
     * in that: (1) We only maintain dependency links across workers
     * upon steals, rather than use per-task bookkeeping.  This may
     * require a linear scan of workers array to locate stealers, but
     * usually doesn't because stealers leave hints (that may become
     * stale/wrong) of where to locate them. This isolates cost to
     * when it is needed, rather than adding to per-task overhead.
     * (2) It is "shallow", ignoring nesting and potentially cyclic
     * mutual steals.  (3) It is intentionally racy: field currentJoin
     * is updated only while actively joining, which means that we
     * miss links in the chain during long-lived tasks, GC stalls etc
     * (which is OK since blocking in such cases is usually a good
     * idea).  (4) We bound the number of attempts to find work (see
     * MAX_HELP) and fall back to suspending the worker and if
     * necessary replacing it with another.
     * 
     * 当一个工人被阻止等待加入任务时,它首先尝试一种线性帮助的形式:每个工人记录(在currentSteal字段中)它从其他工人偷走的最近的任务。
     * 另外,它记录(在field currentJoin中)当前正在加入的任务。方法joinTask使用这些标记来试图找到一个工作者来帮助(即从任务中偷取并执行它),
     * 这可以加快主动连接的任务的完成。从本质上来说,木匠执行一项任务,如果没有被盗的话,那么这个任务就可以在自己的本地装备上。
     * 这可能被看作是Wagner和Calder"跳跃:一种实现有效期货的便携式技术"的SIGPLAN公告1993年版(http://portal.acm.org/citation.cfm?id=155354)
     * 的一种保守的变体形式。
     * 它的不同之处在于:
     * (1)我们只保留偷工作中的工人之间的依赖关系,而不是使用按任务记账。这可能需要对工人阵列进行线性扫描以确定盗贼的位置,但通常不会因为
     * 偷窃者留下提示(可能会变得陈旧/错误)来确定他们的位置。这将成本与需求分离,而不是增加每个任务的开销。
      * (2)"浅",忽略嵌套和潜在的循环互抢。 
      * (3)故意激活:字段currentJoin只在主动加入时更新,这意味着我们在长期任务,GC停顿等期间错过了链中的链接(这是可以的,
      * 因为在这种情况下阻塞通常是个好主意) 。 
      * (4)我们限制尝试找工作的次数(见MAX_HELP),然后回退到暂停工作,必要时替换成另一个。
     *
     * Efficient implementation of these algorithms currently relies
     * on an uncomfortable amount of "Unsafe" mechanics. To maintain
     * correct orderings, reads and writes of variable queueBase
     * require volatile ordering.  Variable queueTop need not be
     * volatile because non-local reads always follow those of
     * queueBase.  Similarly, because they are protected by volatile
     * queueBase reads, reads of the queue array and its slots by
     * other threads do not need volatile load semantics, but writes
     * (in push) require store order and CASes (in pop and deq)
     * require (volatile) CAS semantics.  (Michael, Saraswat, and
     * Vechev's algorithm has similar properties, but without support
     * for nulling slots.)  Since these combinations aren't supported
     * using ordinary volatiles, the only way to accomplish these
     * efficiently is to use direct Unsafe calls. (Using external
     * AtomicIntegers and AtomicReferenceArrays for the indices and
     * array is significantly slower because of memory locality and
     * indirection effects.)
     * 
     * 这些算法的高效实现目前依赖于不安全的"不安全"机制。为了保持正确的排序,读取和写入变量queueBase需要不稳定的顺序。
     * 变量queueTop不必是易失性的,因为非本地读取总是遵循queueBase的读取。同样,由于它们受volatile QueueBase读取的保护,
     * 因此其他线程对队列数组及其插槽的读取不需要易变的加载语义,但写入(推送)需要存储顺序,而CAS(pop和deq)需要(volatile) CAS语义。 
     * (Michael,Saraswat和Vechev的算法具有相似的性质,但是不支持置零时隙)。由于这些组合不支持使用普通的挥发性物质,
     * 唯一有效完成这些组合的方法是使用直接的不安全调用。 (对于索引和数组,使用外部AtomicIntegers和AtomicReferenceArrays要慢得多,
     * 因为内存局部性和间接效应。)
     *
     * Further, performance on most platforms is very sensitive to
     * placement and sizing of the (resizable) queue array.  Even
     * though these queues don't usually become all that big, the
     * initial size must be large enough to counteract cache
     * contention effects across multiple queues (especially in the
     * presence of GC cardmarking). Also, to improve thread-locality,
     * queues are initialized after starting.
     * 此外,大多数平台上的性能对(可调整大小)队列阵列的放置和大小非常敏感。即使这些队列通常不会变得那么大,但是初始大小必须足够大以抵消多
     * 个队列中的缓存争用效应(特别是在GC卡标记的情况下)。而且,为了改善线程局部性,队列在启动之后被初始化。
     */

    /**
     * Mask for pool indices encoded as shorts
     * 用于将池索引编码为“空”的掩码
     */
    private static final int  SMASK  = 0xffff;

    /**
     * Capacity of work-stealing queue array upon initialization.
     * Must be a power of two. Initial size must be at least 4, but is
     * padded to minimize cache effects.
     * 初始化时的工作窃取队列数组的容量。必须是两幂次方。初始大小必须至少为4,但被填充以最小化缓存效果。
     */
    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //4096

    /**
     * Maximum size for queue array. Must be a power of two
     * less than or equal to 1 << (31 - width of array entry) to
     * ensure lack of index wraparound, but is capped at a lower
     * value to help users trap runaway computations.
     * 队列数组的最大大小。必须是小于或等于1 < <(31 -宽度的数组条目)的幂,以确保缺少索引wraparound,但要以较低的值限制,以帮助用户捕获失控的计算。
     */
    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    /**
     * The work-stealing queue array. Size must be a power of two.
     * Initialized when started (as oposed to when constructed), to
     * improve memory locality.
     * 工作-窃取队列数组。大小必须是2的幂。初始化时(作为构建时的oposed),用于改善内存位置。
     */
    ForkJoinTask<?>[] queue;

    /**
     * The pool this thread works in. Accessed directly by ForkJoinTask.
     * 此线程工作的池。直接由ForkJoinTask访问。
     */
    final ForkJoinPool pool;

    /**
     * Index (mod queue.length) of next queue slot to push to or pop
     * from. It is written only by owner thread, and accessed by other
     * threads only after reading (volatile) queueBase.  Both queueTop
     * and queueBase are allowed to wrap around on overflow, but
     * (queueTop - queueBase) still estimates size.
     * 下一个队列插槽的索引(取模 队列长度)push或pop。它仅由所有者线程写入,并且只有在读取(volatile)queueBase之后才被其他线程访问。 
     * queueTop和queueBase都允许环绕溢出,但(queueTop - queueBase)仍然估计大小。
     */
    int queueTop;

    /**
     * Index (mod queue.length) of least valid queue slot, which is
     * always the next position to steal from if nonempty.
     * 索引(取模 队列长度)最小有效队列时隙,它总是下一个从非空位置开始的位置。
     */
    volatile int queueBase;

    /**
     * The index of most recent stealer, used as a hint to avoid
     * traversal in method helpJoinTask. This is only a hint because a
     * worker might have had multiple steals and this only holds one
     * of them (usually the most current). Declared non-volatile,
     * relying on other prevailing sync to keep reasonably current.
     * 
     * 最近的stealer的索引,用作避免在方法helpJoinTask中遍历的提示。这只是一个暗示,因为一个工作人员可能有多次抢断,
     * 而这只能保留其中一个(通常是最新的)。声明非易失性,依靠其他流行的同步保持合理的最新。
     */
    int stealHint;

    /**
     * Index of this worker in pool array. Set once by pool before
     * running, and accessed directly by pool to locate this worker in
     * its workers array.
     * 这个worker在pool数组中的索引。在运行前通过池设置一次,并通过池直接访问,以便在其工作人员数组中找到该worker。
     */
    final int poolIndex;

    /**
     * Encoded record for pool task waits. Usages are always
     * surrounded by volatile reads/writes
     * 编码记录池任务等待。用法volatile读/写
     */
    int nextWait;

    /**
     * Complement of poolIndex, offset by count of entries of task
     * waits. Accessed by ForkJoinPool to manage event waiters.
     * 补充的池索引,通过任务的条目计数来抵消。通过Fork连接池访问以管理事件服务员。
     */
    volatile int eventCount;

    /**
     * Seed for random number generator for choosing steal victims.
     * Uses Marsaglia xorshift. Must be initialized as nonzero.
     * 随机数发生器的种子选择窃取受害者。 使用Marsaglia xorshift。必须初始化为非零。
     */
    int seed;

    /**
     * Number of steals. Directly accessed (and reset) by pool when
     * idle.
     * 抢断次数,空闲时由池直接访问(和重置)。
     */
    int stealCount;

    /**
     * True if this worker should or did terminate
     * 如果这个工人应该或确实终止了
     */
    volatile boolean terminate;

    /**
     * Set to true before LockSupport.park; false on return
     * 在LockSupport.park之前设置为true;假 return
     */
    volatile boolean parked;

    /**
     * True if use local fifo, not default lifo, for local polling.
     * Shadows value from ForkJoinPool.
     * 如果使用本地的fifo,而不是默认的lifo,则为真正的本地轮询。来自ForkJoinPool的阴影值。
     */
    final boolean locallyFifo;

    /**
     * The task most recently stolen from another worker (or
     * submission queue).  All uses are surrounded by enough volatile
     * reads/writes to maintain as non-volatile.
     * 最近从另一个工作人员(或提交队列)中窃取的任务。所有用途都被足够的volatile读/写所包围,以保持non-volatile。
     */
    ForkJoinTask<?> currentSteal;

    /**
     * The task currently being joined, set only when actively trying
     * to help other stealers in helpJoinTask. All uses are surrounded
     * by enough volatile reads/writes to maintain as non-volatile.
     * 目前正在加入的任务,只有在主动尝试帮助其他盗取helpJoinTask时才设置。所有的使用都被足够的volatile读/写所包围,以保持非易失性。
     */
    ForkJoinTask<?> currentJoin;

    /**
     * Creates a ForkJoinWorkerThread operating in the given pool.
     * 创建在给定池中操作的Fork连接工作线程。
     * @param pool the pool this thread works in
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool.nextWorkerName());
        this.pool = pool;
        int k = pool.registerWorker(this);
        poolIndex = k;
        eventCount = ~k & SMASK; // clear wait count 
        locallyFifo = pool.locallyFifo;
        Thread.UncaughtExceptionHandler ueh = pool.ueh;
        if (ueh != null)
            setUncaughtExceptionHandler(ueh);
        setDaemon(true);
    }
    
    /**
      * CASes slot i of array q from t to null. Caller must ensure q is
      * non-null and index is in range.
      * 从t到null的数组q的槽i。调用者必须确保q是非空的,并且索引在范围内。
      */
     private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
                                              ForkJoinTask<?> t) {
         return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
     }
 
     /**
      * Performs a volatile write of the given task at given slot of
      * array q.  Caller must ensure q is non-null and index is in
      * range. This method is used only during resets and backouts.
      * 在给定的数组q的位置上执行给定任务的不稳定的写入。调用者必须确保q是非空的,并且索引在范围内。此方法仅在重新设置和退出时使用。
      */
     private static final void writeSlot(ForkJoinTask<?>[] q, int i,
                                         ForkJoinTask<?> t) {
         UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
     }   
 // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        int s;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class a = ForkJoinTask[].class;
            ABASE = UNSAFE.arrayBaseOffset(a);
            s = UNSAFE.arrayIndexScale(a);
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
    }    
}    

(2).run 线程执行方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    /**
     * This method is required to be public, but should never be
     * called explicitly. It performs the main run loop to execute
     * {@link ForkJoinTask}s.
     */
    public void run() {
        Throwable exception = null;
        try {
            onStart();
            pool.work(this);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            onTermination(exception);
        }
    }
    /**
     * Initializes internal state after construction but before
     * processing any tasks. If you override this method, you must
     * invoke {@code super.onStart()} at the beginning of the method.
     * Initialization requires care: Most fields must have legal
     * default values, to ensure that attempted accesses from other
     * threads work correctly even before this thread starts
     * processing tasks.
     */
    protected void onStart() {
        queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        int r = pool.workerSeedGenerator.nextInt();
        seed = (r == 0) ? 1 : r; //  must be nonzero
    }
    
    /**
     * Performs cleanup associated with termination of this worker
     * thread.  If you override this method, you must invoke
     * {@code super.onTermination} at the end of the overridden method.
     *
     * @param exception the exception causing this thread to abort due
     * to an unrecoverable error, or {@code null} if completed normally
     */
    protected void onTermination(Throwable exception) {
        try {
            terminate = true;
            cancelTasks();
            pool.deregisterWorker(this, exception);
        } catch (Throwable ex) {        // Shouldn't ever happen
            if (exception == null)      // but if so, at least rethrown
                exception = ex;
        } finally {
            if (exception != null)
                UNSAFE.throwException(exception);
        }
    }    

(3).pushTask 添加任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  /**
     * Pushes a task. Call only from this thread.
     *
     * @param t the task. Caller must ensure non-null.
     */
    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();  //扩容
        }
    }

扩容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    /**
     * Creates or doubles queue array.  Transfers elements by
     * emulating steals (deqs) from old array and placing, oldest
     * first, into new array.
     * 创建或加倍队列数组。通过模拟从旧的数组和放置,最老的第一个,到新数组中的窃取(deqs)来传输元素。
     */
    private void growQueue() {
        ForkJoinTask<?>[] oldQ = queue;
        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;// 2倍扩容
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        if (size < INITIAL_QUEUE_CAPACITY)
            size = INITIAL_QUEUE_CAPACITY;
        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
        int mask = size - 1;
        int top = queueTop;
        int oldMask;
        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
            for (int b = queueBase; b != top; ++b) {
                long u = ((b & oldMask) << ASHIFT) + ABASE;
                Object x = UNSAFE.getObjectVolatile(oldQ, u);
                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
                    UNSAFE.putObjectVolatile
                        (q, ((b & mask) << ASHIFT) + ABASE, x);
            }
        }
    }

(4).deqTask 头部出列任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    /**
     * Tries to take a task from the base of the queue, failing if
     * empty or contended. Note: Specializations of this code appear
     * in locallyDeqTask and elsewhere.
     * 尝试从队列的底部执行任务,如果是空的或争用,则失败。注意:此代码的专门化出现在locallyDeqTask和其他地方。
     * @return a task, or null if none or contended
     */
    final ForkJoinTask<?> deqTask() {
        ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
        if (queueTop != (b = queueBase) &&
            (q = queue) != null && // must read q after b
            (i = (q.length - 1) & b) >= 0 &&
            (t = q[i]) != null && queueBase == b &&
            UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null)) {
            queueBase = b + 1;
            return t;
        }
        return null;
    }

(5).locallyDeqTask 头部出列任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    /**
     * Tries to take a task from the base of own queue.  Called only
     * by this thread.
     *尝试从自己的队列的基础上执行任务。只被这个线程调用。
     * @return a task, or null if none
     */
    final ForkJoinTask<?> locallyDeqTask() {
        ForkJoinTask<?> t; int m, b, i;
        ForkJoinTask<?>[] q = queue;
        if (q != null && (m = q.length - 1) >= 0) {
            while (queueTop != (b = queueBase)) {
                if ((t = q[i = m & b]) != null &&
                    queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
                                                t, null)) {
                    queueBase = b + 1;
                    return t;
                }
            }
        }
        return null;
    }

(6).popTask 尾部出列任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /**
     * Returns a popped task, or null if empty.
     * Called only by this thread.
     * 返回一个弹出的任务,或者空的空值。只被这个线程调用。
     */
    private ForkJoinTask<?> popTask() {
        int m;
        ForkJoinTask<?>[] q = queue;
        if (q != null && (m = q.length - 1) >= 0) {
            for (int s; (s = queueTop) != queueBase;) {
                int i = m & --s;
                long u = (i << ASHIFT) + ABASE; // raw offset
                ForkJoinTask<?> t = q[i];
                if (t == null)   // lost to stealer
                    break;
                if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    queueTop = s; // or putOrderedInt
                    return t;
                }
            }
        }
        return null;
    }

(7).unpushTask 最上面的元素是给定的任务时才会弹出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    /**
     * Specialized version of popTask to pop only if topmost element
     * is the given task. Called only by this thread.
     * 特殊版本的pop任务只有在最上面的元素是给定的任务时才会弹出。。只能由这个线程调用。
     * @param t the task. Caller must ensure non-null.
     */
    final boolean unpushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q;
        int s;
        if ((q = queue) != null && (s = queueTop) != queueBase &&
            UNSAFE.compareAndSwapObject
            (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
            queueTop = s; // or putOrderedInt
            return true;
        }
        return false;
    }

(8).peekTask 返回下一个任务,不删除任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/**
     * Returns next task, or null if empty or contended.
     */
    final ForkJoinTask<?> peekTask() {
        int m;
        ForkJoinTask<?>[] q = queue;
        if (q == null || (m = q.length - 1) < 0)
            return null;
        int i = locallyFifo ? queueBase : (queueTop - 1);
        return q[i & m];
    }

(9). Support methods for ForkJoinPool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
  /**
     * Runs the given task, plus any local tasks until queue is empty
     * 运行给定的任务,再加上任何本地任务,直到队列为空
     */
    final void execTask(ForkJoinTask<?> t) {
        currentSteal = t;
        for (;;) {
            if (t != null)
                t.doExec();
            if (queueTop == queueBase)
                break;
            t = locallyFifo ? locallyDeqTask() : popTask();
        }
        ++stealCount;
        currentSteal = null;
    }

    /**
     * Removes and cancels all tasks in queue.  Can be called from any
     * thread.
     * 删除并取消队列中的所有任务。可以从任何线程调用。
     */
    final void cancelTasks() {
        ForkJoinTask<?> cj = currentJoin; // try to cancel ongoing tasks
        if (cj != null && cj.status >= 0)
            cj.cancelIgnoringExceptions();
        ForkJoinTask<?> cs = currentSteal;
        if (cs != null && cs.status >= 0)
            cs.cancelIgnoringExceptions();
        while (queueBase != queueTop) {
            ForkJoinTask<?> t = deqTask();
            if (t != null)
                t.cancelIgnoringExceptions();
        }
    }

    /**
     * Drains tasks to given collection c.
     * 将任务分配到给定的集合c。
     * @return the number of tasks drained
     */
    final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
        int n = 0;
        while (queueBase != queueTop) {
            ForkJoinTask<?> t = deqTask();
            if (t != null) {
                c.add(t);
                ++n;
            }
        }
        return n;
    }

(10). Support methods for ForkJoinTask

  • [1].pollTask 获取并删除本地或被窃取的任务。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    
        /**
         * Gets and removes a local or stolen task.
         *
         * @return a task, if available
         */
        final ForkJoinTask<?> pollTask() {
            ForkJoinWorkerThread[] ws;
            ForkJoinTask<?> t = pollLocalTask();
            if (t != null || (ws = pool.workers) == null)
                return t;
            int n = ws.length; // cheap version of FJP.scan
            int steps = n << 1;
            int r = nextSeed();
            int i = 0;
            while (i < steps) {
                ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
                if (w != null && w.queueBase != w.queueTop && w.queue != null) {
                    if ((t = w.deqTask()) != null)
                        return t;
                    i = 0;
                }
            }
            return null;
        }
      /**
         * Computes next value for random victim probes and backoffs.
         * Scans don't require a very high quality generator, but also not
         * a crummy one.  Marsaglia xor-shift is cheap and works well
         * enough.  Note: This is manually inlined in FJP.scan() to avoid
         * writes inside busy loops.
         */
        private int nextSeed() {
            int r = seed;
            r ^= r << 13;
            r ^= r >>> 17;
            r ^= r << 5;
            return seed = r;
        }    
    
  • [2].joinTask 可能运行一些任务和/或块,直到joinMe完成。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    
     /**
         * The maximum stolen->joining link depth allowed in helpJoinTask,
         * as well as the maximum number of retries (allowing on average
         * one staleness retry per level) per attempt to instead try
         * compensation.  Depths for legitimate chains are unbounded, but
         * we use a fixed constant to avoid (otherwise unchecked) cycles
         * and bound staleness of traversal parameters at the expense of
         * sometimes blocking when we could be helping.
         * helpJoinTask中允许的最大stolen->joining连接深度,以及每次尝试尝试的最大重试次数(允许平均一次的跟踪重试)
         * 以代替尝试补偿。合法链的深度是无限的,但我们使用一个固定的常量来避免(否则未检查)的周期,并限制遍历参数的陈旧性,代价是有时会阻塞我们的帮助。
         */
        private static final int MAX_HELP = 16;
         
        /**
         * Possibly runs some tasks and/or blocks, until joinMe is done.
         * 可能运行一些任务和/或块,直到joinMe完成。
         * @param joinMe the task to join
         * @return completion status on exit
         */
        final int joinTask(ForkJoinTask<?> joinMe) {
            ForkJoinTask<?> prevJoin = currentJoin;
            currentJoin = joinMe;
            for (int s, retries = MAX_HELP;;) {
                if ((s = joinMe.status) < 0) {
                    currentJoin = prevJoin;
                    return s;
                }
                if (retries > 0) {
                    if (queueTop != queueBase) {
                        if (!localHelpJoinTask(joinMe))
                            retries = 0;           // cannot help
                    }
                    else if (retries == MAX_HELP >>> 1) {
                        --retries;                 // check uncommon case
                        if (tryDeqAndExec(joinMe) >= 0)
                            Thread.yield();        // for politeness
                    }
                    else
                        retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
                }
                else {
                    retries = MAX_HELP;           // restart if not done
                    pool.tryAwaitJoin(joinMe);
                }
            }
        }
         
    
/**
 * If present, pops and executes the given task, or any other
 * cancelled task
 * 如果存在,则弹出并执行给定的任务,或任何其他被取消的任务
 * @return false if any other non-cancelled task exists in local queue
 */
private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
    int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
    if ((s = queueTop) != queueBase && (q = queue) != null &&
        (i = (q.length - 1) & --s) >= 0 &&
        (t = q[i]) != null) {
        if (t != joinMe && t.status >= 0)
            return false;
        if (UNSAFE.compareAndSwapObject
            (q, (i << ASHIFT) + ABASE, t, null)) {
            queueTop = s;           // or putOrderedInt
            t.doExec();
        }
    }
    return true;
}

/**
 * Tries to locate and execute tasks for a stealer of the given
 * task, or in turn one of its stealers, Traces
 * currentSteal->currentJoin links looking for a thread working on
 * a descendant of the given task and with a non-empty queue to
 * steal back and execute tasks from.  The implementation is very
 * branchy to cope with potential inconsistencies or loops
 * encountering chains that are stale, unknown, or of length
 * greater than MAX_HELP links.  All of these cases are dealt with
 * by just retrying by caller.
 *
 * @param joinMe the task to join
 * @param canSteal true if local queue is empty
 * @return true if ran a task
 */
private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
    boolean helped = false;
    int m = pool.scanGuard & SMASK;
    ForkJoinWorkerThread[] ws = pool.workers;
    if (ws != null && ws.length > m && joinMe.status >= 0) {
        int levels = MAX_HELP;              // remaining chain length
        ForkJoinTask<?> task = joinMe;      // base of chain
        outer:for (ForkJoinWorkerThread thread = this;;) {
            // Try to find v, the stealer of task, by first using hint
            ForkJoinWorkerThread v = ws[thread.stealHint & m];
            if (v == null || v.currentSteal != task) {
                for (int j = 0; ;) {        // search array
                    if ((v = ws[j]) != null && v.currentSteal == task) {
                        thread.stealHint = j;
                        break;              // save hint for next time
                    }
                    if (++j > m)
                        break outer;        // can't find stealer
                }
            }
            // Try to help v, using specialized form of deqTask
            for (;;) {
                ForkJoinTask<?>[] q; int b, i;
                if (joinMe.status < 0)
                    break outer;
                if ((b = v.queueBase) == v.queueTop ||
                    (q = v.queue) == null ||
                    (i = (q.length-1) & b) < 0)
                    break;                  // empty
                long u = (i << ASHIFT) + ABASE;
                ForkJoinTask<?> t = q[i];
                if (task.status < 0)
                    break outer;            // stale
                if (t != null && v.queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    v.queueBase = b + 1;
                    v.stealHint = poolIndex;
                    ForkJoinTask<?> ps = currentSteal;
                    currentSteal = t;
                    t.doExec();
                    currentSteal = ps;
                    helped = true;
                }
            }
            // Try to descend to find v's stealer
            ForkJoinTask<?> next = v.currentJoin;
            if (--levels > 0 && task.status >= 0 &&
                next != null && next != task) {
                task = next;
                thread = v;
            }
            else
                break;  // max levels, stale, dead-end, or cyclic
        }
    }
    return helped;
}

/**
 * Performs an uncommon case for joinTask: If task t is at base of
 * some workers queue, steals and executes it.
 *
 * @param t the task
 * @return t's status
 */
private int tryDeqAndExec(ForkJoinTask<?> t) {
    int m = pool.scanGuard & SMASK;
    ForkJoinWorkerThread[] ws = pool.workers;
    if (ws != null && ws.length > m && t.status >= 0) {
        for (int j = 0; j <= m; ++j) {
            ForkJoinTask<?>[] q; int b, i;
            ForkJoinWorkerThread v = ws[j];
            if (v != null &&
                (b = v.queueBase) != v.queueTop &&
                (q = v.queue) != null &&
                (i = (q.length - 1) & b) >= 0 &&
                q[i] ==  t) {
                long u = (i << ASHIFT) + ABASE;
                if (v.queueBase == b &&
                    UNSAFE.compareAndSwapObject(q, u, t, null)) {
                    v.queueBase = b + 1;
                    v.stealHint = poolIndex;
                    ForkJoinTask<?> ps = currentSteal;
                    currentSteal = t;
                    t.doExec();
                    currentSteal = ps;
                }
                break;
            }
        }
    }
    return t.status;
}    
  ```
  • [3].helpQuiescePool
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    
        /**
         * Runs tasks until {@code pool.isQuiescent()}. We piggyback on
         * pool's active count ctl maintenance, but rather than blocking
         * when tasks cannot be found, we rescan until all others cannot
         * find tasks either. The bracketing by pool quiescerCounts
         * updates suppresses pool auto-shutdown mechanics that could
         * otherwise prematurely terminate the pool because all threads
         * appear to be inactive.
         */
        final void helpQuiescePool() {
            boolean active = true;
            ForkJoinTask<?> ps = currentSteal; // to restore below
            ForkJoinPool p = pool;
            p.addQuiescerCount(1);
            for (;;) {
                ForkJoinWorkerThread[] ws = p.workers;
                ForkJoinWorkerThread v = null;
                int n;
                if (queueTop != queueBase)
                    v = this;
                else if (ws != null && (n = ws.length) > 1) {
                    ForkJoinWorkerThread w;
                    int r = nextSeed(); // cheap version of FJP.scan
                    int steps = n << 1;
                    for (int i = 0; i < steps; ++i) {
                        if ((w = ws[(i + r) & (n - 1)]) != null &&
                            w.queueBase != w.queueTop) {
                            v = w;
                            break;
                        }
                    }
                }
                if (v != null) {
                    ForkJoinTask<?> t;
                    if (!active) {
                        active = true;
                        p.addActiveCount(1);
                    }
                    if ((t = (v != this) ? v.deqTask() :
                         locallyFifo ? locallyDeqTask() : popTask()) != null) {
                        currentSteal = t;
                        t.doExec();
                        currentSteal = ps;
                    }
                }
                else {
                    if (active) {
                        active = false;
                        p.addActiveCount(-1);
                    }
                    if (p.isQuiescent()) {
                        p.addActiveCount(1);
                        p.addQuiescerCount(-1);
                        break;
                    }
                }
            }
        }
    

4.ForkJoinPool 构造

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
    /**
     * Permission required for callers of methods that may start or
     * kill threads.
     * 可能启动或杀死线程的方法的调用者所需的权限。
     */
    private static final RuntimePermission modifyThreadPermission;

    /**
     * If there is a security manager, makes sure caller has
     * permission to modify threads.
     * 如果有一个安全管理器,确保调用者有权修改线程。
     */
    private static void checkPermission() {
        SecurityManager security = System.getSecurityManager();
        if (security != null)
            security.checkPermission(modifyThreadPermission);
    }
 /**
  * Generator for assigning sequence numbers as pool names.
  * 用于将序列号分配为池名称的生成器。
  */
 private static final AtomicInteger poolNumberGenerator;

 /**
  * Generator for initial random seeds for worker victim
  * selection. This is used only to create initial seeds. Random
  * steals use a cheaper xorshift generator per steal attempt. We
  * don't expect much contention on seedGenerator, so just use a
  * plain Random.
  * 用于工人受害者选择的初始随机种子生成器。这仅用于创建初始种子。随机窃取使用一个更便宜的xorshift发电机每偷窃企图。
  * 我们不期望在seedGenerator上有太多的争用,所以只需使用一个普通的Random。
  */
 static final Random workerSeedGenerator;

 /**
  * Array holding all worker threads in the pool.  Initialized upon
  * construction. Array size must be a power of two.  Updates and
  * replacements are protected by scanGuard, but the array is
  * always kept in a consistent enough state to be randomly
  * accessed without locking by workers performing work-stealing,
  * as well as other traversal-based methods in this class, so long
  * as reads memory-acquire by first reading ctl. All readers must
  * tolerate that some array slots may be null.
  * 数组保存池中的所有工作线程。建设初始化。数组大小必须是2的幂。更新和替换是由scanGuard保护的,
  * 但是数组始终保持一致的状态以便随机访问而不被工作人员执行工作盗取的锁定以及此类中的其他基于遍历的方法,
  * 只要读取内存 - 通过一读ctl获取。所有读者必须容忍一些数组插槽可能为空。
  */
 ForkJoinWorkerThread[] workers;

 /**
  * Initial size for submission queue array. Must be a power of
  * two.  In many applications, these always stay small so we use a
  * small initial cap.
  * 提交队列数组的初始大小。必须是必须是2的幂。在许多应用程序中,这些总是保持较小,所以我们使用一个小的初始上限
  */
 private static final int INITIAL_QUEUE_CAPACITY = 8;

 /**
  * Maximum size for submission queue array. Must be a power of two
  * less than or equal to 1 << (31 - width of array entry) to
  * ensure lack of index wraparound, but is capped at a lower
  * value to help users trap runaway computations.
  * 队列数组的最大大小。必须是小于或等于1 < <(31 -宽度的数组条目)的幂,以确保缺少索引wraparound,但要以较低的值限制,以帮助用户捕获失控的计算。
  */
 private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

 /**
  * Array serving as submission queue. Initialized upon construction.
  * 作为提交队列的阵列。建设初始化。
  */
 private ForkJoinTask<?>[] submissionQueue;

 /**
  * Lock protecting submissions array for addSubmission
  * 锁定保护提交数组addSubmission
  */
 private final ReentrantLock submissionLock;

 /**
  * Condition for awaitTermination, using submissionLock for
  * convenience.
  * 等待终止的条件,使用提交锁定为了方便。
  */
 private final Condition termination;

 /**
  * Creation factory for worker threads.
  * 工作线程的创建工厂。
  */
 private final ForkJoinWorkerThreadFactory factory;

 /**
  * The uncaught exception handler used when any worker abruptly
  * terminates.
  * 任何员工突然终止时使用的未捕获异常处理程序。
  */
 final Thread.UncaughtExceptionHandler ueh;

 /**
  * Prefix for assigning names to worker threads
  * 前缀为工作线程分配名称
  */
 private final String workerNamePrefix;

 /**
  * Sum of per-thread steal counts, updated only when threads are
  * idle or terminating.
  * 每线程窃取计数总和,仅在线程空闲或终止时更新。
  */
 private volatile long stealCount;

 /**
  * Main pool control -- a long packed with:
  * 主池控制 - long包装宽度:
  * AC: Number of active running workers minus target parallelism (16 bits) 正在运行的工作人员数减去目标平行度(16位)
  * TC: Number of total workers minus target parallelism (16bits) 总工人数减目标平行(16位)
  * ST: true if pool is terminating (1 bit)  如果池正在终止,则为true(1位)
  * EC: the wait count of top waiting thread (15 bits) 顶部等待线程的等待计数(15位)
  * ID: ~poolIndex of top of Treiber stack of waiting threads (16 bits) 〜等待线程堆栈顶部的poolIndex(16位)
  *
  * When convenient, we can extract the upper 32 bits of counts and
  * the lower 32 bits of queue state, u = (int)(ctl >>> 32) and e =
  * (int)ctl.  The ec field is never accessed alone, but always
  * together with id and st. The offsets of counts by the target
  * parallelism and the positionings of fields makes it possible to
  * perform the most common checks via sign tests of fields: When
  * ac is negative, there are not enough active workers, when tc is
  * negative, there are not enough total workers, when id is
  * negative, there is at least one waiting worker, and when e is
  * negative, the pool is terminating.  To deal with these possibly
  * negative fields, we use casts in and out of "short" and/or
  * signed shifts to maintain signedness.
  * 方便的时候,我们可以提取队列状态的高32位和低32位,u =(int)(ctl >>> 32)和e =(int)ctl。 
  * ec字段永远不会被单独访问,但总是与id和st一起访问。通过目标平行度和字段的定位来抵消计数,使得可以通过字段的符号测试来执行最常见的检查:
  * 当ac为负时,没有足够的活动工人,
  * 当tc为负时,没有足够的总数工人,
  * 当id为负值时,至少有一个等待工人,
  * 当e为负值时,池正在终止。
  * 为了处理这些可能出现的负面情况,我们使用"short"和/或有符号转换来保持签名。
  */
 volatile long ctl;

 // bit positions/shifts for fields
 private static final int  AC_SHIFT   = 48;
 private static final int  TC_SHIFT   = 32;
 private static final int  ST_SHIFT   = 31;
 private static final int  EC_SHIFT   = 16;

 // bounds
 private static final int  MAX_ID     = 0x7fff;  // max poolIndex    0111 1111 1111 1111
 private static final int  SMASK      = 0xffff;  // mask short bits  1111 1111 1111 1111
 private static final int  SHORT_SIGN = 1 << 15; //  1000 0000 0000 0000
 private static final int  INT_SIGN   = 1 << 31; //  1000 0000 0000 0000 0000 0000 0000 0000

 // masks
 private static final long STOP_BIT   = 0x0001L << ST_SHIFT;//         1000 0000 0000 0000 0000 0000 0000 0000
 private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
 private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;

 // units for incrementing and decrementing 单位递增和递减
 private static final long TC_UNIT    = 1L << TC_SHIFT;
 private static final long AC_UNIT    = 1L << AC_SHIFT;

 // masks and units for dealing with u = (int)(ctl >>> 32)
 private static final int  UAC_SHIFT  = AC_SHIFT - 32; //16
 private static final int  UTC_SHIFT  = TC_SHIFT - 32; //0 
 private static final int  UAC_MASK   = SMASK << UAC_SHIFT; 
 private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
 private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
 private static final int  UTC_UNIT   = 1 << UTC_SHIFT;

 // masks and units for dealing with e = (int)ctl
 private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT  0111 1111 1111 1111 1111 1111 1111 1111
 private static final int  EC_UNIT    = 1 << EC_SHIFT;

 /**
  * The target parallelism level.
  * 目标平行水平。
  */
 final int parallelism;

 /**
  * Index (mod submission queue length) of next element to take
  * from submission queue. Usage is identical to that for
  * per-worker queues -- see ForkJoinWorkerThread internal
  * documentation.
  * 从提交队列中取下一个元素的索引(mod submission queue length)。用法与每个工作者队列的用法相同 - 请参阅ForkJoinWorkerThread内部文档。
  */
 volatile int queueBase;

 /**
  * Index (mod submission queue length) of next element to add
  * in submission queue. Usage is identical to that for
  * per-worker queues -- see ForkJoinWorkerThread internal
  * documentation.
  * 在提交队列中添加的下一个元素的索引(mod submission queue length)。用法与每个工作者队列的用法相同 - 请参阅ForkJoinWorkerThread内部*文档。
  */
 int queueTop;

 /**
  * True when shutdown() has been called.
  * 当shutdown()被调用时为真。
  */
 volatile boolean shutdown;

 /**
  * True if use local fifo, not default lifo, for local polling
  * Read by, and replicated by ForkJoinWorkerThreads
  * 如果使用local fifo,而不是默认的lifo,则为真本地轮询读取,并由ForkJoinWorkerThreads复制
  */
 final boolean locallyFifo;

 /**
  * The number of threads in ForkJoinWorkerThreads.helpQuiescePool.
  * When non-zero, suppresses automatic shutdown when active
  * counts become zero.
  * ForkJoinWorkerThreads.helpQuiescePool中的线程数。 当非零时,当活动计数变为零时禁止自动关闭。
  */
 volatile int quiescerCount;

 /**
  * The number of threads blocked in join.
  * join中阻塞的线程数量。
  */
 volatile int blockedCount;

 /**
  * Counter for worker Thread names (unrelated to their poolIndex)
  * 计数器的工作线程名称(与他们的poolIndex无关)
  */
 private volatile int nextWorkerNumber;

 /**
  * The index for the next created worker. Accessed under scanGuard.
  * 下一个创建的工人的索引。在scanGuard下访问。
  */
 private int nextWorkerIndex;

 /**
  * SeqLock and index masking for updates to workers array.  Locked
  * when SG_UNIT is set. Unlocking clears bit by adding
  * SG_UNIT. Staleness of read-only operations can be checked by
  * comparing scanGuard to value before the reads. The low 16 bits
  * (i.e, anding with SMASK) hold (the smallest power of two
  * covering all worker indices, minus one, and is used to avoid
  * dealing with large numbers of null slots when the workers array
  * is overallocated.
  * SeqLock和索引屏蔽更新工人数组。
  * 当SG_UNIT被设置时锁定。解锁通过添加SG_UNIT来清除位。
  * 通过比较scanGuard和read之前的值,可以检查只读操作的过时性。
  * 低16位(即和SMASK)保持不变(两个覆盖所有工人指数的最小幂,减1),用于避免在工人数组过度时处理大量的空位。
  */
 volatile int scanGuard;

 private static final int SG_UNIT = 1 << 16;

 /**
  * The wakeup interval (in nanoseconds) for a worker waiting for a
  * task when the pool is quiescent to instead try to shrink the
  * number of workers.  The exact value does not matter too
  * much. It must be short enough to release resources during
  * sustained periods of idleness, but not so short that threads
  * are continually re-created.
  * 当池静止时,等待任务的工作人员的唤醒间隔(以纳秒为单位)代替尝试缩小工作人员数量。
  * 确切的价值并不重要。它必须足够短以便在闲置时间内释放资源,但不能太短以致线程不断重新创建。
  */
 private static final long SHRINK_RATE =
     4L * 1000L * 1000L * 1000L; // 4 seconds
 // Constructors

    /**
     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
     * java.lang.Runtime#availableProcessors}, using the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

    /**
     * Creates a {@code ForkJoinPool} with the indicated parallelism
     * level, the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @param parallelism the parallelism level
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

    /**
     * Creates a {@code ForkJoinPool} with the given parameters.
     *
     * @param parallelism the parallelism level. For default value,
     * use {@link java.lang.Runtime#availableProcessors}.
     * @param factory the factory for creating new threads. For default value,
     * use {@link #defaultForkJoinWorkerThreadFactory}.
     * @param handler the handler for internal worker threads that
     * terminate due to unrecoverable errors encountered while executing
     * tasks. For default value, use {@code null}.
     * @param asyncMode if true,
     * establishes local first-in-first-out scheduling mode for forked
     * tasks that are never joined. This mode may be more appropriate
     * than default locally stack-based mode in applications in which
     * worker threads only process event-style asynchronous tasks.
     * For default value, use {@code false}.
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws NullPointerException if the factory is null
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        checkPermission();
        if (factory == null)
            throw new NullPointerException();
        if (parallelism <= 0 || parallelism > MAX_ID)
            throw new IllegalArgumentException();
        this.parallelism = parallelism;
        this.factory = factory;
        this.ueh = handler;
        this.locallyFifo = asyncMode;//设置是否为异步模式。
       /* 
     * 参考我们之前介绍的ctl的内容,由于ctl中的AC或者TC都要减去parallelism
     */          
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        //初始化提交任务队列。
        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // initialize workers array with room for 2*parallelism if possible
         /* 
     * 这里需要根据并行度来算出工作线程数组的大小。 
     * 由于数组大小必须的2的幂,这里的算法是算出比 
     * parallelism的2倍大的最小的2的幂,但不能超过 
     * MAX_ID + 1(1 << 16)的数作为工作线程数组大小 
     */  
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];
        this.submissionLock = new ReentrantLock();
        this.termination = submissionLock.newCondition();
        StringBuilder sb = new StringBuilder("ForkJoinPool-");
        sb.append(poolNumberGenerator.incrementAndGet());
        sb.append("-worker-");
        this.workerNamePrefix = sb.toString();
    }
    
   // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long ctlOffset;
    private static final long stealCountOffset;
    private static final long blockedCountOffset;
    private static final long quiescerCountOffset;
    private static final long scanGuardOffset;
    private static final long nextWorkerNumberOffset;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        poolNumberGenerator = new AtomicInteger();
        workerSeedGenerator = new Random();
        modifyThreadPermission = new RuntimePermission("modifyThread");
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        int s;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = ForkJoinPool.class;
            ctlOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("ctl"));
            stealCountOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("stealCount"));
            blockedCountOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("blockedCount"));
            quiescerCountOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("quiescerCount"));
            scanGuardOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("scanGuard"));
            nextWorkerNumberOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("nextWorkerNumber"));
            Class a = ForkJoinTask[].class;
            ABASE = UNSAFE.arrayBaseOffset(a);
            s = UNSAFE.arrayIndexScale(a);
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((s & (s-1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
    }    
}     

5.ForkJoinPool.execute 执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    /**
     * Arranges for (asynchronous) execution of the given task.
     * 安排(异步)执行给定的任务。
     * @param task the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
    }
    /**
     * Unless terminating, forks task if within an ongoing FJ
     * computation in the current pool, else submits as external task.
     */
    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }    

addSubmission 将提交的任务排入submissionQueue。除了使用submissionLock之外,和ForkJoinWorkerThread.pushTask一样。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /**
     * Enqueues the given task in the submissionQueue.  Same idea as
     * ForkJoinWorkerThread.pushTask except for use of submissionLock.
     *  将提交的任务排入submissionQueue。除了使用submissionLock之外,和ForkJoinWorkerThread.pushTask一样。
     * @param t the task
     */
    private void addSubmission(ForkJoinTask<?> t) {
        final ReentrantLock lock = this.submissionLock;
        lock.lock();
        try {
            ForkJoinTask<?>[] q; int s, m;
            if ((q = submissionQueue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;
                if (s - queueBase == m) //扩容
                    growSubmissionQueue();
            }
        } finally {
            lock.unlock();
        }
        signalWork();
    }

growSubmissionQueue扩容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    /**
     * Creates or doubles submissionQueue array.
     * Basically identical to ForkJoinWorkerThread version.
     */
    private void growSubmissionQueue() {
        ForkJoinTask<?>[] oldQ = submissionQueue;
        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; //2倍扩容
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        if (size < INITIAL_QUEUE_CAPACITY)
            size = INITIAL_QUEUE_CAPACITY;
        ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
        int mask = size - 1;
        int top = queueTop;
        int oldMask;
        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
            for (int b = queueBase; b != top; ++b) {
                long u = ((b & oldMask) << ASHIFT) + ABASE;
                Object x = UNSAFE.getObjectVolatile(oldQ, u);
                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
                    UNSAFE.putObjectVolatile
                        (q, ((b & mask) << ASHIFT) + ABASE, x);
            }
        }
    }

signalWork 唤醒或创建一个工作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
  /**
     * Wakes up or creates a worker.
     */
    final void signalWork() {
        /*
         * The while condition is true if: (there is are too few total
         * workers OR there is at least one waiter) AND (there are too
         * few active workers OR the pool is terminating).  The value
         * of e distinguishes the remaining cases: zero (no waiters)
         * for create, negative if terminating (in which case do
         * nothing), else release a waiter. The secondary checks for
         * release (non-null array etc) can fail if the pool begins
         * terminating after the test, and don't impose any added cost
         * because JVMs must perform null and bounds checks anyway.
         * 
         * 如果满足以下条件,则条件成立:(总人数太少,或者至少有一个服务员)和(活动人员太少或游泳池正在终止)。
          * e的价值区分其余的情况:零(无服务员)创造,否定如果终止(在这种情况下什么都不做),否则释放一个服务员。
          * 如果池在测试之后开始终止,那么释放次数检查(非空数组等)可能会失败,因为JVM必须执行空值和边界检查,所以不会增加任何额外的开销。
         */
        long c; int e, u;
        while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
                (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
            if (e > 0) {                         // release a waiting worker 释放一名等待工作人员
                int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
                if ((ws = workers) == null ||
                    (i = ~e & SMASK) >= ws.length ||
                    (w = ws[i]) == null)
                    break;
                long nc = (((long)(w.nextWait & E_MASK)) |
                           ((long)(u + UAC_UNIT) << 32));
                if (w.eventCount == e &&
                    UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                    w.eventCount = (e + EC_UNIT) & E_MASK;
                    if (w.parked)
                        UNSAFE.unpark(w);
                    break;
                }
            }
            else if (UNSAFE.compareAndSwapLong
                     (this, ctlOffset, c,
                      (long)(((u + UTC_UNIT) & UTC_MASK) |
                             ((u + UAC_UNIT) & UAC_MASK)) << 32)) {
                addWorker();
                break;
            }
        }
    }
    
 /**
     * Tries to create and start a worker; minimally rolls back counts
     * on failure.
     * 尝试创建和启动一个工人;最低限度地回滚失败的计数。
     */
    private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }
  

创建ForkJoinWorkerThread构造调用pool.registerWorker,返回poolIndex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    /**
     * Callback from ForkJoinWorkerThread constructor to
     * determine its poolIndex and record in workers array.
     * 从ForkJoinWorkerThread构造函数中调用,以确定它的poolIndex和记录在workers数组中。
     * @param w the worker
     * @return the worker's pool index
     */
    final int registerWorker(ForkJoinWorkerThread w) {
        /*
         * In the typical case, a new worker acquires the lock, uses
         * next available index and returns quickly.  Since we should
         * not block callers (ultimately from signalWork or
         * tryPreBlock) waiting for the lock needed to do this, we
         * instead help release other workers while waiting for the
         * lock.
         * 在典型情况下,新员工获得锁定,使用下一个可用索引并快速返回。
         * 由于我们不应该阻止呼叫者(最终来自signalWork或tryPreBlock)等待锁定所需的锁定,而是在等待锁定时帮助释放其他员工。
         */
        for (int g;;) {
            ForkJoinWorkerThread[] ws;
            if (((g = scanGuard) & SG_UNIT) == 0 &&
                UNSAFE.compareAndSwapInt(this, scanGuardOffset,
                                         g, g | SG_UNIT)) {
                int k = nextWorkerIndex;
                try {
                    if ((ws = workers) != null) { // ignore on shutdown
                        int n = ws.length;
                        if (k < 0 || k >= n || ws[k] != null) {
                            for (k = 0; k < n && ws[k] != null; ++k)
                                ;
                            if (k == n)  // 扩容
                                ws = workers = Arrays.copyOf(ws, n << 1);
                        }
                        ws[k] = w;
                        nextWorkerIndex = k + 1;
                        int m = g & SMASK;
                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
                    }
                } finally {
                    scanGuard = g;
                }
                return k;
            }
            else if ((ws = workers) != null) { // help release others 帮助释放其他人
                for (ForkJoinWorkerThread u : ws) {
                    if (u != null && u.queueBase != u.queueTop) {
                        if (tryReleaseWaiter())
                            break;
                    }
                }
            }
        }
    }

tryReleaseWaiter 释放一个工作线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
      /**
       * Variant of signalWork to help release waiters on rescans.
       * Tries once to release a waiter if active count < 0.
       * 
       * 信号的变化,以帮助释放重新扫描的工作线程。尝试一次释放一个工作线程,如果积极计数<0。
       * @return false if failed due to contention, else true
       */
      private boolean tryReleaseWaiter() {
          long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
          if ((e = (int)(c = ctl)) > 0 &&  //如果有等待线程
              (int)(c >> AC_SHIFT) < 0 &&  //且当前活动线程数小于CPU核数
              (ws = workers) != null &&
              (i = ~e & SMASK) < ws.length &&    //检测控制信息中等待的工作线程的ID信息的合法性。
              (w = ws[i]) != null) {
              long nc = ((long)(w.nextWait & E_MASK) |
                         ((c + AC_UNIT) & (AC_MASK|TC_MASK))); //尝试调整控制信息,增加活动工作线程计数,将Treiber stack下一个等待线程的ID信息设置到ctl
              if (w.eventCount != e ||
                  !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) 
                  return false;
              w.eventCount = (e + EC_UNIT) & E_MASK; //累加w的eventCount
              if (w.parked)
                  UNSAFE.unpark(w);  //唤醒w
          }
          return true;
      }  

ForkJoinWorkerThread.run执行pool.work

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    /**
     * Top-level loop for worker threads: On each step: if the
     * previous step swept through all queues and found no tasks, or
     * there are excess threads, then possibly blocks. Otherwise,
     * scans for and, if found, executes a task. Returns when pool
     * and/or worker terminate.
     *  工作线程的顶级循环:在每一步:如果上一步扫过所有队列,没有找到任务,或者有多余的线程,则可能会阻塞。
     *  否则,扫描,如果找到,执行任务。当池和/或工人终止时返回。
     * @param w the worker
     */
    final void work(ForkJoinWorkerThread w) {
        boolean swept = false;                // true on empty scans
        long c;
        while (!w.terminate && (int)(c = ctl) >= 0) {
            int a;                            // active count
            if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
                swept = scan(w, a);
            else if (tryAwaitWork(w, c))
                swept = false;
        }
    }

scan 扫描任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
   // Scanning for tasks

    /**
     * Scans for and, if found, executes one task. Scans start at a
     * random index of workers array, and randomly select the first
     * (2*#workers)-1 probes, and then, if all empty, resort to 2
     * circular sweeps, which is necessary to check quiescence. and
     * taking a submission only if no stealable tasks were found.  The
     * steal code inside the loop is a specialized form of
     * ForkJoinWorkerThread.deqTask, followed bookkeeping to support
     * helpJoinTask and signal propagation. The code for submission
     * queues is almost identical. On each steal, the worker completes
     * not only the task, but also all local tasks that this task may
     * have generated. On detecting staleness or contention when
     * trying to take a task, this method returns without finishing
     * sweep, which allows global state rechecks before retry.
     * 扫描,如果找到,执行一项任务。扫描开始于工作人员数组的随机索引,随机选择第一个(2*#workers)-1探针,
     * 然后如果全部为空,则求助于2个循环扫描,这是检查静止所必需的。
     * 只有在没有找到任何可拆卸的任务的情况下才提交。
     * 循环内的窃取代码是ForkJoinWorkerThread.deqTask的一种特殊形式,
     * 遵循簿记来支持helpJoinTask和信号传播。提交队列的代码几乎是相同的。
     * 在每次偷窃时,工作者不仅完成任务,而且完成该任务可能已经产生的所有本地任务。
     * 在尝试执行任务时检测陈旧性或争用时,此方法返回而不完成扫描,这允许在重试之前进行全局状态重新检查。
     * @param w the worker
     * @param a the number of active workers
     * @return true if swept all queues without finding a task
     */
        private boolean scan(ForkJoinWorkerThread w, int a) {
            int g = scanGuard; 
            /*
             * 如果当前只有一个工作线程,将m设置为0,避免没用的扫描。
             * 否则获取guard值。
             */
            int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
            ForkJoinWorkerThread[] ws = workers;
            if (ws == null || ws.length <= m)         // 过期检测
                return false;
            
            for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                //随机选出一个牺牲者(工作线程)。
                ForkJoinWorkerThread v = ws[k & m];
                //一系列检查...
                if (v != null && (b = v.queueBase) != v.queueTop &&
                    (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
                    //如果这个牺牲者的任务队列中还有任务,尝试窃取这个任务。
                    long u = (i << ASHIFT) + ABASE;
                    if ((t = q[i]) != null && v.queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        //窃取成功后,调整queueBase
                        int d = (v.queueBase = b + 1) - v.queueTop;
                        //将牺牲者的stealHint设置为当前工作线程在pool中的下标。
                        v.stealHint = w.poolIndex;
                        if (d != 0)
                            signalWork();             // 如果牺牲者的任务队列还有任务,继续唤醒(或创建)线程。
                        w.execTask(t); //执行窃取的任务。
                    }
                    //计算出下一个随机种子。
                    r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
                    return false;                     // 返回false,表示不是一个空扫描。
                }
                //前2*m次,随机扫描。
                else if (j < 0) {                     // xorshift
                    r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
                }
                //后2*m次,顺序扫描。
                else
                    ++k;
            }
            if (scanGuard != g)                       // staleness check
                return false;
            else {                                   
                //如果扫描完毕后没找到可窃取的任务,那么从Pool的提交任务队列中取一个任务来执行。
                ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
                if ((b = queueBase) != queueTop &&
                    (q = submissionQueue) != null &&
                    (i = (q.length - 1) & b) >= 0) {
                    long u = (i << ASHIFT) + ABASE;
                    if ((t = q[i]) != null && queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        queueBase = b + 1;
                        w.execTask(t);
                    }
                    return false;
                }
                return true;                         // 如果所有的队列(工作线程的任务队列和pool的任务队列)都是空的,返回true。
            }
        }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
  /**
     * Tries to enqueue worker w in wait queue and await change in
     * worker's eventCount.  If the pool is quiescent and there is
     * more than one worker, possibly terminates worker upon exit.
     * Otherwise, before blocking, rescans queues to avoid missed
     * signals.  Upon finding work, releases at least one worker
     * (which may be the current worker). Rescans restart upon
     * detected staleness or failure to release due to
     * contention. Note the unusual conventions about Thread.interrupt
     * here and elsewhere: Because interrupts are used solely to alert
     * threads to check termination, which is checked here anyway, we
     * clear status (using Thread.interrupted) before any call to
     * park, so that park does not immediately return due to status
     * being set via some other unrelated call to interrupt in user
     * code.
     * 试图让工人排队,等待工人的事件计数。如果泳池静止,并且有多名工人,可能会在退出时终止工人。
     * 否则,在阻塞之前,重新排队以避免错过信号。在找到工作时,至少释放一名工人(可能是当前工人)。
      * Rescans在检测到过时或由于争用而无法释放时重新启动。注意这里和别处有关Thread.interrupt的不寻常约定:
      * 由于中断仅用于提示线程检查终止,无论如何在这里检查终止,所以我们在任何调用停止之前清除状态(使用Thread.interrupted),
      * 所以park立即返回由于状态被设置通过其他不相关的调用中断用户代码。
     * @param w the calling worker
     * @param c the ctl value on entry
     * @return true if waited or another thread was released upon enq
     */
       private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
           int v = w.eventCount;
           //w的nextWait保存的是等待之前Pool的控制信息。
           w.nextWait = (int)c;
           //这里是将当前线程的ID信息(下标取反)记录到Pool控制信息上
           //同时将控制信息上的活动工作线程计数减1。
           long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
           if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
               long d = ctl; 
               // 如果和另外的一个窃取线程竞争并失败,这里返回true,work方法中会继续扫描。
               return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
           }
           for (int sc = w.stealCount; sc != 0;) {   
               //将工作线程上的stealCount原子累加到Pool的stealCount上面。
               long s = stealCount;
               if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
                   sc = w.stealCount = 0;
               else if (w.eventCount != v)
                   return true;                      //如果eventCount发生变化,重试。 
           }
           if ((!shutdown || !tryTerminate(false)) &&
               (int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
               blockedCount == 0 && quiescerCount == 0)
               //如果满足上面的条件,说明当前Pool休眠了,需要调用下idleAwaitWork进行处理。
               //上面的条件是:Pool未关闭 且 有工作线程 且 活动的工作线程数量等于cpu核心数量
               //且没有工作线程在合并过程中阻塞 且 没有工作线程休眠。 
               idleAwaitWork(w, nc, c, v);           // quiescent
           for (boolean rescanned = false;;) {
               if (w.eventCount != v)
                   return true; //如果eventCount发生变化,重试。
               if (!rescanned) {
                   int g = scanGuard, m = g & SMASK;
                   ForkJoinWorkerThread[] ws = workers;
                   if (ws != null && m < ws.length) {
                       rescanned = true;
                       //这里再重新扫描一下,如果从其他工作线程任务队列里找到任务,尝试唤醒等待的工作线程。
                       for (int i = 0; i <= m; ++i) {
                           ForkJoinWorkerThread u = ws[i];
                           if (u != null) {
                               if (u.queueBase != u.queueTop &&
                                   !tryReleaseWaiter())
                                   rescanned = false; // 发生竞争,再次扫描。
                               if (w.eventCount != v)
                                   return true;
                           }
                       }
                   }
                   if (scanGuard != g ||              // scanGuard发生变化
                       (queueBase != queueTop && !tryReleaseWaiter())) //或者从Pool任务队列中找到任务
                       rescanned = false; //再次扫描
                   if (!rescanned)
                       Thread.yield();                // 出让cpu,减少竞争。
                   else
                       Thread.interrupted();          // park前清除中断标记。
               }
               else {
                   w.parked = true;                   // 设置park标记
                   if (w.eventCount != v) {           //再检测一下。
                       w.parked = false;
                       return true;
                   }
                   LockSupport.park(this);
                   rescanned = w.parked = false;
               }
           }
       }

等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    /**
     * Possibly blocks waiting for the given task to complete, or
     * cancels the task if terminating.  Fails to wait if contended.
     *
     * @param joinMe the task
     */
    final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
        int s;
        Thread.interrupted(); // clear interrupts before checking termination
        if (joinMe.status >= 0) {
            if (tryPreBlock()) {
                joinMe.tryAwaitDone(0L);
                postBlock();
            }
            else if ((ctl & STOP_BIT) != 0L)
                joinMe.cancelIgnoringExceptions();
        }
    }
    /**
     * Tries to increment blockedCount, decrement active count
     * (sometimes implicitly) and possibly release or create a
     * compensating worker in preparation for blocking. Fails
     * on contention or termination.
     *
     * @return true if the caller can block, else should recheck and retry
     */
    private boolean tryPreBlock() {
        int b = blockedCount;
        if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
            int pc = parallelism;
            do {
                ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
                int e, ac, tc, rc, i;
                long c = ctl;
                int u = (int)(c >>> 32);
                if ((e = (int)c) < 0) {
                                                 // skip -- terminating
                }
                else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
                         (ws = workers) != null &&
                         (i = ~e & SMASK) < ws.length &&
                         (w = ws[i]) != null) {
                    long nc = ((long)(w.nextWait & E_MASK) |
                               (c & (AC_MASK|TC_MASK)));
                    if (w.eventCount == e &&
                        UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                        w.eventCount = (e + EC_UNIT) & E_MASK;
                        if (w.parked)
                            UNSAFE.unpark(w);
                        return true;             // release an idle worker
                    }
                }
                else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
                    long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
                        return true;             // no compensation needed
                }
                else if (tc + pc < MAX_ID) {
                    long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                        addWorker();
                        return true;            // create a replacement
                    }
                }
                // try to back out on any failure and let caller retry
            } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
                                               b = blockedCount, b - 1));
        }
        return false;
    }
    /**
     * Decrements blockedCount and increments active count
     */
    private void postBlock() {
        long c;
        do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // no mask
                                                c = ctl, c + AC_UNIT));
        int b;
        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
                                               b = blockedCount, b - 1));
    }    

一、ForkJoinTask源码分析

ForkJoinTask抽象类实现Future接口

1.ForkJoinTask构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

    /*
     * See the internal documentation of class ForkJoinPool for a
     * general implementation overview.  ForkJoinTasks are mainly
     * responsible for maintaining their "status" field amidst relays
     * to methods in ForkJoinWorkerThread and ForkJoinPool. The
     * methods of this class are more-or-less layered into (1) basic
     * status maintenance (2) execution and awaiting completion (3)
     * user-level methods that additionally report results. This is
     * sometimes hard to see because this file orders exported methods
     * in a way that flows well in javadocs.
     * 请参阅ForkJoinPool类的内部文档以获取常规实施概述。
      * ForkJoinTasks主要负责在中继到ForkJoinWorkerThread和ForkJoinPool中的方法中维护其"status"字段。这个类的方法或多或少分为
      * (1)基本状态维护
      * (2)执行和等待完成
      * (3)用户级方法,另外报告结果。
      * 有时候很难看到这个文件,因为这个文件在javadoc中以一种流畅的方式命令导出的方法。
     */

    /*
     * The status field holds run control status bits packed into a
     * single int to minimize footprint and to ensure atomicity (via
     * CAS).  Status is initially zero, and takes on nonnegative
     * values until completed, upon which status holds value
     * NORMAL, CANCELLED, or EXCEPTIONAL. Tasks undergoing blocking
     * waits by other threads have the SIGNAL bit set.  Completion of
     * a stolen task with SIGNAL set awakens any waiters via
     * notifyAll. Even though suboptimal for some purposes, we use
     * basic builtin wait/notify to take advantage of "monitor
     * inflation" in JVMs that we would otherwise need to emulate to
     * avoid adding further per-task bookkeeping overhead.  We want
     * these monitors to be "fat", i.e., not use biasing or thin-lock
     * techniques, so use some odd coding idioms that tend to avoid
     * them.
     * 
     * 状态字段将运行控制状态位保存在一个int中,以最小化占用空间并确保原子性(通过CAS)。状态初始为零,并且在完成之前呈现非负值,
     * 状态保持NORMAL,CANCELED或EXCEPTIONAL值。由其他线程阻塞等待的任务将设置SIGNAL位。通过SIGNAL设置完成被盗任务,
     * 通过notifyAll唤醒任何服务员。尽管对于某些目的而言是次优的,但是我们使用基本的内建等待/通知来利用JVM中的"监视器通货膨胀",
     * 否则我们需要模拟它们以避免增加每个任务的簿记开销。我们希望这些监视器是"fat"的,即不使用偏置或薄锁技术,所以使用一些奇怪的编码习惯用法来避免它们。
     * 
     */

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    private static final int NORMAL      = -1;
    private static final int CANCELLED   = -2;
    private static final int EXCEPTIONAL = -3;
    private static final int SIGNAL      =  1;
    
        /**
         * Marks completion and wakes up threads waiting to join this task,
         * also clearing signal request bits.
         *
         * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
         * @return completion status on exit
         */
        private int setCompletion(int completion) {
            for (int s;;) {
                if ((s = status) < 0)
                    return s;
                if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
                    if (s != 0)
                        synchronized (this) { notifyAll(); }
                    return completion;
                }
            }
        }
    
 }   
    
    

2. ExceptionNode 异常节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    /**
     * Key-value nodes for exception table.  The chained hash table
     * uses identity comparisons, full locking, and weak references
     * for keys. The table has a fixed capacity because it only
     * maintains task exceptions long enough for joiners to access
     * them, so should never become very large for sustained
     * periods. However, since we do not know when the last joiner
     * completes, we must use weak references and expunge them. We do
     * so on each operation (hence full locking). Also, some thread in
     * any ForkJoinPool will call helpExpungeStaleExceptions when its
     * pool becomes isQuiescent.
     * 异常表的键值节点。链式哈希表使用标识比较,完全锁定和对引用的弱引用。
     * 该表具有固定的容量,因为它只保留足够长的任务异常来访问它们,所以永远不会变得非常大。
     * 但是,由于我们不知道最后一个细木工何时完成,所以我们必须使用弱引用并将其删除。我们这样做每个操作(因此完全锁定)。
     * 而且,任何ForkJoinPool中的某个线程在其池变为isQuiescent时将调用helpExpungeStaleExceptions。
     */
    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // use id not ref to avoid weak cycles 使用id不能避免弱循环
        final int hashCode;  // store task hashCode before weak ref disappears 在弱ref消失之前存储任务哈希代码
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
            super(task, exceptionTableRefQueue);
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task);
        }
    }

3. doExec 执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    /**
     * Primary execution method for stolen tasks. Unless done, calls
     * exec and records status if completed, but doesn't wait for
     * completion otherwise.
     * 被偷任务的主要执行方法。除非已经完成,否则调用exec和记录状态(如果完成了),但不等待其他的完成。
     */
    final void doExec() {
        if (status >= 0) {
            boolean completed;
            try {
                completed = exec();
            } catch (Throwable rex) {
                setExceptionalCompletion(rex);
                return;
            }
            if (completed)
                setCompletion(NORMAL); // must be outside try block
        }
    }
    
     /**
      * Immediately performs the base action of this task.  This method
      * is designed to support extensions, and should not in general be
      * called otherwise. The return value controls whether this task
      * is considered to be done normally. It may return false in
      * asynchronous actions that require explicit invocations of
      * {@link #complete} to become joinable. It may also throw an
      * (unchecked) exception to indicate abnormal exit.
      *
      * @return {@code true} if completed normally
      */
     protected abstract boolean exec();   
     
    /**
     * Records exception and sets exceptional completion.
     *
     * @return status on exit
     */
    private int setExceptionalCompletion(Throwable ex) {
        int h = System.identityHashCode(this);
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                if (e == null) {
                    t[i] = new ExceptionNode(this, ex, t[i]);
                    break;
                }
                if (e.get() == this) // already present
                    break;
            }
        } finally {
            lock.unlock();
        }
        return setCompletion(EXCEPTIONAL);
    }
    

  /**
   * Poll stale refs and remove them. Call only while holding lock.
   */
  private static void expungeStaleExceptions() {
      for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
          if (x instanceof ExceptionNode) {
              ForkJoinTask<?> key = ((ExceptionNode)x).get();
              ExceptionNode[] t = exceptionTable;
              int i = System.identityHashCode(key) & (t.length - 1);
              ExceptionNode e = t[i];
              ExceptionNode pred = null;
              while (e != null) {
                  ExceptionNode next = e.next;
                  if (e == x) {
                      if (pred == null)
                          t[i] = next;
                      else
                          pred.next = next;
                      break;
                  }
                  pred = e;
                  e = next;
              }
          }
      }
  }    
     

4. fork 执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    /**
     * Arranges to asynchronously execute this task.  While it is not
     * necessarily enforced, it is a usage error to fork a task more
     * than once unless it has completed and been reinitialized.
     * Subsequent modifications to the state of this task or any data
     * it operates on are not necessarily consistently observable by
     * any thread other than the one executing it unless preceded by a
     * call to {@link #join} or related methods, or a call to {@link
     * #isDone} returning {@code true}.
     *
     * <p>This method may be invoked only from within {@code
     * ForkJoinPool} computations (as may be determined using method
     * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
     * result in exceptions or errors, possibly including {@code
     * ClassCastException}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

5. join 执行

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110

  /**
   * Returns the result of the computation when it {@link #isDone is
   * done}.  This method differs from {@link #get()} in that
   * abnormal completion results in {@code RuntimeException} or
   * {@code Error}, not {@code ExecutionException}, and that
   * interrupts of the calling thread do <em>not</em> cause the
   * method to abruptly return by throwing {@code
   * InterruptedException}.
   *
   * @return the computed result
   */
  public final V join() {
      if (doJoin() != NORMAL)
          return reportResult();
      else
          return getRawResult();//返回结果
  }
  

  /**
   * Primary mechanics for join, get, quietlyJoin.
   * 主要机制join, get, quietlyJoin.
   * @return status upon completion
   */
  private int doJoin() {
      Thread t; ForkJoinWorkerThread w; int s; boolean completed;
      if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
          if ((s = status) < 0)
              return s;
          if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
              try {
                  completed = exec();
              } catch (Throwable rex) {
                  return setExceptionalCompletion(rex);
              }
              if (completed)
                  return setCompletion(NORMAL);
          }
          return w.joinTask(this);
      }
      else
          return externalAwaitDone();
  } 
  

  /**
   * Blocks a non-worker-thread until completion.
   * @return status upon completion
   */
  private int externalAwaitDone() {
      int s;
      if ((s = status) >= 0) {
          boolean interrupted = false;
          synchronized (this) {
              while ((s = status) >= 0) {
                  if (s == 0)
                      UNSAFE.compareAndSwapInt(this, statusOffset,
                                               0, SIGNAL);
                  else {
                      try {
                          wait();
                      } catch (InterruptedException ie) {
                          interrupted = true;
                      }
                  }
              }
          }
          if (interrupted)
              Thread.currentThread().interrupt();
      }
      return s;
  }
    /**
     * Report the result of invoke or join; called only upon
     * non-normal return of internal versions.
     */
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }

  /**
   * Tries to block a worker thread until completed or timed out.
   * Uses Object.wait time argument conventions.
   * May fail on contention or interrupt.
   *
   * @param millis if > 0, wait time.
   */
  final void tryAwaitDone(long millis) {
      int s;
      try {
          if (((s = status) > 0 ||
               (s == 0 &&
                UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
              status > 0) {
              synchronized (this) {
                  if (status > 0)
                      wait(millis);
              }
          }
      } catch (InterruptedException ie) {
          // caller must check termination
      }
  }