一、背景
rocketmq-remoting模块贯穿RocketMq网络层以及RPC实现;基于Netty实现;
二、基于Netty实现线程模型

线程数 |
线程名 |
线程具体说明 |
1 |
NettyBoss_%d |
Reactor 线程池(1) |
N |
NettyServerEPOLLSelector_%d_%d |
Reactor 线程池 |
M1 |
NettyServerCodecThread_%d |
Worker线程池 |
M2 |
RemotingExecutorThread_%d |
业务RequestProcessor处理线程池 |
盗用官方的图片以及描述
三、协议定义
1、消息协议
1
2
3
4
|
+----------------------+-------------------------------+----------------+-----------+
| Message total length | serializeType & header length | header data | body data |
| (4byte) | 4(1 + 3 ) | | |
+----------------------+-------------------------------+----------------+-----------+
|
serializeType有两种:
- JSON (fastjson 序列化)
- ROCKETMQ (自定义如下)
1
2
3
4
|
+------+---------------+------------+--------+------+---------------+--------------------------------------------------+
| code | LanguageCode | version | opaque | flag | remark | extFields |
| (2) | 1 | 2 | 4 | 4 | length(4) + data | length(4)+ keySize(2) + Key+ valSize(4) + val |
+------+---------------+------------+--------+------+---------------+--------------------------------------------------+
|
Header字段 |
类型 |
Request说明 |
Response说明 |
code |
int |
请求操作码,应答方根据不同的请求码进行不同的业务处理 |
应答响应码。0表示成功,非0则表示各种错误 |
language |
LanguageCode |
请求方实现的语言 |
应答方实现的语言 |
version |
int |
请求方程序的版本 |
应答方程序的版本 |
opaque |
int |
相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 |
应答不做修改直接返回 |
flag |
int |
区分是普通RPC还是onewayRPC得标志 |
区分是普通RPC还是onewayRPC得标志 |
remark |
String |
传输自定义文本信息 |
传输自定义文本信息 |
extFields |
HashMap<String, String> |
请求自定义扩展信息 |
响应自定义扩展信息 |
- extFields 用于继承CommandCustomHeader实现类,把相应字段存放extFields,按一定协议序列化以及发序列化操作
四、源码分析
从这个rocketmq-remoting模块开始,就是RocketMq网络层以及RPC实现的模块;
1.核心接口类图

相关方法具体说明一下:
- registerProcessor 根据请求code处理相应业务逻辑NettyRequestProcessor;
- invokeSync,invokeAsync,invokeOneway,根据方面名字就知道意思,调用方式:同步,异步,单工(请求没有返回结果)
基于接口编程,为扩展性,NettyRequestProcessor改一下,我觉得更好,不跟Netty网络协议名称定义;那天开源比Netty更好的网络协议;更好扩展这块;
2.RemotingServer(RPC服务端实现)
(1).实现类图结构

基于netty实现NettyRemotingServer类;
(2).如何利用netty工具包启动服务?
a.第一步查看一下NettyRemotingServer构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {
// 1.创建两个Semaphore,作用于单工调用和异步调用的限流控制;
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
// 2.创建ServerBootstrap,这个就是Netty工具包,Server启动类
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
// 连接Channel生命周期监听事件,后面具体说明
this.channelEventListener = channelEventListener;
// 3.创建公共线程池,作用于在注册NettyRequestProcessor,没用设置线程池默认用该线程池
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// NettyServerPublicExecutor_
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums,....
// 4.创建EventLoopGroup,根据epoll选择EpollEventLoopGroup或者NioEventLoopGroup;
// 创建两个线程组:1.boss线程组,用于ACCEPT,2.selector线程组,用于select(),读或者写操作;这个是经典Reactor模型;
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, ....
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()...
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1,....
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(),...
}
loadSslContext();
}
|
b.netty启动开始,start方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
@Override
public void start() {
// 1.创建ChannelHandler处理线程组,用于解密或者编码等等....
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads()....
// 2.创建共享ChannelHandler
prepareSharableHandlers();
// 3.配置netty 配置信息,已经添加ChannelHandler
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//用defaultEventExecutorGroup处理相应的ChannelHandler
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
// 创建内存池管理
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
// 4.启动netty server服务;
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} ...
// 5.启动channelEvent线程,用于channelEventListener
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 6.每1秒钟扫描超时请求触发终止操作。
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
//编码协议
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
//request和reposne核心处理
serverHandler = new NettyServerHandler();
}
|
c.why useEpoll()这样判断呢?Nio在liunx系统默认实现epoll,归根到底原因epoll的水平触发和边沿触发模式,Nio未实现边沿触发模式,Netty自己开发这块实现,提高性能;
水平触发和边沿触发模式区别
d.根据上面netty启动分析,线程模型如下

(3).RPC中定义协议很重要,如何定义协议?NettyEncoder编码协议和NettyDecoder解码协议分析
a.NettyEncoder编码,实现MessageToByteEncoder;
MessageToByteEncoder是netty工具自己实现编码类,消息类型转换byte;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
// 1.编码Header信息
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
// 2.编码Header信息
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
}....
}
|
b.NettyDecoder解码,实现LengthFieldBasedFrameDecoder
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
//解密
return RemotingCommand.decode(byteBuffer);
} ....
}
|
协议的编码或者解码;最终RemotingCommand类处理协议规范操作;
c.RemotingCommand结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
//请求操作码或者应答响应码
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
//请求ID
private int opaque = requestId.getAndIncrement();
// 从右到左开始,第1bit,标识请求类型 第2bit,rpc类型,异步,同步和单工
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
|
d.RemotingCommand编码和解码协议跟上面协议定义
看看是怎么编码以及解码呢?
1).encode(编码)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
//创建 ByteBuffer
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
private byte[] headerEncode() {
//把customHeader的属性存放到extFields的Map中,这样编码存在一个问题,实现类的属性只能基本数据类型,RocketMq支持类型:
//int,long,dubbo,bool已经相应的封装基本类型和String
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
}...
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
|
3).decode(解码)就是把字节反序列化RemotingCommand,其次很简单;在看看decodeCommandCustomHeader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
CommandCustomHeader objectHeader;
try {
objectHeader = classHeader.newInstance();
}...
if (this.extFields != null) {
Field[] fields = getClazzFields(classHeader);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String fieldName = field.getName();
if (!fieldName.startsWith("this")) {
try {
String value = this.extFields.get(fieldName);
if (null == value) {
//@CFNotNull主键验证操作
if (!isFieldNullable(field)) {
...
}
continue;
}
field.setAccessible(true);
String type = getCanonicalName(field.getType());
Object valueParsed;
if (type.equals(STRING_CANONICAL_NAME)) {
valueParsed = value;
} else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
valueParsed = Integer.parseInt(value);
} else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
valueParsed = Long.parseLong(value);
} else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
} else {
...
}
field.set(objectHeader, valueParsed);
}...
}
}
}
objectHeader.checkFields();
}
return objectHeader;
}
|
decodeCommandCustomHeader方法很简单,很容易理解反序列化CommandCustomHeader;实现简单容易理解,问题CommandCustomHeader实现类字段属性多,这块序列化字节占一定内存;
(4).RPC协议定义好,数据传输操作,发送操作
a.发送消息,有返回结果,是怎么确定该发送消息结果呢? 具体看看invokeSync同步发送消息方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
@Override
public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
// 1.(request.getOpaque(),responseFuture)缓存到responseTable;
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
//发送数据成功操作,
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
//发送失败操作
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 2.等待返回结果操作
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
// 3.移除 ResponseFuture
this.responseTable.remove(opaque);
}
}
|
1).回到上面问题,是怎么确定返回消息;
-
先按(request.getOpaque(),responseFuture)缓存到responseTable;
-
返回结果中opaque跟请求一样子ID值,responseTable找到相应ResponseFuture,ResponseFuture做相应处理;
1
2
3
4
5
6
|
/**
* This map caches all on-going requests.
* 缓存正在进行的请求等待返回结果
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
|
2).responseTable缓存存在问题?同步好说,请求或者超时都会删除该请求缓存;异步发送,一直缓存没有返回结果,缓存会刷爆;
在Netty start启动方法中有个每秒调度扫描responseTable缓存超时处理;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
// 超时判断
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
//异步处理 executeInvokeCallback;
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
|
b.ResponseFuture实现异步操作
1).结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
// 请求id
private final int opaque;
private final Channel processChannel;
private final long timeoutMillis;
//回调操作
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
//控制线程等待操作
private final CountDownLatch countDownLatch = new CountDownLatch(1);
//用于异步发送或者单工有流量限制,释放资源;AtomicBoolean控制只准一次;
private final SemaphoreReleaseOnlyOnce once;
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
// 返回结果
private volatile RemotingCommand responseCommand;
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
|
2).线程等待
1
2
3
4
5
6
7
8
9
|
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
|
c.invokeAsync异步发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)...{
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
//Semaphore控制异步发送消息数据
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
}...
}...
}
|
d.invokeOneway
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)... {
request.markOnewayRPC();
// semaphoreOneway信号控制发送频率
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} ...
} ...
}
|
(5).RPC协议定义好,数据传输操作,读操作,NettyServerHandler处理相应这块逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
//接受数据处理,请求数据和返回结果数据处理
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
|
a. processRequestCommand请求数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//1.根据 RemotingCommand的code获取相应NettyRequestProcessor,未获取到,使用默认;
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
//2.创建任务,线程池处理NettyRequestProcessor
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//调用RpcHooks方法,执行前或者执行后
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
//最终调用的方法
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
//判断是否是单工请求
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} ....
} else {
}
}
} ....
}
};
// 3.拒绝要求判断操作
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 4.线程池提交任务
pair.getObject2().submit(requestTask);
} ...
} ...
}
|
b. processResponseCommand 返回数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
// ResponseFuture完成操作,已经相应回调操作
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} ...
}
|
(6).NettyRequestProcessor实现接口处理相应业务逻辑;
a.NettyRemotingAbstract管理NettyRequestProcessor
NettyRemotingAbstract这个类看起来很别扭,一般不是Abstract写在前面吗?
1
2
3
4
5
6
7
8
9
10
|
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
|
b.NettyRemotingServer注册NettyRequestProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = this.publicExecutor;
}
Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
|
3.NettyRemotingClient RPC客服端实现
(1).实现类图结构

(2).如何实现Netty创建链接?
a.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
private final Bootstrap bootstrap = new Bootstrap();
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads()...
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
...
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});
//扫描请求返回结果超时
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
|
b.createChannel(final String addr) 创建链接
channelTables缓存中存在Channel,先关闭重新创建一个链接Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
private Channel createChannel(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
cw.getChannel().close();
channelTables.remove(addr);
}
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
cw.getChannel().close();
this.channelTables.remove(addr);
createNewConnection = true;
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
//创建链接,ChannelFuture缓存channelTables
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
}...
finally {
this.lockChannelTables.unlock();
}
}...
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} ...
}...
}
return null;
}
|
(3).发送消息
a.invokeSync同步发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)... {
long beginStartTime = System.currentTimeMillis();
//1.按地址获取Channel,地址为空,获取NameServerChannel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
//调用Netty Server相同处理结果
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} ....
}...
}
|
b.getAndCreateChannel获取Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) {
// 地址为空,获取NameServerChannel
// namesrvAddrList集合中随机创建Channel
return getAndCreateNameserverChannel();
}
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
//创建新的Channel
return this.createChannel(addr);
}
|
c.invokeAsync,invokeOneway跟invokeSync类同,最终调用相应Netty Server相同处理方法,这里不做具体分析;
(4).接受消息,NettyClientHandler核心类
1
2
3
4
5
6
7
|
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
|
4.ChannelEventListener 在服务端以及客服端构造方法中都有这个参数,来了解作用
(1).结构展示出来

上图ChannelEventListener接口方法描述就是Channel的生命周期事件触发;
(2).服务端或者客户端start()
1
2
3
4
|
// 5.启动channelEvent线程,用于channelEventListener
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
|
NettyEventExecutor触发事件点,NettyEventExecutor是线程;NettyEventExecutor实现简单生产者消费者模式;下面解析NettyEventExecutor;
a.生产者消费者模式必须有消息事件,ChannelEventListener相当于消费,在接口中参数类似于我们消息,封装成NettyEvent
1
2
3
4
5
6
7
8
9
10
11
12
|
public class NettyEvent {
private final NettyEventType type;
private final String remoteAddr;
private final Channel channel;
}
public enum NettyEventType {
CONNECT,
CLOSE,
IDLE,
EXCEPTION
}
|
b.NettyEventExecutor构造就启动线程,调用NettyEventExecutor.run方法;从堵塞队列中拿数据,根据相应事件类型调用相应ChannelEventListener方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Override
public void run() {
...
final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
while (!this.isStopped()) {
try {
//从堵塞队列中拿数据
NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
if (event != null && listener != null) {
switch (event.getType()) {
case IDLE:
listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
break;
case CLOSE:
listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
break;
case CONNECT:
listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
break;
case EXCEPTION:
listener.onChannelException(event.getRemoteAddr(), event.getChannel());
break;
default:
break;
}
}
....
}
|
c.putNettyEvent发布事件
1
2
3
4
5
6
7
8
9
10
11
12
|
//有界堵塞队列
private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
private final int maxSize = 10000;
public void putNettyEvent(final NettyEvent event) {
//队列数量达到1万,该事件忽略掉
if (this.eventQueue.size() <= maxSize) {
this.eventQueue.add(event);
} else {
log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
}
}
|
d.在哪里生产这个事件呢?服务端以及客户端对应NettyConnectManageHandler的ChannelHandler处理相应发布事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
// 如下服务端代码,客户端不列出来,都类同;
@ChannelHandler.Sharable
class NettyConnectManageHandler extends ChannelDuplexHandler {
....
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
...
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
....
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
...
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
...
if (NettyRemotingServer.this.channelEventListener != null) {
NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
RemotingUtil.closeChannel(ctx.channel());
}
|
五、配置
1.NettyServerConfig 看看Server配置做相应优化操作
1
2
3
4
5
6
7
8
9
|
//编码或者解密,心跳以及触发RequestProcessor任务
private int serverWorkerThreads = 8;
//默认4,RequestProcessor为配置线程池,就用到这个配置线程池
private int serverCallbackExecutorThreads = 0;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
|
2.NettyClientConfig 看看Client配置做相应优化操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
private int clientWorkerThreads = 4;
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
//65535
private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
//65535
private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
private int connectTimeoutMillis = 3000;
private long channelNotActiveInterval = 1000 * 60;
/**
* IdleStateEvent will be triggered when neither read nor write was performed for
* the specified period of this time. Specify {@code 0} to disable
*/
private int clientChannelMaxIdleTimeSeconds = 120;
private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
|
六、总结