1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
|
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* 主池控制状态ctl是包含两个概念字段的原子整数:
* workerCount,表示有效的线程数
* runState,指示是否运行,关闭等
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
* ctl控制状态是int,我们将workerCount限制为(2 ^ 29)-1(约5亿),而不是(2 ^ 31)-1(20亿)。
* 如果将来有这个问题,可以将变量改为AtomicLong,并调整下面的shift / mask常量。但是,直到需要出现,这个代码是更快,更简单的使用int
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* workerCount是已经被允许启动并且不被允许停止的工作线程的数量。该值可能与活动线程的实际数量暂时不同,
* 例如,当ThreadFactory在被询问时未能创建线程,以及退出线程在终止之前仍在执行簿记时。
* 用户可见的池大小被报告为工作者集合的当前大小。
*
* The runState provides the main lifecyle control, taking on values:
*
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* runState提供了主要的生命周期控制,取值为:
* RUNNING: 接受新任务并处理排队的任务
* SHUTDOWN: 不要接受新的任务,而是处理排队的任务
* STOP: 不要接受新的任务,不要处理排队的任务,并中断正在进行的任务
* TIDYING: 所有任务都已终止,workerCount为零,线程转换到状态TIDYING将运行terminate()挂接方法
* TERMINATED: terminated() 已经完成
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* 这些值之间的数字顺序很重要,以允许有序的比较。 runState随着时间的推移单调增加,但不需要击中每个状态。转换是:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize() 在调用shutdown()时,可能会隐式地在finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow() 在调用shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty 当队列和池都为空时
* STOP -> TIDYING
* When pool is empty 池为空时
* TIDYING -> TERMINATED
* When the terminated() hook method has completed 当terminate()钩子方法完成时
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
* 检测从SHUTDOWN到TIDYING的转换并不像你希望的那样简单,
* 因为在SHUTDOWN状态下,非空或反过来,队列可能变空,但是只有在看到它为空时,
* 我们才能看到workerCount是0(这有时需要重新检查 - 见下文)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //高29位以上代表runState
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //workerCount最大容量(536870911) 0001 1111 1111 1111 1111 1111 1111 1111
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;//1110 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;//0000 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;//0010 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;//0100 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;//0110 0000 0000 0000 0000 0000 0000 0000
// Packing and unpacking ctl 包装和拆包 ctl
//获取 runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取 workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
//两个数取或
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempt to CAS-increment the workerCount field of ctl.
* CAS 添加 workerCount
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempt to CAS-decrement the workerCount field of ctl.
* CAS 减少 workerCount
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
* while循环调用compareAndDecrementWorkerCount方法 主要减少workerCount数量
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
* 任务队列,负责保存任务并将任务交给工作线程处理。我们不要求返回null的workQueue.poll()必然意味着workQueue.isEmpty(),
* 因此完全依赖于isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN转换为TIDYING时,我们必须执行此操作) 。
* 这容纳了特殊用途的队列,如DelayQueues,即使poll()允许其返回null,即使延迟过期后可能会返回非null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 设置包含池中的所有工作线程。只有在保持mainLock的情况下才能访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
* 等待状态支持等待终止
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
* 跟踪达到最大线程池大小。仅在mainLock下访问。
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
* 完成任务的计数器。仅在工作线程终止时更新。仅在mainLock下访问。
*/
private long completedTaskCount;
/*
* All user control parameters are declared as volatiles so that
* ongoing actions are based on freshest values, but without need
* for locking, since no internal invariants depend on them
* changing synchronously with respect to other actions.
*/
/**
* Factory for new threads. All threads are created using this
* factory (via method addWorker). All callers must be prepared
* for addWorker to fail, which may reflect a system or user's
* policy limiting the number of threads. Even though it is not
* treated as an error, failure to create threads may result in
* new tasks being rejected or existing ones remaining stuck in
* the queue.
*
* 工厂创建新线程。所有的线程都是使用这个工厂创建的(通过方法addWorker)。
* 所有的调用者都必须准备好让addWorker失败,这可能反映了系统或用户的策略限制了线程的数量。
* 即使它不被视为错误,创建线程失败可能会导致新任务被拒绝或现有的任务仍然滞留在队列中。
*
* We go further and preserve pool invariants even in the face of
* errors such as OutOfMemoryError, that might be thrown while
* trying to create threads. Such errors are rather common due to
* the need to allocate a native stack in Thread#start, and users
* will want to perform clean pool shutdown to clean up. There
* will likely be enough memory available for the cleanup code to
* complete without encountering yet another OutOfMemoryError.
*
* 即使面对诸如OutOfMemoryError之类的错误,我们也可以更进一步并保持池不变量,这可能会在尝试创建线程时抛出。
* 这样的错误是相当普遍的,因为需要在Thread#start中分配一个本地堆栈,并且用户需要执行清理池关闭来清理。
* 可能会有足够的内存用于清理代码,而不会遇到另一个OutOfMemoryError。
*/
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
* 在执行时调用saturated or shutdown 时的处理程序。
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*
* 空闲工作线程等待任务的超时时间,单位:纳秒。
* 当前线程数大于核心线程数时,超出的线程会使用这个超时时间。
* 如果设置了allowCoreThreadTimeOut,核心线程数也会使用这个
* 超时时间。否则,线程会一直等待新任务,不会超时。
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
* 如果为false(默认情况下),核心线程就算空闲也会一直存活。
* 如果为true,等待任务的核心线程会使用keepAliveTime作为
* 超时时间,如果超时,线程被回收。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
* 核心线程数量,只能在持有mainLock的情况下修改。
* volatile可以保证可见性。
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
* 最大线程数量。注意,实际的最大值是内部受CAPACITY限制的。
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
* 默认的拒绝执行处理程序
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
* Permission required for callers of shutdown and shutdownNow.
* We additionally require (see checkShutdownAccess) that callers
* have permission to actually interrupt threads in the worker set
* (as governed by Thread.interrupt, which relies on
* ThreadGroup.checkAccess, which in turn relies on
* SecurityManager.checkAccess). Shutdowns are attempted only if
* these checks pass.
*
* 对于调用线程池的shutdown(),shutdownNow()方法权限认证。
* 我们还需要(请参阅checkShutdownAccess)调用者有权实际中断工作集中的线程
* (由Thread.interrupt控制,而ThreadGroup.checkAccess依赖于SecurityManager.checkAccess)。
* 仅当这些检查通过时才试图关机。
*
* All actual invocations of Thread.interrupt (see
* interruptIdleWorkers and interruptWorkers) ignore
* SecurityExceptions, meaning that the attempted interrupts
* silently fail. In the case of shutdown, they should not fail
* unless the SecurityManager has inconsistent policies, sometimes
* allowing access to a thread and sometimes not. In such cases,
* failure to actually interrupt threads may disable or delay full
* termination. Other uses of interruptIdleWorkers are advisory,
* and failure to actually interrupt will merely delay response to
* configuration changes so is not handled exceptionally.
*
* Thread.interrupt的所有实际调用(请参阅interruptIdleWorkers和interruptWorkers)都将忽略SecurityExceptions,
* 这意味着尝试的中断将以静默方式失败。在关闭的情况下,除非SecurityManager具有不一致的策略,否则不应该失败,
* 有时允许访问线程,有时不允许访问。在这种情况下,实际中断线程的失败可能会导致或终止完全中断。
* interruptIdleWorkers的其他用途是建议性的,实际中断的失败只会延迟对配置更改的响应,因此不会异常处理。
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
|