一、介绍

Channel是 Netty抽象出来的网络I/O读写相关的接口,为什么不使用JDK NIO原生的Channel而要另起炉灶呢 主要原因如下:

  • (1)JDK的Socketchannel和ServerSocketchannel没有统一的Channel接口供业务开发者使用,对于用户而言,没有统一的API操作,使用起来并不方便。

  • (3) Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于Channelpipeline的定制模型,以及基于元数据描述配置化的TCP参数等, 这些JDK的Socketchannel和 Serversocketchannel都没有提供,需要重新封装;

  • (4)自定义的 Channel,功能实现更加灵活。

基于上述4个原因,Nety重新设计了Channel接口,并且给予了很多不同的实现。它的设计原理比较简单,但是功能却比较繁杂,主要的设计理念如下。

  • (1)在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络IO相关联的其他操作封装起来,统一对外提供。

  • (2) Channel接口的定义尽量大而全,为 Socketchannel和 ServerSocketchannel提供统一的规范,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用

  • (3)具体实现采用聚合而非包含的方式,将相关的功能类聚合在 Channel中,由 Channel统一负责分配和调度,功能实现更加灵活。

二、Channel接口说明

Channel接口方法如图下:

Channel接口中有个Unsafe接口;Unsafe接口实际上是 Channel接口的辅助接口,它不应该被用户代码直接调用。实际 的I/O读写操作都是由Unsafe接口负责完成的

Channel接口实现子类很多,主要讲解NioServerSocketChannel和NioSocketChannel两个类;

NioServerSocketChannel类UML图

NioSocketChannel类UML图

三、AbstractChannel源码分析

1.属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    
    //父Channel 在NIO 用于服务端accept创建NioSocketChannel自定父类
    private final Channel parent;
    private final ChannelId id; //采用默认方式生成的全局唯一ID;
    private final Unsafe unsafe; //Unsafe
    private final DefaultChannelPipeline pipeline; //当前Channel对应DefaultChannelPipeline
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
    private final CloseFuture closeFuture = new CloseFuture(this);

    private volatile SocketAddress localAddress;//本地地址
    private volatile SocketAddress remoteAddress;//远程地址
    private volatile EventLoop eventLoop;        //当前Channel注册EventLoop
    private volatile boolean registered;         //注册状态
    private boolean closeInitiated;              //关闭初始化状态

    /** Cache for the string representation of this channel */
    //缓存此通道的字符串表示形式
    private boolean strValActive;
    private String strVal;
}     

2.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();  //抽象方法,子类去实现
        pipeline = newChannelPipeline();
    }
    protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
  • 主要职责创建DefaultChannelPipeline

3.bind,connect,disconnect,close,read,writed等这些方法都是委托pipeline实现滴;

4.localAddress,remoteAddress方法都是由Unsafe对象去实现

5.AbstractUnsafe源码分析

(1).属性

1
2
3
4
5
6
7
8
9
 protected abstract class AbstractUnsafe implements Unsafe {

        private final ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
        private RecvByteBufAllocator.Handle recvHandle;
        private boolean inFlush0;
        /** true if the channel has never been registered, false otherwise */
        //如果通道从未被注册,则返回true,否则返回false
        private final boolean neverRegistered = true;
}

(2).register只要创建Channel,第一件事就要调用register方法;

主要处理的步骤:

  • EventLoop赋值操作,异步执行调用EventLoop执行,ChannelPromise也是异步提交
  • 调用doRegister()空方法
    • 子类AbstractNioChannel重写方法
  • pipeline.invokeHandlerAddedIfNeeded()添加Channelhandler
  • pipeline.fireChannelRegistered()
  • 是否激活,是pipeline.fireChannelActive()

(3).beginRead

  • pipeline.fireChannelActive()最终调用代码
1
2
3
if (channel.config().isAutoRead()) {
             channel.read();==>最终调用beginRead
 }   
  • doBeginRead抽象方法,子类做NIO事件监听操作
    • NioSocketChannel对应OP_READ
    • NioServerSocketChannel对应OP_ACCEPT

(4).bind 服务绑定端口操作抽象doBind方法子类实现

(5).disconnect 断开

  • doDisconnect 抽象方法
    • NioSocketChannel调用doClose
    • NioServerSocketChannel不支持
  • pipeline.fireChannelInactive()

(6).close

closeExecutor调度执行close,抽象doClose()方法;

(7).deregister

  • 抽象doDeregister
  • pipeline.fireChannelInactive();
  • pipeline.fireChannelUnregistered()

(8).write

  • 往ChannelOutboundBuffer添加消息
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) { //channel关闭中,禁止写数据

                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg); //防止释放msg泄漏。
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);//过滤
                //估算大小 
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);//添加消息
        }

(9).flush 真正往网络发送消息

抽象doWrite子类实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {//channel 关闭
                return;
            }

            outboundBuffer.addFlush();// 添加一个刷新标记
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
            //如果channel非激活状态,将所有挂起的写请求标记为失败。
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer); //抽象方法
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);//写异常,就关闭
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }

6.总结

  • 封装异常处理,提供抽象方法,让子类处理相关的业务
  • ChannelOutboundBuffer处理写操作
  • connect和read未实现,子类实现

四、AbstractNioChannel源码分析

1.属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public abstract class AbstractNioChannel extends AbstractChannel {

    private final SelectableChannel ch;  //包装的JDK Channel
    protected final int readInterestOp; //Read事件,服务端OP_ACCEPT,其他OP_READ
    volatile SelectionKey selectionKey; //对应JDK SelectionKey
    boolean readPending;                //底层读事件进行标记
    //清除readPending标记
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };
}

2.doRegister

获取Java的SelectionKey;

1
2
3
4
5
6
7
8
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
           ...
       }
    }

4.doDeregister

1
2
3
4
    @Override
    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());//eventLoop处理取消
    }

5.doBeginRead 设置当前监听事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) { //选择键被取消而不再有效

            return;
        }

        readPending = true; //设置底层读事件

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            //读事件设置
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

6.doClose

啥没处理,子类重写这个方法

7.AbstractNioUnsafe源码分析

1
 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe

(1).实现NioUnsafe接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  /**
     * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
     */
    public interface NioUnsafe extends Unsafe {
        /**
         * Return underlying {@link SelectableChannel}
         */
        SelectableChannel ch();

        /**
         * 客户端链接服务端成功,调用finishConnect
         */
        void finishConnect();

        /**
         * Read from underlying {@link SelectableChannel}
         */
        void read();
        //立即强制刷新
        void forceFlush();
    }

(2).connect 链接服务端

  • doConnect抽象方法
  • pipeline().fireChannelActive();

(3).finishConnect

  • doFinishConnect()抽象方法
  • pipeline().fireChannelActive();

(4).flush0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
        @Override
        protected final void flush0() {
            //只有在没有等待刷新时立即刷新。如果有一个待定的刷新操作,事件循环将调用forceFlush(),因此现在不需要调用它。
            if (isFlushPending()) {
                return;
            }
            super.flush0();
        }
       private boolean isFlushPending() {
           SelectionKey selectionKey = selectionKey();
           //OP_WRITE表示通道可写
           return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
       }
         

(4).forceFlush 立即强制刷新

1
2
3
4
5
    @Override
        public final void forceFlush() {
            // 立即强制刷新
            super.flush0();
        }  

(5).removeReadOp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
       protected final void removeReadOp() {
           SelectionKey key = selectionKey();
           // Check first if the key is still valid as it may be canceled as part of the deregistration
           // from the EventLoop
           //首先检查键是否仍然有效,因为它可能会被取消,作为从EventLoop注销的一部分
           // See https://github.com/netty/netty/issues/2104
           if (!key.isValid()) {
               return;
           }
           int interestOps = key.interestOps();
           if ((interestOps & readInterestOp) != 0) {
               // only remove readInterestOp if needed
               //只需要删除readInterestOp
               key.interestOps(interestOps & ~readInterestOp);
           }
       }

五、AbstractNioMessageChannel源码分析

服务端AbstractNioMessageChannel

2.doWrite

  • doWriteMessage抽象方法
    • NioServerSocketChannel不支持操作,主要充当ACCEPT,链接客服端角色;

3.NioMessageUnsafe 源码分析

1.属性

1
2
3
4
    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();//doReadMessages用到这个集合
}

2.read读数据

  • doReadMessages抽象方法
    • NioServerSocketChannel实现就是客服端链接成功Channel
  • 运用RecvByteBufAllocator
  • pipeline.fireChannelRead(readBuf.get(i));//触发ChannelRead事件
  • pipeline.fireChannelReadComplete();

六、NioServerSocketChannel源码分析

1.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

    private final ServerSocketChannelConfig config;

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
}

2.doBind

  • backlog 链接队列数量
1
2
3
4
5
6
7
8
   @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

3.doReadMessages 创建NioSocketChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        }...
        return 0;
    }

4.doClose

1
2
3
4
   @Override
   protected void doClose() throws Exception {
       javaChannel().close();
   }

七、AbstractNioByteChannel源码分析

1.构造

1
2
3
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

2.doWrite

  • NioSocketChannel重写
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
       //循环写次数,默认16次
        int writeSpinCount = config().getWriteSpinCount();
        do {
            Object msg = in.current();
            if (msg == null) {  
                // Wrote all messages.
                clearOpWrite(); //清除SelectionKey.OP_WRITE
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
            writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

  protected final void incompleteWrite(boolean setOpWrite) {
        // IO写满了,设置SelectionKey.OP_WRITE事件
        if (setOpWrite) {
            setOpWrite();
        } else {  //writeSpinCount为零情况,数据没写完,提交eventLoop处理
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();   // pipeline.flush();
                    }
                };
            }
            //flushTask任务提交eventLoop
            eventLoop().execute(flushTask);
        }
    }    

a.doWriteInternal 对不同类型写操作

  • ByteBuf调用doWriteBytes抽象方法
  • FileRegion调用doWriteFileRegion方法
  • 写多少字节同步ChannelOutboundBuffer#progress
  • 已经读完调用ChannelOutboundBuffer#remove
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
       if (msg instanceof ByteBuf) {  
           ByteBuf buf = (ByteBuf) msg;
           if (!buf.isReadable()) {
               in.remove();
               return 0;
           }

           final int localFlushedAmount = doWriteBytes(buf);//抽象方法
           if (localFlushedAmount > 0) {
               in.progress(localFlushedAmount); //通知有关写入进度的当前消息的{@link ChannelPromise}。
               if (!buf.isReadable()) {
                   in.remove();
               }
               return 1;
           }
       } else if (msg instanceof FileRegion) {//FileRegion是Netty对NIO底层的FileChannel的封装
           FileRegion region = (FileRegion) msg;
           if (region.transferred() >= region.count()) {
               in.remove();
               return 0;
           }

           long localFlushedAmount = doWriteFileRegion(region);//模版方法
           if (localFlushedAmount > 0) {
               in.progress(localFlushedAmount);
               if (region.transferred() >= region.count()) {
                   in.remove();
               }
               return 1;
           }
       } else {
           // Should not reach here.
           throw new Error();
       }
       return WRITE_STATUS_SNDBUF_FULL;
   } 

3.doWrite0 直接调用doWriteInternal

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

   /**
    * Write objects to the OS.
    * 将对象写入操作系统。
    * @param in the collection which contains objects to write.
    * @return The value that should be decremented from the write quantum which starts at
    * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
    * <ul>
    *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
    *     is encountered</li>
    *     <li>1 - if a single call to write data was made to the OS</li>
    *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
    *     data was accepted</li>
    * </ul>
    * @throws Exception if an I/O exception occurs during write.
    */
   protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
       Object msg = in.current();
       if (msg == null) {
           // Directly return here so incompleteWrite(...) is not called.
           return 0;
       }
       return doWriteInternal(in, in.current());
   }

4.NioByteUnsafe 源码分析

(1).read

  • RecvByteBufAllocator的实现类AdaptiveRecvByteBufAllocator构建一个最优大小的缓冲区来接收数据
  • doReadBytes抽象方法
  • pipeline.fireChannelRead(byteBuf)
  • pipeline.fireChannelReadComplete();
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();//allocHandle不为空情况config().getRecvByteBufAllocator().newHandle();
            //重置统计信息
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                   //获取读取大小
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));//doReadBytes抽象方法
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();//释放
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            //当我们收到EOF时,没有什么可以阅读的。
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//触发ChannelRead事件,用户处理

                    byteBuf = null;
                } while (allocHandle.continueReading()); //判断是否继续读取
                //重新计算下次读取的大小
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
        private void closeOnRead(ChannelPipeline pipeline) {
            if (!isInputShutdown0()) { //默认是false
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
                    shutdownInput();  //抽象方法
                    pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                } else {
                    close(voidPromise());
                }
            } else {
                pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
            }
        }

        private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                RecvByteBufAllocator.Handle allocHandle) {
            if (byteBuf != null) {
                if (byteBuf.isReadable()) {
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                } else {
                    byteBuf.release();
                }
            }
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            pipeline.fireExceptionCaught(cause);
            if (close || cause instanceof IOException) {
                closeOnRead(pipeline);
            }
        }
       

八、NioSocketChannel源码分析

1.构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a socket.", e);
        }
    }

    private final SocketChannelConfig config;

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }
}

2.doConnect 链接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

4.doFinishConnect

1
2
3
4
5
   protected void doFinishConnect() throws Exception {
         if (!javaChannel().finishConnect()) {
             throw new Error();
         }
     }

5.doReadBytes

1
2
3
4
5
6
    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }

6.doWriteBytes

1
2
3
4
5
    @Override
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    } 

7.doWriteFileRegion transferTo运用零CP

1
2
3
4
5
    @Override
    protected long doWriteFileRegion(FileRegion region) throws Exception {
        final long position = region.transferred();
        return region.transferTo(javaChannel(), position);
    } 

6.doWrite

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        //循环写次数,默认16次
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

            // Ensure the pending writes are made of ByteBufs only.
            //确保挂起写入操作仅由ByteBufs组成。
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    //除了ByteBuffers之外,FileRegion正常的写入。
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // Only one ByteBuf so use non-gathering write
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    //只有一个ByteBuf使用非收集写入零长度缓冲区不会被ChannelOutboundBuffer添加到nioBuffers,因此不需要检查所有缓冲区的总大小是否非零。
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    //每次调整写入的最大字节数
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                    // to check if the total size of all the buffers is non-zero.
                    // We limit the max amount to int above so cast is safe
                    //零长度缓冲区不会被ChannelOutboundBuffer添加到nioBuffers,所以不需要检查所有缓冲区的总大小是否非零。 
                    // 我们把最大的数量限制在int以上,所以cast是安全的
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    //每次调整写入的最大字节数
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

a.每次调整写入的最大字节数(adjustMaxBytesPerGatheringWrite)

优点:

  • 尽可能地根据操作系统行为的变化进行调整;
  • 组装大的内容,不要分多个批次,提高点性能;

处理逻辑:

  • 预计写的字节大小跟已写的字节数相同,设置为预计处理的两倍
  • 预计写的字节大小大于4096且预计写的字节的一半大于已写的字节数相同,设置为预计处理的一半
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
       // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
       // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
       // make a best effort to adjust as OS behavior changes.
       //默认情况下,我们跟踪SO_SNDBUF,当它明确设置。然而,一些操作系统可能会动态地改变SO_SNDBUF(以及其他决定可以一次写入多少数据的特性)
       // ,所以我们应该尽可能地根据操作系统行为的变化进行调整。
       // 预计写的字节大小跟已写的字节数相同,设置为预计处理的两倍 
       if (attempted == written) {
           if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
               ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
           }
        // 4096
        // 预计写的字节大小大于4096且预计写的字节的一半大于已写的字节数相同,设置为预计处理的一半  
       } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
           ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
       }
   } 

九、ChannelOutboundBuffer

写的缓冲区,调用write时候,只是把发送消息添加到ChannelOutboundBuffer缓冲区,调用flush真正发送

1.内部数据结构

1
2
3
 private Entry flushedEntry; // flush 开始第一个节点,Entry是链表
 private Entry unflushedEntry;// 被添加的开始节点,但没有准备好被消费。
 private Entry tailEntry;// 最后一个节点,添加 

三个对象变量的处理逻辑,write调用addMessage方法的时候,创建Entry将这个Entry追加到TailEntry节点后面;unflushedEntry为空把当前赋值给unflushedEntry; flush调用addFlush的时候,将unflushedEntry的引用赋给flushedEntry,然后将unflushedEntry置为null。

2.addMessage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

(1).写一直写下去,不调用调用Flush,发生OOM;

unwritable变量控制fireChannelWritabilityChanged事件;用户自己实现这块,控制write频率;

a.incrementPendingOutboundBytes 写的缓冲区大于写的缓冲区的高水位就修改unwritable变量

  • getWriteBufferHighWaterMark 写的缓冲区的高水位,默认64k
1
2
3
4
5
6
7
8
9
  private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
      if (size == 0) {
          return;
      }
      long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
      if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
          setUnwritable(invokeLater);
      }
  }  

(2).写的缓冲区的高水位降下去,有没有通知我加快write频率呢?肯定有,这个就是所谓写的缓冲区的低水位,这样会触发fireChannelWritabilityChanged

删除(remove)写缓存记录是调用decrementPendingOutboundBytes,控制缓冲区低水位

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
   private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
       if (size == 0) {
           return;
       }

       long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
       if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
           setWritable(invokeLater);
       }
   } 

3.addFlush

动机把unflushedEntry赋值给flushedEntry;DefaultChannelPipeline在做inEventLoop验证,不存在并发写;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   public void addFlush() {
       // There is no need to process all entries if there was already a flush before and no new messages
       // where added in the meantime.
       //
       // See https://github.com/netty/netty/issues/2577
       Entry entry = unflushedEntry;
       if (entry != null) {
           //动机可以控制一边写一边flushed,上一批没写完,没关系,Entry是链表往下查找;
           if (flushedEntry == null) {
               // there is no flushedEntry yet, so start with the entry
               flushedEntry = entry;
           }
           do {
              //验证entry是否取消动作
               flushed ++;
               if (!entry.promise.setUncancellable()) {
                   // Was cancelled so make sure we free up memory and notify about the freed bytes
                   int pending = entry.cancel();
                   decrementPendingOutboundBytes(pending, false, true);
               }
               entry = entry.next;
           } while (entry != null);

           // All flushed so reset unflushedEntry
           unflushedEntry = null;
       }
   } 

4.Entry结构

运用对象池;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   static final class Entry {
       private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
           @Override
           protected Entry newObject(Handle<Entry> handle) {
               return new Entry(handle);
           }
       };

       private final Handle<Entry> handle;
       Entry next;
       Object msg;
       //这个NIO转化ByteBuffer
       ByteBuffer[] bufs;
       ByteBuffer buf;
       ChannelPromise promise;
       long progress;
       long total;
       int pendingSize;
       int count = -1;
       boolean cancelled;
} 

1.创建

1
2
3
4
5
6
7
8
9
  static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
       Entry entry = RECYCLER.get();
       entry.msg = msg;
       // CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD:96,
       entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
       entry.total = total;
       entry.promise = promise;
       return entry;
   } 

a.pendingSize加96,96是啥呢?

1
2
3
4
5
6
7
   // Assuming a 64-bit JVM:
   //  - 16 bytes object header
   //  - 8 reference fields ==>改成 6 reference fields
   //  - 2 long fields
   //  - 2 int fields
   //  - 1 boolean field   ==> 4bytes
   //  - padding 

6.nioBuffers 提供这个,主要NIO的写必须用ByteBuf转化ByteBuffer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
   public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
       assert maxCount > 0;
       assert maxBytes > 0;
       long nioBufferSize = 0;
       int nioBufferCount = 0;
       final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
       ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
       Entry entry = flushedEntry;
       while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
           if (!entry.cancelled) {
               ByteBuf buf = (ByteBuf) entry.msg;
               final int readerIndex = buf.readerIndex();
               final int readableBytes = buf.writerIndex() - readerIndex;

               if (readableBytes > 0) {
                   if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                       // If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
                       // we stop populate the ByteBuffer array. This is done for 2 reasons:
                       // 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
                       // and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
                       // on the architecture and kernel but to be safe we also enforce the limit here.
                       // 2. There is no sense in putting more data in the array than is likely to be accepted by the
                       // OS.
                       // 如果nioBufferSize + readableBytes将溢出maxBytes,并且至少存在一个条目,我们将停止填充ByteBuffer数组。这样做有两个原因:
                       //  1. bsd / osx不允许通过一次writev(...)调用写入更多字节,然后是Integer.MAX_VALUE,因此将返回'EINVAL',这将引发IOException。
                       //     在Linux上,它可能会根据体系结构和内核而起作用,但是为了安全起见,我们在此处也施加了限制。 
                       //    2.在阵列中放入比操作系统可能接受的更多的数据没有任何意义
                       // See also:
                       // - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
                       // - http://linux.die.net/man/2/writev
                       break;
                   }
                   nioBufferSize += readableBytes;
                   int count = entry.count;
                   if (count == -1) {
                       //noinspection ConstantValueVariableUse
                       entry.count = count = buf.nioBufferCount();
                   }
                   int neededSpace = min(maxCount, nioBufferCount + count);
                   if (neededSpace > nioBuffers.length) {
                       //扩容操作,在缓存到本地缓存中,可以重复利用
                       nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                       NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                   }
                   if (count == 1) {
                       ByteBuffer nioBuf = entry.buf;
                       if (nioBuf == null) {
                           // 缓存ByteBuffer,因为如果它是派生的缓冲区,则可能需要创建一个新的ByteBuffer实例
                           // DirectByteBuffer#duplicate创建一个新的ByteBuffer
                           entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                       }
                       nioBuffers[nioBufferCount++] = nioBuf;
                   } else {
                       ByteBuffer[] nioBufs = entry.bufs;
                       if (nioBufs == null) {
                           // cached ByteBuffers as they may be expensive to create in terms
                           // of Object allocation
                           entry.bufs = nioBufs = buf.nioBuffers();
                       }
                       for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
                           ByteBuffer nioBuf = nioBufs[i];
                           if (nioBuf == null) {
                               break;
                           } else if (!nioBuf.hasRemaining()) {
                               continue;
                           }
                           nioBuffers[nioBufferCount++] = nioBuf;
                       }
                   }
                   if (nioBufferCount == maxCount) {
                       break;
                   }
               }
           }
           entry = entry.next;
       }
       this.nioBufferCount = nioBufferCount;
       this.nioBufferSize = nioBufferSize;

       return nioBuffers;
   }

a.InternalThreadLocalMap 本地添加缓冲区

7.removeBytes 按字节删除Entry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
   public void removeBytes(long writtenBytes) {
       for (;;) {
           Object msg = current();
           if (!(msg instanceof ByteBuf)) {
               assert writtenBytes == 0;
               break;
           }

           final ByteBuf buf = (ByteBuf) msg;
           final int readerIndex = buf.readerIndex();
           final int readableBytes = buf.writerIndex() - readerIndex;

           if (readableBytes <= writtenBytes) {
               if (writtenBytes != 0) {
                   //ChannelProgressivePromise处理
                   progress(readableBytes);
                   writtenBytes -= readableBytes;
               }
               //删除Entry
               //ChannelPromise成功处理,异步处理
               remove();
           } else { // readableBytes > writtenBytes
               if (writtenBytes != 0) {
                   buf.readerIndex(readerIndex + (int) writtenBytes);
                   progress(writtenBytes);
               }
               break;
           }
       }
       //nioBufferCount赋值为零,已经本地缓存初始化操作,赋值为空
       clearNioBuffers();
   } 

a.writtenBytes写的字节小于Entry,Entry中还有数据要发送,主要ByteBuffer,在写时候把读位置也变动,这里不用去初始化操作;

十、RecvByteBufAllocator

分配一个新的接收缓冲区,其容量可能足够大以读取所有数据,而又足够小以不会浪费其空间。

NioSocketChannel读取默认使用AdaptiveRecvByteBufAllocator

1.RecvByteBufAllocator接口类图

  • guess 猜下次该用多大的接收缓冲区
  • reset 重置一些统计参数
  • 其他API名称顾名思义知道啥意思

2.AdaptiveRecvByteBufAllocator

根据反馈值自动增加和减少预测的缓冲区大小。

如果先前的读取完全填满了分配的缓冲区,它将逐渐增加预期的可读字节数。如果读取操作无法连续两次填充一定数量的已分配缓冲区, 则会逐渐减少预期的可读字节数。否则,它将继续返回相同的预测。

a.类图

b.newHandle 直接创建HandleImpl

3.AdaptiveRecvByteBufAllocator#HandleImpl

a.类图结构

b.MaxMessageHandle

  • 读取字节数统计
  • 继续读的判断(continueReading)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
   public abstract class MaxMessageHandle implements ExtendedHandle {
       private ChannelConfig config;
       //读取消息的次数,默认1次
       private int maxMessagePerRead;
       private int totalMessages;
       private int totalBytesRead;
       private int attemptedBytesRead;//预计读取字节数
       private int lastBytesRead;
       //是否读取更多数据
       private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
       private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
           @Override
           public boolean get() {
               return attemptedBytesRead == lastBytesRead;
           }
       }; 

(1).reset重置统计的数据

  • 重置总读取的记录以及总字节数量
1
2
3
4
5
6
     @Override
      public void reset(ChannelConfig config) {
          this.config = config;
          maxMessagePerRead = maxMessagesPerRead();
          totalMessages = totalBytesRead = 0;
      }  

(2).continueReading继续读

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
       @Override
       public boolean continueReading() {
           return continueReading(defaultMaybeMoreSupplier);
       }

       @Override
       public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
           return config.isAutoRead() &&
                  (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                  totalMessages < maxMessagePerRead &&
                  totalBytesRead > 0;
       } 
  • respectMaybeMoreData为false不用验证预计读取的字节大小

c.数据结构

1
2
3
4
5
6
7
8
 //三个索引干嘛呢??
 private final int minIndex;
 private final int maxIndex;
 private int index;
 //获取读取容量大小
 private int nextReceiveBufferSize;
 //立即减少状态
 private boolean decreaseNow; 

d.AdaptiveRecvByteBufAllocator的类变量定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    private static final int[] SIZE_TABLE;

    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

SIZE_TABLE缓存大小,大小按区域,跟内存池划分有点相似;

  • [16,512)按累加16
  • [512,Int.Max)两倍增加

上面三个索引干嘛呢?就是保存缓存的下标,主要就是获取大小值;

(1).给大小值,是怎么获取大小呢?可能不在等于缓存值,是去最近小于还是最近大于;使用getSizeTableIndex方法

  • 实现最常见二分查找
  • size不在缓存中的值,取最近大于值的下标
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 private static int getSizeTableIndex(final int size) {
       for (int low = 0, high = SIZE_TABLE.length - 1;;) {
           if (high < low) {
               return low;
           }
           if (high == low) {
               return high;
           }

           int mid = low + high >>> 1;
           int a = SIZE_TABLE[mid];
           int b = SIZE_TABLE[mid + 1];
           if (size > b) {
               low = mid + 1;
           } else if (size < a) {
               high = mid - 1;
           } else if (size == a) {
               return mid;
           } else {
               return mid + 1;
           }
       }
   } 

e.lastBytesRead记录读取的大小

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
       @Override
       public void lastBytesRead(int bytes) {
           // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
           // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
           // the selector to check for more data. Going back to the selector can add significant latency for large
           // data transfers.
           // 如果预计读取的大小等于实际读取大小相同,则应检查是否需要增加下一个读取的大小。
           // 当有大量数据待处理时,这有助于更快地进行调整,并且可以避免返回选择器以检查更多数据。
           // 返回选择器可能会增加大量数据传输的延迟
           if (bytes == attemptedBytesRead()) {
               record(bytes);
           }
           super.lastBytesRead(bytes);
       } 

f.record 计算下一个读取的大小

  • 实际读取大小[小于等于] SIZE_TABLE [index-2]
    • nextReceiveBufferSize等于 SIZE_TABLE [index-1],最小值判断
  • 实际读取大小[大于等于] nextReceiveBufferSize
    • nextReceiveBufferSize 等于 SIZE_TABLE [index+4],最大值判断
  • 减少读取大小,decreaseNow判断,进行二次操作;
  • minIndex和maxIndex控制上下限制,默认1024和65536(64k)大小,调用getSizeTableIndex获取下标
  • index默认值2,默认大小64;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    private static final int INDEX_INCREMENT = 4;
    private static final int INDEX_DECREMENT = 1;
    private void record(int actualReadBytes) {
           if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
               if (decreaseNow) {
                   index = max(index - INDEX_DECREMENT, minIndex);
                   nextReceiveBufferSize = SIZE_TABLE[index];
                   decreaseNow = false;
               } else {
                   decreaseNow = true;
               }
           } else if (actualReadBytes >= nextReceiveBufferSize) {
               index = min(index + INDEX_INCREMENT, maxIndex);
               nextReceiveBufferSize = SIZE_TABLE[index];
               decreaseNow = false;
           }
       } 

g.readComplete 调用record,计算下次读取的大小

1
2
3
4
    @Override
       public void readComplete() {
           record(totalBytesRead());
       } 

h.allocate获取内存空间

MaxMessageHandle#allocate

1
2
3
4
  @Override
  public ByteBuf allocate(ByteBufAllocator alloc) {
      return alloc.ioBuffer(guess());
  }

HandleImpl#guess

1
2
3
4
 @Override
 public int guess() {
     return nextReceiveBufferSize;
 } 

就是获取下次获取内存大小分配内存,最终逻辑交给record处理;在lastBytesRead方法和readComplete方法做调用record计算;