一、介绍

Netty提供的心跳机制

当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。 然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。

如果在指定的事件没有发生读(写)事件,就会抛出ReadTimeoutHandler(WriteTimeoutHandler)异常, 并自动关闭这个连接。你可以在 exceptionCaught 方法中处理这个异常。

二、IdleStateHandler

1.数据结构

1
2
3
4
5
6
7
8
9
//是否考虑出站时较慢的情况。默认值是false
//发现OOM问题,并且写空闲极少发生(使用了observeOutput为true),那么就需要注意是不是数据发送宽带速度过慢。
private final boolean observeOutput;
//读事件空闲时间,小于等于0 则禁用事件
private final long readerIdleTimeNanos; 
// 写事件空闲时间,小于等于0 则禁用事件
private final long writerIdleTimeNanos;
//读或写空闲时间,小于等于0 则禁用事件
private final long allIdleTimeNanos; 

2.设置超时时间,肯定有调度器调用

利用本身IO线程,例如EventLoop

1
2
3
    ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }

3.channelActive激活Channel,做初始化操作

1
2
3
4
5
6
7
8
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }

a.initialize

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

b.ReaderIdleTimeoutTask 读的任务处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
     @Override
      protected void run(ChannelHandlerContext ctx) {
          long nextDelay = readerIdleTimeNanos;
          //正在读取数据,未读取完成,reading设置true
          if (!reading) {
              nextDelay -= ticksInNanos() - lastReadTime;
          }

          if (nextDelay <= 0) {
              // 重新设置调度任务
              readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

              boolean first = firstReaderIdleEvent;
              firstReaderIdleEvent = false;

              try {
                  
                  IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                  //默认ctx.fireUserEventTriggered事件
                  //ReadTimeoutHandler重新这个方法,实现channel.close
                  channelIdle(ctx, event);
              } catch (Throwable t) {
                  ctx.fireExceptionCaught(t);
              }
          } else {
              // 重新设置调度任务
              readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
          }
      }
  }  

b.WriterIdleTimeoutTask和AllIdleTimeoutTask相似,只是处理的变量不同而已,

c.hasOutputChanged方法,在WriterIdleTimeoutTask调用

  • 验证发送OOM问题
  • 判断依据利用ChannelOutboundBuffer写的缓冲区
    • System.identityHashCode(buf.current());
    • buf.totalPendingWriteBytes();
    • 上面值是相同,说明发送OOM
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
        if (observeOutput) {

            if (lastChangeCheckTimeStamp != lastWriteTime) {
                lastChangeCheckTimeStamp = lastWriteTime;

                // But this applies only if it's the non-first call.
                if (!first) {
                    return true;
                }
            }

            Channel channel = ctx.channel();
            Unsafe unsafe = channel.unsafe();
            ChannelOutboundBuffer buf = unsafe.outboundBuffer();

            if (buf != null) {
                int messageHashCode = System.identityHashCode(buf.current());
                long pendingWriteBytes = buf.totalPendingWriteBytes();

                if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                    lastMessageHashCode = messageHashCode;
                    lastPendingWriteBytes = pendingWriteBytes;

                    if (!first) {
                        return true;
                    }
                }
            }
        }

        return false;
    }

4.更新时间lastReadTime,最终调用channelReadComplete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

5.更新时间lastWriteTime

  • 利用write的ChannelPromise的监听时间去更新;
    • 写把放入缓冲区中,未发送,flush才进行发送
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
       // Allow writing with void promise if handler is only configured for read timeout events.
       if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
           ctx.write(msg, promise.unvoid()).addListener(writeListener);
       } else {
           ctx.write(msg, promise);
       }
   }
   private final ChannelFutureListener writeListener = new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
           lastWriteTime = ticksInNanos();
           firstWriterIdleEvent = firstAllIdleEvent = true;
       }
   };

6.channelInactive方法做销毁操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       destroy();
       super.channelInactive(ctx);
   }
   private void destroy() {
       state = 2;

       if (readerIdleTimeout != null) {
           readerIdleTimeout.cancel(false);
           readerIdleTimeout = null;
       }
       if (writerIdleTimeout != null) {
           writerIdleTimeout.cancel(false);
           writerIdleTimeout = null;
       }
       if (allIdleTimeout != null) {
           allIdleTimeout.cancel(false);
           allIdleTimeout = null;
       }
   } 

7.ReadTimeoutHandler继承IdleStateHandler

  • 重写channelIdle进行Channle关闭操作

8.WriteTimeoutHandler未继承IdleStateHandler,实现有点差异

a. write

  • 为每个write开启调度
  • 调用任务根据ChannelPromise判断是否执行完
    • 未完成,执行Channel.close();
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
       if (timeoutNanos > 0) {
           promise = promise.unvoid();
           scheduleTimeout(ctx, promise);
       }
       ctx.write(msg, promise);
   }

     private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
         // Schedule a timeout.
         final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
         task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
 
         if (!task.scheduledFuture.isDone()) {
             addWriteTimeoutTask(task);
 
             // Cancel the scheduled timeout if the flush promise is complete.
             promise.addListener(task);
         }
     }