一、io.netty.util.concurrent.Future

Future的UML图

io.netty.util.concurrent.Future继承JDK Future

1.ChannelFuture 继承io.netty.util.concurrent.Future

异步channel的I/O操作的结果。

Netty中的所有I/O操作都是异步的。这意味着任何I/O调用将立即返回,不保证在请求结束时请求的I/O操作已经完成。 相反,您将返回一个ChannelFuture实例,为您提供有关I/O操作结果或状态的信息。

ChannelFuture要么是未完成的,要么是完成的。当I/O操作开始时,将创建一个新的future对象。新的未来一开始就没有完成——它既没有成功,也没有失败,也没有被取消, 因为I/O操作还没有完成。如果I/O操作成功地完成,失败或取消,那么未来将被标记为已完成的更具体的信息,例如失败的原因。请注意,即使是失败和取消也属于完成状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+

提供了各种方法让您检查I/O操作是否已完成,等待完成,并检索I/O操作的结果。它还允许您添加ChannelFutureListeners,以便在I/O操作完成时得到通知。

首选 addListener(GenericFutureListener) 而不是 await()

建议优先使用addListener(GenericFutureListener)在I/O操作完成时尽可能通知await()并执行后续任务。

addListener(GenericFutureListener)是非阻塞的。它只是将指定的ChannelFutureListener添加到ChannelFuture,并且I/O线程将在与未来相关的I/O操作完成时 通知监听器。ChannelFutureListener产生最好的性能和资源利用率,因为它根本不会阻塞,但是如果你不习惯于事件驱动的编程,那么实现一个顺序逻辑可能会很棘手

相反,await()是一个阻塞操作。一旦被调用,调用者线程将阻塞,直到操作完成。用await()实现一个顺序逻辑比较容易,但是调用者线程不必要的阻塞,直到I/O操作完成, 并且线程间通知的代价相对较高。而且,在特定的情况下有可能会死锁。

ChannelFuture的uml图

2.AbstractFuture 源码分析

  • get 堵塞获取值
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public abstract class AbstractFuture<V> implements Future<V> {

    @Override
    public V get() throws InterruptedException, ExecutionException {
        await();//等待获取值

        Throwable cause = cause();
        if (cause == null) {
            return getNow();//返回成功值
        }
        //异常处理
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (await(timeout, unit)) {//指定时间等待结果值
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            throw new ExecutionException(cause);
        }
        throw new TimeoutException();//超时异常
    }
}

3.CompleteFuture抽象类 源码分析

Complete表示操作已完成,所以CompleteFuture表示一个异步操作已完成的结果;

1).构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public abstract class CompleteFuture<V> extends AbstractFuture<V> {
    //执行器,执行Listener中定义的操作
    private final EventExecutor executor;
    /**
     * Creates a new instance.
     *
     * @param executor the {@link EventExecutor} associated with this future
     */
    protected CompleteFuture(EventExecutor executor) {
        this.executor = executor;
    }
  
    /**
     * Return the {@link EventExecutor} which is used by this {@link CompleteFuture}.
     */
    protected EventExecutor executor() {
        return executor;
    }      
}

1).addListener 添加监听

DefaultPromise.notifyListener通知监听事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

   @Override
   public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
       if (listener == null) {
           throw new NullPointerException("listener");
       }
       DefaultPromise.notifyListener(executor(), this, listener);
       return this;
   }

   @Override
   public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
       if (listeners == null) {
           throw new NullPointerException("listeners");
       }
       for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
           if (l == null) {
               break;
           }
           DefaultPromise.notifyListener(executor(), this, l);
       }
       return this;
   }

2).removeListener 移除监听

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  @Override
    public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // NOOP 已完成,监听正在通知或者已通知,不能删除监听
        return this;
    }

    @Override
    public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
        // NOOP
        return this;
    }

3).await 已完成,不用等待;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

   @Override
   public Future<V> await() throws InterruptedException {
       if (Thread.interrupted()) {
           throw new InterruptedException();
       }
       return this;
   }

   @Override
   public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
       if (Thread.interrupted()) {
           throw new InterruptedException();
       }
       return true;
   }

4).状态修改

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    @Override
    public boolean isDone() {
        return true;
    }

    @Override
    public boolean isCancellable() {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

4.CompleteChannelFuture抽象类 源码分析

无结果返回channel完成操作

1).构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
    
    private final Channel channel;

    /**
     * Creates a new instance.
     *
     * @param channel the {@link Channel} associated with this future
     */
    protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
        super(executor);
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;
    }

    @Override
    protected EventExecutor executor() {
        EventExecutor e = super.executor();
        if (e == null) {
            return channel().eventLoop();
        } else {
            return e;
        }
    }
}    

2).getNow

1
2
3
4
5

   @Override
   public Void getNow() {
       return null;
   }

5.SucceededChannelFuture 源码简单

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
final class SucceededChannelFuture extends CompleteChannelFuture {

    /**
     * Creates a new instance.
     *
     * @param channel the {@link Channel} associated with this future
     */
    SucceededChannelFuture(Channel channel, EventExecutor executor) {
        super(channel, executor);
    }

    @Override
    public Throwable cause() {
        return null;
    }

    @Override
    public boolean isSuccess() {
        return true;
    }
}

6.FailedChannelFuture 源码简单

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final class FailedChannelFuture extends CompleteChannelFuture {
    
   //异常属性
    private final Throwable cause;

    /**
     * Creates a new instance.
     *
     * @param channel the {@link Channel} associated with this future
     * @param cause   the cause of failure
     */
    FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
        super(channel, executor);
        if (cause == null) {
            throw new NullPointerException("cause");
        }
        this.cause = cause;
    }

    @Override
    public Throwable cause() {
        return cause;
    }

    @Override
    public boolean isSuccess() {
        return false;
    }

    @Override
    public ChannelFuture sync() {
        PlatformDependent.throwException(cause);
        return this;
    }

    @Override
    public ChannelFuture syncUninterruptibly() {
        PlatformDependent.throwException(cause);
        return this;
    }
}

二、Promise

Promise的UML图

1.DefaultPromise

1).属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
    private static final InternalLogger rejectedExecutionLogger =
            InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    @SuppressWarnings("rawtypes")
    //CAS 修改result
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    //Signal 代表一个状态,成功时,返回结果为空,CAS修改result等于SUCCESS;
    private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
    private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
    //取消
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
            new CancellationException(), DefaultPromise.class, "cancel(...)"));
    //返回结构 
    private volatile Object result;
    //执行listener操作的执行器
    private final EventExecutor executor;
    /**
     * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
     * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
     * 一个或多个listeners。可以是GenericFutureListener或DefaultFutureListeners。如果代码为null,则表示1)没有添加监听器,或2)所有监听器都被通知。
     * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
     * 线程化 - synchronized(this)。当没有EventExecutor时,我们必须支持添加监听器。
     */
    private Object listeners;
    /**
     * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
     * 线程化 - synchronized(this)。我们需要让监视器使用Java的底层wait()/ notifyAll()。
     */
    private short waiters;

    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
     * executor changes.
     * 线程化 - synchronized(this)。如果执行程序改变,我们必须防止并发通知和FIFO监听程序通知。
     */
    private boolean notifyingListeners;
}

2).构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  /**
     * Creates a new instance.
     *
     * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
     *
     * @param executor
     *        the {@link EventExecutor} which is used to notify the promise once it is complete.
     *        It is assumed this executor will protect against {@link StackOverflowError} exceptions.
     *        The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
     *        depth exceeds a threshold.
     *
     */
    public DefaultPromise(EventExecutor executor) {
        this.executor = checkNotNull(executor, "executor");
    }

    /**
     * See {@link #executor()} for expectations of the executor.
     */
    protected DefaultPromise() {
        // only for subclasses
        executor = null;
    }

3).setSuccess 设置成功

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

   @Override
   public Promise<V> setSuccess(V result) {
       if (setSuccess0(result)) {
           notifyListeners(); //通知监听事件
           return this;
       }
       throw new IllegalStateException("complete already: " + this);
   }
   private boolean setSuccess0(V result) {
       return setValue0(result == null ? SUCCESS : result);
   }
   private boolean setValue0(Object objResult) {
       if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
           RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
           checkNotifyWaiters(); //唤醒等待线程
           return true;
       }
       return false;
   } 
   private synchronized void checkNotifyWaiters() {
       if (waiters > 0) {
           notifyAll();
       }
   }    

4).setFailure 设置失败

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }
    //把异常信息封装CauseHolder对象,CAS赋值result属性
 private boolean setFailure0(Throwable cause) {
        return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }    

5).setUncancellable 设置不可能被取消。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

  @Override
  public boolean setUncancellable() {
      if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
          return true;
      }
      Object result = this.result;
      return !isDone0(result) || !isCancelled0(result);
  }
    private static boolean isCancelled0(Object result) {//判断取消
        return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
    }

    private static boolean isDone0(Object result) { //判断是否操作完成
        return result != null && result != UNCANCELLABLE;
    }  

6). addListener 添加监听事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
   private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
       if (listeners == null) {
           listeners = listener;
       } else if (listeners instanceof DefaultFutureListeners) {
           ((DefaultFutureListeners) listeners).add(listener);
       } else {
           listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
       }
   }     

a.DefaultFutureListeners 原理

DefaultFutureListeners原理就是管理数组GenericFutureListener,初始化大小2,2倍扩容数组元素大小;有add和remove方法;具体不做分析

7). removeListener 移除监听事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    @Override
    public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            removeListener0(listener);
        }

        return this;
    }
    
   private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).remove(listener);//DefaultFutureListeners集合移除
        } else if (listeners == listener) {
            listeners = null;
        }
    }    

8).notifyListeners 通知监听事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

  private void notifyListeners() {
      EventExecutor executor = executor();
      if (executor.inEventLoop()) {
          final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
          final int stackDepth = threadLocals.futureListenerStackDepth();//没搞懂???
          if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
              threadLocals.setFutureListenerStackDepth(stackDepth + 1);
              try {
                  notifyListenersNow();
              } finally {
                  threadLocals.setFutureListenerStackDepth(stackDepth);
              }
              return;
          }
      }

      safeExecute(executor, new Runnable() {
          @Override
          public void run() {
              notifyListenersNow();
          }
      });
  }

a.notifyListenersNow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

   private void notifyListenersNow() {
       Object listeners;
       synchronized (this) {
           // Only proceed if there are listeners to notify and we are not already notifying listeners.
           //只有在有监听器通知的情况下才进行,我们还没有通知侦听器。
           if (notifyingListeners || this.listeners == null) {
               return;
           }
           notifyingListeners = true;
           listeners = this.listeners;
           this.listeners = null;
       }
       for (;;) {
           if (listeners instanceof DefaultFutureListeners) {
               notifyListeners0((DefaultFutureListeners) listeners);
           } else {
               notifyListener0(this, (GenericFutureListener<?>) listeners);
           }
           synchronized (this) {
               if (this.listeners == null) {
                   // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                   // need to be in a finally block.
                   notifyingListeners = false;
                   return;
               }
               listeners = this.listeners;
               this.listeners = null;
           }
       }
   }

   private void notifyListeners0(DefaultFutureListeners listeners) {
       GenericFutureListener<?>[] a = listeners.listeners();
       int size = listeners.size();
       for (int i = 0; i < size; i ++) {
           notifyListener0(this, a[i]);
       }
   }

   @SuppressWarnings({ "unchecked", "rawtypes" })
   private static void notifyListener0(Future future, GenericFutureListener l) {
       try {
           l.operationComplete(future);
       } catch (Throwable t) {
           logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
       }
   }

9).await

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock();

        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

    @Override
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }

        checkDeadLock();

        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    decWaiters();
                }
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }

a.checkDeadLock

1
2
3
4
5
6
   protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) { //验证EventLoop线程不能等待
            throw new BlockingOperationException(toString());
        }
    }

b.incWaiters

1
2
3
4
5
6
    private void incWaiters() {
        if (waiters == Short.MAX_VALUE) {
            throw new IllegalStateException("too many waiters: " + this);
        }
        ++waiters;
    }

c.decWaiters

1
2
3
4
   private void decWaiters() {
        --waiters;
    }

10).sync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    @Override
    public Promise<V> sync() throws InterruptedException {
        await();
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> syncUninterruptibly() {
        awaitUninterruptibly();
        rethrowIfFailed();
        return this;
    }

   private void rethrowIfFailed() {
       Throwable cause = cause();
       if (cause == null) {
           return;
       }

       PlatformDependent.throwException(cause);
   }     

11).await(long timeoutMillis)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
    @Override
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(MILLISECONDS.toNanos(timeoutMillis), true);
    }
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
       if (isDone()) {
           return true;
       }

       if (timeoutNanos <= 0) {
           return isDone();
       }

       if (interruptable && Thread.interrupted()) {
           throw new InterruptedException(toString());
       }

       checkDeadLock();

       long startTime = System.nanoTime();
       long waitTime = timeoutNanos;
       boolean interrupted = false;
       try {
           for (;;) {
               synchronized (this) {
                   if (isDone()) {
                       return true;
                   }
                   incWaiters();
                   try {
                       wait(waitTime / 1000000, (int) (waitTime % 1000000));//时间等待
                   } catch (InterruptedException e) {
                       if (interruptable) {
                           throw e;
                       } else {
                           interrupted = true;
                       }
                   } finally {
                       decWaiters();
                   }
               }
               if (isDone()) {
                   return true;
               } else {
                   waitTime = timeoutNanos - (System.nanoTime() - startTime);
                   if (waitTime <= 0) {
                       return isDone();
                   }
               }
           }
       } finally {
           if (interrupted) {
               Thread.currentThread().interrupt();
           }
       }
   }
    

DefaultChannelPromise、VoidChannelPromise实现都很简单,不具体分析;