一、介绍

Channelpipeline是 Channelhandler的容器,它负责 Channelhandler的管理和事件拦截与调度。

事件如何在流水线中流动如图下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

*                                                 I/O Request
*                                            via {@link Channel} or
*                                        {@link ChannelHandlerContext}
*                                                      |
*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |
*  |                                                  \|/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  .               |
*  |               .                                   .               |
*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
*  |        [ method call]                       [method call]         |
*  |               .                                   .               |
*  |               .                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
* 

消息的读取和发送处理全流程描述如下:

  • 1)底层的Socket.read()方法读取ByteBuf,触发Channelread事件,由IO线程 Nioeventloop调用 Channelpipeline的fireChannelread(Object msg)方法,将消息 (ByteBuf)传输到 Channelpipeline中,将消息依次被 Inbound Handler 1、 Inbound Handler 2、 Inbound Handler N拦截和处理,在这个过程中, 任何Channelhandler都可以中断当前的流程,结束消息的传递。

  • 2)调用ChannelHandlerContext的wrte方法发送消息,将消息依次被Outbound Handler 1,Outbound Handler 2 ,Outbound Handler M 拦截和处理,最终交给Socket.write()发送消息;

将事件转发给下一个处理程序

正如上图所看到的那样,处理程序必须调用ChannelHandlerContext中的事件传播方法才能将事件转发到其下一个处理程序。这些方法包括:

  • Inbound 事件传播方法:

  • ChannelHandlerContext.fireChannelRegistered() Channel注册事件

  • ChannelHandlerContext.fireChannelActive() TCP链路建立成功, Channel激活事件;

  • ChannelHandlerContext.fireChannelRead(Object) 读事件

  • ChannelHandlerContext.fireChannelReadComplete() 读操作完成通知事件;

  • ChannelHandlerContext.fireExceptionCaught(Throwable) 异常通知事件;

  • ChannelHandlerContext.fireUserEventTriggered(Object) 用户自定义事件

  • ChannelHandlerContext.fireChannelWritabilityChanged() Channel的可写状态变化通知事件;

  • ChannelHandlerContext.fireChannelInactive() TCP连接关闭,链路不可用通知事件。

  • ChannelHandlerContext.fireChannelUnregistered() Channel从EventLoop中注销

  • Outbound 事件传播方法:

  • ChannelOutboundInvoker.bind(SocketAddress, ChannelPromise)

  • ChannelOutboundInvoker.connect(SocketAddress, SocketAddress, ChannelPromise)

  • ChannelOutboundInvoker.write(Object, ChannelPromise)

  • ChannelHandlerContext.flush()

  • ChannelHandlerContext.read()

  • ChannelOutboundInvoker.disconnect(ChannelPromise)

  • ChannelOutboundInvoker.close(ChannelPromise)

  • ChannelOutboundInvoker.deregister(ChannelPromise)

线程安全:

Channelpipeline是线程安全的,这意味着N个业务线程可以并发地操作Channelpipeline而不存在多线程并发问题。但是, Channelhandler却不是线程安全的,这意味着尽管 Channelpipeline是线程安全的,但是用户仍然需要自己保证Channelhandler的线程安全,ChannelHandler可以随时添加或删除;

uml关系如图下:

二、 DefaultChannelPipeline源码分析

1.数据结构

  • 以双链表保存ChannelHandler的数据
  • head和tail变量,控制往下处理或者往上处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DefaultChannelPipeline implements ChannelPipeline {


    private static final String HEAD_NAME = generateName0(HeadContext.class);
    private static final String TAIL_NAME = generateName0(TailContext.class);

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =  
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() throws Exception {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
            AtomicReferenceFieldUpdater.newUpdater(
                    DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
    //AbstractChannelHandlerContext中有prev,next属性形成双链表;Inbound和Outbound事件都在在这个抽象类实现;
    final AbstractChannelHandlerContext head;  //头节点,
    final AbstractChannelHandlerContext tail;  //尾节点 

    private final Channel channel;
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();
    //线程池中的线程映射,记住这个映射是为了保证执行任务时使用同一个线程
    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private volatile MessageSizeEstimator.Handle estimatorHandle; //负责估计消息的大小
    private final boolean firstRegistration = true;//对应Channel首次注册到EventLoop 
    //未注册,添加或者删除ChannelHandler预备存放地方,单链表存储
    private PendingHandlerCallback pendingHandlerCallbackHead;
    //Channel被注册,就设置为true。一旦设置为true,值将不会改变。
    private boolean registered;
}    

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);//头节点 
        head = new HeadContext(this);//尾节点 
        //头节点和尾节点相互链接
        head.next = tail;
        tail.prev = head;
    }

3.addFirst 添加ChannelHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    @Override
    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 1.验证未被@Sharable注解时,一个实例是否重复使用
            checkMultiplicity(handler);
            // 2.名称重复样子,以及name为空,自动生成名字,生成名字规则按类名
            name = filterName(name, handler);
            // 3.创建AbstractChannelHandlerContext
            newCtx = newContext(group, name, handler);
            // 4.添加双链表中
            addFirst0(newCtx);

            // 5.未注册,PendingHandlerCallback任务添加到pendingHandlerCallbackHead链表中;
            //   分添加(PendingHandlerCallback)或者删除任务(PendingHandlerRemovedTask)
            if (!registered) {
                newCtx.setAddPending();  //设置AbstractChannelHandlerContext中状态,INIT改成ADD_PENDING
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 6.当前线程不是EventLoop线程,是直接callHandlerAdded0;不是添加任务执行callHandlerAdded0
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) { //当前线程不是EventLoop线程
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

(1).checkMultiplicity 验证@Sharable注解,未被@Sharable注解时,指定ChannelHandler对象ChannelPipeline只能添加一次;被@Sharable注解时,可以添加多次;

  • handler继承ChannelHandlerAdapter,才能做下面判断
    • 类注解@Sharable,一个实例添加多次添加
    • 没有类注解@Sharable,一个实例只能添加一次,以added做验证
1
2
3
4
5
6
7
8
9
 private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) { //验证ChannelHandler对象未被@Sharable注解时,ChannelPipeline只能添加一次;
                throw new ChannelPipelineException(...);
            }
            h.added = true;
        }
    }

(2).filterName name为空自动生成名称,name不为空,验证名称是否重复

  • 参数name为NULL,自动生成名字
    • 按类名称自动生成名称,有相同名字后面添加#1
  • 检查重复名称
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   private String filterName(String name, ChannelHandler handler) {
        if (name == null) {  //名称为空,自动生成名称,
            return generateName(handler);
        }
        checkDuplicateName(name); //检查重复名称
        return name;
    }
    
    //按handler类自动生成名字
     private String generateName(ChannelHandler handler) {
          Map<Class<?>, String> cache = nameCaches.get();
          Class<?> handlerType = handler.getClass();
          String name = cache.get(handlerType);/先从缓存中获取名称
          if (name == null) {
              name = generateName0(handlerType); //按类名称自动生成名称
              cache.put(handlerType, name);      //保存在缓存中
          }
          //用户不太可能放置多个相同类型的处理程序,但要确保避免任何名称冲突。请注意,我们不会缓存这里生成的名称。
          if (context0(name) != null) {
              String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
              //有相同名称,baseName把后面#0的0累加生成新名称;
              for (int i = 1;; i ++) {
                  String newName = baseName + i;
                  if (context0(newName) == null) {
                      name = newName;   
                      break;
                  }
              }
          }
          return name;
      }  
      //生成名称规则 handlerType=com.zp.HelloChannelHandler 形成名称=HelloChannelHandler#0
      private static String generateName0(Class<?> handlerType) {
          return StringUtil.simpleClassName(handlerType) + "#0";
      } 

(3).newContext 创建AbstractChannelHandlerContext,这个类在后面具体分析;

  • ChannelHandler包装成ChannelHandlerContext;
1
2
3
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

a.childExecutor(group)

  • 根据ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP配置,按线程组是否公用相同的线程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
   private EventExecutor childExecutor(EventExecutorGroup group) {
       if (group == null) {
           return null;
       }
       //单独执行任务
       Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
       if (pinEventExecutor != null && !pinEventExecutor) {
           return group.next();
       }
       Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
       if (childExecutors == null) {
           childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
       }
       EventExecutor childExecutor = childExecutors.get(group);
       if (childExecutor == null) {
           childExecutor = group.next();
           childExecutors.put(group, childExecutor);
       }
       return childExecutor;
   } 

(4).addFirst0 拼接到head后面

链表添加

1
2
3
4
5
6
7
  private void addFirst0(AbstractChannelHandlerContext newCtx) {
         AbstractChannelHandlerContext nextCtx = head.next;
         newCtx.prev = head;
         newCtx.next = nextCtx;
         head.next = newCtx;
         nextCtx.prev = newCtx;
     }

(5).callHandlerCallbackLater registered为false,把回调任务添加到pendingHandlerCallbackHead链表中;

  • pendingHandlerCallbackHead添加任务,单链表结构
  • 分两种类型PendingHandlerCallbackPendingHandlerCallback和PendingHandlerRemovedTask
    • 调用callHandlerAdded0方法以及callHandlerAdded0
  • Channel注册调用channelRegistered方法,处理回调任务链表集合;处理方法invokeHandlerAddedIfNeeded
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerCallbackPendingHandlerCallback(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

a.PendingHandlerAddedTask 和PendingHandlerRemovedTask源码分析

PendingHandlerCallback抽象类是PendingHandlerAddedTask 和PendingHandlerRemovedTask父类;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
   private abstract static class PendingHandlerCallback implements Runnable {
        final AbstractChannelHandlerContext ctx;
        PendingHandlerCallback next; //单链表实现;保存集合

        PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        abstract void execute();
    }

PendingHandlerAddedTask 等待添加任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 private final class PendingHandlerAddedTask extends PendingHandlerCallback {

        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    ...
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }
    }

PendingHandlerRemovedTask 等待删除任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    private final class PendingHandlerRemovedTask extends PendingHandlerCallback {

        PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        public void run() {
            callHandlerRemoved0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerRemoved0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                   ...
                    ctx.setRemoved();
                }
            }
        }
    }

b.invokeHandlerAddedIfNeeded 处理理回调任务链表pendingHandlerCallbackHead集合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            //我们现在注册到EventLoop。现在是时候调用ChannelHandlers的回调了,这是在注册完成之前添加的。
            callHandlerAddedForAllHandlers();
        }
    }
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute(); //执行任务
            task = task.next;
        }
    }    

(6).callHandlerAdded0 通知添加完成处理两个方法ChannelHandler.handlerAdded和AbstractChannelHandlerContext.setAddComplete

  • 通知ChannelHandler.handlerAdded方法
  • 通知AbstractChannelHandlerContext.setAddComplete方法,ADD_COMPLETE
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx); //通知ChannelHandler.handlerAdded方法
            ctx.setAddComplete();            //通知AbstractChannelHandlerContext.setAddComplete方法,ADD_COMPLETE
        } catch (Throwable t) { //通知异常处理
            boolean removed = false;
            try {
                remove0(ctx);
                try {
                    ctx.handler().handlerRemoved(ctx);//通知ChannelHandler.handlerRemoved方法
                } finally {
                    ctx.setRemoved();                  //通知ChannelHandler.setRemoved方法 REMOVE_COMPLETE
                }
                removed = true;
            } catch (Throwable t2) {
                ...
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(...));//异常通知事件
            } else {
                fireExceptionCaught(new ChannelPipelineException(...));//异常通知事件
            }
        }
    }

a.remove0 移除AbstractChannelHandlerContext链表中

1
2
3
4
5
6
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

(7).callHandlerRemoved0 删除完成通知

  • remove调用方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
       // Notify the complete removal.
       try {
           try {
               ctx.handler().handlerRemoved(ctx);//通知ChannelHandler.handlerRemoved方法
           } finally {
               ctx.setRemoved();                 //通知ChannelHandler.setRemoved方法
           }
       } catch (Throwable t) {
           fireExceptionCaught(new ChannelPipelineException(...));
       }
   }

4.addLast,addBefore都类同,双链表操作

5.remove 删除指定AbstractChannelHandlerContext

6.replace 替换指定AbstractChannelHandlerContext

6.destroy pipeline中的所有节点销毁,顺序由尾部向头部并触发handlerRemoved()

7.Inbound 事件方法

  • head触发,判断事件执行器是否当前线程,是立即执行head的事件,不是,提交任务执行;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

    @Override
    public final ChannelPipeline fireChannelUnregistered() {
        AbstractChannelHandlerContext.invokeChannelUnregistered(head);
        return this;
    }

8.Outbound 事件方法

tail执行,tail是Inbound事件;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  @Override
    public final ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }

    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }

 

tail就是TailContext对象,TailContext实现AbstractChannelHandlerContext抽象类;具有AbstractChannelHandlerContext特性;

三、 AbstractChannelHandlerContext源码分析

AbstractChannelHandlerContext抽象类继承ChannelHandlerContext,具有ChannelHandlerContext.Inbound和ChannelHandlerContext.Outbound事件特性;

AbstractChannelHandlerContext的UML图

1.数据结构

  • 双链表结构
  • handlerState 状态控制是否执行
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    //双向链表
    volatile AbstractChannelHandlerContext next;//下节点
    volatile AbstractChannelHandlerContext prev;//前节点
    //CAS 修改handlerState属性
    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    //handlerState 几个状态标示

    //等待添加状态
    private static final int ADD_PENDING = 1;

    //添加状态
    private static final int ADD_COMPLETE = 2;

    //删除状态
    private static final int REMOVE_COMPLETE = 3;

    //初始化状态
    private static final int INIT = 0;

    private final boolean inbound;//inbound事件状态
    private final boolean outbound;//outbound事件状态
    private final DefaultChannelPipeline pipeline;
    private final String name;  //ChannelHandler名称
    //如果由EventLoop或给定的EventExecutor的是OrderedEventExecutor的实例,则它是有序的。
    private final boolean ordered;
    
    //如果不使用子执行程序,将被设置为null,否则将被设置为子执行程序。
    final EventExecutor executor;
    private ChannelFuture succeededFuture;
    
    //状态,初始化 INIT
    private final int handlerState = INIT;
}    

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        //如果由EventLoop或给定的EventExecutor的是OrderedEventExecutor的实例,则它是有序的。
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

3.executor 获取事件执行器(就是个线程)

没有指定事件执行器就用Channel的事件执行去

1
2
3
4
5
6
7
8
   @Override
   public EventExecutor executor() {
       if (executor == null) {
           return channel().eventLoop();
       } else {
           return executor;
       }
   }

4.findContextInbound 查找Inbound事件的ChannelHandlerContext

往下个节点查询,inbound为true的ChannelHandlerContext;

1
2
3
4
5
6
7
   private AbstractChannelHandlerContext findContextInbound() {
       AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.next;
       } while (!ctx.inbound);
       return ctx;
   } 

5.Inbound事件

(1).fireChannelRegistered() 触发next的注册事件

分三个步骤执行:

  • 查找next的ChannelHandlerContext
  • 调用next个ChannelHandlerContext#invokeChannelRegistered方法
  • 执行ChannelInboundHandler#channelRegistered
1
2
3
4
5
 @Override
 public ChannelHandlerContext fireChannelRegistered() {
     invokeChannelRegistered(findContextInbound());
     return this;
 } 

a.findContextInbound 查找next的ChannelHandlerContext

往下个节点查询,inbound为true的ChannelHandlerContext;

1
2
3
4
5
6
7
   private AbstractChannelHandlerContext findContextInbound() {
       AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.next;
       } while (!ctx.inbound);
       return ctx;
   } 

b.invokeChannelRegistered 调用next个ChannelHandlerContext#invokeChannelRegistered方法

  • 获取next的事件执行器,是相同线程就立即执行invokeChannelRegistered,不是提交任务执行;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        //获取next的事件执行器,是相同线程就立即执行invokeChannelRegistered,不是提交任务执行;
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    } 

c.invokeChannelRegistered执行ChannelInboundHandler#channelRegistered

  • 验证当前状态,完成或者等待添加执,执行ChannelInboundHandler#channelRegistered
    • 发送异常,判断异常栈中没有方法名称等于exceptionCaught,就要调用ChannelInboundHandler.exceptionCaught
      • 等于exceptionCaught,就是异常处理发送异常,不做处理
  • 否,则往next查找执行
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    private void invokeChannelRegistered() {
        // 验证当前状态,完成或者等待添加执,执行ChannelInboundHandler#channelRegistered
        // handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);//调用ChannelInboundHandler.channelRegistered
            } catch (Throwable t) {
                //判断异常栈中没有发现方法名称等于exceptionCaught,就要调用ChannelInboundHandler.exceptionCaught
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();//往下查找
        }
    }

(2).fireChannelActive() 触发next的Channel激活(TCP链路建立成功)事件;

处理步骤跟findContextInbound一样,只是调用方法不同

(3).fireChannelRead(Object) 触发next的读事件

处理步骤跟findContextInbound一样,只是调用方法不同;

1
2
3
4
5
 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
     //用于对象泄露,以及msg对null验证
     final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
     ...
 }

(4).fireChannelReadComplete() 触发next的读操作完成通知事件;

处理步骤跟findContextInbound一样,只是调用方法不同;

(5).fireUserEventTriggered(Object) 用户自定义事件

处理步骤跟findContextInbound一样,只是调用方法不同;

1
2
3
4
5
   static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
       //验证事件内容不能为NULL
       ObjectUtil.checkNotNull(event, "event");
       ...
   }

(6).fireChannelWritabilityChanged() Channel的可写状态变化通知事件;

处理步骤跟findContextInbound一样,只是调用方法不同;

(7).fireChannelInactive() TCP连接关闭,链路不可用通知事件。

处理步骤跟findContextInbound一样,只是调用方法不同;

(8).fireChannelUnregistered() 注销事件

处理步骤跟findContextInbound一样,只是调用方法不同;

(9).fireExceptionCaught(Throwable) 异常通知事件;

处理步骤跟findContextInbound一样,只是调用方法不同;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(next, cause);
        return this;
    }

    static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
      //验证异常内容不能为NULL
       ObjectUtil.checkNotNull(cause, "cause");
        ...
    }

6.Outbound事件

除flush和read方法,参数都有ChannelPromise,因为实现异步,所以类似回调方式,就引用ChannelPromise; 调用到Channel#Unsafe,成功或者失败都是异步处理ChannelPromise类;

(1).bind(SocketAddress, ChannelPromise) 绑定

执行步骤

  • 验证ChannelPromise是否有效
  • 往上查找Outbound事件的ChannelHandlerContext
  • 获取事件执行器,是当前线程立即执行invokeBind,否,往事件执行器提交任务执行invokeBind;
  • invokeBind方法就是调用ChannelOutboundHandler#bind
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        //验证ChannelPromise
        if (isNotValidPromise(promise, false)) {//验证promise
            // cancelled
            return promise;
        }
        //往上查找Outbound事件
        final AbstractChannelHandlerContext next = findContextOutbound();
        //获取事件执行器,是当前线程立即执行invokeBind,否,往事件执行器提交任务执行invokeBind;
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

a.isNotValidPromise 验证ChannelPromise是否有效

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
        if (promise == null) {
            throw new NullPointerException("promise");
        }

        if (promise.isDone()) {
            // 检查promise是否已取消,如果已取消,则表明不应执行该操作的处理。
            //
            // See https://github.com/netty/netty/issues/2349
            if (promise.isCancelled()) {
                return true;
            }
            throw new IllegalArgumentException("promise already done: " + promise);
        }

        if (promise.channel() != channel()) {
            throw new IllegalArgumentException(String.format(
                    "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
        }

        if (promise.getClass() == DefaultChannelPromise.class) {
            return false;
        }

        if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
            throw new IllegalArgumentException(
                    StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
        }

        if (promise instanceof AbstractChannel.CloseFuture) {
            throw new IllegalArgumentException(
                    StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
        }
        return false;
    }

b.findContextOutbound 往上查找Outbound事件的ChannelHandlerContext

1
2
3
4
5
6
7
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

d.invokeBind方法就是调用ChannelOutboundHandler#bind

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {//上面Inbound事件说明过,判断状态是否添加或者等待添加
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);//调用ChannelOutboundHandler.bind
            } catch (Throwable t) {
                //promise#tryFailure
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);//往上找
        }
    } 

(2).connect(SocketAddress, SocketAddress, ChannelPromise)

处理步骤跟bind一样,只是调用方法不同;

(3).write

  • write(Object msg)最终调用下面方法,自动生成DefaultChannelPromise对象
    1
    2
    3
    4
    
    @Override
        public ChannelFuture write(Object msg) {
            return write(msg, newPromise());//DefaultChannelPromise
        }
    
  • write(final Object msg, final ChannelPromise promise)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

   @Override
   public ChannelFuture write(final Object msg, final ChannelPromise promise) {
      //验证消息写入消息不能为NULL
       if (msg == null) {
           throw new NullPointerException("msg");
       }

       try {
           if (isNotValidPromise(promise, true)) {//验证promise
               ReferenceCountUtil.release(msg); //释放msg
               // cancelled
               return promise;
           }
       } catch (RuntimeException e) {
           //释放msg
           ReferenceCountUtil.release(msg);
           throw e;
       }
       write(msg, false, promise);
       return promise;
   }
   

a.write(Object msg, boolean flush, ChannelPromise promise) flush为true做刷新数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();//往上查找
        //内存泄漏检测
        final Object m = pipeline.touch(msg, next);
         //获取事件执行器,是当前线程立即执行invokeWriteAndFlush或者invokeWrite,否,往事件执行器提交任务执行invokeWriteAndFlush或者invokeWrite;
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) { 
                //调用ChannelOutboundHandler#write
                //调用ChannelOutboundHandler#flush
                next.invokeWriteAndFlush(m, promise);
            } else {
                //调用ChannelOutboundHandler#write
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            //发送异常,调用ReferenceCountUtil.release
            safeExecute(executor, task, promise, m);//自己定义executor执行任务
        }
    }

b.自定义Execute执行WriteAndFlushTask和WriteTask源码

WriteAndFlushTask和WriteTask实现AbstractWriteTask抽象类;

1).AbstractWriteTask源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 abstract static class AbstractWriteTask implements Runnable {
        //是否估算内容大小
        private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
                SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);

        // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
        //假设有一个64位的JVM, 16个字节的对象头,3个引用字段和一个int字段,加上对齐。
        private static final int WRITE_TASK_OVERHEAD =
                SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);

        private final Recycler.Handle<AbstractWriteTask> handle;//使用对象池
        private AbstractChannelHandlerContext ctx;
        private Object msg;
        private ChannelPromise promise;
        private int size; //大小

        @SuppressWarnings("unchecked")
        private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
            this.handle = handle;
        }

        protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
                                   Object msg, ChannelPromise promise) {
            task.ctx = ctx;
            task.msg = msg;
            task.promise = promise;

            if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
                task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
                ctx.pipeline.incrementPendingOutboundBytes(task.size);
            } else {
                task.size = 0;
            }
        }

        @Override
        public final void run() {
            try {
                // Check for null as it may be set to null if the channel is closed already
                //如果通道已经关闭,则检查null,因为它可能被设置为null。
                if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
                    ctx.pipeline.decrementPendingOutboundBytes(size);
                }
                write(ctx, msg, promise);
            } finally {
                // Set to null so the GC can collect them directly
                ctx = null;
                msg = null;
                promise = null;
                handle.recycle(this);
            }
        }

        protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            ctx.invokeWrite(msg, promise);
        }
    }

2). WriteAndFlushTask源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  static final class WriteAndFlushTask extends AbstractWriteTask {

        private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
            @Override
            protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
                return new WriteAndFlushTask(handle);
            }
        }; //使用对象池

        private static WriteAndFlushTask newInstance(
                AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
            WriteAndFlushTask task = RECYCLER.get();
            init(task, ctx, msg, promise);
            return task;
        }

        private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
            super(handle);
        }

        @Override
        public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            super.write(ctx, msg, promise);
            ctx.invokeFlush();//调用刷新
        }
    }

3). WriteTask源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {

        private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
            @Override
            protected WriteTask newObject(Handle<WriteTask> handle) {
                return new WriteTask(handle);
            }
        };//使用对象池

        private static WriteTask newInstance(
                AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            WriteTask task = RECYCLER.get();
            init(task, ctx, msg, promise);
            return task;
        }

        private WriteTask(Recycler.Handle<WriteTask> handle) {
            super(handle);
        }
    }

(4).writeAndFlush(Object, ChannelPromise)

处理步骤跟write一样,Flush设置为true

(5).flush()

处理步骤跟bind一样,只是调用方法不同,不用处理ChannelPromise

(6).read()

处理步骤跟bind一样,只是调用方法不同,不用处理ChannelPromise

(7).disconnect(ChannelPromise)

处理步骤跟bind一样,只是调用方法不同

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 @Override
    public ChannelFuture disconnect() {
        return disconnect(newPromise());
    }

  @Override
   public ChannelFuture disconnect(final ChannelPromise promise) {
       if (isNotValidPromise(promise, false)) {
           // cancelled
           return promise;
       }

       final AbstractChannelHandlerContext next = findContextOutbound();
       EventExecutor executor = next.executor();
       if (executor.inEventLoop()) {
           // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
           // So far, UDP/IP is the only transport that has such behavior.
           //如果通道没有断开连接 - 重新连接的概念,则将断开连接转换为关闭状态。 到目前为止,UDP / IP是唯一具有这种行为的传输方式。
           if (!channel().metadata().hasDisconnect()) {
               next.invokeClose(promise);
           } else {
               next.invokeDisconnect(promise);
           }
       } else {
           safeExecute(executor, new Runnable() {
               @Override
               public void run() {
                   if (!channel().metadata().hasDisconnect()) {
                       next.invokeClose(promise);
                   } else {
                       next.invokeDisconnect(promise);
                   }
               }
           }, promise, null);
       }
       return promise;
   }

(8).close(ChannelPromise)

处理步骤跟bind一样,只是调用方法不同

(9).deregister(ChannelPromise)

处理步骤跟bind一样,只是调用方法不同

4.状态修改,CAS修改

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

   final void setRemoved() {
       handlerState = REMOVE_COMPLETE;
   }

   final void setAddComplete() {
       for (;;) {
           int oldState = handlerState;
           // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
           // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
           // exposing ordering guarantees.
           if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
               return;
           }
       }
   }

   final void setAddPending() {
       boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
       assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
   }

5.DefaultChannelHandlerContext源码分析

ChannelPipeline添加ChannelHandler任务处理时,创建DefaultChannelHandlerContext拼接;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

   private final ChannelHandler handler; //添加ChannelHandler属性

   DefaultChannelHandlerContext(
           DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
       super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
       if (handler == null) {
           throw new NullPointerException("handler");
       }
       this.handler = handler;
   }

   @Override
   public ChannelHandler handler() {
       return handler;
   }

   private static boolean isInbound(ChannelHandler handler) { 
       return handler instanceof ChannelInboundHandler;
   }

   private static boolean isOutbound(ChannelHandler handler) {
       return handler instanceof ChannelOutboundHandler;
   }
}

6.HeadContext源码分析

  • HeadContext中Outbound事件底层都是Unsafe对象去调用;
    • read调用beginRead
  • Inbound事件都都调用next,这里触发点;
  • Inbound事件特殊处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
   
        @Override
           public void read(ChannelHandlerContext ctx) {
               unsafe.beginRead();
           }

        ...

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();//触发等待添加任务
            ctx.fireChannelRegistered();
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelUnregistered();
            //如果通道关闭且未注册,则按顺序删除所有处理程序。调用destroy
            if (!channel.isOpen()) {
                destroy();
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            readIfIsAutoRead();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelReadComplete();
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }
       ...
    }

7.TailContext源码分析

TailContext只实现Inbound事件,主要作用在于资源释放;channelRead

四、ChannelHandler 分析

1.ChannelHandler接口源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface ChannelHandler {
    
    //ChannelPipeline添加ChannelHandler成功,回调通知方法
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    
    //ChannelPipeline添加ChannelHandler删除,回调通知方法
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    //过时,把这个方法移到ChannelInboundHandler接口中
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * 相同实例可以多次添加到一个或多个{ChannelPipeline},而不存在竞争条件。
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

ChannelHandler被注解为@Sharable,全局只有一个handler实例,它会被多个Channel的Pipeline共享,在被多线程并发时,不存在竞争条件;

2.ChannelHandlerAdapter抽象类源码分析

(1).属性

1
2
3
4
5
public abstract class ChannelHandlerAdapter implements ChannelHandler {

    // Not using volatile because it's used only for a sanity check.
    //不要使用volatile,因为它只用于完整性检查。
    boolean added;

(2).isSharable 判断类注解@Sharable

  • 用本地缓存保存缓存Class是否共享注解;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 
    public boolean isSharable() {
        
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);//缓存InternalThreadLocalMap中;
        }
        return sharable;
    }

3.ChannelOutboundHandlerAdapter类源码分析

ChannelOutboundHandlerAdapter类的UML

ChannelOutboundHandler的Outbound事件方法调用ChannelHandlerContext方法;

4.ChannelInboundHandlerAdapter类源码分析

ChannelInboundHandlerAdapter类的UML

ChannelInboundHandlerAdapter的Inbound事件调用ChannelHandlerContext方法;

5.SimpleChannelInboundHandler 允许显式只处理特定类型的消息

SimpleChannelInboundHandler类的UML

(1).属性

1
2
3
4
5
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {

    private final TypeParameterMatcher matcher;//类型参数匹配器
    private final boolean autoRelease; //自动释放资源
}

(2).channelRead

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      boolean release = true;
      try {
          if (acceptInboundMessage(msg)) { //类型判断
              @SuppressWarnings("unchecked")
              I imsg = (I) msg;
              channelRead0(ctx, imsg);//抽象方法,子类实现
          } else {
              release = false;
              ctx.fireChannelRead(msg); //手动触发查找next的channelRead
          }
      } finally {
          if (autoRelease && release) {
              ReferenceCountUtil.release(msg);
          }
      }
  }

五、总结

1.Channelpipeline是线程安全?

ChannelPipeline执行ChannelHandler连时,先判断AbstractChannelHandlerContext中事件执行器是否当前线程,是立即执行,不是提交任务执行ChannelHandler; 导致只有单线程执行ChannelHandler;保障线程是安全;

head没有制定事件执行器,还是用channel的事件执行器,说明在Outbound事件时,channel的事件执行器,不存在多线程处理;线程安全;

2.Channelpipeline执行流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

*                                                 I/O Request
*                                            Channel.writeAndFlush  
*                                                      |
*  +---------------------------------------------------+--------------------+
*  |                           ChannelPipeline         |writeAndFlush       |
*  |                                                  \|/                   |
*  |    +---------------------------+     +------------+---------------+    |
*  |    | Inbound channelRead tail  |     | Outbound writeAndFlush tail|    |
*  |    +----------+----------------+     +------------+---------------+    |
*  |              /|\                                  | writeAndFlush      |
*  |               |fireChannelRead                   \|/                   |
*  |    +----------+----------------+     +------------+---------------+    |
*  |    | Inbound channelRead  next |     | Outbound writeAndFlush prev|    |
*  |    +----------+----------------+     +------------+---------------+    |
*  |              /|\                                  . writeAndFlush      |
*  |               .                                   .                    |
*  |               .                                   .                    |
*  |               . fireChannelRead                  \|/                   |
*  |    +----------+-----------------+    +------------+---------------+    |
*  |    | Inbound channelRead  next  |    | Outbound writeAndFlush prev|    |
*  |    +----------+-----------------+    +------------+---------------+    |
*  |              /|\                                  | writeAndFlush      |
*  |               | fireChannelRead                  \|/                   |
*  |    +----------+-----------------+    +------------+------------ --+    |
*  |    | Inbound channelRead  head  |    | Outbound writeAndFlush prev|    |
*  |    +----------+-----------------+    +------------+---------------+    |
*  |              /|\                                  | flush              |
*  +---------------+-----------------------------------+--------------------+
*                  |fireChannelRead                   \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
* 

写ChannelHandler自己手动触发next(fire开头名称)Inbound事件或者prev(write,bind…)Outbound事件