一、dubbo引用服务

1.Spring afterPropertiesSet()执行方法发布服务

ReferenceBean.getObject()->ReferenceConfig.get()->init()->createProxy()

(1).createProxy创建暴露服务接口的代理对象,就是Invoker代理对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
            ...
            if (urls.size() == 1) {
                //创建Invoke
                invoker = refprotocol.refer(interfaceClass, urls.get(0));//ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                 //多个注册中心,引用cluster概念,创建StaticDirectory
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        ...
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

(2).ProtocolFilterWrapper#refer

1
2
3
4
5
6
7
8
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        //这里buildInvokerChain跟exporter一样,创建Filter处理链
        //protocol.refer(type, url)=>ProtocolListenerWrapper.refer=>DubboProtocol.refer 处理Filter
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

默认情况grop:Constants.CONSUMER和key:Constants.REFERENCE_FILTER_KEY进行筛选

FutureFilter 返回结果处理

(3).ProtocolListenerWrapper#refer

1
2
3
4
5
6
7
8
9
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

触发InvokerListener监听,的dubbo未做处理

(4).RegistryProtocol#refer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //链接注册中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

(a). doRefer

  • 创建Directory
  • 订阅Constants.PROVIDERS_CATEGORY服务提供信息,Constants.CONFIGURATORS_CATEGORY系统配置,Constants.ROUTERS_CATEGORY路由配置信息
  • Cluster#join=>Invoker
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
       //创建RegistryDirectory
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            //客服端往注册中心注册信息
            //例如/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F....
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        //订阅Constants.PROVIDERS_CATEGORY服务提供信息,Constants.CONFIGURATORS_CATEGORY系统配置,Constants.ROUTERS_CATEGORY路由配置信息
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
        return invoker;
    }

2.Directory 作用用于服务发现

(1).实现类关系

  • StaticDirectory静态发现服务,作用于多个注册中心时;
  • RegistryDirectory 注册中心发现服务

(2).AbstractDirectory#list

根据Router获取Invoker集合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   public List<Invoker<T>> list(Invocation invocation) throws RpcException {
       ...
       List<Invoker<T>> invokers = doList(invocation);
       List<Router> localRouters = this.routers; // local reference
       if (localRouters != null && localRouters.size() > 0) {
           for (Router router : localRouters) {
               try {
                   if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                       invokers = router.route(invokers, getConsumerUrl(), invocation);
                   }
               }...
           }
       }
       return invokers;
   } 

(3).StaticDirectory手动设置集合

(4).RegistryDirectory从注册中心获取Invoker集合,支持实时

a.directory.subscribe 调用RegistryDirectory.notify

配置,路由规则以及服务集合变化都通知

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category)
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } ...
        }
        // configurators 配置
        if (configuratorUrls != null && configuratorUrls.size() > 0) {
            this.configurators = toConfigurators(configuratorUrls);
        }
        // routers 路由
        if (routerUrls != null && routerUrls.size() > 0) {
            List<Router> routers = toRouters(routerUrls);
            if (routers != null) { // null - do nothing
                setRouters(routers);
            }
        }
        List<Configurator> localConfigurators = this.configurators; // local reference
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        if (localConfigurators != null && localConfigurators.size() > 0) {
            for (Configurator configurator : localConfigurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
        // providers 提供服务
        refreshInvoker(invokerUrls);
    }

b.toRouters URL转化为Router

  • 根据RouterFactory路由工厂创建路由规则
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
   private List<Router> toRouters(List<URL> urls) {
       List<Router> routers = new ArrayList<Router>();
       ...
       if (urls != null && urls.size() > 0) {
           for (URL url : urls) {
               if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
                   continue;
               }
               String routerType = url.getParameter(Constants.ROUTER_KEY);
               if (routerType != null && routerType.length() > 0) {
                   url = url.setProtocol(routerType);
               }
               try {
                   Router router = routerFactory.getRouter(url);
                   if (!routers.contains(router))
                       routers.add(router);
               }...
           }
       }
       return routers;
   } 

c.setRouters 设置路由

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    protected void setRouters(List<Router> routers) {
        // copy list
        routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
        // url手动配置路由
        String routerkey = url.getParameter(Constants.ROUTER_KEY);
       // 根据
        if (routerkey != null && routerkey.length() > 0) {
            RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
            routers.add(routerFactory.getRouter(url));
        }
        // 最后添加MockInvokersSelector
        // 降级处理
        routers.add(new MockInvokersSelector());
        Collections.sort(routers);
        this.routers = routers;
    }

d.RouterFactory工厂创建Router

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@SPI
public interface RouterFactory {

    /**
     * Create router.
     *
     * @param url
     * @return router
     */
    @Adaptive("protocol")
    Router getRouter(URL url);
}

  • ConditionRouterFactory创建ConditionRouter
i.Router 路由
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public interface Router extends Comparable<Router> {

   /**
    * get the router url.
    *
    * @return url
    */
   URL getUrl();

   /**
    * route.
    *
    * @param invokers
    * @param url        refer url
    * @param invocation
    * @return routed invokers
    * @throws RpcException
    */
   <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

} 
ii.ConditionRouter条件路由

看看你页面操作

1).属性
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   //优先级
   private final int priority;
   private final boolean force;
   //host = 10.20.153.10 => host = 10.20.153.11
   //消费者
   private final Map<String, MatchPair> whenCondition;
   //提供者
   private final Map<String, MatchPair> thenCondition; 

   private static final class MatchPair {
       final Set<String> matches = new HashSet<String>();
       final Set<String> mismatches = new HashSet<String>();
   ... 
2).route路由规则选择
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
           throws RpcException {
       if (invokers == null || invokers.size() == 0) {
           return invokers;
       }
       try {
           //消费者验证
           if (!matchWhen(url, invocation)) {
               return invokers;
           }
           List<Invoker<T>> result = new ArrayList<Invoker<T>>();
           if (thenCondition == null) {
               ...                
               return result;
           }
           //提供者过滤操作
           for (Invoker<T> invoker : invokers) {
               if (matchThen(invoker.getUrl(), url)) {
                   result.add(invoker);
               }
           }
           if (result.size() > 0) {
               return result;
           } else if (force) {
              ...                
             return result;
           }
       }...
       return invokers;
   } 

e.refreshInvoker刷新Invoker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
     private void refreshInvoker(List<URL> invokerUrls) {
            ...
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.size() == 0) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            ...
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                //destroy操作;下线操作
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

f.toInvokers URL转化为InvokerDelegate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
        if (urls == null || urls.size() == 0) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<String>();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            ...
            //合并参数
            URL url = mergeUrl(providerUrl);
            //只要配置有改动就要重新创建Invoker
            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            //缓存
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                   //这里一般处理服务禁用操作
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        //ProtocolFilterWrapper->ProtocolListenerWrapper=>DubboProtocol
                        invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                }...
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

g.DubboProtocol#refer创建DubboInvoker

1
2
3
4
5
6
7
8
  public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        //getClients 创建ExchangeClient
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
(1).getClients创建ExchangeClient

ExchangeClient的时实现类HeaderExchangeClient

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   /**
     * Create new connection
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
        boolean compatible = (version != null && version.startsWith("1.0."));
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        ... 
        ExchangeClient client;
        try {
            // 懒载创建客户端
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type).connect
                // HeaderExchanger.connect
                client = Exchangers.connect(url, requestHandler);
            }
        }...
        return client;
    }
(3).LazyConnectExchangeClient懒载创建客户端,请求创建链接
(2).HeaderExchanger#connect
1
2
3
4
5
6
7
8
public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
}

3.Cluster集群处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException; 
}

  • MockClusterWrapper - 包装类降级处理 -> 创建MockClusterInvoker
  • FailoverCluster - 失败自动切换 -> FailoverInvoker
  • FailfastCluster - 快速失败 -> 下面类同
  • FailsafeCluster - 失败安全
  • FailbackCluster - 失败自动恢复
  • ForkingCluster - 并行调用多个服务提供者
  • BroadcastCluster - 广播
  • MergeableCluster - 合并

默认MockClusterWrapper->FailoverCluster

(1).MockClusterWrapper创建MockClusterInvoker

MockClusterInvoker是服务降级处理

  • mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
  • 还可以改为 mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。

a.invoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   public Result invoke(Invocation invocation) throws RpcException {
       Result result = null;

       String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
       if (value.length() == 0 || value.equalsIgnoreCase("false")) {
           //no mock
           result = this.invoker.invoke(invocation);
       } else if (value.startsWith("force")) {
           ...
           //force:direct mock
           result = doMockInvoke(invocation, null);
       } else {
           //fail-mock
           try {
               result = this.invoker.invoke(invocation);
           } catch (RpcException e) {
               if (e.isBiz()) {
                   throw e;
               } else {
                   ...
                   result = doMockInvoke(invocation, e);
               }
           }
       }
       return result;
   } 

b.doMockInvoke

创建MockInvoker处理降级处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
   private Result doMockInvoke(Invocation invocation, RpcException e) {
       Result result = null;
       Invoker<T> minvoker;

       List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
       if (mockInvokers == null || mockInvokers.size() == 0) {
           minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
       } else {
           minvoker = mockInvokers.get(0);
       }
       try {
           result = minvoker.invoke(invocation);
       }...
       return result;
   } 

(2).FailoverCluster创建FailoverClusterInvoker

a.调用AbstractClusterInvoker.invoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

   public Result invoke(final Invocation invocation) throws RpcException {
       ...
       LoadBalance loadbalance;
       //directory.list(invocation);
       List<Invoker<T>> invokers = list(invocation);
       //负载均衡操作
       if (invokers != null && invokers.size() > 0) {
           loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                   .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
       } else {
           loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
       }
       RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
       return doInvoke(invocation, invokers, loadbalance);
   }

b.FailoverClusterInvoker.doInvoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
   public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
       List<Invoker<T>> copyinvokers = invokers;
       checkInvokers(copyinvokers, invocation);
       //获取重试机会
       int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
       if (len <= 0) {
           len = 1;
       }
       // retry loop.
       RpcException le = null; // last exception.
       List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
       Set<String> providers = new HashSet<String>(len);
       for (int i = 0; i < len; i++) {
           //Reselect before retry to avoid a change of candidate `invokers`.
           //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
           if (i > 0) {
               checkWhetherDestroyed();
               copyinvokers = list(invocation);
               // check again
               checkInvokers(copyinvokers, invocation);
           }
           Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
           invoked.add(invoker);
           RpcContext.getContext().setInvokers((List) invoked);
           try {
               Result result = invoker.invoke(invocation);
               ...
               return result;
           } catch (RpcException e) {
               if (e.isBiz()) { // biz exception.
                   throw e;
               }
               le = e;
           } catch (Throwable e) {
               le = new RpcException(e.getMessage(), e);
           } finally {
               providers.add(invoker.getUrl().getAddress());
           }
       }
       ...异常
   } 

c.进行负载均衡AbstractClusterInvoker.doselect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
       if (invokers == null || invokers.size() == 0)
           return null;
       if (invokers.size() == 1)
           return invokers.get(0);
       // 只有两个Invoker,进行轮询操作,不走LoadBalance逻辑
       if (invokers.size() == 2 && selected != null && selected.size() > 0) {
           return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
       }
       //负载均衡操作
       Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
       if ((selected != null && selected.contains(invoker))
               || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
           try {
               Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
               if (rinvoker != null) {
                   invoker = rinvoker;
               } else {
                   int index = invokers.indexOf(invoker);
                   try {
                       //Avoid collision
                       invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                   }...
               }
           }...
       }
       return invoker;
   } 

d.重新选择AbstractClusterInvoker.reselect

重新选择,首先使用不在“ selected”中的调用者,如果所有调用者都在“ selected”中,则使用负载均衡策略选择一个可用的调用者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
   private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                               List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
           throws RpcException {

       //Allocating one in advance, this list is certain to be used.
       List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

       //First, try picking a invoker not in `selected`.
       if (availablecheck) { // invoker.isAvailable() should be checked
           for (Invoker<T> invoker : invokers) {
               if (invoker.isAvailable()) {
                   if (selected == null || !selected.contains(invoker)) {
                       reselectInvokers.add(invoker);
                   }
               }
           }
           if (reselectInvokers.size() > 0) {
               return loadbalance.select(reselectInvokers, getUrl(), invocation);
           }
       } else { // do not check invoker.isAvailable()
           for (Invoker<T> invoker : invokers) {
               if (selected == null || !selected.contains(invoker)) {
                   reselectInvokers.add(invoker);
               }
           }
           if (reselectInvokers.size() > 0) {
               return loadbalance.select(reselectInvokers, getUrl(), invocation);
           }
       }
       // Just pick an available invoker using loadbalance policy
       {
           if (selected != null) {
               for (Invoker<T> invoker : selected) {
                   if ((invoker.isAvailable()) // available first
                           && !reselectInvokers.contains(invoker)) {
                       reselectInvokers.add(invoker);
                   }
               }
           }
           if (reselectInvokers.size() > 0) {
               return loadbalance.select(reselectInvokers, getUrl(), invocation);
           }
       }
       return null;
   } 

(3).ForkingCluster创建ForkingClusterInvoker 并发操作

a.ForkingClusterInvoker#doInvoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
   public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
       checkInvokers(invokers, invocation);
       final List<Invoker<T>> selected;
       //获取并发数量
       final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
       //超时处理
       final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
       if (forks <= 0 || forks >= invokers.size()) {
           selected = invokers;
       } else {
           selected = new ArrayList<Invoker<T>>();
           for (int i = 0; i < forks; i++) {
               // TODO. Add some comment here, refer chinese version for more details.
               Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
               if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                   selected.add(invoker);
               }
           }
       }
       RpcContext.getContext().setInvokers((List) selected);
       final AtomicInteger count = new AtomicInteger();
       final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
       for (final Invoker<T> invoker : selected) {
           //线程池执行,运用newCachedThreadPool
           executor.execute(new Runnable() {
               public void run() {
                   try {
                       Result result = invoker.invoke(invocation);
                       ref.offer(result);
                   }...
               }
           });
       }
       try {
           //看谁处理最快;或者等待超时处理
           Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
           if (ret instanceof Throwable) {
               Throwable e = (Throwable) ret;
               throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
           }
           return (Result) ret;
       }...
   }

(4).MergeableCluster创建MergeableClusterInvoker 并发操作

a.MergeableClusterInvoker#invoke

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
 public Result invoke(final Invocation invocation) throws RpcException {
       List<Invoker<T>> invokers = directory.list(invocation);
       //获取merger
       String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
       if (ConfigUtils.isEmpty(merger)) { // 如果某个方法没有合并,则仅调用一个invoker
           for (final Invoker<T> invoker : invokers) {
               if (invoker.isAvailable()) {
                   return invoker.invoke(invocation);
               }
           }
           return invokers.iterator().next().invoke(invocation);
       }

       Class<?> returnType;
       try {
           returnType = getInterface().getMethod(
                   invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
       }...

       Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
       for (final Invoker<T> invoker : invokers) {
          //线程池执行,运用newCachedThreadPool
           Future<Result> future = executor.submit(new Callable<Result>() {
               public Result call() throws Exception {
                   return invoker.invoke(new RpcInvocation(invocation, invoker));
               }
           });
           results.put(invoker.getUrl().getServiceKey(), future);
       }

       Object result = null;

       List<Result> resultList = new ArrayList<Result>(results.size());
       //获取超时时间 
       int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
       for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
           Future<Result> future = entry.getValue();
           try {
               Result r = future.get(timeout, TimeUnit.MILLISECONDS);
               if (r.hasException()) {
                   ...
               } else {
                   resultList.add(r);
               }
           } catch (Exception e) {
              ...
           }
       }

       if (resultList.size() == 0) {
           return new RpcResult((Object) null);
       } else if (resultList.size() == 1) {
           return resultList.iterator().next();
       }

       if (returnType == void.class) {
           return new RpcResult((Object) null);
       }

       if (merger.startsWith(".")) { //直接用反射处理合并结果的操作
           merger = merger.substring(1);
           Method method;
           try {
               method = returnType.getMethod(merger, returnType);
           } catch (NoSuchMethodException e) {
               ...
           }
           if (method != null) {
               if (!Modifier.isPublic(method.getModifiers())) {
                   method.setAccessible(true);
               }
               result = resultList.remove(0).getValue();
               try {
                   if (method.getReturnType() != void.class
                           && method.getReturnType().isAssignableFrom(result.getClass())) {
                       for (Result r : resultList) {
                           result = method.invoke(result, r.getValue());
                       }
                   } else {
                       for (Result r : resultList) {
                           method.invoke(result, r.getValue());
                       }
                   }
               } ...
           }...
       } else {
           Merger resultMerger;
           // "true".equalsIgnoreCase(value)|| "default".equalsIgnoreCase(value);
           if (ConfigUtils.isDefault(merger)) {
               resultMerger = MergerFactory.getMerger(returnType);
           } else {
               resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
           }
           if (resultMerger != null) {
               List<Object> rets = new ArrayList<Object>(resultList.size());
               for (Result r : resultList) {
                   rets.add(r.getValue());
               }
               result = resultMerger.merge(
                       rets.toArray((Object[]) Array.newInstance(returnType, 0)));
           }...
       }
       return new RpcResult(result);
   } 

b.Merger接口就一个方法merge

i.IntArrayMerger int数组操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class IntArrayMerger implements Merger<int[]> {

   public int[] merge(int[]... items) {
       int totalLen = 0;
       for (int[] item : items) {
           totalLen += item.length;
       }
       int[] result = new int[totalLen];
       int index = 0;
       for (int[] item : items) {
           for (int i : item) {
               result[index++] = i;
           }
       }
       return result;
   }

} 

4.LoadBalance负载均衡

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
 @SPI(RandomLoadBalance.NAME)
 public interface LoadBalance {
 
     /**
      * select one invoker in list.
      *
      * @param invokers   invokers.
      * @param url        refer url
      * @param invocation invocation.
      * @return selected invoker.
      */
     @Adaptive("loadbalance")
     <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
 
 }

  • 默认负载均衡RandomLoadBalance

(1).AbstractLoadBalance抽象类,几个负载均衡继承抽象类,几个公用方法

a.select做基础验证,最终调用子类实现doSelect

1
2
3
4
5
6
7
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        return doSelect(invokers, url, invocation);
    }

b.获取权重getWeight

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   protected int getWeight(Invoker<?> invoker, Invocation invocation) {
       int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
       if (weight > 0) {
           //获取服务提供者启动时间
           long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
           if (timestamp > 0L) {
               //获取服务提供者运行时间
               int uptime = (int) (System.currentTimeMillis() - timestamp);
               //获取服务预热时间,默认为10分钟
               int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
               if (uptime > 0 && uptime < warmup) {
                   //重新计算服务权重
                   weight = calculateWarmupWeight(uptime, warmup, weight);
               }
           }
       }
       return weight;
   } 
  static int calculateWarmupWeight(int uptime, int warmup, int weight) {
   // 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
   // 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
   int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
   return ww < 1 ? 1 : (ww > weight ? weight : ww);
 }

(2).RandomLoadBalance加权随机算法

a.doSelect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       int length = invokers.size(); // Number of invokers
       int totalWeight = 0; // The sum of weights
       boolean sameWeight = true; // Every invoker has the same weight?
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           totalWeight += weight; // Sum
           //sameWeight为true,说明weight都相同,就随机选择;不按权重选择随机选择
           if (sameWeight && i > 0
                   && weight != getWeight(invokers.get(i - 1), invocation)) {
               sameWeight = false;
           }
       }
       if (totalWeight > 0 && !sameWeight) {
           // 按权重值大小随机值,随机值遍历相减,减值为小于零,选择当前Invoker
           int offset = random.nextInt(totalWeight);
           // Return a invoker based on the random value.
           for (int i = 0; i < length; i++) {
               offset -= getWeight(invokers.get(i), invocation);
               if (offset < 0) {
                   return invokers.get(i);
               }
           }
       }
       // If all invokers have the same weight value or totalWeight=0, return evenly.
       return invokers.get(random.nextInt(length));
   } 

(3).RoundRobinLoadBalance#doSelect 加权轮询负载均衡

a.轮询意思肯定缓存保存现在执行下标位置

1
 private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>(); 

b.doSelect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       // key = 全限定类名 + "." + 方法名,
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int length = invokers.size(); // Number of invokers
       int maxWeight = 0; // The maximum weight
       int minWeight = Integer.MAX_VALUE; // The minimum weight
       final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
       int weightSum = 0;
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
           minWeight = Math.min(minWeight, weight); // Choose the minimum weight
           if (weight > 0) {
               invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
               weightSum += weight;
           }
       }
       AtomicPositiveInteger sequence = sequences.get(key);
       if (sequence == null) {
           sequences.putIfAbsent(key, new AtomicPositiveInteger());
           sequence = sequences.get(key);
       }
       int currentSequence = sequence.getAndIncrement();
       if (maxWeight > 0 && minWeight < maxWeight) {
           int mod = currentSequence % weightSum;
           for (int i = 0; i < maxWeight; i++) {
               //轮询减mod
               for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                   final Invoker<T> k = each.getKey();
                   final IntegerWrapper v = each.getValue();
                   if (mod == 0 && v.getValue() > 0) {
                       return k;
                   }
                   if (v.getValue() > 0) {
                       v.decrement();
                       mod--;
                   }
               }
           }
       }
       // Round robin
       return invokers.get(currentSequence % length);
   } 

(4).LeastActiveLoadBalance最小活跃数负载均衡

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); 
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 具有相同“最小活跃数”的服务者提供者(Invoker)数量
        int[] leastIndexs = new int[length]; // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
        int totalWeight = 0; // 具有相同“最小活跃数”的总权重
        int firstWeight = 0; // 具有相同“最小活跃数”,比较权重是否相同
        boolean sameWeight = true; 
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            //活跃数
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); 
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
            if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
                leastActive = active; // Record the current least active value
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = weight; // Reset
                firstWeight = weight; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                totalWeight += weight; // Add this invoker's weight to totalWeight.
                // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // assert(leastCount > 0)
        if (leastCount == 1) {
            return invokers.get(leastIndexs[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // 这里代码类同加权随机算法
            int offsetWeight = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

(5).ConsistentHashLoadBalance一致性hash算法

  • 首先根据ip或者其他的信息为缓存节点生成一个hash,
  • 并将这个hash投射到[0,2^64-1]的圆环上。
  • 当有查询或写入请求时,则为根据相应规则生成的key,key生成一个hash值。然后查找第一个大于或等于该hash值的缓存节点;
  • 上一步查找节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其hash值的缓存节点即可。
  • 数据倾斜问题:由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。
    • 虚拟节点

a.doSelect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
   private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

   @SuppressWarnings("unchecked")
   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int identityHashCode = System.identityHashCode(invokers);
       ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
       // 此时 selector.identityHashCode != identityHashCode 条件成立
       if (selector == null || selector.identityHashCode != identityHashCode) {
           selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
           selector = (ConsistentHashSelector<T>) selectors.get(key);
       }
       return selector.select(invocation);
   } 

b.ConsistentHashSelector一致性hash算法核心类

(1).构建[0,2^64-1]的圆环数据结构
1
private final TreeMap<Long, Invoker<T>> virtualInvokers;
(2).创建圆环数据结构以及虚拟节点数量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            //圆环数据结构
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            //获取虚拟节点数量
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            //获取请求生产key参数
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            //将节点地址hash投射圆环数据结构上
            //还用md5加密操作
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
(3).hash 运用位操作运行,尽量hash分散
1
2
3
4
5
6
7
private long hash(byte[] digest, int number) {
          return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                  | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                  | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                  | (digest[number * 4] & 0xFF))
                  & 0xFFFFFFFFL;
      }  
(4).查询select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
     public Invoker<T> select(Invocation invocation) { 
           //组装请求key
           String key = toKey(invocation.getArguments());
           //进行MD5加密进行hash
           byte[] digest = md5(key);
           return selectForKey(hash(digest, 0));
       }

       private String toKey(Object[] args) {
           StringBuilder buf = new StringBuilder();
           for (int i : argumentIndex) {
               if (i >= 0 && i < args.length) {
                   buf.append(args[i]);
               }
           }
           return buf.toString();
       }

       private Invoker<T> selectForKey(long hash) {
           Invoker<T> invoker;
           Long key = hash;
           if (!virtualInvokers.containsKey(key)) {//是否hash相同值
               //获取hash大于key集合;从小到大排序
               SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
               if (tailMap.isEmpty()) {
                   key = virtualInvokers.firstKey();
               } else {
                   key = tailMap.firstKey();
               }
           }
           invoker = virtualInvokers.get(key);
           return invoker;
       } 

5.代理暴露接口调用方法

用代理对象处理;最终调用JavassistProxyFactory.getProxy方法

最终调用JavassistProxyFactory.getProxy方法

1
2
3
4
5
   @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

a).InvokerInvocationHandler 调用服务方法都要经过InvokerInvocationHandler处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

执行流程:

  • MockClusterInvoker#invoke
  • FailoverClusterInvoker#invoke
  • Router#route
  • LoadBalance#select
  • DubboInvoker#doInvoke
  • ExchangeClient#request

(1).DubboInvoker#doInvoke

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
   @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) { //单向处理
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) { //异步处理
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {   //双向处理
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();//等待结果
            }
        }...
    }

(2).ExchangeClient#request返回DefaultFuture

HeaderExchangeChannel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

6.NettyClient

(1).构造

1
2
3
4
5
6
  public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
      //   url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
      //   new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
      //                  .getAdaptiveExtension().dispatch(handler, url)));
       super(url, wrapChannelHandler(url, handler));
   } 
  • 也运用的线程池,cached

(2).doOpen 这里是客户端链接netty配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

(2).AbstractClient#connect=>NettyClient#doConnect正在链接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
   protected void doConnect() throws Throwable {
       long start = System.currentTimeMillis();
       ChannelFuture future = bootstrap.connect(getConnectAddress());
       try {
           boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);

           if (ret && future.isSuccess()) {
               Channel newChannel = future.channel();
               try {
                   // Close old channel
                   Channel oldChannel = NettyClient.this.channel; // copy reference
                   ...
               } finally {
                   if (NettyClient.this.isClosed()) {
                       try {
                           if (logger.isInfoEnabled()) {
                               logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                           }
                           newChannel.close();
                       } finally {
                           NettyClient.this.channel = null;
                           NettyChannel.removeChannelIfDisconnected(newChannel);
                       }
                   } else {
                       //channel赋值
                       NettyClient.this.channel = newChannel;
                   }
               }
           }...
       } finally {
           if (!isConnected()) {
               //future.cancel(true);
           }
       }
   } 

(3).NettyHandler委托处理ChannelHandler,具体不说明