【注意】最后更新于 April 20, 2019,文中内容可能已过时,请谨慎使用。
一、介绍
Netty提供的心跳机制
当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。
然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。
如果在指定的事件没有发生读(写)事件,就会抛出ReadTimeoutHandler(WriteTimeoutHandler)异常,
并自动关闭这个连接。你可以在 exceptionCaught 方法中处理这个异常。
二、IdleStateHandler
1.数据结构
1
2
3
4
5
6
7
8
9
|
//是否考虑出站时较慢的情况。默认值是false
//发现OOM问题,并且写空闲极少发生(使用了observeOutput为true),那么就需要注意是不是数据发送宽带速度过慢。
private final boolean observeOutput;
//读事件空闲时间,小于等于0 则禁用事件
private final long readerIdleTimeNanos;
// 写事件空闲时间,小于等于0 则禁用事件
private final long writerIdleTimeNanos;
//读或写空闲时间,小于等于0 则禁用事件
private final long allIdleTimeNanos;
|
2.设置超时时间,肯定有调度器调用
利用本身IO线程,例如EventLoop
1
2
3
|
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
|
3.channelActive激活Channel,做初始化操作
1
2
3
4
5
6
7
8
|
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
|
a.initialize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
|
b.ReaderIdleTimeoutTask 读的任务处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
//正在读取数据,未读取完成,reading设置true
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// 重新设置调度任务
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
//默认ctx.fireUserEventTriggered事件
//ReadTimeoutHandler重新这个方法,实现channel.close
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 重新设置调度任务
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
|
b.WriterIdleTimeoutTask和AllIdleTimeoutTask相似,只是处理的变量不同而已,
c.hasOutputChanged方法,在WriterIdleTimeoutTask调用
- 验证发送OOM问题
- 判断依据利用ChannelOutboundBuffer写的缓冲区
- System.identityHashCode(buf.current());
- buf.totalPendingWriteBytes();
- 上面值是相同,说明发送OOM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
}
}
return false;
}
|
4.更新时间lastReadTime,最终调用channelReadComplete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
}
|
5.更新时间lastWriteTime
- 利用write的ChannelPromise的监听时间去更新;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// Allow writing with void promise if handler is only configured for read timeout events.
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ctx.write(msg, promise.unvoid()).addListener(writeListener);
} else {
ctx.write(msg, promise);
}
}
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
|
6.channelInactive方法做销毁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
}
private void destroy() {
state = 2;
if (readerIdleTimeout != null) {
readerIdleTimeout.cancel(false);
readerIdleTimeout = null;
}
if (writerIdleTimeout != null) {
writerIdleTimeout.cancel(false);
writerIdleTimeout = null;
}
if (allIdleTimeout != null) {
allIdleTimeout.cancel(false);
allIdleTimeout = null;
}
}
|
7.ReadTimeoutHandler继承IdleStateHandler
- 重写channelIdle进行Channle关闭操作
8.WriteTimeoutHandler未继承IdleStateHandler,实现有点差异
a. write
- 为每个write开启调度
- 调用任务根据ChannelPromise判断是否执行完
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (timeoutNanos > 0) {
promise = promise.unvoid();
scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise);
}
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
// Schedule a timeout.
final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
if (!task.scheduledFuture.isDone()) {
addWriteTimeoutTask(task);
// Cancel the scheduled timeout if the flush promise is complete.
promise.addListener(task);
}
}
|