ThreadPoolExecutor
文章目录
【注意】最后更新于 April 25, 2017,文中内容可能已过时,请谨慎使用。
ScheduledThreadPoolExecutor源码分析
简介
ScheduledThreadPoolExecutor是一种类似Timer的定时器或者说是调度器,和Timer比起来主要有几点好处:
- 1.多线程的定时调度,timer是单线程的,每个timer实例只有一个工作线程。
- 2.由于继承自ThreadPoolExecutor,更具有灵活性和伸缩性。
- 3.没有timer那种线程泄露问题,timer调度的任务如果异常终止,那么整个timer都会被取消,无法执行其他任务
一、ScheduledThreadPoolExecutor 内部结构实现
ScheduledThreadPoolExecutor实现ThreadPoolExecutor,继承ThreadPoolExecutor接口
1.ThreadPoolExecutor接口说明
|
|
2.ThreadPoolExecutor构造
|
|
3.workQueue 任务队列用DelayedWorkQueue
(1).DelayedWorkQueue 内部实现
DelayedWorkQueue实现AbstractQueue抽象类,继承BlockingQueue接口。DelayedWorkQueue只有默认构造方法
|
|
(2).ScheduledFutureTask实现RunnableScheduledFuture接口实现
RunnableScheduledFuture接口继承关系图如下:
ScheduledFutureTask继承FutureTask类
|
|
(3).入队
- offer(Runnable x)
- offer(Runnable x, long timeout, TimeUnit unit) 不支持等待时间
- put(Runnable e) 调用offer 不支持等待
- add(Runnable e) 调用offer
|
|
(3).出队
-
take
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); //等待 else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return finishPoll(first); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } /** * Performs common bookkeeping for poll and take: Replaces * first element with last and sifts it down. Call only when * holding lock. * @param f the task to remove and return */ private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { int s = --size; RunnableScheduledFuture x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } /** * Sift element added at top down to its heap-ordered spot. * Call only when holding lock. */ private void siftDown(int k, RunnableScheduledFuture key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1;// left RunnableScheduledFuture c = queue[child]; int right = child + 1; //right if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
-
poll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } public RunnableScheduledFuture poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) //延期 return finishPoll(first); if (nanos <= 0) return null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
(3).移除
-
remove
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(x); if (i < 0) return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); //往下 重组堆结构 if (queue[i] == replacement) siftUp(i, replacement); //往上 重组堆结构 } return true; } finally { lock.unlock(); } } /** * Find index of given object, or -1 if absent */ private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; //heapIndex 快速定位,删除 // Sanity check; x could conceivably be a // ScheduledFutureTask from some other pool. if (i >= 0 && i < size && queue[i] == x) return i; } else { for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; }
-
drainTo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
/** * Return and remove first element only if it is expired. * Used only by drainTo. Call only when holding lock. * 只有在第一个元素过期时才返回并移除。仅由drainTo使用。只有在保持锁定时才能call */ private RunnableScheduledFuture pollExpired() { RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; return finishPoll(first); } public int drainTo(Collection<? super Runnable> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first; int n = 0; while ((first = pollExpired()) != null) { c.add(first); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection<? super Runnable> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first; int n = 0; while (n < maxElements && (first = pollExpired()) != null) { c.add(first); ++n; } return n; } finally { lock.unlock(); } }
(4). Iterator 迭代器实现
|
|
二、 schedule 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。
|
|
三、 scheduleAtFixedRate
创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一次被执行,然后会以给定的周期时间执行。 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。如果某次执行时长超过了周期时间, 那么下一次任务会延迟启动,不会和当前 任务并行执行。
|
|
四、 scheduleAtFixedRate
创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一次被执行,接下来的任务会在上次任务执行完毕后,延迟给定的时间, 然后再继续执行。如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。
|
|
五、execute 执行任务
|
|
六、submit 提交任务
|
|
七、停止任务 shutdown和shutdownNow
调用ThreadPoolExecutor