一、ByteToMessageDecoder

1.Cumulator累加器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
   /**
    * 累积ByteBuf
    */
   public interface Cumulator {
       /**
        * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
        * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
        * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
        */
       ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
   }

a.MERGE_CUMULATOR 合并累加器,累加到同一个ByteBuf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            final ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // 当缓冲区中没有更多空间或者如果refCnt大于1时扩展累积(通过替换),当用户使用slice()。retain()或plicate()。retain()或如果只读。
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                //重新分配内存空间
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

b.COMPOSITE_CUMULATOR 利用CompositeByteBuf把ByteBuf组装,不用重新分配内存,相对性能好

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
       @Override
       public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
           ByteBuf buffer;
           if (cumulation.refCnt() > 1) {
               // 当refCnt大于1时扩展累积量(通过替换)(当用户使用slice()。retain()或plicate()。retain()时可能会发生)。
               //
               // See:
               // - https://github.com/netty/netty/issues/2327
               // - https://github.com/netty/netty/issues/1764
               buffer = expandCumulation(alloc, cumulation, in.readableBytes());
               buffer.writeBytes(in);
               in.release();
           } else {
               CompositeByteBuf composite;
               if (cumulation instanceof CompositeByteBuf) {
                   composite = (CompositeByteBuf) cumulation;
               } else {
                   composite = alloc.compositeBuffer(Integer.MAX_VALUE);
                   composite.addComponent(true, cumulation);
               }
               composite.addComponent(true, in);
               buffer = composite;
           }
           return buffer;
       }
   }; 

2.channelRead

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if (msg instanceof ByteBuf) {
           //保存编码成对象的集合
           CodecOutputList out = CodecOutputList.newInstance();
           try {
               ByteBuf data = (ByteBuf) msg;
               first = cumulation == null;
               if (first) {
                   cumulation = data;
               } else {
                   cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
               }
               callDecode(ctx, cumulation, out);
           } catch (DecoderException e) {
               throw e;
           } catch (Exception e) {
               throw new DecoderException(e);
           } finally {
               if (cumulation != null && !cumulation.isReadable()) {
                   numReads = 0;
                   cumulation.release();
                   cumulation = null;
               } else if (++ numReads >= discardAfterReads) {
                   // 我们已经进行了足够的读取,并尝试丢弃一些字节,因此我们不会冒看到OOME的风险。
                   // See https://github.com/netty/netty/issues/4275
                   numReads = 0;
                   discardSomeReadBytes();
               }

               int size = out.size();
               decodeWasNull = !out.insertSinceRecycled();
               fireChannelRead(ctx, out, size);
               out.recycle();
           }
       } else {
           ctx.fireChannelRead(msg);
       }
   } 

a.callDecode 调用解码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    //循环调用ctx.fireChannelRead(out.getUnsafe(i));
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // 在继续解码之前,请检查是否已删除此处理程序。如果将其删除,则继续在缓冲区上操作是不安全的。
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                //最终调用decode抽象方法,按子类解码
                decodeRemovalReentryProtection(ctx, in, out);

                // 在继续循环之前,请检查是否已删除此处理程序。如果将其删除,则继续在缓冲区上操作是不安全的。
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
                
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(...);
                }
                 //是否只处理一次解码
                if (isSingleDecode()) {
                    break;
                }
            }
        }...
    }

b.decodeRemovalReentryProtection

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
           throws Exception {
       decodeState = STATE_CALLING_CHILD_DECODE;
       try {
           //抽象方法子类去处理解码
           decode(ctx, in, out);
       } finally {
           boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
           decodeState = STATE_INIT;
           if (removePending) {
               handlerRemoved(ctx);
           }
       }
   } 

c.handlerRemoved 移除ChannelHandler操作,做清除操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    @Override
    public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (decodeState == STATE_CALLING_CHILD_DECODE) {
            decodeState = STATE_HANDLER_REMOVED_PENDING;
            return;
        }
        ByteBuf buf = cumulation;
        if (buf != null) {
            // 直接将其设置为null,因此我们确定在此不再使用任何其他方法访问它
            cumulation = null;

            int readable = buf.readableBytes();
            if (readable > 0) {
                ByteBuf bytes = buf.readBytes(readable);
                buf.release();
                ctx.fireChannelRead(bytes);
            } else {
                buf.release();
            }

            numReads = 0;
            ctx.fireChannelReadComplete();
        }
        //钩子方法
        handlerRemoved0(ctx);
    }

3.channelReadComplete 读取完,做优化处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;
        //cumulation.discardSomeReadBytes();释放一些内存
        discardSomeReadBytes();
        if (decodeWasNull) {
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }

4.实现类

  • FixedLengthFrameDecoder ,基于固定长度消息进行粘包拆包处理的。
    • frameLength 指定长度
    • in.readRetainedSlice(frameLength)读取内容
  • LengthFieldBasedFrameDecoder ,基于消息头指定消息长度进行粘包拆包处理的。
  • LineBasedFrameDecoder ,基于换行来进行消息粘包拆包处理的。
  • DelimiterBasedFrameDecoder ,基于指定消息边界方式进行粘包拆包处理的。