【注意】最后更新于 April 25, 2019,文中内容可能已过时,请谨慎使用。
一、介绍
由于netty管理100w+的连接,用IdleStateHandler心跳检测存在问题:
- 毋庸置疑占用IO线程;
- 超时间不精准,例如:NioEventLoop配置ioRatio,默认50,
处理IO事件花费100MS,处理相应队列中任务也只能花费100ms,觉得100w+数据能处理完吗?导致下次轮询执行;
解决方案,引用时间轮;
1.原理
时间轮其实就是一种环形的数据结构,可以想象成时钟,分成很多格子,一个格子代码一段时间(这个时间越短,Timer的精度越高)。
如果任务的时间跨度很大,数量很大,单层的时间轮会造成任务的round很大,单个格子的链表很长。这时候可以将时间轮分层,类似于时钟的时分秒3层。
秒,分,时;
二、HashedWheelTimer
HashedWheelTimer实现单层时间轮
1.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//执行任务线程,指针转动到那个格子
private final Worker worker = new Worker();
private final Thread workerThread;
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
//运行状态 0 - 初始化, 1 - 开始, 2 - 关闭
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
//滴答间隔数据,就是类似秒钟,1秒执行一次,
private final long tickDuration;
//环形的数据结构
private final HashedWheelBucket[] wheel;
//超时任务队列
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
//取消超时任务队列
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
|
2.构造方法
- 创建环形的数据结构HashedWheelBucket
- 设置滴答时间
- 创建线程执行任务,滴答时间到那个格子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
...
// 创建环形的数据结构HashedWheelBucket,ticksPerWheel大小必须2次方
wheel = createWheel(ticksPerWheel);
//掩码快速定位下标索引
mask = wheel.length - 1;
//滴答间隔数据,就是类似秒钟,1秒执行一次,
this.tickDuration = unit.toNanos(tickDuration);
// 防止溢出。
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(...);
}
//创建工作线程
workerThread = threadFactory.newThread(worker);
//内存泄漏检测
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
//最大等待超时
this.maxPendingTimeouts = maxPendingTimeouts;
//创建HashedWheelTimer对象大于64个实例,打印异常
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
|
3.HashedWheelBucket 管理超时时间任务
管理超时时间任务(HashedWheelTimeout),HashedWheelTimeout是双链表结构;支持快速删除操作;
a.数据结构
- head 从前面查询数据
- tail 从后面查询数据
1
2
|
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
|
b.addTimeout 添加超时任务
1
2
3
4
5
6
7
8
9
10
11
|
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
|
c.remove 删除操作
双链表删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// 设置为null,进行GC
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
|
d.expireTimeouts
- 执行超时任务
- 判断是否取消
- 上面操作都进行删除操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// HashedWheelTimeout.remainingRounds轮询次数
// 根据超时时间进行轮询次数,每调用一次减一次,减到到零,任务已经到期,执行任务;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);//移除
if (timeout.deadline <= deadline) {
//执行任务
timeout.expire();
} else {
// 放错槽
throw new IllegalStateException(...);
}
} else if (timeout.isCancelled()) { //取消
next = remove(timeout);//移除
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}
|
4.HashedWheelTimeout
a.接口类图

b.结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
//超时时间
private final long deadline;
// 状态 0-初始化 1 -取消 2 -过期
private volatile int state = ST_INIT;
// 轮询次数
// Worker.transferTimeoutsToBuckets()执行这个值
long remainingRounds;
// 运用锁添加超时任务,不用volatile修饰
HashedWheelTimeout next;
HashedWheelTimeout prev;
private final HashedWheelTimer timer;
private final TimerTask task;
// The bucket to which the timeout was added
HashedWheelBucket bucket;
|
c.expire 执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void expire() {
//修改状态
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
执行任务
task.run(this);
} catch (Throwable t) {
...
}
}
|
d.cancel取消
1
2
3
4
5
6
7
8
9
|
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
//如果应该取消一项任务,我们将其放入另一个队列,该队列将在每个刻度上处理。
timer.cancelledTimeouts.add(this);
return true;
}
|
5.Worker
a.run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
//滴答记录
private long tick;
@Override
public void run() {
// 初始化startTime。
startTime = System.nanoTime();
if (startTime == 0) {
// 我们在这里使用0作为未初始化值的指标,因此请确保在初始化时它不为0。
startTime = 1;
}
// 在start()处通知其他线程等待初始化。
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
//添加超时任务
transferTimeoutsToBuckets();
//执行超时任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//往unprocessedTimeouts集合添加未处理的超时,以便我们可以从stop()方法返回它们。
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
|
b.waitForNextTick
从startTime和当前滴答数计算目标nanoTime,然后等待直到达到该目标。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// 检查我们是否在Windows上运行,如果那样的话,
//我们将需要把sleepTime四舍五入为一个错误的解决方法,该错误仅在JVM在Windows上运行时才会影响JVM。
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
|
b.processCancelledTasks 处理取消任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private void processCancelledTasks() {
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
...
}
}
}
|
c.transferTimeoutsToBuckets 添加超时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private void transferTimeoutsToBuckets() {
// 每次最多只能100000处理超时任务,以防止线程在循环中添加新超时时使workerThread过时。
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
//计算滴答轮询次数,这个核心的核心
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // 确保我们不安排过去的时间。
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
|
6.start 启动
- WORKER_STATE_INIT -> WORKER_STATE_STARTED
- workerThread.start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待直到startTime由工作程序初始化。
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
}
}
}
|
7.stop 暂停返回未处理超时任务
- workerThread不能处理stop
- WORKER_STATE_STARTED -> WORKER_STATE_SHUTDOWN
- workerThread.interrupt(); 中断线程
- workerThread.join(100) 等待执行stop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(...);
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();
}
|
8.newTimeout 创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
...
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException(...);
}
start();
// 将超时添加到超时队列中,该队列将在下一个滴答中处理。
// 在处理期间,所有排队的HashedWheelTimeouts将添加到正确的HashedWheelBucket。
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
|
三、总结
- 超时任务队列,添加任务时,添加队列才有并发情况,任务分配到到那个桶,在workerThread处理,不存在并发情况
- 单层时间轮
- 有sleep时间,这时间可以处理,任务分配到到那个桶和取消超时任务;可以提高性能;我个人建议,sleep时间短没必要
- JCTool高性能队列的使用