ScheduledThreadPoolExecutor源码分析

简介

ScheduledThreadPoolExecutor是一种类似Timer的定时器或者说是调度器,和Timer比起来主要有几点好处:

  • 1.多线程的定时调度,timer是单线程的,每个timer实例只有一个工作线程。
  • 2.由于继承自ThreadPoolExecutor,更具有灵活性和伸缩性。
  • 3.没有timer那种线程泄露问题,timer调度的任务如果异常终止,那么整个timer都会被取消,无法执行其他任务

一、ScheduledThreadPoolExecutor 内部结构实现

ScheduledThreadPoolExecutor实现ThreadPoolExecutor,继承ThreadPoolExecutor接口

1.ThreadPoolExecutor接口说明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ScheduledExecutorService extends ExecutorService {
    /**
     * 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。
     */
    public ScheduledFuture<?> schedule(Runnable command,
                    long delay, TimeUnit unit);
    /**
     * 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                    long delay, TimeUnit unit);
    /**
     * 创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一
     * 次被执行,然后会以给定的周期时间执行。
     * 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。
     * 如果某次执行时长超过了周期时间,那么下一次任务会延迟启动,不会和当前
     * 任务并行执行。
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                       long initialDelay,
                       long period,
                       TimeUnit unit);
    /**
     * 创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一
     * 次被执行,接下来的任务会在上次任务执行完毕后,延迟给定的时间,
     * 然后再继续执行。
     * 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                          long initialDelay,
                          long delay,
                          TimeUnit unit);
}

2.ThreadPoolExecutor构造

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    /*
     * This class specializes ThreadPoolExecutor implementation by
     * 这个类专门用ThreadPoolExecutor实现
     *
     * 1. Using a custom task type, ScheduledFutureTask for
     *    tasks, even those that don't require scheduling (i.e.,
     *    those submitted using ExecutorService execute, not
     *    ScheduledExecutorService methods) which are treated as
     *    delayed tasks with a delay of zero.
     *    使用自定义任务类型ScheduledFutureTask执行任务,
     *    即使那些不需要调度的任务(即,使用ExecutorService执行,而不是ScheduledExecutorService方法提交的任务)被视为延迟为零的延迟任务。
     *
     * 2. Using a custom queue (DelayedWorkQueue), a variant of
     *    unbounded DelayQueue. The lack of capacity constraint and
     *    the fact that corePoolSize and maximumPoolSize are
     *    effectively identical simplifies some execution mechanics
     *    (see delayedExecute) compared to ThreadPoolExecutor.
     *    使用自定义队列(DelayedWorkQueue),一个无界DelayQueue的变种。
     *    容量约束的缺乏以及corePoolSize和maximumPoolSize实际上相同这一事实,与ThreadPoolExecutor相比,简化了一些执行机制(请参见delayedExecute)。
     *
     * 3. Supporting optional run-after-shutdown parameters, which
     *    leads to overrides of shutdown methods to remove and cancel
     *    tasks that should NOT be run after shutdown, as well as
     *    different recheck logic when task (re)submission overlaps
     *    with a shutdown.
     *    支持可选的运行后关闭参数,这会导致覆盖关闭方法来取消和取消不应在关闭后运行的任务,以及任务(重新)提交与关闭重叠时的不同重新检查逻辑。
     *
     * 4. Task decoration methods to allow interception and
     *    instrumentation, which are needed because subclasses cannot
     *    otherwise override submit methods to get this effect. These
     *    don't have any impact on pool control logic though.
     *    任务装饰方法允许拦截和检测,这是因为子类不能以其他方式覆盖提交方法来获得这种效果。这些对池控制逻辑没有任何影响。
     */

    /**
     * False if should cancel/suppress periodic tasks on shutdown.
     * 表示是否应该在关闭时取消或者终止周期性任务。
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * False if should cancel non-periodic tasks on shutdown.
     * 表示是否应该在关闭时取消非周期性任务。
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     * True if ScheduledFutureTask.cancel should remove from queue
     * 如果ScheduledFutureTask.cancel应该从队列中移除,则为真
     */
    private volatile boolean removeOnCancel = false;

    /**
     * Sequence number to break scheduling ties, and in turn to
     * guarantee FIFO order among tied entries.
     * 这个序列号的作用是在并列调度(延迟值一样)的情况下保证先入先出的关系。
     */
    private static final AtomicLong sequencer = new AtomicLong(0);
 /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} or
     *         {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
}    

3.workQueue 任务队列用DelayedWorkQueue

(1).DelayedWorkQueue 内部实现

DelayedWorkQueue实现AbstractQueue抽象类,继承BlockingQueue接口。DelayedWorkQueue只有默认构造方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
     /**
       * Specialized delay queue. To mesh with TPE declarations, this
       * class must be declared as a BlockingQueue<Runnable> even though
       * it can only hold RunnableScheduledFutures.
       * 专门的延迟队列。为了与TPE声明网格化,这个类必须声明为BlockingQueue <Runnable>,即使它只能保存RunnableScheduledFutures。
       */
      static class DelayedWorkQueue extends AbstractQueue<Runnable>
          implements BlockingQueue<Runnable> {
  
          /*
           * A DelayedWorkQueue is based on a heap-based data structure
           * like those in DelayQueue and PriorityQueue, except that
           * every ScheduledFutureTask also records its index into the
           * heap array. This eliminates the need to find a task upon
           * cancellation, greatly speeding up removal (down from O(n)
           * to O(log n)), and reducing garbage retention that would
           * otherwise occur by waiting for the element to rise to top
           * before clearing. But because the queue may also hold
           * RunnableScheduledFutures that are not ScheduledFutureTasks,
           * we are not guaranteed to have such indices available, in
           * which case we fall back to linear search. (We expect that
           * most tasks will not be decorated, and that the faster cases
           * will be much more common.)
           * 除了每个ScheduledFutureTask还将其索引记录到堆数组之外,
           * DelayedWorkQueue基于基于堆的数据结构(如DelayQueue和PriorityQueue中的数据结构)。
           * 这消除了取消任务时的查找任务的需要,大大加快了从O(n)到O(log n)的删除速度,
           * 减少了在清除之前等待元素上升到最高的垃圾留存量。
           * 但是因为队列也可能持有不是ScheduledFutureTasks的RunnableScheduledFutures,所以我们不能保证有这样的索引可用,
           * 在这种情况下,我们回到线性搜索。 (我们期望大多数任务不会被装饰,而且更快的情况会更普遍。)
           *
           * All heap operations must record index changes -- mainly
           * within siftUp and siftDown. Upon removal, a task's
           * heapIndex is set to -1. Note that ScheduledFutureTasks can
           * appear at most once in the queue (this need not be true for
           * other kinds of tasks or work queues), so are uniquely
           * identified by heapIndex.
           * 所有堆操作都必须记录索引更改 - 主要在siftUp和siftDown中。
           * 删除后,任务的heapIndex被设置为-1。请注意,ScheduledFutureTasks最多只能在队列中出现一次
           * (对于其他类型的任务或工作队列,这不一定是正确的),所以heapIndex是唯一标识的。
           */
  
          private static final int INITIAL_CAPACITY = 16;
          //RunnableScheduledFuture 数组保存任务
          private RunnableScheduledFuture[] queue =
              new RunnableScheduledFuture[INITIAL_CAPACITY];
          //重入锁
          private final ReentrantLock lock = new ReentrantLock();
          //大小
          private int size = 0;
  
          /**
           * Thread designated to wait for the task at the head of the
           * queue.  This variant of the Leader-Follower pattern
           * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
           * minimize unnecessary timed waiting.  When a thread becomes
           * the leader, it waits only for the next delay to elapse, but
           * other threads await indefinitely.  The leader thread must
           * signal some other thread before returning from take() or
           * poll(...), unless some other thread becomes leader in the
           * interim.  Whenever the head of the queue is replaced with a
           * task with an earlier expiration time, the leader field is
           * invalidated by being reset to null, and some waiting
           * thread, but not necessarily the current leader, is
           * signalled.  So waiting threads must be prepared to acquire
           * and lose leadership while waiting.
           * 线程被指定为等待队列头部的任务。 Leader-Follower模式的这种变体(http://www.cs.wustl.edu/~schmidt/POSA/POSA2/)用于最小化不必要的定时等待。
           * 当一个线程成为领导者时,它只等待下一个延迟,但其他线程则无限期地等待。领导者线程必须在从take()或poll(...)返回之前发出其他线程的信号,
           * 除非其他线程在过渡期间成为领导者。每当队列的头部被具有较早的到期时间的任务替换时,领导者字段通过被重置为空而被无效,
           * 并且一些等待线程(但不一定是当前领导者)被发送信号。所以等待的线程必须准备好在获得和失去领导的同时等待
           */
          private Thread leader = null;
  
          /**
           * Condition signalled when a newer task becomes available at the
           * head of the queue or a new thread may need to become leader.
           * 当一个新的任务在队列的头部变得可用或者一个新的线程可能需要成为领导者时,条件发出信号。
           */
          private final Condition available = lock.newCondition();
}          

(2).ScheduledFutureTask实现RunnableScheduledFuture接口实现

RunnableScheduledFuture接口继承关系图如下:

ScheduledFutureTask继承FutureTask类

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** Sequence number to break ties FIFO */
        //连接FIFO的序列号
        private final long sequenceNumber;

        /** The time the task is enabled to execute in nanoTime units */
        //任务可以执行的时间,单位纳秒
        private long time;

        /**
         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
         * 
         *  周期性任务的周期时间,单位纳秒。
         * 正数表示固定频率执行,负数表示固定延迟执行,0表示一次性任务。
         */
        private final long period;

        /** The actual task to be re-enqueued by reExecutePeriodic */
        //由reExecutePeriodic重新入队的实际任务
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * Index into delay queue, to support faster cancellation.
         */
        //索引进入延迟队列,支持更快的取消。
        int heapIndex;

        /**
         * Creates a one-shot action with given nanoTime-based trigger time.
         * 使用给定的基于nanoTime的触发时间创建一次性操作。
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a periodic action with given nano time and period.
         * 用给定的纳秒时间和周期创建一个周期性的动作。
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger.
         * 使用给定的基于nanoTime的触发器创建一次性操作。
         */
        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
        
        //延期时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
        
        //比较
        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

        /**
         * Returns true if this is a periodic (not a one-shot) action.
         * 如果这是一个周期性(而不是一次性)操作,则返回true。
         * @return true if periodic
         */
        public boolean isPeriodic() {
            return period != 0;
        }

        /**
         * Sets the next time to run for a periodic task.
         * 设置下一次运行周期性任务。
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);//延迟动作的触发时间
        }

        //取消
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         * 覆盖FutureTask版本,以便定期重置/重新排序。
         */
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask); 
            }
        }
    }
   /**
     * Requeues a periodic task unless current run state precludes it.
     * Same idea as delayedExecute except drops task rather than rejecting.
     *  除非当前运行状态排除它,否则重新执行一个周期性任务。
     *  与delayedExecute相同的想法,除了放弃任务而不是拒绝。
     * @param task the task
     */
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);//重新添加新任务
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }
    /**
     * Returns true if can run a task given current run state
     * and run-after-shutdown parameters.
     * 如果可以运行给定当前运行状态和run-after-shutdown参数的任务,则返回true。
     * @param periodic true if this task periodic, false if delayed
     */
    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }    
   /**
     * Returns the trigger time of a delayed action.
     * 返回延迟动作的触发时间。
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     * 返回延迟动作的触发时间。
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }
  /**
     * Constrains the values of all delays in the queue to be within
     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
     * This may occur if a task is eligible to be dequeued, but has
     * not yet been, while some other task is added with a delay of
     * Long.MAX_VALUE.
     * 将队列中所有延迟的值限制在对方的Long.MAX_VALUE范围内,
     * 以避免compareTo溢出。如果某个任务有资格离队,
     * 但尚未完成,而其他任务的延迟时间为Long.MAX_VALUE,则可能会发生这种情况
     */
    private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }    
 

(3).入队

  • offer(Runnable x)
  • offer(Runnable x, long timeout, TimeUnit unit) 不支持等待时间
  • put(Runnable e) 调用offer 不支持等待
  • add(Runnable e) 调用offer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
         public boolean offer(Runnable x) {
              if (x == null)
                  throw new NullPointerException();
              RunnableScheduledFuture e = (RunnableScheduledFuture)x;
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  int i = size;
                  if (i >= queue.length)
                      grow();
                  size = i + 1;
                  if (i == 0) {
                      queue[0] = e;
                      setIndex(e, 0);
                  } else {
                      siftUp(i, e);
                  }
                  if (queue[0] == e) {
                      leader = null;
                      available.signal();
                  }
              } finally {
                  lock.unlock();
              }
              return true;
          }
         /**
           * Resize the heap array.  Call only when holding lock.
           * 扩容
           */
          private void grow() {
              int oldCapacity = queue.length;
              int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
              if (newCapacity < 0) // overflow
                  newCapacity = Integer.MAX_VALUE;
              queue = Arrays.copyOf(queue, newCapacity);
          }
       /**
         * Set f's heapIndex if it is a ScheduledFutureTask.
         * 如果它是一个ScheduledFutureTask,则设置f的heapIndex。
         */
        private void setIndex(RunnableScheduledFuture f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }

        /**
         * Sift element added at bottom up to its heap-ordered spot.
         * Call only when holding lock.
         * 在底部添加筛选元素,直到堆排序的位置。只有在保持锁定时才能call
         */
        private void siftUp(int k, RunnableScheduledFuture key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }
       
  
          public void put(Runnable e) {
              offer(e);
          }
  
          public boolean add(Runnable e) {
              return offer(e);
          }
  
          public boolean offer(Runnable e, long timeout, TimeUnit unit) {
              return offer(e);
          }

(3).出队

  1. take

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    
              public RunnableScheduledFuture take() throws InterruptedException {
                  final ReentrantLock lock = this.lock;
                  lock.lockInterruptibly();
                  try {
                      for (;;) {
                          RunnableScheduledFuture first = queue[0];
                          if (first == null)
                              available.await();  //等待
                          else {
                              long delay = first.getDelay(TimeUnit.NANOSECONDS);
                              if (delay <= 0)
                                  return finishPoll(first);
                              else if (leader != null)
                                  available.await();
                              else {
                                  Thread thisThread = Thread.currentThread();
                                  leader = thisThread;
                                  try {
                                      available.awaitNanos(delay);
                                  } finally {
                                      if (leader == thisThread)
                                          leader = null;
                                  }
                              }
                          }
                      }
                  } finally {
                      if (leader == null && queue[0] != null)
                          available.signal();
                      lock.unlock();
                  }
              }
            /**
             * Performs common bookkeeping for poll and take: Replaces
             * first element with last and sifts it down.  Call only when
             * holding lock.
             * @param f the task to remove and return
             */
            private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
                int s = --size;
                RunnableScheduledFuture x = queue[s];
                queue[s] = null;
                if (s != 0)
                    siftDown(0, x);
                setIndex(f, -1);
                return f;
            }
           /**
             * Sift element added at top down to its heap-ordered spot.
             * Call only when holding lock.
             */
            private void siftDown(int k, RunnableScheduledFuture key) {
                int half = size >>> 1;
                while (k < half) {
                    int child = (k << 1) + 1;// left
                    RunnableScheduledFuture c = queue[child];
                    int right = child + 1;   //right
                    if (right < size && c.compareTo(queue[right]) > 0)
                        c = queue[child = right];
                    if (key.compareTo(c) <= 0)
                        break;
                    queue[k] = c;
                    setIndex(c, k);
                    k = child;
                }
                queue[k] = key;
                setIndex(key, k);
            }        
    
  2. poll

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    
            public RunnableScheduledFuture poll() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first = queue[0];
                    if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                        return null;
                    else
                        return finishPoll(first);
                } finally {
                    lock.unlock();
                }
            }
            public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
                throws InterruptedException {
                long nanos = unit.toNanos(timeout);
                final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();
                try {
                    for (;;) {
                        RunnableScheduledFuture first = queue[0];
                        if (first == null) {
                            if (nanos <= 0)
                                return null;
                            else
                                nanos = available.awaitNanos(nanos);
                        } else {
                            long delay = first.getDelay(TimeUnit.NANOSECONDS);
                            if (delay <= 0) //延期
                                return finishPoll(first);
                            if (nanos <= 0)
                                return null;
                            if (nanos < delay || leader != null)
                                nanos = available.awaitNanos(nanos);
                            else {
                                Thread thisThread = Thread.currentThread();
                                leader = thisThread;
                                try {
                                    long timeLeft = available.awaitNanos(delay);
                                    nanos -= delay - timeLeft;
                                } finally {
                                    if (leader == thisThread)
                                        leader = null;
                                }
                            }
                        }
                    }
                } finally {
                    if (leader == null && queue[0] != null)
                        available.signal();
                    lock.unlock();
                }
            }        
    

(3).移除

  1. remove

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    
                   public boolean remove(Object x) {
                       final ReentrantLock lock = this.lock;
                       lock.lock();
                       try {
                           int i = indexOf(x);
                           if (i < 0)
                               return false;
              
                           setIndex(queue[i], -1);
                           int s = --size;
                           RunnableScheduledFuture replacement = queue[s];
                           queue[s] = null;
                           if (s != i) {
                               siftDown(i, replacement); //往下 重组堆结构
                               if (queue[i] == replacement)
                                   siftUp(i, replacement); //往上 重组堆结构
                           }
                           return true;
                       } finally {
                           lock.unlock();
                       }
                   }
        /**
             * Find index of given object, or -1 if absent
             */
            private int indexOf(Object x) {
                if (x != null) {
                    if (x instanceof ScheduledFutureTask) {
                        int i = ((ScheduledFutureTask) x).heapIndex; //heapIndex 快速定位,删除
                        // Sanity check; x could conceivably be a
                        // ScheduledFutureTask from some other pool.
                        if (i >= 0 && i < size && queue[i] == x)
                            return i;
                    } else {
                        for (int i = 0; i < size; i++)
                            if (x.equals(queue[i]))
                                return i;
                    }
                }
                return -1;
            }               
    
  2. drainTo

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    
          /**
             * Return and remove first element only if it is expired.
             * Used only by drainTo.  Call only when holding lock.
             * 只有在第一个元素过期时才返回并移除。仅由drainTo使用。只有在保持锁定时才能call
             */
            private RunnableScheduledFuture pollExpired() {
                RunnableScheduledFuture first = queue[0];
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    return null;
                return finishPoll(first);
            }
       
            public int drainTo(Collection<? super Runnable> c) {
                if (c == null)
                    throw new NullPointerException();
                if (c == this)
                    throw new IllegalArgumentException();
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first;
                    int n = 0;
                    while ((first = pollExpired()) != null) {
                        c.add(first);
                        ++n;
                    }
                    return n;
                } finally {
                    lock.unlock();
                }
            }
      public int drainTo(Collection<? super Runnable> c, int maxElements) {
                if (c == null)
                    throw new NullPointerException();
                if (c == this)
                    throw new IllegalArgumentException();
                if (maxElements <= 0)
                    return 0;
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                    RunnableScheduledFuture first;
                    int n = 0;
                    while (n < maxElements && (first = pollExpired()) != null) {
                        c.add(first);
                        ++n;
                    }
                    return n;
                } finally {
                    lock.unlock();
                }
            }        
    

(4). Iterator 迭代器实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  /**
         * Snapshot iterator that works off copy of underlying q array.
         */
        private class Itr implements Iterator<Runnable> {
            final RunnableScheduledFuture[] array;
            int cursor = 0;     // index of next element to return
            int lastRet = -1;   // index of last element, or -1 if no such

            Itr(RunnableScheduledFuture[] array) {
                this.array = array;
            }

            public boolean hasNext() {
                return cursor < array.length;
            }

            public Runnable next() {
                if (cursor >= array.length)
                    throw new NoSuchElementException();
                lastRet = cursor;
                return array[cursor++];
            }

            public void remove() {
                if (lastRet < 0)
                    throw new IllegalStateException();
                DelayedWorkQueue.this.remove(array[lastRet]);
                lastRet = -1;
            }
        }

二、 schedule 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  /**
        * @throws RejectedExecutionException {@inheritDoc}
        * @throws NullPointerException       {@inheritDoc}
        */
       public ScheduledFuture<?> schedule(Runnable command,
                                          long delay,
                                          TimeUnit unit) {
           if (command == null || unit == null)
               throw new NullPointerException();
           RunnableScheduledFuture<?> t = decorateTask(command,
               new ScheduledFutureTask<Void>(command, null,
                                             triggerTime(delay, unit)));
           delayedExecute(t);
           return t;
       }
      /**
        * Modifies or replaces the task used to execute a runnable.
        * This method can be used to override the concrete
        * class used for managing internal tasks.
        * The default implementation simply returns the given task.
        *
        * @param runnable the submitted Runnable
        * @param task the task created to execute the runnable
        * @return a task that can execute the runnable
        * @since 1.6
        */
       protected <V> RunnableScheduledFuture<V> decorateTask(
           Runnable runnable, RunnableScheduledFuture<V> task) {
           return task;
       }
       /**
        * Main execution method for delayed or periodic tasks.  If pool
        * is shut down, rejects the task. Otherwise adds task to queue
        * and starts a thread, if necessary, to run it.  (We cannot
        * prestart the thread to run the task because the task (probably)
        * shouldn't be run yet,) If the pool is shut down while the task
        * is being added, cancel and remove it if required by state and
        * run-after-shutdown parameters.
        * 执行延迟或定期任务的主要方法。
        * 如果池被关闭,拒绝任务。
        * 否则,将任务添加到队列中,并在必要时启动一个线程来运行它。 
        * (我们不能预先启动线程来运行任务,因为任务(可能)不应该运行)。
        * 如果在添加任务时关闭了池,请取消并在需要时按状态进行删除并执行run-after-关闭参数。
        *
        * @param task the task
        */
       private void delayedExecute(RunnableScheduledFuture<?> task) {
           if (isShutdown())
               reject(task);
           else {
               super.getQueue().add(task);
               if (isShutdown() &&
                   !canRunInCurrentRunState(task.isPeriodic()) &&
                   remove(task))
                   task.cancel(false);
               else
                   ensurePrestart();
           }
       }
       /**
        * Same as prestartCoreThread except arranges that at least one
        * thread is started even if corePoolSize is 0.
        */
       void ensurePrestart() {
           int wc = workerCountOf(ctl.get());
           if (wc < corePoolSize)
               addWorker(null, true);
           else if (wc == 0)
               addWorker(null, false);
       }    

三、 scheduleAtFixedRate

  创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一次被执行,然后会以给定的周期时间执行。 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。如果某次执行时长超过了周期时间, 那么下一次任务会延迟启动,不会和当前 任务并行执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

四、 scheduleAtFixedRate

  创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一次被执行,接下来的任务会在上次任务执行完毕后,延迟给定的时间, 然后再继续执行。如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

五、execute 执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  /**
     * Executes {@code command} with zero required delay.
     * This has effect equivalent to
     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
     * Note that inspections of the queue and of the list returned by
     * {@code shutdownNow} will access the zero-delayed
     * {@link ScheduledFuture}, not the {@code command} itself.
     *
     * <p>A consequence of the use of {@code ScheduledFuture} objects is
     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
     * called with a null second {@code Throwable} argument, even if the
     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
     * thrown by such a task can be obtained via {@link Future#get}.
     *
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution because the
     *         executor has been shut down
     * @throws NullPointerException {@inheritDoc}
     */
    public void execute(Runnable command) {
        schedule(command, 0, TimeUnit.NANOSECONDS);
    }
  

六、submit 提交任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, TimeUnit.NANOSECONDS);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result),
                        0, TimeUnit.NANOSECONDS);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, TimeUnit.NANOSECONDS);
    }

七、停止任务 shutdown和shutdownNow

调用ThreadPoolExecutor