【注意】最后更新于 April 4, 2019,文中内容可能已过时,请谨慎使用。
一、线程模型
Netty线程模型 这个博客写不错;
二、NioEventLoopGroup线程池分析
1.NioEventLoopGroup的类图结构

2.EventLoopGroup接口图

- next()属于分配EventLoop
- register注册Channel
- shutdownGracefully优雅关机
1.AbstractEventExecutorGroup 抽象类源码
execute,submit和schedule调用next()执行,最终调用EventExecutor类实现
2.MultithreadEventExecutorGroup 抽象类源码
(1).数据结构
1
2
3
4
5
6
7
8
9
10
11
|
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//线程池
private final EventExecutor[] children;
//不可以更改线程池,只能可读效果;
private final Set<EventExecutor> readonlyChildren;
//终止线程数量
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
//EventExecutor选择器
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
}
|
(2).构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
// 线程执行器,ThreadFactory执行任务
if (executor == null) {
//threadFactory.newThread(command).start();
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建线程池数组
// 调用newChild抽象方法,创建EventExecutor
children = new EventExecutor[nThreads];//
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args); //抽象方法,子类实现;
success = true;
} catch (Exception e) {
throw new IllegalStateException(...);
} finally {
if (!success) {
//创建线程池失败,优雅关机终止
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// awaitTermination等待终止,是同步方法
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch...
}
}
}
}
// 创建EventExecutor的选择器
chooser = chooserFactory.newChooser(children);
...
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);//把Set改成不可改变Set
}
|
a.chooserFactory.newChooser 创建EventExecutor线程选择器
chooserFactory默认 DefaultEventExecutorChooserFactory
b.DefaultEventExecutorChooserFactory 源码分析
- EventExecutor数量是2次方幂,就创建PowerOfTwoEventExecutorChooser,否GenericEventExecutorChooser
- PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser区别在于’&‘比’%‘性能高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
//获取EventExecutorChooser线程选择器
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//判断是否2次方幂用意?
//executors的长度是2次方幂,快速定位executors数组下标,idx&executors.length-1
//不是2次方幂,idx % executors.length定位数组下标
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
//判断是否2次方幂
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
...
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
|
(3).next 获取EventExecutor线程对象
- 调用EventExecutorChooser算法分配EventExecutor
1
2
3
4
|
@Override
public EventExecutor next() {
return chooser.next();
}
|
(4).shutdownGracefully 循环调用EventExecutor#shutdownGracefully
1
2
3
4
5
6
7
|
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
|
3.MultithreadEventLoopGroup
(1).next() 分配EventLoop
EventLoop继承EventExecutor,这里可以强制转化
1
2
3
4
|
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
|
(2).register 注册channel,EventLoop执行;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
|
(2)newDefaultThreadFactory,创建DefaultThreadFactory
最终创建FastThreadLocalThread线程
1
2
3
4
|
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
|
4.NioEventLoopGroup 源码分析
NioEventLoopGroup实现MultithreadEventExecutorGroup抽象类;
(1).构造
1
2
3
4
5
6
7
|
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider, //
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
|
(2).newChild 创建NioEventLoop
1
2
3
4
5
|
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
|
二、NioEventLoop分析
1.NioEventLoop的实现类图结构

2.EventLoop接口类图

- OrderedEventExecutor 标记状态,将以有序/串行方式处理所有提交的任务。
3.AbstractEventExecutor源码分析
(1).数据结构
1
2
3
4
5
6
7
8
9
|
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
//所属EventExecutorGroup
private final EventExecutorGroup parent;
//单元素集合
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);
}
|
(2).next() 指定当前EventExecutor
1
2
3
4
|
@Override
public EventExecutor next() {
return this;
}
|
(3).shutdownGracefully()
1
2
3
4
5
|
@Override
public Future<?> shutdownGracefully() {
//设置默认时间
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
|
(4).pollScheduledTask 获取调度任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
|
(5).cancelScheduledTasks 取消调度任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
protected void cancelScheduledTasks() {
assert inEventLoop();
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
final ScheduledFutureTask<?>[] scheduledTasks =
scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
for (ScheduledFutureTask<?> task: scheduledTasks) {
task.cancelWithoutRemove(false);
}
scheduledTaskQueue.clearIgnoringIndexes();
}
|
4.AbstractScheduledEventExecutor支持调度的的抽象基类
(1).数据结构,创建PriorityQueue(优先队列),利用优先队列,支持调度
1
2
3
4
5
|
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
...
//优先级队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
}
|
(2).优先级队列任务实现ScheduledFutureTask
a.ScheduledFutureTask的类图

- 类图关系很复杂,最核心怎么实现时间调度
- PromiseTask类后面去分析
b.数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
|
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final AtomicLong nextTaskId = new AtomicLong();//调度任务ID生成器
private static final long START_TIME = System.nanoTime();
//调度任务id
private final long id = nextTaskId.getAndIncrement();
//调度任务截止时间即到了改时间点任务将被执行
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
//0--表示调度任务不重复,>0--表示按固定频率重复(at fixed rate),<0--表示按固定延迟重复(with fixed delay)。
private final long periodNanos;
private final int queueIndex = INDEX_NOT_IN_QUEUE;
}
|
c.nanoTime() 获取当前相对纳秒时间
1
2
3
|
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
|
d.run 核心执行方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@Override
public void run() {
assert executor().inEventLoop();
try {
if (periodNanos == 0) { //为0情况,只执行一次
if (setUncancellableInternal()) {
V result = task.call(); //执行任务
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled 验证是否取消
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p; //固定频率重复
} else {
deadlineNanos = nanoTime() - p;//固定延迟重复
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
//重新添加到队列中
scheduledTaskQueue.add(this);
}
}
}
}
}...
}
|
e.getDelay 获取延迟时间
1
2
3
4
5
6
7
8
9
10
|
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
//动态变化值,nanoTime()获取当前相对纳秒时间
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
|
(3).scheduledTaskQueue 获取优先级队列
实现DefaultPriorityQueue,根PriorityQueue底层设计类似,都是用堆数据结构设计;这里不做具体分析;
1
2
3
4
5
6
7
8
9
|
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
|
(4).schedule
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
...
if (delay < 0) {
delay = 0;
}
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
|
5.SingleThreadEventExecutor源码分析
一个线程中执行所有提交的任务。
(1).数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
...
//state 状态
private static final int ST_NOT_STARTED = 1; //未启动
private static final int ST_STARTED = 2; //启动
private static final int ST_SHUTTING_DOWN = 3; //正在关闭
private static final int ST_SHUTDOWN = 4; //已关闭
private static final int ST_TERMINATED = 5; //终止
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
//CAS修改state和threadProperties属性
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
//默认创建堵塞队列(LinkedBlockingQueue)
private final Queue<Runnable> taskQueue;//队列任务
private volatile Thread thread; //线程
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;//线程配置
private final Executor executor;
private volatile boolean interrupted;//中断状态
private final Semaphore threadLock = new Semaphore(0);//信号量,来控制awaitTermination等待
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp; // 添加任务时是否唤醒线程
private final int maxPendingTasks; //队列中,未执行任务最大数量
private final RejectedExecutionHandler rejectedExecutionHandler;//队列满来,拒绝处理
private long lastExecutionTime; //最后执行时间
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED; //默认状态ST_NOT_STARTED
private volatile long gracefulShutdownQuietPeriod; //优雅的关机安静期
private volatile long gracefulShutdownTimeout; //优雅的关闭超时
private long gracefulShutdownStartTime;
// 线程终止异步结果
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
}
}
|
(2).execute 添加任务,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();//开启一个线程
addTask(task);//添加任务
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
|
1).inEventLoop 执行任务线程是否EventLoop线程
1
2
3
4
5
6
7
8
9
|
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
|
2).startThread 开启线程
- state 状态控制启动线程,防止重复启动
- 状态分5种,代码如下
1
2
3
4
5
|
private static final int ST_NOT_STARTED = 1; //未启动
private static final int ST_STARTED = 2; //启动
private static final int ST_SHUTTING_DOWN = 3; //正在关闭
private static final int ST_SHUTDOWN = 4; //已关闭
private static final int ST_TERMINATED = 5; //终止
|
1
2
3
4
5
6
7
8
9
10
11
12
|
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();//正在启动一个线程
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
...
}
}
}
}
|
a.doStartThread 启动线程开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
//设置lastExecutionTime时间
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); //抽象方法,子类去实现
success = true;
} catch (Throwable t) {
...
} finally {
...优雅退出机制介绍
}
}
});
}
|
(3).addTask 添加任务
- 添加队列满交给rejectedExecutionHandler处理
- 当前状态关闭或者以终止throw的RejectedExecutionException异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {//添加队列满,交给rejectedExecutionHandler处理
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {//关闭或者终止,throw RejectedExecutionException
reject();
}
return taskQueue.offer(task);
}
|
(4).runAllTasks() 运行所有任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
// 获取调度任务,添加到队列任务中
do {
fetchedAll = fetchFromScheduledTaskQueue();//fetchedAll 返回false任务队列满了,
if (runAllTasksFrom(taskQueue)) { //执行所有任务
ranAtLeastOne = true;
}
} while (!fetchedAll); //循环重新获取调度任务添加到执行任务中
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
//运行所有任务钩子方法
afterRunningAllTasks();
return ranAtLeastOne;
}
|
a.fetchFromScheduledTaskQueue 获取调度任务,添加到队列任务中;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//获取调度任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
//任务队列满了,重新添加到队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime); //while形式获取调度任务
}
return true;
}
|
b.runAllTasksFrom 执行队列所有任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//过滤WAKEUP_TASK任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
|
(5).runAllTasks(timeoutNanos) 设置超时时间运行所有任务
- 执行获取调度任务,添加到队列任务中
- 循环执行队列任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
protected boolean runAllTasks(long timeoutNanos) {
//执行获取调度任务,添加到队列任务中
fetchFromScheduledTaskQueue();
Runnable task = pollTask(); //获取任务
if (task == null) {
afterRunningAllTasks(); //钩子
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
//每执行64个任务,检测是否超时,主要调用nanoTime是相对昂贵的;
if ((runTasks & 0x3F) == 0) { //0x3F=63
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
|
(6).shutdownGracefully 优雅退出机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
...验证参数
//已经执行关闭或者已终止
if (isShuttingDown()) {
return terminationFuture();
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
oldState = state;
//CAS修改状态ST_SHUTTING_DOWN
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
//优雅退出安静期
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
//优雅退出超时时间
gracefulShutdownTimeout = unit.toNanos(timeout);
//以前状态未开始线程,先开启线程
if (oldState == ST_NOT_STARTED) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.tryFailure(cause);
...
return terminationFuture;
}
}
if (wakeup) {
wakeup(inEventLoop);
}
return terminationFuture();
}
|
a.正常运行finally =>doStartThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
for (;;) {
int oldState = state;
// 没有调用优雅关闭,状态可能正常,CAS修改成ST_SHUTTING_DOWN
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
...
try {
//运行所有剩余的任务和关闭挂钩。
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup(); //清除钩子方法
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); //状态改成ST_TERMINATED
threadLock.release();
...
terminationFuture.setSuccess(null);
}
}
|
b.confirmShutdown 确认关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException(...);
}
//取消调度队列任务
cancelScheduledTasks();
//设置优雅退出开始时间
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 队列没有任务执行,调用runShutdownHooks方法
// 调用shutdownHooks的钩子方法集合,有任务执行返回true,已经清空集合
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// 不再有新任务。
return true;
}
// 队列中有任务。再等待一会儿,直到在静默期没有任何任务排队,或者在静默期为0时终止。
// See https://github.com/netty/netty/issues/4241
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
//往队列添加WAKEUP_TASK任务
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
//超过优雅退出超时时间,立即返回
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
//优雅退出安静期
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// 检查每100毫秒是否将任何任务添加到队列中。
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// 在上一个安静的时期未添加任何任务-希望可以安全关闭。(希望是因为我们确实不能保证用户不会执行execute())
return true;
}
|
b. wakeup 队列中加入唤醒任务
1
2
3
4
5
6
|
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
//使用offer因为我们实际上只需要这个来解锁线程,如果offer失败,我们不在乎,因为队列中已经有东西了。
taskQueue.offer(WAKEUP_TASK); //队列中加入唤醒任务
}
}
|
3.SingleThreadEventLoop源码分析
单个线程中执行所有提交的任务。
(1).数据结构
1
2
3
4
5
6
7
8
|
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
//这个队列做啥子
private final Queue<Runnable> tailTasks;
}
|
(3).register 注册channel
1
2
3
4
5
6
7
8
9
10
11
|
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);//调用Unsafe对象
return promise;
}
|
(4).afterRunningAllTasks
SingleThreadEventExecutor调用runAllTasks最后调用afterRunningAllTasks,在这里就是执行这个队列的任务;
1
2
3
4
|
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
|
(5).executeAfterEventLoopIteration 只有这个方法才能添加任务
添加一个任务,该任务在下一次(或当前){Eventloop}迭代结束时运行一次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) {
reject();
}
if (!tailTasks.offer(task)) {
reject(task);
}
if (wakesUpForTask(task)) {
wakeup(inEventLoop());
}
}
|
(6).tailTasks用处
- 用于批量提交写入
- 这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。
4.NioEventLoop源码分析
(1).属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
//控制的布尔值确定阻塞的Selector.select是否应该突破其选择过程。
// 在我们的例子中,我们使用select方法的超时时间,除非唤醒,否则select方法将阻塞那段时间。
//唤醒标记,由于select()方法会阻塞
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;//选择策略
private volatile int ioRatio = 50;// IO任务占总任务(IO+普通任务)比例
//SelectionKey取消次数(cancelledKeys)达到CLEANUP_INTERVAL时,needsToSelectAgain为true,获取任务,处理SelectedKeys都要重新selectNow
private int cancelledKeys;
private boolean needsToSelectAgain;
|
(2).构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
|
(3).run 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
@Override
protected void run() {
for (;;) {
try {
//hasTasks():先判断SingleThreadEventExecutor.taskQueue队列不为空,在判断SingleThreadEventLoop.tailTasks不空
// 为true,selector.selectNow()
// 为false,执行SelectStrategy.SELECT逻辑==>select
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 1.在调用'selector.wakeup()'之前,总是评估'wakenUp.compareAndSet(false,true)',以减少唤醒开销(Selector.wakeup()是一个昂贵的操作)。
if (wakenUp.get()) {
//API文档有具体说明,调用wakeup没有堵塞,下个select时立即返回
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks(); //SingleThreadEventExecutor
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
//异常Thread.sleep(1000);
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
//异常Thread.sleep(1000);
handleLoopException(t);
}
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(); //优化
} else {
processSelectedKeysPlain(selector.selectedKeys());//普通
}
}
//关闭所有通道/NioTask任务
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
|
a.NIO API说明
select() 阻塞直到有一个感兴趣的IO事件就绪
select(long timeout) 与select()类似,但阻塞的最长时间为给定的timeout
selectNow() 不会阻塞,直接返回而不管是否有IO事件就绪
wakeUp()方法,其功能是唤醒一个阻塞在select()上的线程,使其继续运行。如果先调用了wakeUp()方法,那么下一个select()操作也会立即返回。
此外,wakeUp()是一个昂贵的方法,应尽量减少其调用次数。
b.select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// delayNanos返回的是最近的一个调度任务的到期时间,没有调度任务返回1秒
// selectDeadLineNanos指可以进行select操作的截止时间点
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果某个任务在wakenUp值为true时提交,则该任务没有机会调用Selector#wakeup。
// 所以我们需要在执行选择操作之前再次检查任务队列。如果我们不这样做,那么这个任务可能会被暂停,直到选择操作超时。
// 如果管道中存在IdleStateHandler,则可能会挂起直到空闲超时。
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//selectedKeys等于0情况,考虑老wakenUp,新wakenUp,任务队列大小,调度任务大小情况
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
...
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
//已超时,未选择任何内容
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
...
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
...
}...
}
|
由于JDK BUG导致select(timeout)方法并不阻塞而直接返回且
返回值为0,从而出现空轮询使CPU完全耗尽。
Netty解决的办法是:调用selector.select(time)操作计数(selectCnt++),如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个Selector,将注册到老的
Selector上的channel重新注册到新的elector上。阈值SELECTOR_AUTO_REBUILD_THRESHOLD可由用户使用系统变量io.netty.selectorAutoRebuildThreshold配置,默认为512。
i.rebuildSelector 重新创建Selector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();//重新创建Selector
} catch (Exception e) {
...
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
//判断监听SelectionKey是否有效
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
}...
}
|
c.processSelectedKeys
1
2
3
4
5
6
7
|
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(); //优化
} else {
processSelectedKeysPlain(selector.selectedKeys());//普通
}
}
|
1).processSelectedKeysOptimized 优化处理SelectedKeys
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
//在数组中空出条目以允许在频道关闭时使其GC'ed
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
//在数组中空出条目以允许在频道关闭时使其GC'ed
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
|
- selectedKeys在openSelector()方法中初始化,Netty通过反射修改了Selector的selectedKeys成员和publicSelectedKeys成员。替换成了自己的实现——SelectedSelectionKeySet。
- 从OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均采用了HashSet实现。HashSet采用HashMap实现,插入需要计算Hash并解决Hash冲突并挂链,而SelectedSelectionKeySet实现使用了双数组,每次插入尾部,扩展策略为double,调用flip()则返回当前数组并切换到另外一个数据。
- https://github.com/netty/netty/issues/6058#,目前在4.1.9.Final 版本中,netty已经将
SelectedSelectionKeySet.java
底层使用一个数组了
2).processSelectedKey 处理SelectedKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
//如果通道实现抛出一个异常,因为没有事件循环,我们忽略这个,因为我们只是试图确定ch是否注册到这个事件循环,因此有权关闭ch。
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
//如果ch仍然被注册到这个EventLoop,只关闭ch。 ch可以从事件循环中注销,因此SelectionKey可以作为注销过程的一部分被取消,但是频道仍然健康,
// 不应该被关闭
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 首先处理OP_WRITE,因为我们可能会写入一些排队的缓冲区和空闲内存。
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//有写的事件,立即执行Flush,一旦没有任何信息写入,它也将清除OP_WRITE
ch.unsafe().forceFlush();
}
//同时检查readOps为0以解决可能导致旋转循环的可能的JDK错误
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
|
2).processSelectedKeysPlain
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
|
(4).重写newTaskQueue方法,MpscQueue提高了性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
@Override
public int pendingTasks() {
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
//当我们使用MpscQueue时,我们需要确保pendingTasks()只能在EventLoop中执行,否则我们可能会看到意外的行为(因为size()只允许被单个使用者调用)。
// See https://github.com/netty/netty/issues/5297
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
}
}
|