【注意】最后更新于 April 10, 2019,文中内容可能已过时,请谨慎使用。
一、io.netty.util.concurrent.Future
Future的UML图

io.netty.util.concurrent.Future继承JDK Future
1.ChannelFuture 继承io.netty.util.concurrent.Future
异步channel的I/O操作的结果。
Netty中的所有I/O操作都是异步的。这意味着任何I/O调用将立即返回,不保证在请求结束时请求的I/O操作已经完成。
相反,您将返回一个ChannelFuture实例,为您提供有关I/O操作结果或状态的信息。
ChannelFuture要么是未完成的,要么是完成的。当I/O操作开始时,将创建一个新的future对象。新的未来一开始就没有完成——它既没有成功,也没有失败,也没有被取消,
因为I/O操作还没有完成。如果I/O操作成功地完成,失败或取消,那么未来将被标记为已完成的更具体的信息,例如失败的原因。请注意,即使是失败和取消也属于完成状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
* +---------------------------+
* | Completed successfully |
* +---------------------------+
* +----> isDone() = true |
* +--------------------------+ | | isSuccess() = true |
* | Uncompleted | | +===========================+
* +--------------------------+ | | Completed with failure |
* | isDone() = false | | +---------------------------+
* | isSuccess() = false |----+----> isDone() = true |
* | isCancelled() = false | | | cause() = non-null |
* | cause() = null | | +===========================+
* +--------------------------+ | | Completed by cancellation |
* | +---------------------------+
* +----> isDone() = true |
* | isCancelled() = true |
* +---------------------------+
|
提供了各种方法让您检查I/O操作是否已完成,等待完成,并检索I/O操作的结果。它还允许您添加ChannelFutureListeners,以便在I/O操作完成时得到通知。
首选 addListener(GenericFutureListener) 而不是 await()
建议优先使用addListener(GenericFutureListener)在I/O操作完成时尽可能通知await()并执行后续任务。
addListener(GenericFutureListener)是非阻塞的。它只是将指定的ChannelFutureListener添加到ChannelFuture,并且I/O线程将在与未来相关的I/O操作完成时
通知监听器。ChannelFutureListener产生最好的性能和资源利用率,因为它根本不会阻塞,但是如果你不习惯于事件驱动的编程,那么实现一个顺序逻辑可能会很棘手
相反,await()是一个阻塞操作。一旦被调用,调用者线程将阻塞,直到操作完成。用await()实现一个顺序逻辑比较容易,但是调用者线程不必要的阻塞,直到I/O操作完成,
并且线程间通知的代价相对较高。而且,在特定的情况下有可能会死锁。
ChannelFuture的uml图

2.AbstractFuture 源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
await();//等待获取值
Throwable cause = cause();
if (cause == null) {
return getNow();//返回成功值
}
//异常处理
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {//指定时间等待结果值
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();//超时异常
}
}
|
3.CompleteFuture抽象类 源码分析
Complete表示操作已完成,所以CompleteFuture表示一个异步操作已完成的结果;
1).构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public abstract class CompleteFuture<V> extends AbstractFuture<V> {
//执行器,执行Listener中定义的操作
private final EventExecutor executor;
/**
* Creates a new instance.
*
* @param executor the {@link EventExecutor} associated with this future
*/
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
/**
* Return the {@link EventExecutor} which is used by this {@link CompleteFuture}.
*/
protected EventExecutor executor() {
return executor;
}
}
|
1).addListener 添加监听
DefaultPromise.notifyListener通知监听事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@Override
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
if (listener == null) {
throw new NullPointerException("listener");
}
DefaultPromise.notifyListener(executor(), this, listener);
return this;
}
@Override
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
if (listeners == null) {
throw new NullPointerException("listeners");
}
for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
if (l == null) {
break;
}
DefaultPromise.notifyListener(executor(), this, l);
}
return this;
}
|
2).removeListener 移除监听
1
2
3
4
5
6
7
8
9
10
11
|
@Override
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// NOOP 已完成,监听正在通知或者已通知,不能删除监听
return this;
}
@Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// NOOP
return this;
}
|
3).await 已完成,不用等待;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Override
public Future<V> await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
|
4).状态修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Override
public boolean isDone() {
return true;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
|
4.CompleteChannelFuture抽象类 源码分析
无结果返回channel完成操作
1).构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
private final Channel channel;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
@Override
protected EventExecutor executor() {
EventExecutor e = super.executor();
if (e == null) {
return channel().eventLoop();
} else {
return e;
}
}
}
|
2).getNow
1
2
3
4
5
|
@Override
public Void getNow() {
return null;
}
|
5.SucceededChannelFuture 源码简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
final class SucceededChannelFuture extends CompleteChannelFuture {
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
*/
SucceededChannelFuture(Channel channel, EventExecutor executor) {
super(channel, executor);
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
}
|
6.FailedChannelFuture 源码简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
final class FailedChannelFuture extends CompleteChannelFuture {
//异常属性
private final Throwable cause;
/**
* Creates a new instance.
*
* @param channel the {@link Channel} associated with this future
* @param cause the cause of failure
*/
FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
super(channel, executor);
if (cause == null) {
throw new NullPointerException("cause");
}
this.cause = cause;
}
@Override
public Throwable cause() {
return cause;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public ChannelFuture sync() {
PlatformDependent.throwException(cause);
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}
}
|
二、Promise
Promise的UML图

1.DefaultPromise
1).属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
@SuppressWarnings("rawtypes")
//CAS 修改result
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
//Signal 代表一个状态,成功时,返回结果为空,CAS修改result等于SUCCESS;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
//取消
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
//返回结构
private volatile Object result;
//执行listener操作的执行器
private final EventExecutor executor;
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
* 一个或多个listeners。可以是GenericFutureListener或DefaultFutureListeners。如果代码为null,则表示1)没有添加监听器,或2)所有监听器都被通知。
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
* 线程化 - synchronized(this)。当没有EventExecutor时,我们必须支持添加监听器。
*/
private Object listeners;
/**
* Threading - synchronized(this). We are required to hold the monitor to use Java's underlying wait()/notifyAll().
* 线程化 - synchronized(this)。我们需要让监视器使用Java的底层wait()/ notifyAll()。
*/
private short waiters;
/**
* Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification if the
* executor changes.
* 线程化 - synchronized(this)。如果执行程序改变,我们必须防止并发通知和FIFO监听程序通知。
*/
private boolean notifyingListeners;
}
|
2).构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/**
* Creates a new instance.
*
* It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
*
* @param executor
* the {@link EventExecutor} which is used to notify the promise once it is complete.
* It is assumed this executor will protect against {@link StackOverflowError} exceptions.
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
*
*/
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
/**
* See {@link #executor()} for expectations of the executor.
*/
protected DefaultPromise() {
// only for subclasses
executor = null;
}
|
3).setSuccess 设置成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
notifyListeners(); //通知监听事件
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters(); //唤醒等待线程
return true;
}
return false;
}
private synchronized void checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
}
|
4).setFailure 设置失败
1
2
3
4
5
6
7
8
9
10
11
12
|
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
//把异常信息封装CauseHolder对象,CAS赋值result属性
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
|
5).setUncancellable 设置不可能被取消。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Override
public boolean setUncancellable() {
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}
private static boolean isCancelled0(Object result) {//判断取消
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
private static boolean isDone0(Object result) { //判断是否操作完成
return result != null && result != UNCANCELLABLE;
}
|
6). addListener 添加监听事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
|
a.DefaultFutureListeners 原理
DefaultFutureListeners原理就是管理数组GenericFutureListener,初始化大小2,2倍扩容数组元素大小;有add和remove方法;具体不做分析
7). removeListener 移除监听事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
removeListener0(listener);
}
return this;
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);//DefaultFutureListeners集合移除
} else if (listeners == listener) {
listeners = null;
}
}
|
8).notifyListeners 通知监听事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();//没搞懂???
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
|
a.notifyListenersNow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
//只有在有监听器通知的情况下才进行,我们还没有通知侦听器。
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
|
9).await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
@Override
public Promise<V> awaitUninterruptibly() {
if (isDone()) {
return this;
}
checkDeadLock();
boolean interrupted = false;
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} catch (InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
|
a.checkDeadLock
1
2
3
4
5
6
|
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) { //验证EventLoop线程不能等待
throw new BlockingOperationException(toString());
}
}
|
b.incWaiters
1
2
3
4
5
6
|
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
|
c.decWaiters
1
2
3
4
|
private void decWaiters() {
--waiters;
}
|
10).sync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise<V> syncUninterruptibly() {
awaitUninterruptibly();
rethrowIfFailed();
return this;
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
|
11).await(long timeoutMillis)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
if (isDone()) {
return true;
}
if (timeoutNanos <= 0) {
return isDone();
}
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
long startTime = System.nanoTime();
long waitTime = timeoutNanos;
boolean interrupted = false;
try {
for (;;) {
synchronized (this) {
if (isDone()) {
return true;
}
incWaiters();
try {
wait(waitTime / 1000000, (int) (waitTime % 1000000));//时间等待
} catch (InterruptedException e) {
if (interruptable) {
throw e;
} else {
interrupted = true;
}
} finally {
decWaiters();
}
}
if (isDone()) {
return true;
} else {
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
|
DefaultChannelPromise、VoidChannelPromise实现都很简单,不具体分析;