一、背景

rocketmq-remoting模块贯穿RocketMq网络层以及RPC实现;基于Netty实现;

二、基于Netty实现线程模型

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 线程池(1)
N NettyServerEPOLLSelector_%d_%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务RequestProcessor处理线程池

盗用官方的图片以及描述

三、协议定义

1、消息协议

1
2
3
4
   +----------------------+-------------------------------+----------------+-----------+
   | Message total length | serializeType & header length |  header data   | body data |
   |        (4byte)       |           4(1 + 3 )           |                |           |
   +----------------------+-------------------------------+----------------+-----------+

2、header data 协议

serializeType有两种:

  • JSON (fastjson 序列化)
  • ROCKETMQ (自定义如下)
1
2
3
4
   +------+---------------+------------+--------+------+---------------+--------------------------------------------------+
   | code | LanguageCode  |  version   | opaque | flag |     remark    |              extFields                           |
   |  (2) |    1          |     2      |   4    | 4    | length(4) + data | length(4)+ keySize(2) + Key+ valSize(4) + val |
   +------+---------------+------------+--------+------+---------------+--------------------------------------------------+
Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap<String, String> 请求自定义扩展信息 响应自定义扩展信息
  • extFields 用于继承CommandCustomHeader实现类,把相应字段存放extFields,按一定协议序列化以及发序列化操作

四、源码分析

从这个rocketmq-remoting模块开始,就是RocketMq网络层以及RPC实现的模块;

1.核心接口类图

相关方法具体说明一下:

  • registerProcessor 根据请求code处理相应业务逻辑NettyRequestProcessor;
  • invokeSync,invokeAsync,invokeOneway,根据方面名字就知道意思,调用方式:同步,异步,单工(请求没有返回结果)

基于接口编程,为扩展性,NettyRequestProcessor改一下,我觉得更好,不跟Netty网络协议名称定义;那天开源比Netty更好的网络协议;更好扩展这块;

2.RemotingServer(RPC服务端实现)

(1).实现类图结构

基于netty实现NettyRemotingServer类;

(2).如何利用netty工具包启动服务?

a.第一步查看一下NettyRemotingServer构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {
       // 1.创建两个Semaphore,作用于单工调用和异步调用的限流控制;    
       super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
       // 2.创建ServerBootstrap,这个就是Netty工具包,Server启动类
       this.serverBootstrap = new ServerBootstrap();
       this.nettyServerConfig = nettyServerConfig;
       // 连接Channel生命周期监听事件,后面具体说明
       this.channelEventListener = channelEventListener;
       // 3.创建公共线程池,作用于在注册NettyRequestProcessor,没用设置线程池默认用该线程池
       int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
       if (publicThreadNums <= 0) {
           publicThreadNums = 4;
       }
       // NettyServerPublicExecutor_
       this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums,....
       // 4.创建EventLoopGroup,根据epoll选择EpollEventLoopGroup或者NioEventLoopGroup;
       //   创建两个线程组:1.boss线程组,用于ACCEPT,2.selector线程组,用于select(),读或者写操作;这个是经典Reactor模型;
       if (useEpoll()) {
           this.eventLoopGroupBoss = new EpollEventLoopGroup(1, ....
           this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()...
       } else {
           this.eventLoopGroupBoss = new NioEventLoopGroup(1,....
           this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(),...
       }
       loadSslContext();
   } 

b.netty启动开始,start方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
 public void start() {
     // 1.创建ChannelHandler处理线程组,用于解密或者编码等等....
     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads()....
     // 2.创建共享ChannelHandler
     prepareSharableHandlers();
     // 3.配置netty 配置信息,已经添加ChannelHandler
     ServerBootstrap childHandler =
         this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)...
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     //用defaultEventExecutorGroup处理相应的ChannelHandler
                     ch.pipeline()
                         .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                         .addLast(defaultEventExecutorGroup,
                             encoder,
                             new NettyDecoder(),
                             new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                             connectionManageHandler,
                             serverHandler
                         );
                 }
             });
     //  创建内存池管理
     if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
         childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
     }
     // 4.启动netty server服务;
     try {
         ChannelFuture sync = this.serverBootstrap.bind().sync();
         InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
         this.port = addr.getPort();
     } ...
     // 5.启动channelEvent线程,用于channelEventListener
     if (this.channelEventListener != null) {
         this.nettyEventExecutor.start();
     }
     // 6.每1秒钟扫描超时请求触发终止操作。
     this.timer.scheduleAtFixedRate(new TimerTask() {

         @Override
         public void run() {
             try {
                 NettyRemotingServer.this.scanResponseTable();
             } catch (Throwable e) {
                 log.error("scanResponseTable exception", e);
             }
         }
     }, 1000 * 3, 1000);
 }

 private void prepareSharableHandlers() {
     handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
     //编码协议
     encoder = new NettyEncoder();
     connectionManageHandler = new NettyConnectManageHandler();
     //request和reposne核心处理
     serverHandler = new NettyServerHandler();
 }   

c.why useEpoll()这样判断呢?Nio在liunx系统默认实现epoll,归根到底原因epoll的水平触发和边沿触发模式,Nio未实现边沿触发模式,Netty自己开发这块实现,提高性能;

水平触发和边沿触发模式区别

d.根据上面netty启动分析,线程模型如下

(3).RPC中定义协议很重要,如何定义协议?NettyEncoder编码协议和NettyDecoder解码协议分析

a.NettyEncoder编码,实现MessageToByteEncoder;

MessageToByteEncoder是netty工具自己实现编码类,消息类型转换byte;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   @Override
   public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
       throws Exception {
       try {
           // 1.编码Header信息
           ByteBuffer header = remotingCommand.encodeHeader();
           out.writeBytes(header);
          // 2.编码Header信息
           byte[] body = remotingCommand.getBody();
           if (body != null) {
               out.writeBytes(body);
           }
       }....
   } 

b.NettyDecoder解码,实现LengthFieldBasedFrameDecoder

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   @Override
   public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
       ByteBuf frame = null;
       try {
           frame = (ByteBuf) super.decode(ctx, in);
           if (null == frame) {
               return null;
           }

           ByteBuffer byteBuffer = frame.nioBuffer();
           //解密
           return RemotingCommand.decode(byteBuffer);
       } ....
   } 

协议的编码或者解码;最终RemotingCommand类处理协议规范操作;

c.RemotingCommand结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
      //请求操作码或者应答响应码
      private int code;
      private LanguageCode language = LanguageCode.JAVA;
      private int version = 0;
      //请求ID
      private int opaque = requestId.getAndIncrement();
      // 从右到左开始,第1bit,标识请求类型 第2bit,rpc类型,异步,同步和单工
      private int flag = 0;
      private String remark;
      private HashMap<String, String> extFields;
      private transient CommandCustomHeader customHeader;
  
      private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
  
      private transient byte[] body;

d.RemotingCommand编码和解码协议跟上面协议定义

看看是怎么编码以及解码呢?

1).encode(编码)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  public ByteBuffer encode() {
      // 1> header length size
      int length = 4;

      // 2> header data length
      byte[] headerData = this.headerEncode();
      length += headerData.length;

      // 3> body data length
      if (this.body != null) {
          length += body.length;
      }
      //创建 ByteBuffer
      ByteBuffer result = ByteBuffer.allocate(4 + length);

      // length
      result.putInt(length);

      // header length
      result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

      // header data
      result.put(headerData);

      // body data;
      if (this.body != null) {
          result.put(this.body);
      }

      result.flip();

      return result;
  }  
2).headerEncode(head编码)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   private byte[] headerEncode() {
       //把customHeader的属性存放到extFields的Map中,这样编码存在一个问题,实现类的属性只能基本数据类型,RocketMq支持类型:
       //int,long,dubbo,bool已经相应的封装基本类型和String
       this.makeCustomHeaderToNet();
       if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
           return RocketMQSerializable.rocketMQProtocolEncode(this);
       } else {
           return RemotingSerializable.encode(this);
       }
   }
   public void makeCustomHeaderToNet() {
       if (this.customHeader != null) {
           Field[] fields = getClazzFields(customHeader.getClass());
           if (null == this.extFields) {
               this.extFields = new HashMap<String, String>();
           }
           for (Field field : fields) {
               if (!Modifier.isStatic(field.getModifiers())) {
                   String name = field.getName();
                   if (!name.startsWith("this")) {
                       Object value = null;
                       try {
                           field.setAccessible(true);
                           value = field.get(this.customHeader);
                       }...
                       if (value != null) {
                           this.extFields.put(name, value.toString());
                       }
                   }
               }
           }
       }
   } 
3).decode(解码)就是把字节反序列化RemotingCommand,其次很简单;在看看decodeCommandCustomHeader
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
   public CommandCustomHeader decodeCommandCustomHeader(
       Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
       CommandCustomHeader objectHeader;
       try {
           objectHeader = classHeader.newInstance();
       }...

       if (this.extFields != null) {
           Field[] fields = getClazzFields(classHeader);
           for (Field field : fields) {
               if (!Modifier.isStatic(field.getModifiers())) {
                   String fieldName = field.getName();
                   if (!fieldName.startsWith("this")) {
                       try {
                           String value = this.extFields.get(fieldName);
                           if (null == value) {
                               //@CFNotNull主键验证操作
                               if (!isFieldNullable(field)) {
                                  ...
                               }
                               continue;
                           }
                           field.setAccessible(true);
                           String type = getCanonicalName(field.getType());
                           Object valueParsed;
                           if (type.equals(STRING_CANONICAL_NAME)) {
                               valueParsed = value;
                           } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
                               valueParsed = Integer.parseInt(value);
                           } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
                               valueParsed = Long.parseLong(value);
                           } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
                               valueParsed = Boolean.parseBoolean(value);
                           } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
                               valueParsed = Double.parseDouble(value);
                           } else {
                               ...
                           }
                           field.set(objectHeader, valueParsed);
                       }...
                   }
               }
           }
           objectHeader.checkFields();
       }
       return objectHeader;
   } 

decodeCommandCustomHeader方法很简单,很容易理解反序列化CommandCustomHeader;实现简单容易理解,问题CommandCustomHeader实现类字段属性多,这块序列化字节占一定内存;

(4).RPC协议定义好,数据传输操作,发送操作

a.发送消息,有返回结果,是怎么确定该发送消息结果呢? 具体看看invokeSync同步发送消息方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
 @Override
   public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
       throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
       return this.invokeSyncImpl(channel, request, timeoutMillis);
   }
 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
       final long timeoutMillis)
       throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
       final int opaque = request.getOpaque();

       try {
           // 1.(request.getOpaque(),responseFuture)缓存到responseTable;
           final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
           this.responseTable.put(opaque, responseFuture);
           final SocketAddress addr = channel.remoteAddress();
           channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture f) throws Exception {
                   if (f.isSuccess()) {
                     //发送数据成功操作,
                       responseFuture.setSendRequestOK(true);
                       return;
                   } else {
                       responseFuture.setSendRequestOK(false);
                   }
                   //发送失败操作
                   responseTable.remove(opaque);
                   responseFuture.setCause(f.cause());
                   responseFuture.putResponse(null);
                   log.warn("send a request command to channel <" + addr + "> failed.");
               }
           });
           // 2.等待返回结果操作
           RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
           if (null == responseCommand) {
               if (responseFuture.isSendRequestOK()) {
                   throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                       responseFuture.getCause());
               } else {
                   throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
               }
           }

           return responseCommand;
       } finally {
           // 3.移除 ResponseFuture
           this.responseTable.remove(opaque);
       }
   } 
1).回到上面问题,是怎么确定返回消息;
  • 先按(request.getOpaque(),responseFuture)缓存到responseTable;

  • 返回结果中opaque跟请求一样子ID值,responseTable找到相应ResponseFuture,ResponseFuture做相应处理;

1
2
3
4
5
6
    /**
     * This map caches all on-going requests.
     * 缓存正在进行的请求等待返回结果
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);
2).responseTable缓存存在问题?同步好说,请求或者超时都会删除该请求缓存;异步发送,一直缓存没有返回结果,缓存会刷爆;

在Netty start启动方法中有个每秒调度扫描responseTable缓存超时处理;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  public void scanResponseTable() {
      final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
      Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
      while (it.hasNext()) {
          Entry<Integer, ResponseFuture> next = it.next();
          ResponseFuture rep = next.getValue();
          // 超时判断
          if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
              rep.release();
              it.remove();
              rfList.add(rep);
              log.warn("remove timeout request, " + rep);
          }
      }

      for (ResponseFuture rf : rfList) {
          try {
            //异步处理 executeInvokeCallback;
              executeInvokeCallback(rf);
          } catch (Throwable e) {
              log.warn("scanResponseTable, operationComplete Exception", e);
          }
      }
  }  

b.ResponseFuture实现异步操作

1).结构
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   // 请求id
   private final int opaque;
   private final Channel processChannel;
   private final long timeoutMillis;
   //回调操作
   private final InvokeCallback invokeCallback;
   private final long beginTimestamp = System.currentTimeMillis();
   //控制线程等待操作
   private final CountDownLatch countDownLatch = new CountDownLatch(1);
   //用于异步发送或者单工有流量限制,释放资源;AtomicBoolean控制只准一次;
   private final SemaphoreReleaseOnlyOnce once;

   private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
   // 返回结果
   private volatile RemotingCommand responseCommand;
   private volatile boolean sendRequestOK = true;
   private volatile Throwable cause; 
2).线程等待
1
2
3
4
5
6
7
8
9
    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }

c.invokeAsync异步发送消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   @Override
   public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
       throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
       this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
   }
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)...{
       long beginStartTime = System.currentTimeMillis();
       final int opaque = request.getOpaque();
        //Semaphore控制异步发送消息数据
       boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
       if (acquired) {
           final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
           long costTime = System.currentTimeMillis() - beginStartTime;
           if (timeoutMillis < costTime) {
               once.release();
               throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
           }

           final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
           this.responseTable.put(opaque, responseFuture);
           try {
               channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                   @Override
                   public void operationComplete(ChannelFuture f) throws Exception {
                       if (f.isSuccess()) {
                           responseFuture.setSendRequestOK(true);
                           return;
                       }
                       requestFail(opaque);
                       log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                   }
               });
           }...
       }...
   } 

d.invokeOneway

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
   public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)... {
       request.markOnewayRPC();
       // semaphoreOneway信号控制发送频率
       boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
       if (acquired) {
           final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
           try {
               channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                   @Override
                   public void operationComplete(ChannelFuture f) throws Exception {
                       once.release();
                       if (!f.isSuccess()) {
                           log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                       }
                   }
               });
           } ...
       } ...
   }

(5).RPC协议定义好,数据传输操作,读操作,NettyServerHandler处理相应这块逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
   //接受数据处理,请求数据和返回结果数据处理
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

a. processRequestCommand请求数据处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
  public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
         //1.根据 RemotingCommand的code获取相应NettyRequestProcessor,未获取到,使用默认;
         final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
         final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
         final int opaque = cmd.getOpaque();
         //2.创建任务,线程池处理NettyRequestProcessor
         if (pair != null) {
             Runnable run = new Runnable() {
                 @Override
                 public void run() {
                     try {
                         //调用RpcHooks方法,执行前或者执行后
                         doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                         //最终调用的方法 
                         final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                         doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                         //判断是否是单工请求
                         if (!cmd.isOnewayRPC()) {
                             if (response != null) {
                                 response.setOpaque(opaque);
                                 response.markResponseType();
                                 try {
                                     ctx.writeAndFlush(response);
                                 } ....
                             } else {
 
                             }
                         }
                     } ....
                 }
             };
             // 3.拒绝要求判断操作
             if (pair.getObject1().rejectRequest()) {
                 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                     "[REJECTREQUEST]system busy, start flow control for a while");
                 response.setOpaque(opaque);
                 ctx.writeAndFlush(response);
                 return;
             }
 
             try {
                 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
             // 4.线程池提交任务   
                 pair.getObject2().submit(requestTask);
             } ...
         } ...
     }

b. processResponseCommand 返回数据处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
       final int opaque = cmd.getOpaque();
       // ResponseFuture完成操作,已经相应回调操作
       final ResponseFuture responseFuture = responseTable.get(opaque);
       if (responseFuture != null) {
           responseFuture.setResponseCommand(cmd);

           responseTable.remove(opaque);

           if (responseFuture.getInvokeCallback() != null) {
               executeInvokeCallback(responseFuture);
           } else {
               responseFuture.putResponse(cmd);
               responseFuture.release();
           }
       } ...
   } 

(6).NettyRequestProcessor实现接口处理相应业务逻辑;

a.NettyRemotingAbstract管理NettyRequestProcessor

NettyRemotingAbstract这个类看起来很别扭,一般不是Abstract写在前面吗?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
   /**
    * This container holds all processors per request code, aka, for each incoming request, we may look up the
    * responding processor in this map to handle the request.
    */
   protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
       new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); 
   /**
    * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
    */
   protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

b.NettyRemotingServer注册NettyRequestProcessor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  @Override
  public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
      ExecutorService executorThis = executor;
      if (null == executor) {
          executorThis = this.publicExecutor;
      }

      Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
      this.processorTable.put(requestCode, pair);
  }

  @Override
  public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
      this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
  }  

3.NettyRemotingClient RPC客服端实现

(1).实现类图结构

(2).如何实现Netty创建链接?

a.start()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private final Bootstrap bootstrap = new Bootstrap(); 

   @Override
   public void start() {
       this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads()...

       Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
           ...
           .handler(new ChannelInitializer<SocketChannel>() {
               @Override
               public void initChannel(SocketChannel ch) throws Exception {
                   ChannelPipeline pipeline = ch.pipeline();
                   if (nettyClientConfig.isUseTLS()) {
                       if (null != sslContext) {
                           pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                           log.info("Prepend SSL handler");
                       } else {
                           log.warn("Connections are insecure as SSLContext is null!");
                       }
                   }
                   pipeline.addLast(
                       defaultEventExecutorGroup,
                       new NettyEncoder(),
                       new NettyDecoder(),
                       new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                       new NettyConnectManageHandler(),
                       new NettyClientHandler());
               }
           });
      //扫描请求返回结果超时
       this.timer.scheduleAtFixedRate(new TimerTask() {
           @Override
           public void run() {
               try {
                   NettyRemotingClient.this.scanResponseTable();
               } catch (Throwable e) {
                   log.error("scanResponseTable exception", e);
               }
           }
       }, 1000 * 3, 1000);

       if (this.channelEventListener != null) {
           this.nettyEventExecutor.start();
       }
   }

b.createChannel(final String addr) 创建链接

channelTables缓存中存在Channel,先关闭重新创建一个链接Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
   private Channel createChannel(final String addr) throws InterruptedException {
       ChannelWrapper cw = this.channelTables.get(addr);
       if (cw != null && cw.isOK()) {
           cw.getChannel().close();
           channelTables.remove(addr);
       }
      
       if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
           try {
               boolean createNewConnection;
               cw = this.channelTables.get(addr);
               if (cw != null) {

                   if (cw.isOK()) {
                       cw.getChannel().close();
                       this.channelTables.remove(addr);
                       createNewConnection = true;
                   } else if (!cw.getChannelFuture().isDone()) {
                       createNewConnection = false;
                   } else {
                       this.channelTables.remove(addr);
                       createNewConnection = true;
                   }
               } else {
                   createNewConnection = true;
               }

               if (createNewConnection) {
                   //创建链接,ChannelFuture缓存channelTables
                   ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                   log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                   cw = new ChannelWrapper(channelFuture);
                   this.channelTables.put(addr, cw);
               }
           }...
           finally {
               this.lockChannelTables.unlock();
           }
       }...

       if (cw != null) {
           ChannelFuture channelFuture = cw.getChannelFuture();
           if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
               if (cw.isOK()) {
                   log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
                   return cw.getChannel();
               } ...
           }...
       }

       return null;
   } 

(3).发送消息

a.invokeSync同步发送消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
   @Override
   public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)... {
       long beginStartTime = System.currentTimeMillis();
       //1.按地址获取Channel,地址为空,获取NameServerChannel
       final Channel channel = this.getAndCreateChannel(addr);
       if (channel != null && channel.isActive()) {
           try {
               doBeforeRpcHooks(addr, request);
               long costTime = System.currentTimeMillis() - beginStartTime;
               if (timeoutMillis < costTime) {
                   throw new RemotingTimeoutException("invokeSync call timeout");
               }
               //调用Netty Server相同处理结果
               RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
               doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
               return response;
           } ....
       }...
   } 

b.getAndCreateChannel获取Channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   private Channel getAndCreateChannel(final String addr) throws InterruptedException {
       if (null == addr) {
          // 地址为空,获取NameServerChannel
          // namesrvAddrList集合中随机创建Channel
           return getAndCreateNameserverChannel();
       }

       ChannelWrapper cw = this.channelTables.get(addr);
       if (cw != null && cw.isOK()) {
           return cw.getChannel();
       }
       //创建新的Channel
       return this.createChannel(addr);
   } 

c.invokeAsync,invokeOneway跟invokeSync类同,最终调用相应Netty Server相同处理方法,这里不做具体分析;

(4).接受消息,NettyClientHandler核心类

1
2
3
4
5
6
7
   class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

       @Override
       protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
           processMessageReceived(ctx, msg);
       }
   } 

4.ChannelEventListener 在服务端以及客服端构造方法中都有这个参数,来了解作用

(1).结构展示出来

上图ChannelEventListener接口方法描述就是Channel的生命周期事件触发;

(2).服务端或者客户端start()

1
2
3
4
       // 5.启动channelEvent线程,用于channelEventListener
       if (this.channelEventListener != null) {
           this.nettyEventExecutor.start();
       } 

NettyEventExecutor触发事件点,NettyEventExecutor是线程;NettyEventExecutor实现简单生产者消费者模式;下面解析NettyEventExecutor;

a.生产者消费者模式必须有消息事件,ChannelEventListener相当于消费,在接口中参数类似于我们消息,封装成NettyEvent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
 public class NettyEvent {
   private final NettyEventType type;
   private final String remoteAddr;
   private final Channel channel;
 }

public enum NettyEventType {
   CONNECT,
   CLOSE,
   IDLE,
   EXCEPTION
}

b.NettyEventExecutor构造就启动线程,调用NettyEventExecutor.run方法;从堵塞队列中拿数据,根据相应事件类型调用相应ChannelEventListener方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
      @Override
      public void run() {
         ...
          final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
          while (!this.isStopped()) {
              try {
                  //从堵塞队列中拿数据
                  NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                  if (event != null && listener != null) {
                      switch (event.getType()) {
                          case IDLE:
                              listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                              break;
                          case CLOSE:
                              listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                              break;
                          case CONNECT:
                              listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                              break;
                          case EXCEPTION:
                              listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                              break;
                          default:
                              break;

                      }
                  }
   ....
      }  

c.putNettyEvent发布事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
       //有界堵塞队列  
       private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
       private final int maxSize = 10000;

       public void putNettyEvent(final NettyEvent event) {
          //队列数量达到1万,该事件忽略掉
           if (this.eventQueue.size() <= maxSize) {
               this.eventQueue.add(event);
           } else {
               log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
           }
       } 

d.在哪里生产这个事件呢?服务端以及客户端对应NettyConnectManageHandler的ChannelHandler处理相应发布事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  // 如下服务端代码,客户端不列出来,都类同;
   @ChannelHandler.Sharable
   class NettyConnectManageHandler extends ChannelDuplexHandler {
        ....

       @Override
       public void channelActive(ChannelHandlerContext ctx) throws Exception {
           ...
           if (NettyRemotingServer.this.channelEventListener != null) {
               NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
           }
       }

       @Override
       public void channelInactive(ChannelHandlerContext ctx) throws Exception {
           ....
           if (NettyRemotingServer.this.channelEventListener != null) {
               NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
           }
       }

       @Override
       public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
           if (evt instanceof IdleStateEvent) {
               IdleStateEvent event = (IdleStateEvent) evt;
               if (event.state().equals(IdleState.ALL_IDLE)) {
                   final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                   ...
                   RemotingUtil.closeChannel(ctx.channel());
                   if (NettyRemotingServer.this.channelEventListener != null) {
                       NettyRemotingServer.this
                           .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                   }
               }
           }
           ctx.fireUserEventTriggered(evt);
       }

       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           ...
           if (NettyRemotingServer.this.channelEventListener != null) {
               NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
           }
           RemotingUtil.closeChannel(ctx.channel());
       } 

五、配置

1.NettyServerConfig 看看Server配置做相应优化操作

1
2
3
4
5
6
7
8
9
  //编码或者解密,心跳以及触发RequestProcessor任务 
  private int serverWorkerThreads = 8;
  //默认4,RequestProcessor为配置线程池,就用到这个配置线程池
  private int serverCallbackExecutorThreads = 0;
  private int serverSelectorThreads = 3;
  private int serverOnewaySemaphoreValue = 256;
  private int serverAsyncSemaphoreValue = 64;
  private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
  private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;  

2.NettyClientConfig 看看Client配置做相应优化操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   private int clientWorkerThreads = 4;
   private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
   //65535
   private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
   //65535
   private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
   private int connectTimeoutMillis = 3000;
   private long channelNotActiveInterval = 1000 * 60;

   /**
    * IdleStateEvent will be triggered when neither read nor write was performed for
    * the specified period of this time. Specify {@code 0} to disable
    */
   private int clientChannelMaxIdleTimeSeconds = 120;

   private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
   private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; 

六、总结

  • 基于Netty实现网络层,自定义自己协议;

  • NettyRequestProcessor接口实现相应请求业务逻辑;根据code区分NettyRequestProcessor;

  • 基于Netty线程模式,在创建两个线程池,一个解码或者编码,一个处理NettyRequestProcessor线程池,提高性能;