一、线程模型

Netty线程模型 这个博客写不错;

二、NioEventLoopGroup线程池分析

1.NioEventLoopGroup的类图结构

2.EventLoopGroup接口图

  • next()属于分配EventLoop
  • register注册Channel
  • shutdownGracefully优雅关机

1.AbstractEventExecutorGroup 抽象类源码

execute,submit和schedule调用next()执行,最终调用EventExecutor类实现

2.MultithreadEventExecutorGroup 抽象类源码

(1).数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    //线程池
    private final EventExecutor[] children; 
    //不可以更改线程池,只能可读效果;
    private final Set<EventExecutor> readonlyChildren;
    //终止线程数量
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    //EventExecutor选择器
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
}

(2).构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
       ...
        // 线程执行器,ThreadFactory执行任务
        if (executor == null) {
            //threadFactory.newThread(command).start();
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 创建线程池数组
        // 调用newChild抽象方法,创建EventExecutor
        children = new EventExecutor[nThreads];//
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args); //抽象方法,子类实现;
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException(...);
            } finally {
                if (!success) { 
                    //创建线程池失败,优雅关机终止
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    // awaitTermination等待终止,是同步方法
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch...
                    }
                }
            }
        }
        // 创建EventExecutor的选择器
        chooser = chooserFactory.newChooser(children);
        ...
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);//把Set改成不可改变Set
    }

a.chooserFactory.newChooser 创建EventExecutor线程选择器

chooserFactory默认 DefaultEventExecutorChooserFactory

b.DefaultEventExecutorChooserFactory 源码分析

  • EventExecutor数量是2次方幂,就创建PowerOfTwoEventExecutorChooser,否GenericEventExecutorChooser
  • PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser区别在于’&‘比’%‘性能高
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    //获取EventExecutorChooser线程选择器
    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //判断是否2次方幂用意?
        //executors的长度是2次方幂,快速定位executors数组下标,idx&executors.length-1
        //不是2次方幂,idx % executors.length定位数组下标
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    //判断是否2次方幂
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        ...
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        ...
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

(3).next 获取EventExecutor线程对象

  • 调用EventExecutorChooser算法分配EventExecutor
1
2
3
4
   @Override
    public EventExecutor next() {
        return chooser.next();
    }

(4).shutdownGracefully 循环调用EventExecutor#shutdownGracefully

1
2
3
4
5
6
7
   @Override
   public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
       for (EventExecutor l: children) {
           l.shutdownGracefully(quietPeriod, timeout, unit);
       }
       return terminationFuture();
   }

3.MultithreadEventLoopGroup

(1).next() 分配EventLoop

EventLoop继承EventExecutor,这里可以强制转化

1
2
3
4
 @Override
 public EventLoop next() {
     return (EventLoop) super.next();
 }

(2).register 注册channel,EventLoop执行;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }

(2)newDefaultThreadFactory,创建DefaultThreadFactory

最终创建FastThreadLocalThread线程

1
2
3
4
  @Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

4.NioEventLoopGroup 源码分析

NioEventLoopGroup实现MultithreadEventExecutorGroup抽象类;

(1).构造

1
2
3
4
5
6
7
    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider, //
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

(2).newChild 创建NioEventLoop

1
2
3
4
5
   @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

二、NioEventLoop分析

1.NioEventLoop的实现类图结构

2.EventLoop接口类图

  • OrderedEventExecutor 标记状态,将以有序/串行方式处理所有提交的任务。

3.AbstractEventExecutor源码分析

(1).数据结构

1
2
3
4
5
6
7
8
9
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
    //所属EventExecutorGroup
    private final EventExecutorGroup parent; 
    //单元素集合
    private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
}

(2).next() 指定当前EventExecutor

1
2
3
4
    @Override
    public EventExecutor next() {
        return this;
    } 

(3).shutdownGracefully()

1
2
3
4
5
  @Override
     public Future<?> shutdownGracefully() {
         //设置默认时间
         return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
     }

(4).pollScheduledTask 获取调度任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   protected final Runnable pollScheduledTask(long nanoTime) {
       assert inEventLoop();

       Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
       ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
       if (scheduledTask == null) {
           return null;
       }

       if (scheduledTask.deadlineNanos() <= nanoTime) {
           scheduledTaskQueue.remove();
           return scheduledTask;
       }
       return null;
   } 

(5).cancelScheduledTasks 取消调度任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
   protected void cancelScheduledTasks() {
       assert inEventLoop();
       PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
       if (isNullOrEmpty(scheduledTaskQueue)) {
           return;
       }

       final ScheduledFutureTask<?>[] scheduledTasks =
               scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);

       for (ScheduledFutureTask<?> task: scheduledTasks) {
           task.cancelWithoutRemove(false);
       }

       scheduledTaskQueue.clearIgnoringIndexes();
   } 

4.AbstractScheduledEventExecutor支持调度的的抽象基类

(1).数据结构,创建PriorityQueue(优先队列),利用优先队列,支持调度

1
2
3
4
5
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {  
    ...
    //优先级队列
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
    }

(2).优先级队列任务实现ScheduledFutureTask

a.ScheduledFutureTask的类图

  • 类图关系很复杂,最核心怎么实现时间调度
  • PromiseTask类后面去分析

b.数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
    private static final AtomicLong nextTaskId = new AtomicLong();//调度任务ID生成器
    private static final long START_TIME = System.nanoTime();
    //调度任务id
    private final long id = nextTaskId.getAndIncrement();
    //调度任务截止时间即到了改时间点任务将被执行
    private long deadlineNanos;
    /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
    //0--表示调度任务不重复,>0--表示按固定频率重复(at fixed rate),<0--表示按固定延迟重复(with fixed delay)。
    private final long periodNanos;

    private final int queueIndex = INDEX_NOT_IN_QUEUE;
}

c.nanoTime() 获取当前相对纳秒时间

1
2
3
   static long nanoTime() {
       return System.nanoTime() - START_TIME;
   }   

d.run 核心执行方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 @Override
    public void run() {
        assert executor().inEventLoop();
        try {
            if (periodNanos == 0) { //为0情况,只执行一次
                if (setUncancellableInternal()) {
                    V result = task.call();   //执行任务
                    setSuccessInternal(result);
                }
            } else {
                // check if is done as it may was cancelled 验证是否取消
                if (!isCancelled()) {
                    task.call();
                    if (!executor().isShutdown()) {
                        long p = periodNanos;
                        if (p > 0) {
                            deadlineNanos += p; //固定频率重复
                        } else {
                            deadlineNanos = nanoTime() - p;//固定延迟重复
                        }
                        if (!isCancelled()) {
                            // scheduledTaskQueue can never be null as we lazy init it before submit the task!
                            Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
                                    ((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
                            assert scheduledTaskQueue != null;
                            //重新添加到队列中
                            scheduledTaskQueue.add(this);
                        }
                    }
                }
            }
        }...
    }

e.getDelay 获取延迟时间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
    }

    //动态变化值,nanoTime()获取当前相对纳秒时间
    public long delayNanos() {
        return Math.max(0, deadlineNanos() - nanoTime());
    }

(3).scheduledTaskQueue 获取优先级队列

实现DefaultPriorityQueue,根PriorityQueue底层设计类似,都是用堆数据结构设计;这里不做具体分析;

1
2
3
4
5
6
7
8
9
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
                    SCHEDULED_FUTURE_TASK_COMPARATOR,
                    // Use same initial capacity as java.util.PriorityQueue
                    11);
        }
        return scheduledTaskQueue;
    }

(4).schedule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
   @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        ...
        if (delay < 0) {
            delay = 0;
        }
        return schedule(new ScheduledFutureTask<Void>(
                this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
  <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
      if (inEventLoop()) {
          scheduledTaskQueue().add(task);
      } else {
          execute(new Runnable() {
              @Override
              public void run() {
                  scheduledTaskQueue().add(task);
              }
          });
      }
      return task;
  }      

5.SingleThreadEventExecutor源码分析

一个线程中执行所有提交的任务。

(1).数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    ...
    
    //state 状态
    private static final int ST_NOT_STARTED = 1; //未启动
    private static final int ST_STARTED = 2;     //启动
    private static final int ST_SHUTTING_DOWN = 3; //正在关闭
    private static final int ST_SHUTDOWN = 4;       //已关闭
    private static final int ST_TERMINATED = 5;     //终止

    private static final Runnable WAKEUP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    private static final Runnable NOOP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };

    //CAS修改state和threadProperties属性
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");

    //默认创建堵塞队列(LinkedBlockingQueue)
    private final Queue<Runnable> taskQueue;//队列任务

    private volatile Thread thread;         //线程
    @SuppressWarnings("unused")
    private volatile ThreadProperties threadProperties;//线程配置
    private final Executor executor; 
    private volatile boolean interrupted;//中断状态

    private final Semaphore threadLock = new Semaphore(0);//信号量,来控制awaitTermination等待
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
    private final boolean addTaskWakesUp;   // 添加任务时是否唤醒线程
    private final int maxPendingTasks;      //队列中,未执行任务最大数量
    private final RejectedExecutionHandler rejectedExecutionHandler;//队列满来,拒绝处理

    private long lastExecutionTime;   //最后执行时间

    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;   //默认状态ST_NOT_STARTED

    private volatile long gracefulShutdownQuietPeriod; //优雅的关机安静期
    private volatile long gracefulShutdownTimeout; //优雅的关闭超时
    private long gracefulShutdownStartTime;
    // 线程终止异步结果
    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
    
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
     } 

}

(2).execute 添加任务,

  • 添加任务,未启动线程,先启动线程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) { 
            addTask(task);
        } else {
            startThread();//开启一个线程
            addTask(task);//添加任务
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

1).inEventLoop 执行任务线程是否EventLoop线程

1
2
3
4
5
6
7
8
9
   @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }
    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
 

2).startThread 开启线程

  • state 状态控制启动线程,防止重复启动
  • 状态分5种,代码如下
    1
    2
    3
    4
    5
    
         private static final int ST_NOT_STARTED = 1; //未启动
         private static final int ST_STARTED = 2;     //启动
         private static final int ST_SHUTTING_DOWN = 3; //正在关闭
         private static final int ST_SHUTDOWN = 4;       //已关闭
         private static final int ST_TERMINATED = 5;     //终止
    
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
   private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();//正在启动一个线程
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    ...
                }
            }
        }
    } 
a.doStartThread 启动线程开始
  • executor.execute启动一个线程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
       
  private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
                boolean success = false;
                //设置lastExecutionTime时间
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run(); //抽象方法,子类去实现
                    success = true;
                } catch (Throwable t) {
                    ...
                } finally {
                    ...优雅退出机制介绍
                }
            }
        });
    }    

(3).addTask 添加任务

  • 添加队列满交给rejectedExecutionHandler处理
  • 当前状态关闭或者以终止throw的RejectedExecutionException异常
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   
     protected void addTask(Runnable task) {
         if (task == null) {
             throw new NullPointerException("task");
         }
         if (!offerTask(task)) {//添加队列满,交给rejectedExecutionHandler处理
             reject(task);
         }
     }
 
     final boolean offerTask(Runnable task) {
         if (isShutdown()) {//关闭或者终止,throw RejectedExecutionException
             reject();
         }
         return taskQueue.offer(task);
     }
    

(4).runAllTasks() 运行所有任务

  • 获取所有调度的任务,添加到队列任务中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;
        // 获取调度任务,添加到队列任务中
        do {
            fetchedAll = fetchFromScheduledTaskQueue();//fetchedAll 返回false任务队列满了,
            if (runAllTasksFrom(taskQueue)) { //执行所有任务
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); //循环重新获取调度任务添加到执行任务中

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        //运行所有任务钩子方法 
        afterRunningAllTasks(); 
        return ranAtLeastOne;
    }

a.fetchFromScheduledTaskQueue 获取调度任务,添加到队列任务中;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        //获取调度任务
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                //任务队列满了,重新添加到队列中
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime); //while形式获取调度任务
        }
        return true;
    }

b.runAllTasksFrom 执行队列所有任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
     protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
         //过滤WAKEUP_TASK任务
         Runnable task = pollTaskFrom(taskQueue);
         if (task == null) {
             return false;
         }
         for (;;) {
             safeExecute(task);
             task = pollTaskFrom(taskQueue);
             if (task == null) {
                 return true;
             }
         }
     } 

(5).runAllTasks(timeoutNanos) 设置超时时间运行所有任务

  • 执行获取调度任务,添加到队列任务中
  • 循环执行队列任务
    • 每执行64个任务,检测是否超时
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    protected boolean runAllTasks(long timeoutNanos) {
        //执行获取调度任务,添加到队列任务中
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask(); //获取任务
        if (task == null) {
            afterRunningAllTasks(); //钩子
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);
            runTasks ++;
            //每执行64个任务,检测是否超时,主要调用nanoTime是相对昂贵的;
            if ((runTasks & 0x3F) == 0) { //0x3F=63
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    } 

(6).shutdownGracefully 优雅退出机制

  • 修改状态为ST_SHUTTING_DOWN
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
   @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        ...验证参数
        //已经执行关闭或者已终止
        if (isShuttingDown()) {
            return terminationFuture();
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            oldState = state;
            //CAS修改状态ST_SHUTTING_DOWN
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { 
                break;
            }
        }
        //优雅退出安静期 
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
       //优雅退出超时时间
        gracefulShutdownTimeout = unit.toNanos(timeout);
        //以前状态未开始线程,先开启线程
        if (oldState == ST_NOT_STARTED) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);
                ...
                return terminationFuture;
            }
        }

        if (wakeup) {
            wakeup(inEventLoop);
        }
        return terminationFuture();
    }
   

a.正常运行finally =>doStartThread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
                for (;;) {
                        int oldState = state;
                        // 没有调用优雅关闭,状态可能正常,CAS修改成ST_SHUTTING_DOWN
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    ...
                    try {
                        //运行所有剩余的任务和关闭挂钩。
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup(); //清除钩子方法
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); //状态改成ST_TERMINATED
                            threadLock.release();
                            ...
                            terminationFuture.setSuccess(null);
                        }
                    } 

b.confirmShutdown 确认关闭

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  
    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }
        if (!inEventLoop()) {
            throw new IllegalStateException(...);
        }
        //取消调度队列任务
        cancelScheduledTasks(); 
        //设置优雅退出开始时间
        if (gracefulShutdownStartTime == 0) {
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }
        // 队列没有任务执行,调用runShutdownHooks方法
        //  调用shutdownHooks的钩子方法集合,有任务执行返回true,已经清空集合
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                // 不再有新任务。
                return true;
            }
            //  队列中有任务。再等待一会儿,直到在静默期没有任何任务排队,或者在静默期为0时终止。
            // See https://github.com/netty/netty/issues/4241
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            //往队列添加WAKEUP_TASK任务
            wakeup(true);
            return false;
        }

        final long nanoTime = ScheduledFutureTask.nanoTime();
        //超过优雅退出超时时间,立即返回
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }
        //优雅退出安静期 
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            // 检查每100毫秒是否将任何任务添加到队列中。
            wakeup(true);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }
            return false;
        }
        // 在上一个安静的时期未添加任何任务-希望可以安全关闭。(希望是因为我们确实不能保证用户不会执行execute())
        return true;
    }

b. wakeup 队列中加入唤醒任务

1
2
3
4
5
6
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop || state == ST_SHUTTING_DOWN) {
            //使用offer因为我们实际上只需要这个来解锁线程,如果offer失败,我们不在乎,因为队列中已经有东西了。
            taskQueue.offer(WAKEUP_TASK); //队列中加入唤醒任务
        }
    }

3.SingleThreadEventLoop源码分析

单个线程中执行所有提交的任务。

(1).数据结构

1
2
3
4
5
6
7
8
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

   //这个队列做啥子
    private final Queue<Runnable> tailTasks; 
}

(3).register 注册channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);//调用Unsafe对象
        return promise;
    }

(4).afterRunningAllTasks

SingleThreadEventExecutor调用runAllTasks最后调用afterRunningAllTasks,在这里就是执行这个队列的任务;

1
2
3
4
  @Override
   protected void afterRunningAllTasks() {
       runAllTasksFrom(tailTasks);
   } 

(5).executeAfterEventLoopIteration 只有这个方法才能添加任务

添加一个任务,该任务在下一次(或当前){Eventloop}迭代结束时运行一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   public final void executeAfterEventLoopIteration(Runnable task) {
       ObjectUtil.checkNotNull(task, "task");
       if (isShutdown()) {
           reject();
       }

       if (!tailTasks.offer(task)) {
           reject(task);
       }

       if (wakesUpForTask(task)) {
           wakeup(inEventLoop());
       }
   } 

(6).tailTasks用处

  • 用于批量提交写入
    • 这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。

4.NioEventLoop源码分析

(1).属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
  /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    //控制的布尔值确定阻塞的Selector.select是否应该突破其选择过程。
    // 在我们的例子中,我们使用select方法的超时时间,除非唤醒,否则select方法将阻塞那段时间。
    //唤醒标记,由于select()方法会阻塞
    private final AtomicBoolean wakenUp = new AtomicBoolean();

    private final SelectStrategy selectStrategy;//选择策略

    private volatile int ioRatio = 50;// IO任务占总任务(IO+普通任务)比例
    //SelectionKey取消次数(cancelledKeys)达到CLEANUP_INTERVAL时,needsToSelectAgain为true,获取任务,处理SelectedKeys都要重新selectNow
    private int cancelledKeys;
    private boolean needsToSelectAgain;

(2).构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

(3).run 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
    @Override
    protected void run() {
        for (;;) {
            try {
                //hasTasks():先判断SingleThreadEventExecutor.taskQueue队列不为空,在判断SingleThreadEventLoop.tailTasks不空
                // 为true,selector.selectNow()
                // 为false,执行SelectStrategy.SELECT逻辑==>select
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        // 1.在调用'selector.wakeup()'之前,总是评估'wakenUp.compareAndSet(false,true)',以减少唤醒开销(Selector.wakeup()是一个昂贵的操作)。
                      
                        if (wakenUp.get()) {
                            //API文档有具体说明,调用wakeup没有堵塞,下个select时立即返回
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();   //SingleThreadEventExecutor
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) { 
                //异常Thread.sleep(1000);
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                //异常Thread.sleep(1000);
                handleLoopException(t);
            }
        }
    }
    
     private void processSelectedKeys() {
         if (selectedKeys != null) {
             processSelectedKeysOptimized(); //优化
         } else {
             processSelectedKeysPlain(selector.selectedKeys());//普通
         }
     }
     //关闭所有通道/NioTask任务
    private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }     

a.NIO API说明

select() 阻塞直到有一个感兴趣的IO事件就绪

select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout

selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪

wakeUp()方法,其功能是唤醒一个阻塞在select()上的线程,使其继续运行。如果先调用了wakeUp()方法,那么下一个select()操作也会立即返回。

此外,wakeUp()是一个昂贵的方法,应尽量减少其调用次数。

b.select

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
             // delayNanos返回的是最近的一个调度任务的到期时间,没有调度任务返回1秒
             // selectDeadLineNanos指可以进行select操作的截止时间点
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) { 
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                // 如果某个任务在wakenUp值为true时提交,则该任务没有机会调用Selector#wakeup。
                // 所以我们需要在执行选择操作之前再次检查任务队列。如果我们不这样做,那么这个任务可能会被暂停,直到选择操作超时。
                // 如果管道中存在IdleStateHandler,则可能会挂起直到空闲超时。
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                //selectedKeys等于0情况,考虑老wakenUp,新wakenUp,任务队列大小,调度任务大小情况
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    ...
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    //已超时,未选择任何内容
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    ...
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
            ...
        }...
    }

由于JDK BUG导致select(timeout)方法并不阻塞而直接返回且 返回值为0,从而出现空轮询使CPU完全耗尽。

Netty解决的办法是:调用selector.select(time)操作计数(selectCnt++),如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个Selector,将注册到老的 Selector上的channel重新注册到新的elector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。

i.rebuildSelector 重新创建Selector
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    public void rebuildSelector() {
       if (!inEventLoop()) {
           execute(new Runnable() {
               @Override
               public void run() {
                   rebuildSelector0();
               }
           });
           return;
       }
       rebuildSelector0();
   }

   private void rebuildSelector0() {
       final Selector oldSelector = selector;
       final SelectorTuple newSelectorTuple;

       if (oldSelector == null) {
           return;
       }

       try {
           newSelectorTuple = openSelector();//重新创建Selector
       } catch (Exception e) {
           ...
           return;
       }

       // Register all channels to the new Selector.
       int nChannels = 0;
       for (SelectionKey key: oldSelector.keys()) {
           Object a = key.attachment();
           try {
               //判断监听SelectionKey是否有效
               if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                   continue;
               }

               int interestOps = key.interestOps();
               key.cancel();
               SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
               if (a instanceof AbstractNioChannel) {
                   // Update SelectionKey
                   ((AbstractNioChannel) a).selectionKey = newKey;
               }
               nChannels ++;
           } catch (Exception e) {
               if (a instanceof AbstractNioChannel) {
                   AbstractNioChannel ch = (AbstractNioChannel) a;
                   ch.unsafe().close(ch.unsafe().voidPromise());
               } else {
                   @SuppressWarnings("unchecked")
                   NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                   invokeChannelUnregistered(task, key, e);
               }
           }
       }

       selector = newSelectorTuple.selector;
       unwrappedSelector = newSelectorTuple.unwrappedSelector;

       try {
           // time to close the old selector as everything else is registered to the new one
           oldSelector.close();
       }...

   }

c.processSelectedKeys

1
2
3
4
5
6
7
   private void processSelectedKeys() {
         if (selectedKeys != null) {
             processSelectedKeysOptimized(); //优化
         } else {
             processSelectedKeysPlain(selector.selectedKeys());//普通
         }
     } 
1).processSelectedKeysOptimized 优化处理SelectedKeys
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
  private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            //在数组中空出条目以允许在频道关闭时使其GC'ed
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                //在数组中空出条目以允许在频道关闭时使其GC'ed
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }
    
    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }    
  1. selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。
  2. 从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。
  3. https://github.com/netty/netty/issues/6058#,目前在4.1.9.Final 版本中,netty已经将SelectedSelectionKeySet.java 底层使用一个数组了
2).processSelectedKey 处理SelectedKey
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

  private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
      final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
      if (!k.isValid()) {
          final EventLoop eventLoop;
          try {
              eventLoop = ch.eventLoop();
          } catch (Throwable ignored) {
              // If the channel implementation throws an exception because there is no event loop, we ignore this
              // because we are only trying to determine if ch is registered to this event loop and thus has authority
              // to close ch.
              //如果通道实现抛出一个异常,因为没有事件循环,我们忽略这个,因为我们只是试图确定ch是否注册到这个事件循环,因此有权关闭ch。
              return;
          }
          // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
          // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
          // still healthy and should not be closed.
          //如果ch仍然被注册到这个EventLoop,只关闭ch。 ch可以从事件循环中注销,因此SelectionKey可以作为注销过程的一部分被取消,但是频道仍然健康,
          // 不应该被关闭
          // See https://github.com/netty/netty/issues/5125
          if (eventLoop != this || eventLoop == null) {
              return;
          }
          // close the channel if the key is not valid anymore
          unsafe.close(unsafe.voidPromise());
          return;
      }

      try {
          int readyOps = k.readyOps();
          if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
              int ops = k.interestOps();
              ops &= ~SelectionKey.OP_CONNECT;
              k.interestOps(ops);
              unsafe.finishConnect();
          }

          // 首先处理OP_WRITE,因为我们可能会写入一些排队的缓冲区和空闲内存。
          if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              //有写的事件,立即执行Flush,一旦没有任何信息写入,它也将清除OP_WRITE
              ch.unsafe().forceFlush();
          }
          //同时检查readOps为0以解决可能导致旋转循环的可能的JDK错误
          if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
              unsafe.read();
          }
      } catch (CancelledKeyException ignored) {
          unsafe.close(unsafe.voidPromise());
      }
  }

2).processSelectedKeysPlain
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

(4).重写newTaskQueue方法,MpscQueue提高了性能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21

   @Override
   protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
       // This event loop never calls takeTask()
       return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                   : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
   }

   @Override
   public int pendingTasks() {
       // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
       // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
       //当我们使用MpscQueue时,我们需要确保pendingTasks()只能在EventLoop中执行,否则我们可能会看到意外的行为(因为size()只允许被单个使用者调用)。
       // See https://github.com/netty/netty/issues/5297
       if (inEventLoop()) {
           return super.pendingTasks();
       } else {
           return submit(pendingTasksCallable).syncUninterruptibly().getNow();
       }
   }