FutureTask源码分析

简介

一、FutureTask 内部实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class FutureTask<V> implements RunnableFuture<V> {
    /*
     * Revision notes: This differs from previous versions of this
     * class that relied on AbstractQueuedSynchronizer, mainly to
     * avoid surprising users about retaining interrupt status during
     * cancellation races. Sync control in the current design relies
     * on a "state" field updated via CAS to track completion, along
     * with a simple Treiber stack to hold waiting threads.
     * 修订注意事项:这与此类依赖AbstractQueuedSynchronizer的以前版本不同,主要是为了避免令人惊讶的用户在取消竞赛期间保留中断状态。
     * 当前设计中的同步控制依赖于通过CAS更新的"状态"字段来跟踪完成情况,以及一个简单的Treiber栈以保持等待线程。
     * Style note: As usual, we bypass overhead of using
     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     * 注意:像往常一样,我们绕过使用AtomicXFieldUpdaters的开销,而是直接使用不安全的内部函数。
     */

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *这个任务的运行状态,最初是NEW。运行状态仅在方法set,setException和cancel中转​​换为终端状态。
     * 在完成状态期间,状态可以具有COMPLETING(set结果)或者INTERRUPTING(运行中终止cancel(true))的瞬态值。
     * 从这些中间状态到最终状态的转换使用更便宜的ordered/lazy 写入,因为值是唯一的,不能进一步修改。
     * 
     * Possible state transitions:状态转换:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    //底层的调用
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    //结果值
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    //运行线程
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    //等待线程的Treiber堆栈
    private volatile WaitNode waiters;
    
       /**
         * Simple linked list nodes to record waiting threads in a Treiber
         * stack.  See other classes such as Phaser and SynchronousQueue
         * for more detailed explanation.
         * 简单链接列表节点记录在Treiber堆栈中的等待线程。
         */
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    
       /**
         * Creates a {@code FutureTask} that will, upon running, execute the
         * given {@code Callable}.
         *
         * @param  callable the callable task
         * @throws NullPointerException if the callable is null
         */
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
        /**
         * Creates a {@code FutureTask} that will, upon running, execute the
         * given {@code Runnable}, and arrange that {@code get} will return the
         * given result on successful completion.
         *
         * @param runnable the runnable task
         * @param result the result to return on successful completion. If
         * you don't need a particular result, consider using
         * constructions of the form:
         * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
         * @throws NullPointerException if the runnable is null
         */
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    
    
   // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}   
  • 继承RunnableFuture接口,如图下 RunnableFuture继承Runnable接口和Future

二、执行run方法

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
 public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread())) //状态不是NEW,说明已有线程执行,CAS更新runner当前线程失败,说明其他线程比当前线程早一步。
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);//调用执行方法,调用throw异常,状态变成COMPLETING->EXCEPTIONAL
                }
                if (ran)
                    set(result); //成功执行,状态变成COMPLETING->NORMAL
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            //runner必须在设置了state之后再置空,避免run方法出现并发问题。
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            //这里还必须再读一次state,避免丢失中断。
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

  /**
     * Causes this future to report an {@link ExecutionException}
     * with the given throwable as its cause, unless this future has
     * already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon failure of the computation.
     *
     * @param t the cause of failure
     */
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    } 

  /**
   * Sets the result of this future to the given value unless
   * this future has already been set or has been cancelled.
   * 将这个未来的结果设置为给定值,除非这个未来已经被设置或被取消。
   * <p>This method is invoked internally by the {@link #run} method
   * upon successful completion of the computation.
   *
   * @param v the value
   */
  protected void set(V v) {
      if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = v;
          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
          finishCompletion();
      }
  }    

  /**
   * Removes and signals all waiting threads, invokes done(), and
   * nulls out callable.
   * 移除并发出所有等待的线程,调用done(),并取消callable。
   */
  private void finishCompletion() {
      // assert state > COMPLETING;
      for (WaitNode q; (q = waiters) != null;) {
          if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
              for (;;) {
                  Thread t = q.thread;
                  if (t != null) {
                      q.thread = null;
                      LockSupport.unpark(t);
                  }
                  WaitNode next = q.next;
                  if (next == null)
                      break;
                  q.next = null; // unlink to help gc
                  q = next;
              }
              break;
          }
      }

      done();

      callable = null;        // to reduce footprint
  }

  /**
   * Protected method invoked when this task transitions to state
   * {@code isDone} (whether normally or via cancellation). The
   * default implementation does nothing.  Subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. Note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   *  当此任务转换为状态{@code isDone}时调用受保护的方法(通常或通过取消)。
   *  默认实现什么都不做。子类可以重写此方法来调用完成回调或执行簿记。
   *  请注意,您可以查询此方法执行内的状态,以确定此任务是否已被取消。
   */
  protected void done() { }  
  
      /**
       * Ensures that any interrupt from a possible cancel(true) is only
       * delivered to a task while in run or runAndReset.
       */
      private void handlePossibleCancellationInterrupt(int s) {
          // It is possible for our interrupter to stall before getting a
          // chance to interrupt us.  Let's spin-wait patiently.
          if (s == INTERRUPTING)
              while (state == INTERRUPTING)
                  Thread.yield(); // wait out pending interrupt
  
          // assert state == INTERRUPTED;
  
          // We want to clear any interrupt we may have received from
          // cancel(true).  However, it is permissible to use interrupts
          // as an independent mechanism for a task to communicate with
          // its caller, and there is no way to clear only the
          // cancellation interrupt.
          //
          // Thread.interrupted();
      }
    /**
     * Executes the computation without setting its result, and then
     * resets this future to initial state, failing to do so if the
     * computation encounters an exception or is cancelled.  This is
     * designed for use with tasks that intrinsically execute more
     * than once.
     *  执行计算而不设置其结果,然后将此未来重置为初始状态,如果计算遇到异常或被取消,
     *  则不执行此操作。这是设计用于与本质上执行多次的任务。
     * @return true if successfully run and reset
     */
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }      

二、get 获取返回结果值

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
   /**
     * Awaits completion or aborts on interrupt or timeout.
     * 在中断或超时等待完成或中止。
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); //拼接等待线程
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    /**
     * Tries to unlink a timed-out or interrupted wait node to avoid
     * accumulating garbage.  Internal nodes are simply unspliced
     * without CAS since it is harmless if they are traversed anyway
     * by releasers.  To avoid effects of unsplicing from already
     * removed nodes, the list is retraversed in case of an apparent
     * race.  This is slow when there are a lot of nodes, but we don't
     * expect lists to be long enough to outweigh higher-overhead
     * schemes.
     * 尝试解除超时或中断等待节点的链接,以避免累积垃圾。
     * 内部节点没有CAS就简单地拼接,因为如果它们被释放者遍历,它是无害的。
     * 为了避免已经移除的节点的影响,列表在显式竞赛的情况下被倒退。有很多节点的时候,
     * 这是很慢的,但是我们不希望列表的长度足以超过开销较高的方案。
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race 重新移除
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;  //移除等待线程
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s)) //
                        continue retry;
                }
                break;
            }
        }
    }
    /**
     * Returns result or throws exception for completed task.
     * 返回结果或为完成的任务抛出异常。
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException(); //取消中断异常
        throw new ExecutionException((Throwable)x); //执行时异常
    }
    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }    

三、cancel 取消

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false;
        if (mayInterruptIfRunning) {//中断 NEW=>INTERRUPTING=>INTERRUPTED
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) //取消  NEW=>CANCELLED
            return false;
        finishCompletion();
        return true;
    }