一、dubbo整体架构

1.先看看架构图

分层架构,一目了然,每一层处理相应的业务逻辑;

官方文档写很棒;

2.在 Dubbo 的核心领域模型中:

  • Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。

  • Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。

  • Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。

  • 基本设计原则

  • 采用 Microkernel + Plugin 模式,Microkernel 只负责组装 Plugin,Dubbo 自身的功能也是通过扩展点实现的,也就是 Dubbo 的所有功能点都可被用户自定义扩展所替换。

  • 采用 URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息。

    • 例如:registry://192.168.1.7:9090/com.alibaba.service1?param1=value1&param2=value2

a.领域模型类图

二、dubbo发布服务

1.Spring#afterPropertiesSet()执行方法发布服务

执行方法顺序:

ServiceBean.afterPropertiesSet()->ServiceConfig.export()->doExport()->doExportUrls() ->doExportUrlsFor1Protocol()

2.ServiceConfig中定义常量(static final)Protocol和ProxyFactory的适配器类,从ExtensionLoader获取;

1
2
3
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

(1).ExtensionLoader自动生成Protocol生成Protocol$Adaptive类,看看结构,跟ExtensionLoader分析是不是一样;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

(2).Protocol的SPI实现

创建对象被ProtocolFilterWrapper和ProtocolListenerWrapper一起包装

(3).ProxyFactory的SPI实现动态代理

a.接口类图

  • getProxy这个用于消费端实现接口创建代理
  • getInvoker作用于的服务端创建Invoker代理
    • 创建AbstractProxyInvoker抽象类进行代理

b.SPI实现类

默认情况javassist动态代理,创建实例对象时,对象都被StubProxyFactoryWrapper包装; 具体不做分析,在SPI机制中说明;

c.JavassistProxyFactory 代理动态代理,创建代码,编译class,减少用反射,提高性能

d.StubProxyFactoryWrapper用于Dubbo的本地存根,消费端先调用本地的实现类逻辑处理;

3.ServiceConfig#doExportUrlsFor1Protocol部分代码

  • 注册registryURLs
    • registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.6.0&pid=28302&registry=zookeeper&timestamp=1593676329574
  • dubbo://192.168.31.122:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.31.122&bind.port=20881&dubbo=2.6.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=28302&server=netty4&side=provider&timestamp=1593676329586
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
      // 创建url,name默认dubbo
      URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
      // 配置 
      if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
              .hasExtension(url.getProtocol())) {
          url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                  .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
      }
       //配置为none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                ...
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        ...
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        Exporter<?> exporter = protocol.export(invoker);//调用ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);//调用ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

4. exportLocal 发布本地

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
        private void exportLocal(URL url) {
            if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                URL local = URL.valueOf(url.toFullString())
                        .setProtocol(Constants.LOCAL_PROTOCOL)
                        .setHost(LOCALHOST)
                        .setPort(0);
                ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
                Exporter<?> exporter = protocol.export(                   //调用ProtocolFilterWrapper->ProtocolListenerWrapper->InjvmProtocol->InjvmExporter
                        proxyFactory.getInvoker(ref, (Class) interfaceClass, local));//调用StubProxyFactoryWrapper->JavassistProxyFactory动态创建代理对象
                exporters.add(exporter);
                ...
            }
        }

5. 发布注册中心

ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol

(1). ProtocolFilterWrapper#exporter

  • URL是registry注册协议。不封装Filter链的Invoker
  • Invoker封装构成Filter连图;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {//URL是registry注册协议。不封装Filter链的Invoker
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    
    
   // 实现Filter的Invoker封装链处理
   private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
       Invoker<T> last = invoker;
       List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
       if (filters.size() > 0) {
           for (int i = filters.size() - 1; i >= 0; i--) {
               final Filter filter = filters.get(i);
               final Invoker<T> next = last;
               last = new Invoker<T>() {

                   public Class<T> getInterface() {
                       return invoker.getInterface();
                   }

                   public URL getUrl() {
                       return invoker.getUrl();
                   }

                   public boolean isAvailable() {
                       return invoker.isAvailable();
                   }

                   public Result invoke(Invocation invocation) throws RpcException {
                       return filter.invoke(next, invocation);
                   }

                   public void destroy() {
                       invoker.destroy();
                   }

                   @Override
                   public String toString() {
                       return invoker.toString();
                   }
               };
           }
       }
       return last;
   }

getActivateExtension获取实现Filter接口注解@Activate对象;@Activate按group,value做筛选;

key:Constants.SERVICE_FILTER_KEY, group:Constants.PROVIDER

GenericFilter处理Dubbo的泛化;

TimeoutFilter只是超时日志打印处理;

a.Filter的SPI实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

/**
* Filter. (SPI, Singleton, ThreadSafe)
*/
@SPI
public interface Filter {

  /**
   * do invoke filter.
   * <p>
   * <code>
   * // before filter
   * Result result = invoker.invoke(invocation);
   * // after filter
   * return result;
   * </code>
   *
   * @param invoker    service
   * @param invocation invocation.
   * @return invoke result.
   * @throws RpcException
   * @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation)
   */
  Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;


(2). ProtocolListenerWrapper#exporter

1
2
3
4
5
6
7
8
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

ExporterListener 未做实现

a).ExporterListener的SPI实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * ExporterListener. (SPI, Singleton, ThreadSafe)
 */
@SPI
public interface ExporterListener {

    /**
     * The exporter exported.
     *
     * @param exporter
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Protocol#export(Invoker)
     */
    void exported(Exporter<?> exporter) throws RpcException;

    /**
     * The exporter unexported.
     *
     * @param exporter
     * @throws RpcException
     * @see com.alibaba.dubbo.rpc.Exporter#unexport()
     */
    void unexported(Exporter<?> exporter);

}

(3). RegistryProtocol#exporter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        boolean register = registedProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }

            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

a).doLocalExport处理DubboProtocol

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

b).getRegistry 链接注册中心

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }
i. RegistryFactory的SPI实现
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * RegistryFactory. (SPI, Singleton, ThreadSafe)
 *
 * @author william.liangf
 * @see com.alibaba.dubbo.registry.support.AbstractRegistryFactory
 */
@SPI("dubbo")
public interface RegistryFactory {

    /**
     * 连接注册中心.
     * <p>
     * 连接注册中心需处理契约:<br>
     * 1. 当设置check=false时表示不检查连接,否则在连接不上时抛出异常。<br>
     * 2. 支持URL上的username:password权限认证。<br>
     * 3. 支持backup=10.20.153.10备选注册中心集群地址。<br>
     * 4. 支持file=registry.cache本地磁盘文件缓存。<br>
     * 5. 支持timeout=1000请求超时设置。<br>
     * 6. 支持session=60000会话超时或过期设置。<br>
     *
     * @param url 注册中心地址,不允许为空
     * @return 注册中心引用,总不返回空
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

}

c).register 提供服务注册到注册中心

1
2
3
4
    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

例如 Zookeeper注册地址:
/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.31.12….

d).订阅Constants.CONFIGURATORS_CATEGORY路径配置信息

1
2
3
4
5
6
7
8
  
     registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    private URL getSubscribedOverrideUrl(URL registedProviderUrl) {
        return registedProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL)
                .addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false));
    }

(4). DubboProtocol#exporter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);//例如:group/com.alibaba.dubbo.demo.DemoService:version:20881
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //创建DubboExporter
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                ...
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

(a). openServer 开启Netty服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
   private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //isserver 属性允许客服端调用
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                //重置服务配置
                server.reset(url);
            }
        }
    }
    //创建ExchangeServer
    private ExchangeServer createServer(URL url) {
        //服务器关闭时发送只读事件,默认情况下启用
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //心跳配置
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
        ...
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            //ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type).bind()
            server = Exchangers.bind(url, requestHandler);
        }...
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

(b). Exchangers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    /**
     * bind.
     *  exchanger
     * @param url
     * @param handler
     * @return message server
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    /**
     * connect.
     *
     * @param url
     * @param handler
     * @return message channel
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

就一个类HeaderExchanger

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/**
 * DefaultMessenger
 *
 *
 */
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        //Transporters.connect=ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension().connect
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
         //Transporters.bind=ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension().bind
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}
i.HeaderExchangeServer处理心跳
ii.DecodeHandler包装–>HeaderExchangeHandler包装->ExchangeHandler

(c).HeaderExchangeHandler#received 接受

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                //DefaultFuture.received(channel, response);
                handleResponse(channel, (Response) message);
            }...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
i.Request和Response

ii.Request

(d).HeaderExchangeHandler#request请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
i.DefaultFuture实现Future模式,等待Response结果
  • 缓存等待DefaultFuture
1
2
3
4
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

  • 创建DefaultFuture,添加到缓存中
1
2
3
4
5
6
7
8
9
   public DefaultFuture(Channel channel, Request request, int timeout) {
       this.channel = channel;
       this.request = request;
       this.id = request.getId();
       this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
       // put into waiting map.
       FUTURES.put(id, this);
       CHANNELS.put(id, channel);
   } 
  • 开启一个线程处理超时DefaultFuture的RemotingInvocationTimeoutScan

(e).requestHandler 客服端请求最终调用方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv); //exporterMap获取DubboExporter.getInvoker
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
               //最终调用
               return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            ...
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

(f).Transporter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SPI("netty")
public interface Transporter {

    /**
     * Bind a server.
     *
     * @param url     server url
     * @param handler
     * @return server
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, Receiver, ChannelHandler)
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * Connect to a server.
     *
     * @param url     server url
     * @param handler
     * @return client
     * @throws RemotingException
     * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, Receiver, ChannelListener)
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

NettyTransporter 源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);//开启Netty服务
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);////开启Netty客服端
    }

}

6.Registry 注册中心

看看接口类图

![image/dubbo25.jpg]

(1).实现类

![image/dubbo26.jpg]

  • FailbackRegistry故障处理
  • ZookeeperRegistry Zookeeper注册中心
  • RedisRegistry redis注册中心

(2).AbstractRegistry抽象类

a.构造

  • “/.dubbo/dubbo-registry-” + APPLICATION + “-” + Addres + “.cache"缓存文件path
  • 文件不存在就创建,加载到缓存properties中
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    public AbstractRegistry(URL url) {
        setUrl(url);
        // Start file save timer
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        loadProperties();
        notify(url.getBackupUrls());
    }

b.数据结构

1
2
3
   private final Set<URL> registered = new ConcurrentHashSet<URL>();
   private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
   private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();

从这个数据结构中,一目了然注册和监听都是ConcurrentMap控制,这里具体不说了

c.notify配置信息有变动实时触发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   protected void notify(URL url, NotifyListener listener, List<URL> urls) {
       ...
       Map<String, List<URL>> result = new HashMap<String, List<URL>>();
       //把url进行分组操作    
       for (URL u : urls) {
           if (UrlUtils.isMatch(url, u)) {
               String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
               List<URL> categoryList = result.get(category);
               if (categoryList == null) {
                   categoryList = new ArrayList<URL>();
                   result.put(category, categoryList);
               }
               categoryList.add(u);
           }
       }
       if (result.size() == 0) {
           return;
       }
       //缓存到notified
       //在持久化文件
       //触发NotifyListener#notify
       Map<String, List<URL>> categoryNotified = notified.get(url);
       if (categoryNotified == null) {
           notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
           categoryNotified = notified.get(url);
       }
       for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
           String category = entry.getKey();
           List<URL> categoryList = entry.getValue();
           categoryNotified.put(category, categoryList);
           saveProperties(url);
           listener.notify(categoryList);
       }
   } 

(3).FailbackRegistry容错处理

例如注册失败或者订阅失败等错误是怎么处理呢??

a.看看数据结构

1
2
3
4
5
6
7
8
9
   private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

   private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

   private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

   private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

   private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); 

记录错误操作对应相应操作

b.构造器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   public FailbackRegistry(URL url) {
       super(url);
       // 默认每5秒执行
       int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
       this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
           public void run() {
               // Check and connect to the registry
               try {
                   retry();
               } catch (Throwable t) { // Defensive fault tolerance
                   logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
               }
           }
       }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
   }

原理就是线程池每5秒触发一次,重试操作;是不是很简单

(4).ZookeeperRegistry

a.构造方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
   public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
       super(url);
       if (url.isAnyHost()) {
           throw new IllegalStateException("registry address == null");
       }
       String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
       if (!group.startsWith(Constants.PATH_SEPARATOR)) {
           group = Constants.PATH_SEPARATOR + group;
       }
       this.root = group;
       zkClient = zookeeperTransporter.connect(url);
       zkClient.addStateListener(new StateListener() {
           public void stateChanged(int state) {
               if (state == RECONNECTED) {
                   try {
                       //重新链接进行重新注册以及订阅操作
                       recover();
                   }...
               }
           }
       });
   } 
  • ZookeeperTransporter接口创建不同的zookeeper客户端;用户自己选择
  • 这里不具体分析ZookeeperTransporter

b.doRegister 注册服务

1
2
3
4
5
6
7
   protected void doRegister(URL url) {
       try {
           zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
       } catch (Throwable e) {
           throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
       }
   } 

toUrlPath: dubbo+/+ServiceInterface+/+CATEGORY_KEY+/+Url

c.doSubscrib订阅

  • ServiceInterface为*时,订阅dubbo下路径
  • 为正式服务名时,监听服务路径通知this.notify方法

(2).RedisRegistry

链接Redis工具包JedisPool;看看注册服务和订阅是怎么实现

a.doRegister注册服务

  • hset(key, value, expire)
  • publish(key, Constants.REGISTER); 命令用于将信息发送到指定的频道。
  • expire过期时间,注册服务过期怎么处理呢?
    • expirePeriod默认60秒
    • 调度线程每expirePeriod/2;重新hset和publish命令
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    @Override
    public void doRegister(URL url) {
        String key = toCategoryPath(url);
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
            JedisPool jedisPool = entry.getValue();
            try {
                Jedis jedis = jedisPool.getResource();
                try {
                    jedis.hset(key, value, expire);
                    jedis.publish(key, Constants.REGISTER);
                    success = true;
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                } finally {
                    jedisPool.returnResource(jedis);
                }
            }...
        }
        ...
    }

b.doSubscribe订阅

  • 开启Notifier线程,用命令psubscribe订阅操作

7.NettyServer 源码分析

(1).构造

1
2
3
4
5
6

   public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
      //new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
      //                 .getAdaptiveExtension().dispatch(handler, url))
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
  • MultiMessageHandler处理多个消息
  • HeartbeatHandler心跳处理
  • 形成ChannelHandler连
    • MultiMessageHandler -> HeartbeatHandler -> XXXChannelHandler(Dispatcher) -> DecodeHandler–>HeaderExchangeHandler->ExchangeHandle

(2).ChannelHandler Channel处理

顾名思义知道方法意思

(3).Dispatcher线程派发器

通道信息派发器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     *
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    // The last two parameters are reserved for compatibility with the old configuration
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}

  • all:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。
  • direct:所有消息都不派发到线程池,全部在IO线程上直接执行。
  • message:只有请求响应消息派发到线程池,其他连接断开事件、心跳等消息,直接在IO线程上执行。
  • execution:只请求消息派发到线程池,不含响应,响应和其他连接断开事件、心跳等消息,直接在IO线程上执行。
  • connection:创建单个线程池,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池。

a.AllDispatcher源码分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
/**
 * default thread pool configure
 */
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

b.AllChannelHandler线程池执行客户端请求

AllChannelHandler继续WrappedChannelHandler抽象方法

i.WrappedChannelHandler#构造方法创建业务线程池
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }
ii.WrappedChannelHandler往线程池中添加ChannelEventRunnable任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  public void connected(Channel channel) throws RemotingException {
       ExecutorService cexecutor = getExecutorService();
       try {
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
       }...
   }

   public void disconnected(Channel channel) throws RemotingException {
       ExecutorService cexecutor = getExecutorService();
       try {
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
       }...
   }

   public void received(Channel channel, Object message) throws RemotingException {
       ExecutorService cexecutor = getExecutorService();
       try {
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
       }...
   }

   public void caught(Channel channel, Throwable exception) throws RemotingException {
       ExecutorService cexecutor = getExecutorService();
       try {
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
       }...
   } 

c. ThreadPool的SPI实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/**
 * ThreadPool
 */
@SPI("fixed")
public interface ThreadPool {

    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);
}

  • fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。
  • cached: 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited:可伸缩线程池,但池中的线程只会增长不会收缩。

d. DataStore的SPI实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@SPI("simple")
public interface DataStore {

    /**
     * return a snapshot value of componentName
     */
    Map<String, Object> get(String componentName);

    Object get(String componentName, String key);

    void put(String componentName, String key, Object value);

    void remove(String componentName, String key);

}

  • SimpleDataStore
    1
    2
    3
    
        // <component name or id, <data-name, data-value>>
        private ConcurrentMap<String, ConcurrentMap<String, Object>> data =
                new ConcurrentHashMap<String, ConcurrentMap<String, Object>>();
    

d.ChannelEventRunnable#run业务线程池任务处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 public void run() {
     switch (state) {
         case CONNECTED:
             try {
                 handler.connected(channel);
             }...
             break;
         case DISCONNECTED:
             try {
                 handler.disconnected(channel);
             }...
             break;
         case SENT:
             try {
                 handler.sent(channel, message);
             }...
             break;
         case RECEIVED:
             try {
                 handler.received(channel, message);
             }...
             break;
         case CAUGHT:
             try {
                 handler.caught(channel, exception);
             }...
             break;
         default:
             logger.warn("unknown state: " + state + ", message is " + message);
     }
 }   

(4).doOpen start Server

Netty4启动服务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   @Override
   protected void doOpen() throws Throwable {
       NettyHelper.setNettyLoggerFactory();

       bootstrap = new ServerBootstrap();

       bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
       workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
               new DefaultThreadFactory("NettyServerWorker", true));

       final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
       channels = nettyServerHandler.getChannels();

       bootstrap.group(bossGroup, workerGroup)
               .channel(NioServerSocketChannel.class)
               .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
               .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
               .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
               .childHandler(new ChannelInitializer<NioSocketChannel>() {
                   @Override
                   protected void initChannel(NioSocketChannel ch) throws Exception {
                       NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                       ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                               .addLast("decoder", adapter.getDecoder()) 
                               .addLast("encoder", adapter.getEncoder())
                               .addLast("handler", nettyServerHandler);
                   }
               });
       // bind
       ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
       channelFuture.syncUninterruptibly();
       channel = channelFuture.channel();

   } 

a.NettyCodecAdapter编解码器适配器

i.encode编码,最终调用codec.encode
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   private class InternalEncoder extends MessageToByteEncoder {

       protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
           com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
           Channel ch = ctx.channel();
           NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
           try {
               codec.encode(channel, buffer, msg);
           } finally {
               NettyChannel.removeChannelIfDisconnected(ch);
           }
       }
   } 
ii.decode解码,最终调用codec.decode
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private class InternalDecoder extends ByteToMessageDecoder {

        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    try {
                        msg = codec.decode(channel, message);
                    } catch (IOException e) {
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }

b.Codec2编解码器

(1).DubboCodec默认编解码器,继承ExchangeCodec,ExchangeCodec最终进行编解码器

ExchangeCodec源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
   public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        return decode(channel, buffer, readable, header);
    }
(2).ExchangeCodec#encodeRequest
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
   protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
       //获取Serialization
       Serialization serialization = getSerialization(channel);
       // 16
       byte[] header = new byte[HEADER_LENGTH];
       // 魔数占有2个字节,0xdabb
       Bytes.short2bytes(MAGIC, header);

       // 设置数据包类型(Request/Response)和序列化器编号
       // byte FLAG_REQUEST = (byte) 0x80;
       // byte FLAG_TWOWAY = (byte) 0x40;
       // byte FLAG_EVENT = (byte) 0x20;
       // int SERIALIZATION_MASK = 0x1f;
       header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

       if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
       if (req.isEvent()) header[2] |= FLAG_EVENT;

       // request id 4个字节
       Bytes.long2bytes(req.getId(), header, 4);

       // encode request data.
       int savedWriteIndex = buffer.writerIndex();
       buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
       ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
       ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
       if (req.isEvent()) {
           encodeEventData(channel, out, req.getData());
       } else {
           encodeRequestData(channel, out, req.getData());
       }
       out.flushBuffer();
       if (out instanceof Cleanable) {
           ((Cleanable) out).cleanup();
       }
       bos.flush();
       bos.close();
       int len = bos.writtenBytes();
       checkPayload(channel, len);
       Bytes.int2bytes(len, header, 12);

       // write
       buffer.writerIndex(savedWriteIndex);
       buffer.writeBytes(header); // write header.
       buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
   } 
(3).DubboCodec#encodeRequestData
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   @Override
   protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
       RpcInvocation inv = (RpcInvocation) data;

       out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
       out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
       out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

       out.writeUTF(inv.getMethodName());
       out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
       Object[] args = inv.getArguments();
       if (args != null)
           for (int i = 0; i < args.length; i++) {
               out.writeObject(encodeInvocationArgument(channel, inv, i));
           }
       out.writeObject(inv.getAttachments());
   } 
(4).ExchangeCodec#encodeResponse
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
       int savedWriteIndex = buffer.writerIndex();
       try {
           Serialization serialization = getSerialization(channel);
           // header.
           byte[] header = new byte[HEADER_LENGTH];
           // set magic number.
           Bytes.short2bytes(MAGIC, header);
           // set request and serialization flag.
           header[2] = serialization.getContentTypeId();
           if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
           // set response status.
           byte status = res.getStatus();
           header[3] = status;
           // set request id.
           Bytes.long2bytes(res.getId(), header, 4);

           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
           ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
           ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
           // encode response data or error message.
           if (status == Response.OK) {
               if (res.isHeartbeat()) {
                   encodeHeartbeatData(channel, out, res.getResult());
               } else {
                   encodeResponseData(channel, out, res.getResult());
               }
           } else out.writeUTF(res.getErrorMessage());
           out.flushBuffer();
           if (out instanceof Cleanable) {
               ((Cleanable) out).cleanup();
           }
           bos.flush();
           bos.close();

           int len = bos.writtenBytes();
           checkPayload(channel, len);
           Bytes.int2bytes(len, header, 12);
           // write
           buffer.writerIndex(savedWriteIndex);
           buffer.writeBytes(header); // write header.
           buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
       } catch (Throwable t) {
           // clear buffer
           buffer.writerIndex(savedWriteIndex);
           // send error message to Consumer, otherwise, Consumer will wait till timeout.
           if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
               Response r = new Response(res.getId(), res.getVersion());
               r.setStatus(Response.BAD_RESPONSE);

               if (t instanceof ExceedPayloadLimitException) {
                   logger.warn(t.getMessage(), t);
                   try {
                       r.setErrorMessage(t.getMessage());
                       channel.send(r);
                       return;
                   }...
               } else {
                   // FIXME log error message in Codec and handle in caught() of IoHanndler?
                   logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                   try {
                       r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                       channel.send(r);
                       return;
                   }...
               }
           }
           ....
       }
   } 
(5).DubboCodec#decodeBody
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    } else {
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                                Constants.DECODE_IN_IO_THREAD_KEY,
                                Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                    (Invocation) getRequestData(id), proto);
                            result.decode();
                        } else {
                            result = new DecodeableRpcResult(channel, res,
                                    new UnsafeByteArrayInputStream(readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    res.setResult(data);
                } ...
            } ...
            return res;
        } else {
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            }...
            return req;
        }
    }

c.NettyServerHandler

  • channelActive调用ChannelHandler.connected
  • channelInactive调用ChannelHandler.connected
  • channelRead调用ChannelHandler.received
  • write调用ChannelHandler.sent
  • exceptionCaughtd调用ChannelHandler.caught
  • NettyChannel缓存上下文
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
         
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            if (channel != null) {
                channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
            }
            handler.connected(channel);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

(5).NettyServer实现ChannelHandler接口

7.Filter

1.ExecuteLimitFilter用于提供者限制并发数量

  • 方法+executes 配置并发数量
  • Semaphore控制

2.ActiveLimitFilter用于消费者限制并发数量

  • 方法+actives配置并发数量
  • 方法+timeout设置等待
  • AtomicInteger控制数量
  • RpcStatus做统计

3.GenericFilter用于泛化提供者实现

  • 方法名称:$invoke
  • 参数包含generic
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
      if (inv.getMethodName().equals(Constants.$INVOKE)
              && inv.getArguments() != null
              && inv.getArguments().length == 3
              && !ProtocolUtils.isGeneric(invoker.getUrl().getParameter(Constants.GENERIC_KEY))) {
          String name = ((String) inv.getArguments()[0]).trim();
          String[] types = (String[]) inv.getArguments()[1];
          Object[] args = (Object[]) inv.getArguments()[2];
          try {
              Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
              Class<?>[] params = method.getParameterTypes();
              if (args == null) {
                  args = new Object[params.length];
              }
              String generic = inv.getAttachment(Constants.GENERIC_KEY);
              //通用序列号,true
              if (StringUtils.isEmpty(generic)
                      || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                  //反射处理
                  args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
              } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { //nativejava
                  for (int i = 0; i < args.length; i++) {
                      if (byte[].class == args[i].getClass()) {
                          try {
                             //NativeJavaSerialization反序列化操作
                              UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
                              args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                      .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                                      .deserialize(null, is).readObject();
                          }...
                      }...
                  }
              } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                  for (int i = 0; i < args.length; i++) {
                      //bean反序列化操作
                      if (args[i] instanceof JavaBeanDescriptor) {
                          args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                      }...
                  }
              }
              Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
              if (result.hasException()
                      && !(result.getException() instanceof GenericException)) {
                  return new RpcResult(new GenericException(result.getException()));
              }
              if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                  try {
                      //nativejava进行相应序列化
                      UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                      ExtensionLoader.getExtensionLoader(Serialization.class)
                              .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                              .serialize(null, os).writeObject(result.getValue());
                      return new RpcResult(os.toByteArray());
                  }...
              }...
          }...
      }
      return invoker.invoke(inv);
  }  

4.GenericImplFilter用于泛化消费者实现

  • URL参数包含generic就可以进行泛化
  • 按参数内容进行序列化操作
  • GenericService进行调用