1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
|
public class ForkJoinWorkerThread extends Thread {
/*
* Overview:
*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. This class includes bookkeeping in support of
* worker activation, suspension, and lifecycle control described
* in more detail in the internal documentation of class
* ForkJoinPool. And as described further below, this class also
* includes special-cased support for some ForkJoinTask
* methods. But the main mechanics involve work-stealing:
*
* ForkJoinWorkerThreads由ForkJoinPools管理并执行ForkJoinTasks。本课程包括记录,以支持工作人员激活,暂停和生命周期控制,
* 在ForkJoinPool类的内部文档中有更详细的描述。如下所述,这个类还包括对某些ForkJoinTask方法的特殊支持。但主要的机制涉及盗窃工作:
*
* Work-stealing queues are special forms of Deques that support
* only three of the four possible end-operations -- push, pop,
* and deq (aka steal), under the further constraints that push
* and pop are called only from the owning thread, while deq may
* be called from other threads. (If you are unfamiliar with
* them, you probably want to read Herlihy and Shavit's book "The
* Art of Multiprocessor programming", chapter 16 describing these
* in more detail before proceeding.) The main work-stealing
* queue design is roughly similar to those in the papers "Dynamic
* Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
* The main differences ultimately stem from gc requirements that
* we null out taken slots as soon as we can, to maintain as small
* a footprint as possible even in programs generating huge
* numbers of tasks. To accomplish this, we shift the CAS
* arbitrating pop vs deq (steal) from being on the indices
* ("queueBase" and "queueTop") to the slots themselves (mainly
* via method "casSlotNull()"). So, both a successful pop and deq
* mainly entail a CAS of a slot from non-null to null. Because
* we rely on CASes of references, we do not need tag bits on
* queueBase or queueTop. They are simple ints as used in any
* circular array-based queue (see for example ArrayDeque).
* Updates to the indices must still be ordered in a way that
* guarantees that queueTop == queueBase means the queue is empty,
* but otherwise may err on the side of possibly making the queue
* appear nonempty when a push, pop, or deq have not fully
* committed. Note that this means that the deq operation,
* considered individually, is not wait-free. One thief cannot
* successfully continue until another in-progress one (or, if
* previously empty, a push) completes. However, in the
* aggregate, we ensure at least probabilistic non-blockingness.
* If an attempted steal fails, a thief always chooses a different
* random victim target to try next. So, in order for one thief to
* progress, it suffices for any in-progress deq or new push on
* any empty queue to complete.
*
* 窃取队列是Deques的特殊形式,它只支持四种可能的最终操作中的三种 - push,pop和deq(aka steal),
* 在push和pop只能从拥有的线程中被调用的同时, deq可能会被其他线程调用。
* (如果您不熟悉这些内容,您可能需要阅读Herlihy和Shavit的书"多处理器编程的艺术",第16章在继续之前详细描述了这些内容)。
* 主要的工作队列设计大致类似于Chase和Lev撰写的"Dynamic Circular Work-Stealing Deque",SPAA 2005(http://research.sun.com/scalable/pubs/index.html)
* 和Michael,Saraswat和Vechev的"幂等工作窃取"PPoPP 2009(http://portal.acm.org/citation.cfm?id=1504186)。主要的差异最终来源于gc的要求,
* 我们尽可能地尽快地将空位排除在外,即使在产生大量任务的程序中,也尽可能地保持足够小的空间。
* 为了实现这一点,我们将CAS仲裁pop和deq(steal)从指标("queueBase"和"queueTop")转移到槽本身(主要通过方法"casSlotNull()")。
* 所以,一个成功的pop和deq主要包含一个从非null到null的时隙的CAS。因为我们依赖于引用的CASes,
* 所以我们不需要queueBase或queueTop上的标签位。它们是用于任何基于循环数组的队列中的简单整数(参见例如ArrayDeque)。
* 对索引的更新仍然必须以保证queueTop == queueBase意味着队列为空的方式排序,否则在push,pop或deq未完全提交时可能使队列显示为非空。
* 请注意,这意味着单独考虑deq操作并非等待。一个小偷不能成功地继续下去,直到另一个小偷(或者,如果先前是空的,推)完成。
* 但总的来说,我们至少保证概率性的非阻塞性。如果一次偷窃失败,一个小偷总是选择一个不同的随机受害者目标来尝试下一个。
* 所以,为了让一个小偷进步,只要任何空队列中的任何正在进行的排队或新推进就足以完成。
*
* This approach also enables support for "async mode" where local
* task processing is in FIFO, not LIFO order; simply by using a
* version of deq rather than pop when locallyFifo is true (as set
* by the ForkJoinPool). This allows use in message-passing
* frameworks in which tasks are never joined. However neither
* mode considers affinities, loads, cache localities, etc, so
* rarely provide the best possible performance on a given
* machine, but portably provide good throughput by averaging over
* these factors. (Further, even if we did try to use such
* information, we do not usually have a basis for exploiting
* it. For example, some sets of tasks profit from cache
* affinities, but others are harmed by cache pollution effects.)
*
* 这种方法还支持"异步模式",其中本地任务处理处于FIFO中,而不是LIFO命令;当localFifo为true时(通过ForkJoinPool设置),
* 只需使用deq版本而不是pop。这允许在消息传递框架中使用,其中任务从未被加入。然而,这两种模式都不考虑亲和性,负载,缓存区域等,
* 因此很少在给定的机器上提供最好的性能,但是通过对这些因素进行平均,可移植性提供了良好的吞吐量。
* (另外,即使我们试图使用这些信息,通常也没有开发它的依据,例如,某些任务从缓存关系中获益,而另外一些则受到缓存污染的影响。
*
* When a worker would otherwise be blocked waiting to join a
* task, it first tries a form of linear helping: Each worker
* records (in field currentSteal) the most recent task it stole
* from some other worker. Plus, it records (in field currentJoin)
* the task it is currently actively joining. Method joinTask uses
* these markers to try to find a worker to help (i.e., steal back
* a task from and execute it) that could hasten completion of the
* actively joined task. In essence, the joiner executes a task
* that would be on its own local deque had the to-be-joined task
* not been stolen. This may be seen as a conservative variant of
* the approach in Wagner & Calder "Leapfrogging: a portable
* technique for implementing efficient futures" SIGPLAN Notices,
* 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
* in that: (1) We only maintain dependency links across workers
* upon steals, rather than use per-task bookkeeping. This may
* require a linear scan of workers array to locate stealers, but
* usually doesn't because stealers leave hints (that may become
* stale/wrong) of where to locate them. This isolates cost to
* when it is needed, rather than adding to per-task overhead.
* (2) It is "shallow", ignoring nesting and potentially cyclic
* mutual steals. (3) It is intentionally racy: field currentJoin
* is updated only while actively joining, which means that we
* miss links in the chain during long-lived tasks, GC stalls etc
* (which is OK since blocking in such cases is usually a good
* idea). (4) We bound the number of attempts to find work (see
* MAX_HELP) and fall back to suspending the worker and if
* necessary replacing it with another.
*
* 当一个工人被阻止等待加入任务时,它首先尝试一种线性帮助的形式:每个工人记录(在currentSteal字段中)它从其他工人偷走的最近的任务。
* 另外,它记录(在field currentJoin中)当前正在加入的任务。方法joinTask使用这些标记来试图找到一个工作者来帮助(即从任务中偷取并执行它),
* 这可以加快主动连接的任务的完成。从本质上来说,木匠执行一项任务,如果没有被盗的话,那么这个任务就可以在自己的本地装备上。
* 这可能被看作是Wagner和Calder"跳跃:一种实现有效期货的便携式技术"的SIGPLAN公告1993年版(http://portal.acm.org/citation.cfm?id=155354)
* 的一种保守的变体形式。
* 它的不同之处在于:
* (1)我们只保留偷工作中的工人之间的依赖关系,而不是使用按任务记账。这可能需要对工人阵列进行线性扫描以确定盗贼的位置,但通常不会因为
* 偷窃者留下提示(可能会变得陈旧/错误)来确定他们的位置。这将成本与需求分离,而不是增加每个任务的开销。
* (2)"浅",忽略嵌套和潜在的循环互抢。
* (3)故意激活:字段currentJoin只在主动加入时更新,这意味着我们在长期任务,GC停顿等期间错过了链中的链接(这是可以的,
* 因为在这种情况下阻塞通常是个好主意) 。
* (4)我们限制尝试找工作的次数(见MAX_HELP),然后回退到暂停工作,必要时替换成另一个。
*
* Efficient implementation of these algorithms currently relies
* on an uncomfortable amount of "Unsafe" mechanics. To maintain
* correct orderings, reads and writes of variable queueBase
* require volatile ordering. Variable queueTop need not be
* volatile because non-local reads always follow those of
* queueBase. Similarly, because they are protected by volatile
* queueBase reads, reads of the queue array and its slots by
* other threads do not need volatile load semantics, but writes
* (in push) require store order and CASes (in pop and deq)
* require (volatile) CAS semantics. (Michael, Saraswat, and
* Vechev's algorithm has similar properties, but without support
* for nulling slots.) Since these combinations aren't supported
* using ordinary volatiles, the only way to accomplish these
* efficiently is to use direct Unsafe calls. (Using external
* AtomicIntegers and AtomicReferenceArrays for the indices and
* array is significantly slower because of memory locality and
* indirection effects.)
*
* 这些算法的高效实现目前依赖于不安全的"不安全"机制。为了保持正确的排序,读取和写入变量queueBase需要不稳定的顺序。
* 变量queueTop不必是易失性的,因为非本地读取总是遵循queueBase的读取。同样,由于它们受volatile QueueBase读取的保护,
* 因此其他线程对队列数组及其插槽的读取不需要易变的加载语义,但写入(推送)需要存储顺序,而CAS(pop和deq)需要(volatile) CAS语义。
* (Michael,Saraswat和Vechev的算法具有相似的性质,但是不支持置零时隙)。由于这些组合不支持使用普通的挥发性物质,
* 唯一有效完成这些组合的方法是使用直接的不安全调用。 (对于索引和数组,使用外部AtomicIntegers和AtomicReferenceArrays要慢得多,
* 因为内存局部性和间接效应。)
*
* Further, performance on most platforms is very sensitive to
* placement and sizing of the (resizable) queue array. Even
* though these queues don't usually become all that big, the
* initial size must be large enough to counteract cache
* contention effects across multiple queues (especially in the
* presence of GC cardmarking). Also, to improve thread-locality,
* queues are initialized after starting.
* 此外,大多数平台上的性能对(可调整大小)队列阵列的放置和大小非常敏感。即使这些队列通常不会变得那么大,但是初始大小必须足够大以抵消多
* 个队列中的缓存争用效应(特别是在GC卡标记的情况下)。而且,为了改善线程局部性,队列在启动之后被初始化。
*/
/**
* Mask for pool indices encoded as shorts
* 用于将池索引编码为“空”的掩码
*/
private static final int SMASK = 0xffff;
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two. Initial size must be at least 4, but is
* padded to minimize cache effects.
* 初始化时的工作窃取队列数组的容量。必须是两幂次方。初始大小必须至少为4,但被填充以最小化缓存效果。
*/
private static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //4096
/**
* Maximum size for queue array. Must be a power of two
* less than or equal to 1 << (31 - width of array entry) to
* ensure lack of index wraparound, but is capped at a lower
* value to help users trap runaway computations.
* 队列数组的最大大小。必须是小于或等于1 < <(31 -宽度的数组条目)的幂,以确保缺少索引wraparound,但要以较低的值限制,以帮助用户捕获失控的计算。
*/
private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M
/**
* The work-stealing queue array. Size must be a power of two.
* Initialized when started (as oposed to when constructed), to
* improve memory locality.
* 工作-窃取队列数组。大小必须是2的幂。初始化时(作为构建时的oposed),用于改善内存位置。
*/
ForkJoinTask<?>[] queue;
/**
* The pool this thread works in. Accessed directly by ForkJoinTask.
* 此线程工作的池。直接由ForkJoinTask访问。
*/
final ForkJoinPool pool;
/**
* Index (mod queue.length) of next queue slot to push to or pop
* from. It is written only by owner thread, and accessed by other
* threads only after reading (volatile) queueBase. Both queueTop
* and queueBase are allowed to wrap around on overflow, but
* (queueTop - queueBase) still estimates size.
* 下一个队列插槽的索引(取模 队列长度)push或pop。它仅由所有者线程写入,并且只有在读取(volatile)queueBase之后才被其他线程访问。
* queueTop和queueBase都允许环绕溢出,但(queueTop - queueBase)仍然估计大小。
*/
int queueTop;
/**
* Index (mod queue.length) of least valid queue slot, which is
* always the next position to steal from if nonempty.
* 索引(取模 队列长度)最小有效队列时隙,它总是下一个从非空位置开始的位置。
*/
volatile int queueBase;
/**
* The index of most recent stealer, used as a hint to avoid
* traversal in method helpJoinTask. This is only a hint because a
* worker might have had multiple steals and this only holds one
* of them (usually the most current). Declared non-volatile,
* relying on other prevailing sync to keep reasonably current.
*
* 最近的stealer的索引,用作避免在方法helpJoinTask中遍历的提示。这只是一个暗示,因为一个工作人员可能有多次抢断,
* 而这只能保留其中一个(通常是最新的)。声明非易失性,依靠其他流行的同步保持合理的最新。
*/
int stealHint;
/**
* Index of this worker in pool array. Set once by pool before
* running, and accessed directly by pool to locate this worker in
* its workers array.
* 这个worker在pool数组中的索引。在运行前通过池设置一次,并通过池直接访问,以便在其工作人员数组中找到该worker。
*/
final int poolIndex;
/**
* Encoded record for pool task waits. Usages are always
* surrounded by volatile reads/writes
* 编码记录池任务等待。用法volatile读/写
*/
int nextWait;
/**
* Complement of poolIndex, offset by count of entries of task
* waits. Accessed by ForkJoinPool to manage event waiters.
* 补充的池索引,通过任务的条目计数来抵消。通过Fork连接池访问以管理事件服务员。
*/
volatile int eventCount;
/**
* Seed for random number generator for choosing steal victims.
* Uses Marsaglia xorshift. Must be initialized as nonzero.
* 随机数发生器的种子选择窃取受害者。 使用Marsaglia xorshift。必须初始化为非零。
*/
int seed;
/**
* Number of steals. Directly accessed (and reset) by pool when
* idle.
* 抢断次数,空闲时由池直接访问(和重置)。
*/
int stealCount;
/**
* True if this worker should or did terminate
* 如果这个工人应该或确实终止了
*/
volatile boolean terminate;
/**
* Set to true before LockSupport.park; false on return
* 在LockSupport.park之前设置为true;假 return
*/
volatile boolean parked;
/**
* True if use local fifo, not default lifo, for local polling.
* Shadows value from ForkJoinPool.
* 如果使用本地的fifo,而不是默认的lifo,则为真正的本地轮询。来自ForkJoinPool的阴影值。
*/
final boolean locallyFifo;
/**
* The task most recently stolen from another worker (or
* submission queue). All uses are surrounded by enough volatile
* reads/writes to maintain as non-volatile.
* 最近从另一个工作人员(或提交队列)中窃取的任务。所有用途都被足够的volatile读/写所包围,以保持non-volatile。
*/
ForkJoinTask<?> currentSteal;
/**
* The task currently being joined, set only when actively trying
* to help other stealers in helpJoinTask. All uses are surrounded
* by enough volatile reads/writes to maintain as non-volatile.
* 目前正在加入的任务,只有在主动尝试帮助其他盗取helpJoinTask时才设置。所有的使用都被足够的volatile读/写所包围,以保持非易失性。
*/
ForkJoinTask<?> currentJoin;
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
* 创建在给定池中操作的Fork连接工作线程。
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super(pool.nextWorkerName());
this.pool = pool;
int k = pool.registerWorker(this);
poolIndex = k;
eventCount = ~k & SMASK; // clear wait count
locallyFifo = pool.locallyFifo;
Thread.UncaughtExceptionHandler ueh = pool.ueh;
if (ueh != null)
setUncaughtExceptionHandler(ueh);
setDaemon(true);
}
/**
* CASes slot i of array q from t to null. Caller must ensure q is
* non-null and index is in range.
* 从t到null的数组q的槽i。调用者必须确保q是非空的,并且索引在范围内。
*/
private static final boolean casSlotNull(ForkJoinTask<?>[] q, int i,
ForkJoinTask<?> t) {
return UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE, t, null);
}
/**
* Performs a volatile write of the given task at given slot of
* array q. Caller must ensure q is non-null and index is in
* range. This method is used only during resets and backouts.
* 在给定的数组q的位置上执行给定任务的不稳定的写入。调用者必须确保q是非空的,并且索引在范围内。此方法仅在重新设置和退出时使用。
*/
private static final void writeSlot(ForkJoinTask<?>[] q, int i,
ForkJoinTask<?> t) {
UNSAFE.putObjectVolatile(q, (i << ASHIFT) + ABASE, t);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long ABASE;
private static final int ASHIFT;
static {
int s;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class a = ForkJoinTask[].class;
ABASE = UNSAFE.arrayBaseOffset(a);
s = UNSAFE.arrayIndexScale(a);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}
}
|