一、介绍

由于netty管理100w+的连接,用IdleStateHandler心跳检测存在问题:

  • 毋庸置疑占用IO线程;
  • 超时间不精准,例如:NioEventLoop配置ioRatio,默认50, 处理IO事件花费100MS,处理相应队列中任务也只能花费100ms,觉得100w+数据能处理完吗?导致下次轮询执行;

解决方案,引用时间轮;

1.原理

时间轮其实就是一种环形的数据结构,可以想象成时钟,分成很多格子,一个格子代码一段时间(这个时间越短,Timer的精度越高)。

如果任务的时间跨度很大,数量很大,单层的时间轮会造成任务的round很大,单个格子的链表很长。这时候可以将时间轮分层,类似于时钟的时分秒3层。 秒,分,时;

二、HashedWheelTimer

HashedWheelTimer实现单层时间轮

1.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    //执行任务线程,指针转动到那个格子
    private final Worker worker = new Worker();
    private final Thread workerThread;

    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;
    //运行状态  0 - 初始化, 1 - 开始, 2 - 关闭
    private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
    
    //滴答间隔数据,就是类似秒钟,1秒执行一次, 
    private final long tickDuration;
    //环形的数据结构
    private final HashedWheelBucket[] wheel;
    //超时任务队列
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    //取消超时任务队列
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

2.构造方法

  • 创建环形的数据结构HashedWheelBucket
  • 设置滴答时间
  • 创建线程执行任务,滴答时间到那个格子
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
   public HashedWheelTimer(
           ThreadFactory threadFactory,
           long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
           long maxPendingTimeouts) {

       ...

       // 创建环形的数据结构HashedWheelBucket,ticksPerWheel大小必须2次方
       wheel = createWheel(ticksPerWheel);
      //掩码快速定位下标索引
       mask = wheel.length - 1;

       //滴答间隔数据,就是类似秒钟,1秒执行一次, 
       this.tickDuration = unit.toNanos(tickDuration);

       // 防止溢出。
       if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
           throw new IllegalArgumentException(...);
       }
       //创建工作线程
       workerThread = threadFactory.newThread(worker);
       //内存泄漏检测
       leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
       //最大等待超时
       this.maxPendingTimeouts = maxPendingTimeouts;
       //创建HashedWheelTimer对象大于64个实例,打印异常
       if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
           WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
           reportTooManyInstances();
       }
   }

3.HashedWheelBucket 管理超时时间任务

管理超时时间任务(HashedWheelTimeout),HashedWheelTimeout是双链表结构;支持快速删除操作;

a.数据结构

  • head 从前面查询数据
  • tail 从后面查询数据
1
2
 private HashedWheelTimeout head;
 private HashedWheelTimeout tail; 

b.addTimeout 添加超时任务

  • 就是双链表添加操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
       public void addTimeout(HashedWheelTimeout timeout) {
           assert timeout.bucket == null;
           timeout.bucket = this;
           if (head == null) {
               head = tail = timeout;
           } else {
               tail.next = timeout;
               timeout.prev = tail;
               tail = timeout;
           }
       } 

c.remove 删除操作

双链表删除操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
        public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
            HashedWheelTimeout next = timeout.next;
            // remove timeout that was either processed or cancelled by updating the linked-list
            if (timeout.prev != null) {
                timeout.prev.next = next;
            }
            if (timeout.next != null) {
                timeout.next.prev = timeout.prev;
            }

            if (timeout == head) {
                // if timeout is also the tail we need to adjust the entry too
                if (timeout == tail) {
                    tail = null;
                    head = null;
                } else {
                    head = next;
                }
            } else if (timeout == tail) {
                // if the timeout is the tail modify the tail to be the prev node.
                tail = timeout.prev;
            }
            // 设置为null,进行GC
            timeout.prev = null;
            timeout.next = null;
            timeout.bucket = null;
            timeout.timer.pendingTimeouts.decrementAndGet();
            return next;
        }

d.expireTimeouts

  • 执行超时任务
  • 判断是否取消
  • 上面操作都进行删除操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
       public void expireTimeouts(long deadline) {
           HashedWheelTimeout timeout = head;
           // process all timeouts
           while (timeout != null) {
               HashedWheelTimeout next = timeout.next;
               // HashedWheelTimeout.remainingRounds轮询次数
               //  根据超时时间进行轮询次数,每调用一次减一次,减到到零,任务已经到期,执行任务;
               if (timeout.remainingRounds <= 0) {
                   next = remove(timeout);//移除
                   if (timeout.deadline <= deadline) {
                       //执行任务
                       timeout.expire();
                   } else {
                       // 放错槽
                       throw new IllegalStateException(...);
                   }
               } else if (timeout.isCancelled()) { //取消
                   next = remove(timeout);//移除
               } else {
                   timeout.remainingRounds --;
               }
               timeout = next;
           }
       } 

4.HashedWheelTimeout

a.接口类图

b.结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
      private static final int ST_INIT = 0;
       private static final int ST_CANCELLED = 1;
       private static final int ST_EXPIRED = 2;
       private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
               AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
       //超时时间
       private final long deadline;

       // 状态 0-初始化 1 -取消 2 -过期
       private volatile int state = ST_INIT;

       // 轮询次数 
       // Worker.transferTimeoutsToBuckets()执行这个值
       long remainingRounds;

       // 运用锁添加超时任务,不用volatile修饰
       HashedWheelTimeout next;
       HashedWheelTimeout prev;
       private final HashedWheelTimer timer;
       private final TimerTask task;
       // The bucket to which the timeout was added
       HashedWheelBucket bucket; 

c.expire 执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    public void expire() {
           //修改状态
           if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
               return;
           }

           try {
               执行任务
               task.run(this);
           } catch (Throwable t) {
               ...
           }
       } 

d.cancel取消

1
2
3
4
5
6
7
8
9
     public boolean cancel() {
           // only update the state it will be removed from HashedWheelBucket on next tick.
           if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
               return false;
           }
           //如果应该取消一项任务,我们将其放入另一个队列,该队列将在每个刻度上处理。
           timer.cancelledTimeouts.add(this);
           return true;
       } 

5.Worker

a.run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
       //滴答记录
       private long tick;
       @Override
       public void run() {
           // 初始化startTime。
           startTime = System.nanoTime();
           if (startTime == 0) {
               // 我们在这里使用0作为未初始化值的指标,因此请确保在初始化时它不为0。                
              startTime = 1;
           }

           // 在start()处通知其他线程等待初始化。
           startTimeInitialized.countDown();

           do {
               final long deadline = waitForNextTick();
               if (deadline > 0) {
                   int idx = (int) (tick & mask);
                   processCancelledTasks();
                   HashedWheelBucket bucket =
                           wheel[idx];
                   //添加超时任务
                   transferTimeoutsToBuckets();
                   //执行超时任务
                   bucket.expireTimeouts(deadline);
                   tick++;
               }
           } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

           //往unprocessedTimeouts集合添加未处理的超时,以便我们可以从stop()方法返回它们。
           for (HashedWheelBucket bucket: wheel) {
               bucket.clearTimeouts(unprocessedTimeouts);
           }
           for (;;) {
               HashedWheelTimeout timeout = timeouts.poll();
               if (timeout == null) {
                   break;
               }
               if (!timeout.isCancelled()) {
                   unprocessedTimeouts.add(timeout);
               }
           }
           processCancelledTasks();
       } 

b.waitForNextTick

从startTime和当前滴答数计算目标nanoTime,然后等待直到达到该目标。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
        private long waitForNextTick() {
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                final long currentTime = System.nanoTime() - startTime;
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }

                // 检查我们是否在Windows上运行,如果那样的话,
                //我们将需要把sleepTime四舍五入为一个错误的解决方法,该错误仅在JVM在Windows上运行时才会影响JVM。
                //
                // See https://github.com/netty/netty/issues/356
                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }

                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

b.processCancelledTasks 处理取消任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
        private void processCancelledTasks() {
            for (;;) {
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                try {
                    timeout.remove();
                } catch (Throwable t) {
                    ...
                }
            }
        }

c.transferTimeoutsToBuckets 添加超时任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    private void transferTimeoutsToBuckets() {
            // 每次最多只能100000处理超时任务,以防止线程在循环中添加新超时时使workerThread过时。
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }

                long calculated = timeout.deadline / tickDuration;
                //计算滴答轮询次数,这个核心的核心
                timeout.remainingRounds = (calculated - tick) / wheel.length;

                final long ticks = Math.max(calculated, tick); // 确保我们不安排过去的时间。
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

6.start 启动

  • WORKER_STATE_INIT -> WORKER_STATE_STARTED
  • workerThread.start
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   public void start() {
       switch (WORKER_STATE_UPDATER.get(this)) {
           case WORKER_STATE_INIT:
               if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                   workerThread.start();
               }
               break;
           case WORKER_STATE_STARTED:
               break;
           case WORKER_STATE_SHUTDOWN:
               throw new IllegalStateException("cannot be started once stopped");
           default:
               throw new Error("Invalid WorkerState");
       }

       // 等待直到startTime由工作程序初始化。
       while (startTime == 0) {
           try {
               startTimeInitialized.await();
           } catch (InterruptedException ignore) {
           }
       }
   } 

7.stop 暂停返回未处理超时任务

  • workerThread不能处理stop
  • WORKER_STATE_STARTED -> WORKER_STATE_SHUTDOWN
  • workerThread.interrupt(); 中断线程
  • workerThread.join(100) 等待执行stop
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  @Override
   public Set<Timeout> stop() {
       if (Thread.currentThread() == workerThread) {
           throw new IllegalStateException(...);
       }

       if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
           // workerState can be 0 or 2 at this moment - let it always be 2.
           if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
               INSTANCE_COUNTER.decrementAndGet();
               if (leak != null) {
                   boolean closed = leak.close(this);
                   assert closed;
               }
           }

           return Collections.emptySet();
       }

       try {
           boolean interrupted = false;
           while (workerThread.isAlive()) {
               workerThread.interrupt();
               try {
                   workerThread.join(100);
               } catch (InterruptedException ignored) {
                   interrupted = true;
               }
           }

           if (interrupted) {
               Thread.currentThread().interrupt();
           }
       } finally {
           INSTANCE_COUNTER.decrementAndGet();
           if (leak != null) {
               boolean closed = leak.close(this);
               assert closed;
           }
       }
       return worker.unprocessedTimeouts();
   } 

8.newTimeout 创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
   @Override
   public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
       ...
       long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

       if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
           pendingTimeouts.decrementAndGet();
           throw new RejectedExecutionException(...);
       }

       start();

       // 将超时添加到超时队列中,该队列将在下一个滴答中处理。
       // 在处理期间,所有排队的HashedWheelTimeouts将添加到正确的HashedWheelBucket。
       long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
       HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
       timeouts.add(timeout);
       return timeout;
   } 

三、总结

  • 超时任务队列,添加任务时,添加队列才有并发情况,任务分配到到那个桶,在workerThread处理,不存在并发情况
  • 单层时间轮
  • 有sleep时间,这时间可以处理,任务分配到到那个桶和取消超时任务;可以提高性能;我个人建议,sleep时间短没必要
  • JCTool高性能队列的使用