ThreadPoolExecutor源码分析

简介

  ThreadPoolExecutor是java.util.concurrent包中提供的线程池。

一、ThreadPoolExecutor 数据结构实现

1.ThreadPoolExecutor 实现AbstractExecutorService抽象类

依赖关系图如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public interface ExecutorService extends Executor {
    /**
     * 关闭当前执行器,关闭之前提交的任务仍然会被执行,但不
     * 会接收新的任务。如果当前执行服务已经关闭,那么调用这
     * 个方法不会产生额外的影响。
     */
    void shutdown();
    /**
     * 尝试停止所有正在执行的任务,停止处理正在等待的任务,
     * 并返回等待执行任务列表。
     *
     * 并不能确保一定可以停止正在执行的任务。比如,通常的实现
     * 方式是中断执行任务的线程,但如果任务执行过程中并不响应
     * 中断,那就无法停止这个任务。
     */
    List<Runnable> shutdownNow();
    /**
     * 判断当前执行器是否已经关闭。
     */
    boolean isShutdown();
    /**
     * 判断在执行器关闭后,是否所有的任务都执行完毕。
     */
    boolean isTerminated();
    /**
     * 在一个关闭请求后,阻塞等待,直到所有任务都执行完毕,或者
     * 超时,或者当前线程被中断。
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 提交一个有返回结果的任务,并返回一个Future。  
     */
    <T> Future<T> submit(Callable<T> task);
    /**
     * 提交一个Runnable任务和返回结果,并返回一个Future。
     */
    <T> Future<T> submit(Runnable task, T result);
    /**
     * 提交一个Runnable任务和返回结果,并返回一个Future。
     * 如果任务执行完毕,调用Future的get方法返回null。
     */
    Future<?> submit(Runnable task);
    /**
     * 执行一批任务,当所有任务都执行完毕后,以Future集合
     * 集合的形式返回结果。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    /**
     * 执行一批任务,当所有任务都执行完毕或者超时,以Future集合
     * 集合的形式返回结果。注意如果超时的话,没有执行完的任务会
     * 被取消。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 执行一批任务,如果其中有一个任务完成,就返回结果。
     * 其他没有完成的任务会被取消。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    /**
     * 执行一批任务,如果其中有一个任务完成或者超时,就返回结果。
     * 其他没有完成的任务会被取消。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

2.ThreadPoolExecutor 内部结构实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * The main pool control state, ctl, is an atomic integer packing
     * two conceptual fields
     *   workerCount, indicating the effective number of threads
     *   runState,    indicating whether running, shutting down etc
     *  主池控制状态ctl是包含两个概念字段的原子整数:
     *    workerCount,表示有效的线程数
     *    runState,指示是否运行,关闭等
     * In order to pack them into one int, we limit workerCount to
     * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
     * billion) otherwise representable. If this is ever an issue in
     * the future, the variable can be changed to be an AtomicLong,
     * and the shift/mask constants below adjusted. But until the need
     * arises, this code is a bit faster and simpler using an int.
     * ctl控制状态是int,我们将workerCount限制为(2 ^ 29)-1(约5亿),而不是(2 ^ 31)-1(20亿)。
     * 如果将来有这个问题,可以将变量改为AtomicLong,并调整下面的shift / mask常量。但是,直到需要出现,这个代码是更快,更简单的使用int
     *
     * The workerCount is the number of workers that have been
     * permitted to start and not permitted to stop.  The value may be
     * transiently different from the actual number of live threads,
     * for example when a ThreadFactory fails to create a thread when
     * asked, and when exiting threads are still performing
     * bookkeeping before terminating. The user-visible pool size is
     * reported as the current size of the workers set.
     * 
     * workerCount是已经被允许启动并且不被允许停止的工作线程的数量。该值可能与活动线程的实际数量暂时不同,
     * 例如,当ThreadFactory在被询问时未能创建线程,以及退出线程在终止之前仍在执行簿记时。
     * 用户可见的池大小被报告为工作者集合的当前大小。
     *
     * The runState provides the main lifecyle control, taking on values:
     * 
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *   
     * runState提供了主要的生命周期控制,取值为:
     *   RUNNING: 接受新任务并处理排队的任务
     *   SHUTDOWN: 不要接受新的任务,而是处理排队的任务
     *   STOP:     不要接受新的任务,不要处理排队的任务,并中断正在进行的任务
     *   TIDYING:  所有任务都已终止,workerCount为零,线程转换到状态TIDYING将运行terminate()挂接方法
     *   TERMINATED: terminated() 已经完成
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * 这些值之间的数字顺序很重要,以允许有序的比较。 runState随着时间的推移单调增加,但不需要击中每个状态。转换是:
     * 
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize() 在调用shutdown()时,可能会隐式地在finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()  在调用shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty 当队列和池都为空时
     * STOP -> TIDYING
     *    When pool is empty    池为空时
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed  当terminate()钩子方法完成时
     *
     * Threads waiting in awaitTermination() will return when the
     * state reaches TERMINATED.
     * 
     * 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
     *
     * Detecting the transition from SHUTDOWN to TIDYING is less
     * straightforward than you'd like because the queue may become
     * empty after non-empty and vice versa during SHUTDOWN state, but
     * we can only terminate if, after seeing that it is empty, we see
     * that workerCount is 0 (which sometimes entails a recheck -- see
     * below).
     * 检测从SHUTDOWN到TIDYING的转换并不像你希望的那样简单,
     * 因为在SHUTDOWN状态下,非空或反过来,队列可能变空,但是只有在看到它为空时,
     * 我们才能看到workerCount是0(这有时需要重新检查 - 见下文)。
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3; //高29位以上代表runState
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //workerCount最大容量(536870911) 0001 1111 1111 1111 1111 1111 1111 1111

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//1110 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0000 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;//0010 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;//0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;//0110 0000 0000 0000 0000 0000 0000 0000 

    // Packing and unpacking ctl  包装和拆包 ctl
    //获取 runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取 workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //两个数取或
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    /*
     * Bit field accessors that don't require unpacking ctl.
     * These depend on the bit layout and on workerCount being never negative.
     * 
     */

    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * Attempt to CAS-increment the workerCount field of ctl.
     * CAS 添加 workerCount
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempt to CAS-decrement the workerCount field of ctl.
     * CAS 减少 workerCount
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     *  while循环调用compareAndDecrementWorkerCount方法 主要减少workerCount数量
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     * 任务队列,负责保存任务并将任务交给工作线程处理。我们不要求返回null的workQueue.poll()必然意味着workQueue.isEmpty(),
     * 因此完全依赖于isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN转换为TIDYING时,我们必须执行此操作) 。
     * 这容纳了特殊用途的队列,如DelayQueues,即使poll()允许其返回null,即使延迟过期后可能会返回非null。
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     * 
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     * 设置包含池中的所有工作线程。只有在保持mainLock的情况下才能访问。
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     * 等待状态支持等待终止
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     * 跟踪达到最大线程池大小。仅在mainLock下访问。
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     * 完成任务的计数器。仅在工作线程终止时更新。仅在mainLock下访问。
     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     */

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     * 
     * 工厂创建新线程。所有的线程都是使用这个工厂创建的(通过方法addWorker)。
     * 所有的调用者都必须准备好让addWorker失败,这可能反映了系统或用户的策略限制了线程的数量。
     * 即使它不被视为错误,创建线程失败可能会导致新任务被拒绝或现有的任务仍然滞留在队列中。
     * 
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread#start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     * 
     * 即使面对诸如OutOfMemoryError之类的错误,我们也可以更进一步并保持池不变量,这可能会在尝试创建线程时抛出。
     * 这样的错误是相当普遍的,因为需要在Thread#start中分配一个本地堆栈,并且用户需要执行清理池关闭来清理。
     * 可能会有足够的内存用于清理代码,而不会遇到另一个OutOfMemoryError。
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     * 在执行时调用saturated or shutdown 时的处理程序。
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     * 
     * 空闲工作线程等待任务的超时时间,单位:纳秒。
     * 当前线程数大于核心线程数时,超出的线程会使用这个超时时间。
     * 如果设置了allowCoreThreadTimeOut,核心线程数也会使用这个
    * 超时时间。否则,线程会一直等待新任务,不会超时。
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     * 如果为false(默认情况下),核心线程就算空闲也会一直存活。
     * 如果为true,等待任务的核心线程会使用keepAliveTime作为
     * 超时时间,如果超时,线程被回收。
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     *  核心线程数量,只能在持有mainLock的情况下修改。
     * volatile可以保证可见性。
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     * 最大线程数量。注意,实际的最大值是内部受CAPACITY限制的。
     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     * 默认的拒绝执行处理程序
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    /**
     * Permission required for callers of shutdown and shutdownNow.
     * We additionally require (see checkShutdownAccess) that callers
     * have permission to actually interrupt threads in the worker set
     * (as governed by Thread.interrupt, which relies on
     * ThreadGroup.checkAccess, which in turn relies on
     * SecurityManager.checkAccess). Shutdowns are attempted only if
     * these checks pass.
     *  
     *  对于调用线程池的shutdown(),shutdownNow()方法权限认证。
     *  我们还需要(请参阅checkShutdownAccess)调用者有权实际中断工作集中的线程
     *  (由Thread.interrupt控制,而ThreadGroup.checkAccess依赖于SecurityManager.checkAccess)。
     *  仅当这些检查通过时才试图关机。
     *  
     * All actual invocations of Thread.interrupt (see
     * interruptIdleWorkers and interruptWorkers) ignore
     * SecurityExceptions, meaning that the attempted interrupts
     * silently fail. In the case of shutdown, they should not fail
     * unless the SecurityManager has inconsistent policies, sometimes
     * allowing access to a thread and sometimes not. In such cases,
     * failure to actually interrupt threads may disable or delay full
     * termination. Other uses of interruptIdleWorkers are advisory,
     * and failure to actually interrupt will merely delay response to
     * configuration changes so is not handled exceptionally.
     * 
     * Thread.interrupt的所有实际调用(请参阅interruptIdleWorkers和interruptWorkers)都将忽略SecurityExceptions,
     * 这意味着尝试的中断将以静默方式失败。在关闭的情况下,除非SecurityManager具有不一致的策略,否则不应该失败,
     * 有时允许访问线程,有时不允许访问。在这种情况下,实际中断线程的失败可能会导致或终止完全中断。 
     * interruptIdleWorkers的其他用途是建议性的,实际中断的失败只会延迟对配置更改的响应,因此不会异常处理。
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters and default thread factory and rejected execution handler.
   * It may be more convenient to use one of the {@link Executors} factory
   * methods instead of this general purpose constructor.
   *
   * @param corePoolSize the number of threads to keep in the pool, even
   *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *        pool
   * @param keepAliveTime when the number of threads is greater than
   *        the core, this is the maximum time that excess idle threads
   *        will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *        executed.  This queue will hold only the {@code Runnable}
   *        tasks submitted by the {@code execute} method.
   * @throws IllegalArgumentException if one of the following holds:<br>
   *         {@code corePoolSize < 0}<br>
   *         {@code keepAliveTime < 0}<br>
   *         {@code maximumPoolSize <= 0}<br>
   *         {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue} is null
   */
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue) {
      this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
           Executors.defaultThreadFactory(), defaultHandler);
  }

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters and default rejected execution handler.
   *
   * @param corePoolSize the number of threads to keep in the pool, even
   *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *        pool
   * @param keepAliveTime when the number of threads is greater than
   *        the core, this is the maximum time that excess idle threads
   *        will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *        executed.  This queue will hold only the {@code Runnable}
   *        tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *        creates a new thread
   * @throws IllegalArgumentException if one of the following holds:<br>
   *         {@code corePoolSize < 0}<br>
   *         {@code keepAliveTime < 0}<br>
   *         {@code maximumPoolSize <= 0}<br>
   *         {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue}
   *         or {@code threadFactory} is null
   */
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory) {
      this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
           threadFactory, defaultHandler);
  }

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters and default thread factory.
   *
   * @param corePoolSize the number of threads to keep in the pool, even
   *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *        pool
   * @param keepAliveTime when the number of threads is greater than
   *        the core, this is the maximum time that excess idle threads
   *        will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *        executed.  This queue will hold only the {@code Runnable}
   *        tasks submitted by the {@code execute} method.
   * @param handler the handler to use when execution is blocked
   *        because the thread bounds and queue capacities are reached
   * @throws IllegalArgumentException if one of the following holds:<br>
   *         {@code corePoolSize < 0}<br>
   *         {@code keepAliveTime < 0}<br>
   *         {@code maximumPoolSize <= 0}<br>
   *         {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue}
   *         or {@code handler} is null
   */
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler) {
      this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
           Executors.defaultThreadFactory(), handler);
  }

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters.
   *
   * @param corePoolSize the number of threads to keep in the pool, even
   *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *        pool
   * @param keepAliveTime when the number of threads is greater than
   *        the core, this is the maximum time that excess idle threads
   *        will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *        executed.  This queue will hold only the {@code Runnable}
   *        tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *        creates a new thread
   * @param handler the handler to use when execution is blocked
   *        because the thread bounds and queue capacities are reached
   * @throws IllegalArgumentException if one of the following holds:<br>
   *         {@code corePoolSize < 0}<br>
   *         {@code keepAliveTime < 0}<br>
   *         {@code maximumPoolSize <= 0}<br>
   *         {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue}
   *         or {@code threadFactory} or {@code handler} is null
   */
  public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
      if (corePoolSize < 0 ||
          maximumPoolSize <= 0 ||
          maximumPoolSize < corePoolSize ||
          keepAliveTime < 0)
          throw new IllegalArgumentException();
      if (workQueue == null || threadFactory == null || handler == null)
          throw new NullPointerException();
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.workQueue = workQueue;
      this.keepAliveTime = unit.toNanos(keepAliveTime);
      this.threadFactory = threadFactory;
      this.handler = handler;
  }
}

(1).构造ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

参数名 作用
corePoolSize 核心线程池大小  
maximumPoolSize 最大线程池大小  
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列(ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,SynchronousQueue,LinkedTransferQueue和LinkedBlockingDeque等)
threadFactory 新建线程工厂(DefaultThreadFactory,PrivilegedThreadFactory)
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理,AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy
  • corePoolSize,maximumPoolSize,workQueue之间关系。

    • 1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
    • 2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
    • 3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
    • 4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
    • 5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
    • 6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
  • RejectedExecutionHandler 四种拒绝策略说明

    • 1.AbortPolicy 抛出RejectedExecutionException的被拒绝任务的处理程序。
    • 2.CallerRunsPolicy 直接在执行方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下任务将被丢弃。
    • 3.DiscardOledestPolicy 丢弃最旧的未处理的请求,然后重试{execute},除非执行程序关闭,在这种情况下任务将被丢弃。
    • 4.DiscardPolicy 默默的丢弃无法处理的任务,不予任何处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
    * A handler for rejected tasks that runs the rejected task
    * directly in the calling thread of the {@code execute} method,
    * unless the executor has been shut down, in which case the task
    * is discarded.
    */
   public static class CallerRunsPolicy implements RejectedExecutionHandler {
       /**
        * Creates a {@code CallerRunsPolicy}.
        */
       public CallerRunsPolicy() { }

       /**
        * Executes task r in the caller's thread, unless the executor
        * has been shut down, in which case the task is discarded.
        *
        * @param r the runnable task requested to be executed
        * @param e the executor attempting to execute this task
        */
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               r.run();
           }
       }
   }

   /**
    * A handler for rejected tasks that throws a
    * {@code RejectedExecutionException}.
    */
   public static class AbortPolicy implements RejectedExecutionHandler {
       /**
        * Creates an {@code AbortPolicy}.
        */
       public AbortPolicy() { }

       /**
        * Always throws RejectedExecutionException.
        *
        * @param r the runnable task requested to be executed
        * @param e the executor attempting to execute this task
        * @throws RejectedExecutionException always.
        */
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           throw new RejectedExecutionException("Task " + r.toString() +
                                                " rejected from " +
                                                e.toString());
       }
   }

   /**
    * A handler for rejected tasks that silently discards the
    * rejected task.
    */
   public static class DiscardPolicy implements RejectedExecutionHandler {
       /**
        * Creates a {@code DiscardPolicy}.
        */
       public DiscardPolicy() { }

       /**
        * Does nothing, which has the effect of discarding task r.
        *
        * @param r the runnable task requested to be executed
        * @param e the executor attempting to execute this task
        */
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
       }
   }

   /**
    * A handler for rejected tasks that discards the oldest unhandled
    * request and then retries {@code execute}, unless the executor
    * is shut down, in which case the task is discarded.
    */
   public static class DiscardOldestPolicy implements RejectedExecutionHandler {
       /**
        * Creates a {@code DiscardOldestPolicy} for the given executor.
        */
       public DiscardOldestPolicy() { }

       /**
        * Obtains and ignores the next task that the executor
        * would otherwise execute, if one is immediately available,
        * and then retries execution of task r, unless the executor
        * is shut down, in which case task r is instead discarded.
        *
        * @param r the runnable task requested to be executed
        * @param e the executor attempting to execute this task
        */
       public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               e.getQueue().poll();
               e.execute(r);
           }
       }
   }
  • threadFactory 默认调用 Executors.defaultThreadFactory()
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    
        /**
         * The default thread factory
         */
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1); // 统计连接池数量
            private final ThreadGroup group;  
            private final AtomicInteger threadNumber = new AtomicInteger(1); //线程数量
            private final String namePrefix;
        
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
        
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    

(2).Worker 工作线程实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     * Class Worker主要维护执行任务的线程的中断控制状态,以及其他小规模的簿记。
     * 该类机会性地扩展AbstractQueuedSynchronizer以简化每个任务执行周围的获取和释放锁。
     * 这可以防止打算唤醒等待任务的工作线程的中断,而不是中断正在运行的任务。
     * 我们实现了一个简单的非重入互斥锁,而不是使用ReentrantLock,
     * 因为我们不希望工作任务在调用像setCorePoolSize这样的池控制方法时能够重新获取锁。
     * 此外,为了抑制中断,直到线程真正开始运行任务,我们将锁定状态初始化为负值,并在启动时(在runWorker中)清除它。
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        //运行当前worker的线程。只有线程在被创建时才能设置到这个域上,行为类似final域。
        final Thread thread;
        /** Initial task to run.  Possibly null. */
         //在Worker进入主循环后首先要执行的初始任务,可能为null。
        Runnable firstTask;
        /** Per-thread task counter */
       // 记录单个线程完成的任务数量,当前worker终止时会累计到线程池中的总完成任务数量上。
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    
    
   /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     * 主要任务线程运行循环。反复从队列中获取任务并执行它们,同时处理一些问题:
     * 
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 我们可能会从最初的任务开始,在这种情况下,我们不需要获得第一个任务。
     * 否则,只要池正在运行,我们就从getTask获得任务。
     * 如果它返回null,则由于更改池状态或配置参数而导致worker退出。
     * 其他退出的结果是在外部代码中抛出的异常,在这种情况下completeAbruptly成立,
     * 这通常会导致processWorkerExit来取代这个线程。
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and
     * clearInterruptsForTaskRun called to ensure that unless pool is
     * stopping, this thread does not have its interrupt set.
     * 在运行任何任务之前,获取锁以防止任务执行期间出现其他线程池中断,
     * 并调用clearInterruptsForTaskRun确保除非线程池正在停止,否则此线程没有设置其中断。
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 每个任务运行之前都会调用beforeExecute,这可能会引发一个异常,
     * 在这种情况下,我们会导致线程死亡(断开循环completeAbruptly为true),而不处理任务。
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to
     * afterExecute. We separately handle RuntimeException, Error
     * (both of which the specs guarantee that we trap) and arbitrary
     * Throwables.  Because we cannot rethrow Throwables within
     * Runnable.run, we wrap them within Errors on the way out (to the
     * thread's UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     * 假设beforeExecute正常完成,我们运行任务,收集任何抛出的异常发送到afterExecute。
     * 我们分别处理RuntimeException,Error(这两个规范保证我们陷阱)和任意的Throwables。
     * 因为我们不能在Runnable.run中重新抛出Throwables,所以我们把它们包裹在错误中(到线程的UncaughtExceptionHandler)。
     * 任何抛出的异常也保守地导致线程死亡。
     * 
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     * task.run完成后,我们调用afterExecute,这也可能会抛出一个异常,
     * 这也会导致线程死亡。根据JLS Sec 14.20,即使task.run抛出,这个异常也是有效的。
     * 
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     * 异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有关于用户代码遇到的任何问题的准确信息。
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果池停止,确保线程被中断;如果没有,确保线程没有被中断。
                //这需要在第二种情况下进行重新检查,以便在清除中断时处理shutdownNow竞争
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//执行前钩子
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown); //执行后钩子
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
 /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 根据当前的配置设置,执行阻塞或定时等待任务,或者如果由于以下任一情况而必须退出,则返回null:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     *    有超过maximumPoolSize工作线程(由于调用setMaximumPoolSize)。
     * 2. The pool is stopped.
     *    线程池停止工作
     * 3. The pool is shutdown and the queue is empty.
     *    线程池是关闭的,队列是空的。
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *    此工作者超时等待任务,超时工作线程在定时等待之前和之后都会被终止(即,allowCoreThreadTimeOut || workerCount> corePoolSize})。
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out? 做最后一个 poll()超时?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取线程池 runState

            // Check if queue empty only if necessary.  检查队列是否为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount(); // ctl 减1
                return null;
            }

            boolean timed;      // Are workers subject to culling? 工作线程会被淘汰吗?

            for (;;) {
                int wc = workerCountOf(c); // 获取线程池中正在运行线程数量
                timed = allowCoreThreadTimeOut || wc > corePoolSize; // allowCoreThreadTimeOut 为false时 大于核心线程数量线程的空闲线程,超出设置keepAliveTime等待超时时间。线程被淘汰
                                                                     // allowCoreThreadTimeOut 为true时 所有线程,超出设置等待超时时间,线程被淘汰。

                if (wc <= maximumPoolSize && ! (timedOut && timed)) // 满足这个要求,就要获取任务
                    break;
                if (compareAndDecrementWorkerCount(c)) // 线程要被淘汰 CAS ctl减1 减少线程数量
                    return null;
                c = ctl.get();  // Re-read ctl 重新获取ctl值 在做判断,判断runStateOf 不一致,说明线程池runStateOf被修改,重新循环retry点
                if (runStateOf(c) != rs) 
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take(); //获取任务
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }    
 
    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *  工作线程退出处理
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);// 工作线程集合删除
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    
    /**
       * Transitions to TERMINATED state if either (SHUTDOWN and pool
       * and queue empty) or (STOP and pool empty).  If otherwise
       * eligible to terminate but workerCount is nonzero, interrupts an
       * idle worker to ensure that shutdown signals propagate. This
       * method must be called following any action that might make
       * termination possible -- reducing worker count or removing tasks
       * from the queue during shutdown. The method is non-private to
       * allow access from ScheduledThreadPoolExecutor.
       * 如果(SHUTDOWN和池和队列为空)或(STOP和池为空)转换到TERMINATED状态。
       * 如果否则有资格终止,但workerCount不为零,将中断一个空闲的工作,以确保关闭信号传播。
       * 必须在执行任何可能会终止的操作之后调用此方法 - 在关闭期间减少工作人员数量或从队列中删除任务。该方法是非专用的,
       * 允许从ScheduledThreadPoolExecutor进行访问。
       */
      final void tryTerminate() {
          for (;;) {
              int c = ctl.get();
              if (isRunning(c) ||
                  runStateAtLeast(c, TIDYING) ||
                  (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                  return;
              if (workerCountOf(c) != 0) { // Eligible to terminate
                  interruptIdleWorkers(ONLY_ONE);
                  return;
              }
  
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                      try {
                          terminated();
                      } finally {
                          ctl.set(ctlOf(TERMINATED, 0));
                          termination.signalAll();
                      }
                      return;
                  }
              } finally {
                  mainLock.unlock();
              }
              // else retry on failed CAS
          }
      }
      
 /**
      * Interrupts threads that might be waiting for tasks (as
      * indicated by not being locked) so they can check for
      * termination or configuration changes. Ignores
      * SecurityExceptions (in which case some threads may remain
      * uninterrupted).
      *
      * @param onlyOne If true, interrupt at most one worker. This is
      * called only from tryTerminate when termination is otherwise
      * enabled but there are still other workers.  In this case, at
      * most one waiting worker is interrupted to propagate shutdown
      * signals in case all threads are currently waiting.
      * Interrupting any arbitrary thread ensures that newly arriving
      * workers since shutdown began will also eventually exit.
      * To guarantee eventual termination, it suffices to always
      * interrupt only one idle worker, but shutdown() interrupts all
      * idle workers so that redundant workers exit promptly, not
      * waiting for a straggler task to finish.
      */
     private void interruptIdleWorkers(boolean onlyOne) {
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             for (Worker w : workers) {
                 Thread t = w.thread;
                 if (!t.isInterrupted() && w.tryLock()) {
                     try {
                         t.interrupt();
                     } catch (SecurityException ignore) {
                     } finally {
                         w.unlock();
                     }
                 }
                 if (onlyOne)
                     break;
             }
         } finally {
             mainLock.unlock();
         }
     }
 
     /**
      * Common form of interruptIdleWorkers, to avoid having to
      * remember what the boolean argument means.
      */
     private void interruptIdleWorkers() {
         interruptIdleWorkers(false);
     }
 
     private static final boolean ONLY_ONE = true;     
   

二 、execute(Runnable command) 执行给定的任务

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *  在将来执行给定的任务。任务可以在新的线程或现有的池中执行。
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果任务不能被提交执行,或者因为这个执行器已经关闭或者因为它的容量已经到达,
     * 那么任务由当前的RejectedExecutionHandler来处理。
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *  
         *  如果少于corePoolSize线程正在运行,请尝试使用给定命令启动一个新线程作为其第一个任务。
         *  对addWorker的调用以原子方式检查runState和workerCount,从而防止在不应该的时候通过返回false来添加线程的错误警报。
         *  
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 
         *  如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有的线程自上次检查以来已经死掉了)
         *  或者在进入此方法后关闭了这个线程池。所以我们重新检查状态,如果有必要的话,退出排队,如果没有的话,开始一个新的线程。
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 
         * 如果我们不能排队任务,那么我们尝试添加一个新的线程。
         * 如果失败,我们知道我们正在关闭或饱和,所以拒绝任务。
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//offer不堵塞,
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command);   //拒绝任务
            else if (workerCountOf(recheck) == 0) //分两种情况:1.未设置core线程数;2.设置core线程数,也是设置allowCoreThreadTimeOut为TURE
                addWorker(null, false);      //创建工作线程
        }
        else if (!addWorker(command, false))
            reject(command);            // 拒绝任务
    }
    

   /**
    * Checks if a new worker can be added with respect to current
    * pool state and the given bound (either core or maximum). If so,
    * the worker count is adjusted accordingly, and, if possible, a
    * new worker is created and started, running firstTask as its
    * first task. This method returns false if the pool is stopped or
    * eligible to shut down. It also returns false if the thread
    * factory fails to create a thread when asked.  If the thread
    * creation fails, either due to the thread factory returning
    * null, or due to an exception (typically OutOfMemoryError in
    * Thread#start), we roll back cleanly.
    * 
    * 检查是否可以根据当前池状态和给定的界限(核心或最大)添加新的工作者。
    * 如果是这样的话,工作线程数量会相应地调整,如果可能的话,创建一个新工作线程并开始运行第一个任务。
    * 如果线程池停止或有资格关闭,则此方法返回false。
    * 如果线程工厂在被询问时未能创建线程,它也返回false。如果线程创建失败,或者由于线程工厂返回null,或者由于异常
    * (通常Thread#start中的OutOfMemoryError),我们会回滚清楚干净。
    *
    *
    * @param firstTask the task the new thread should run first (or
    * null if none). Workers are created with an initial first task
    * (in method execute()) to bypass queuing when there are fewer
    * than corePoolSize threads (in which case we always start one),
    * or when the queue is full (in which case we must bypass queue).
    * Initially idle threads are usually created via
    * prestartCoreThread or to replace other dying workers.
    *
    * @param core if true use corePoolSize as bound, else
    * maximumPoolSize. (A boolean indicator is used here rather than a
    * value to ensure reads of fresh values after checking other pool
    * state).
    * @return true if successful
    */
   private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

           // Check if queue empty only if necessary. 检查队列是否为空。
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
               int wc = workerCountOf(c); // 获取工作线程数量
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize)) //工作线程数量大于最大容量,core为ture 判断核心线程 为false,判断设置最大线程数量
                   return false;
               if (compareAndIncrementWorkerCount(c))  // 添加工作线程数量 CAS 添加成功终止for循环,失败,重新for循环
                   break retry;
               c = ctl.get();  // Re-read ctl 
               if (runStateOf(c) != rs) // 判断线程池状态是否更改
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           final ReentrantLock mainLock = this.mainLock;
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if
                   // shut down before lock acquired.
                   int c = ctl.get();
                   int rs = runStateOf(c);

                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();
               }
               if (workerAdded) {
                   t.start(); //启动线程
                   workerStarted = true;
               }
           }
       } finally {
           if (! workerStarted) // 添加失败回滚工作线程创建
               addWorkerFailed(w);
       }
       return workerStarted;
   }     

   /**
    * Rolls back the worker thread creation.
    * 回滚工作线程创建。
    * - removes worker from workers, if present
    * - decrements worker count
    * - rechecks for termination, in case the existence of this
    *   worker was holding up termination
    */
   private void addWorkerFailed(Worker w) {
       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           if (w != null)
               workers.remove(w);
           decrementWorkerCount();
           tryTerminate();
       } finally {
           mainLock.unlock();
       }
   }
    /**
     * Removes this task from the executor's internal queue if it is
     * present, thus causing it not to be run if it has not already
     * started.
     *
     * <p> This method may be useful as one part of a cancellation
     * scheme.  It may fail to remove tasks that have been converted
     * into other forms before being placed on the internal queue. For
     * example, a task entered using {@code submit} might be
     * converted into a form that maintains {@code Future} status.
     * However, in such cases, method {@link #purge} may be used to
     * remove those Futures that have been cancelled.
     *
     * @param task the task to remove
     * @return true if the task was removed
     */
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }   

三 、submit 提交指定的任务

提交任务有三个重载方法

  • Future submit(Runnable task, T result); 提交一个Runnable任务和返回结果,并返回一个Future。
  • Future<?> submit(Runnable task); 提交一个Runnable任务,并返回一个Future。
  • Future submit(Callable task); 提交一个有返回结果的任务,并返回一个Future。

这三个方法AbstractExecutorService抽象类实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
 /**
     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
     * value.
     *
     * @param runnable the runnable task being wrapped
     * @param value the default value for the returned future
     * @return a <tt>RunnableFuture</tt> which when run will run the
     * underlying runnable and which, as a <tt>Future</tt>, will yield
     * the given value as its result and provide for cancellation of
     * the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    /**
     * Returns a <tt>RunnableFuture</tt> for the given callable task.
     *
     * @param callable the callable task being wrapped
     * @return a <tt>RunnableFuture</tt> which when run will call the
     * underlying callable and which, as a <tt>Future</tt>, will yield
     * the callable's result as its result and provide for
     * cancellation of the underlying task.
     * @since 1.6
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
   

最终都是创建FutureTask,执行指定任务。

四 、invokeAll和invokeAny执行批量任务

invokeAll和invokeAny方法是由AbstractExecutorService抽象类实现

1. invokeAll

  • List<Future> invokeAll(Collection<? extends Callable> tasks) 执行一批任务,当所有任务都执行完毕后,以Future集合 集合的形式返回结果。
  • List<Future> invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit) 执行一批任务,当所有任务都执行完毕或者超时, 以Future集合集合的形式返回结果。注意如果超时的话,没有执行完的任务会 被取消。

AbstractExecutorService源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) { //循环处理submit 提交任务
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get(); // 一直等待处理
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
   public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS); //设置超时处理
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }    

2. invokeAny

  • T invokeAny(Collection<? extends Callable> tasks) 执行一批任务,如果其中有一个任务完成,就返回结果。 其他没有完成的任务会被取消。
  • T invokeAny(Collection<? extends Callable> tasks,long timeout, TimeUnit unit) 执行一批任务,如果其中有一个任务完成或者超时,就返回结果。 其他没有完成的任务会被取消。

AbstractExecutorService源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

 /**
  * the main mechanics of invokeAny.
  */
 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                         boolean timed, long nanos)
     throws InterruptedException, ExecutionException, TimeoutException {
     if (tasks == null)
         throw new NullPointerException();
     int ntasks = tasks.size();
     if (ntasks == 0)
         throw new IllegalArgumentException();
     List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
     ExecutorCompletionService<T> ecs =
         new ExecutorCompletionService<T>(this);

     // For efficiency, especially in executors with limited
     // parallelism, check to see if previously submitted tasks are
     // done before submitting more of them. This interleaving
     // plus the exception mechanics account for messiness of main
     // loop.
      // 处于性能考虑,尤其在并行有限的情况下(比如单核处理器),
     // 下面的程序会放一个任务到ecs,然后查看这个任务是否完成,
     // 如果没完成,再放一个。。。(而不是一起放进去)。
     // 这个过程还会记录异常,如果都没有执行成功,会抛出最后
     // 记录的异常。最后会把没执行完的任务取消掉。

     try {
         // Record exceptions so that if we fail to obtain any
         // result, we can throw the last exception we got.
         ExecutionException ee = null;
         long lastTime = timed ? System.nanoTime() : 0;
         Iterator<? extends Callable<T>> it = tasks.iterator();

         // Start one task for sure; the rest incrementally
         futures.add(ecs.submit(it.next()));
         --ntasks;
         int active = 1;

         for (;;) {
             Future<T> f = ecs.poll();
             if (f == null) {
                 if (ntasks > 0) {
                     --ntasks;
                     futures.add(ecs.submit(it.next()));
                     ++active;
                 }
                 else if (active == 0)
                     break;
                 else if (timed) {
                     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                     if (f == null)
                         throw new TimeoutException();
                     long now = System.nanoTime();
                     nanos -= now - lastTime;
                     lastTime = now;
                 }
                 else
                     f = ecs.take();
             }
             if (f != null) {
                 --active;
                 try {
                     return f.get();
                 } catch (ExecutionException eex) {
                     ee = eex;
                 } catch (RuntimeException rex) {
                     ee = new ExecutionException(rex);
                 }
             }
         }

         if (ee == null)
             ee = new ExecutionException();
         throw ee;

     } finally {
         for (Future<T> f : futures)
             f.cancel(true);
     }
 }    

停止任务 shutdown和shutdownNow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *  启动先前提交的任务被执行的有序关闭,但不接受新的任务。如果已关闭,则调用没有其他影响。
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor onShutdown钩子
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    /**
     * If there is a security manager, makes sure caller has
     * permission to shut down threads in general (see shutdownPerm).
     * If this passes, additionally makes sure the caller is allowed
     * to interrupt each worker thread. This might not be true even if
     * first check passed, if the SecurityManager treats some threads
     * specially.
     * 如果有安全管理器,请确保调用者有权关闭一般的线程(请参阅shutdownPerm)。
     * 如果通过,另外确保调用者被允许中断每个工作者线程。即使第一次检查通过,这可能不是真的,如果SecurityManager专门处理一些线程。
     */
    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }
    /**
     * Transitions runState to given target, or leaves it alone if
     * already at least the given target.
     *
     * @param targetState the desired state, either SHUTDOWN or STOP
     *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
     */
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }    
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 尝试停止所有正在执行的任务,停止等待任务的处理,并返回正在等待执行的任务的列表。从这个方法返回后,这些任务从任务队列中排出(移除)。
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    
    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }    
    /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }