【注意】最后更新于 December 20, 2018,文中内容可能已过时,请谨慎使用。
一、dubbo引用服务
1.Spring afterPropertiesSet()执行方法发布服务
ReferenceBean.getObject()->ReferenceConfig.get()->init()->createProxy()
(1).createProxy创建暴露服务接口的代理对象,就是Invoker代理对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
...
if (urls.size() == 1) {
//创建Invoke
invoker = refprotocol.refer(interfaceClass, urls.get(0));//ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
//多个注册中心,引用cluster概念,创建StaticDirectory
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
...
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
|
(2).ProtocolFilterWrapper#refer
1
2
3
4
5
6
7
8
|
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
//这里buildInvokerChain跟exporter一样,创建Filter处理链
//protocol.refer(type, url)=>ProtocolListenerWrapper.refer=>DubboProtocol.refer 处理Filter
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
|
默认情况grop:Constants.CONSUMER和key:Constants.REFERENCE_FILTER_KEY进行筛选

FutureFilter 返回结果处理
(3).ProtocolListenerWrapper#refer
1
2
3
4
5
6
7
8
9
|
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
|
触发InvokerListener监听,的dubbo未做处理
(4).RegistryProtocol#refer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//链接注册中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
|
(a). doRefer
- 创建Directory
- 订阅Constants.PROVIDERS_CATEGORY服务提供信息,Constants.CONFIGURATORS_CATEGORY系统配置,Constants.ROUTERS_CATEGORY路由配置信息
- Cluster#join=>Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//客服端往注册中心注册信息
//例如/dubbo/com.alibaba.dubbo.demo.DemoService/consumers/consumer%3A%2F....
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//订阅Constants.PROVIDERS_CATEGORY服务提供信息,Constants.CONFIGURATORS_CATEGORY系统配置,Constants.ROUTERS_CATEGORY路由配置信息
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;
}
|
2.Directory 作用用于服务发现

(1).实现类关系

- StaticDirectory静态发现服务,作用于多个注册中心时;
- RegistryDirectory 注册中心发现服务
(2).AbstractDirectory#list
根据Router获取Invoker集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
...
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}...
}
}
return invokers;
}
|
(3).StaticDirectory手动设置集合
(4).RegistryDirectory从注册中心获取Invoker集合,支持实时
a.directory.subscribe 调用RegistryDirectory.notify
配置,路由规则以及服务集合变化都通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} ...
}
// configurators 配置
if (configuratorUrls != null && configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers 路由
if (routerUrls != null && routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers 提供服务
refreshInvoker(invokerUrls);
}
|
b.toRouters URL转化为Router
- 根据RouterFactory路由工厂创建路由规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
private List<Router> toRouters(List<URL> urls) {
List<Router> routers = new ArrayList<Router>();
...
if (urls != null && urls.size() > 0) {
for (URL url : urls) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {
continue;
}
String routerType = url.getParameter(Constants.ROUTER_KEY);
if (routerType != null && routerType.length() > 0) {
url = url.setProtocol(routerType);
}
try {
Router router = routerFactory.getRouter(url);
if (!routers.contains(router))
routers.add(router);
}...
}
}
return routers;
}
|
c.setRouters 设置路由
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
protected void setRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
// url手动配置路由
String routerkey = url.getParameter(Constants.ROUTER_KEY);
// 根据
if (routerkey != null && routerkey.length() > 0) {
RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
routers.add(routerFactory.getRouter(url));
}
// 最后添加MockInvokersSelector
// 降级处理
routers.add(new MockInvokersSelector());
Collections.sort(routers);
this.routers = routers;
}
|
d.RouterFactory工厂创建Router
1
2
3
4
5
6
7
8
9
10
11
12
|
@SPI
public interface RouterFactory {
/**
* Create router.
*
* @param url
* @return router
*/
@Adaptive("protocol")
Router getRouter(URL url);
}
|

- ConditionRouterFactory创建ConditionRouter
i.Router 路由
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public interface Router extends Comparable<Router> {
/**
* get the router url.
*
* @return url
*/
URL getUrl();
/**
* route.
*
* @param invokers
* @param url refer url
* @param invocation
* @return routed invokers
* @throws RpcException
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
|
ii.ConditionRouter条件路由
看看你页面操作

1).属性
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//优先级
private final int priority;
private final boolean force;
//host = 10.20.153.10 => host = 10.20.153.11
//消费者
private final Map<String, MatchPair> whenCondition;
//提供者
private final Map<String, MatchPair> thenCondition;
private static final class MatchPair {
final Set<String> matches = new HashSet<String>();
final Set<String> mismatches = new HashSet<String>();
...
|
2).route路由规则选择
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws RpcException {
if (invokers == null || invokers.size() == 0) {
return invokers;
}
try {
//消费者验证
if (!matchWhen(url, invocation)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
if (thenCondition == null) {
...
return result;
}
//提供者过滤操作
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (result.size() > 0) {
return result;
} else if (force) {
...
return result;
}
}...
return invokers;
}
|
e.refreshInvoker刷新Invoker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private void refreshInvoker(List<URL> invokerUrls) {
...
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.size() == 0) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
...
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
//destroy操作;下线操作
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
|
f.toInvokers URL转化为InvokerDelegate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.size() == 0) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
...
//合并参数
URL url = mergeUrl(providerUrl);
//只要配置有改动就要重新创建Invoker
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
//缓存
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
//这里一般处理服务禁用操作
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//ProtocolFilterWrapper->ProtocolListenerWrapper=>DubboProtocol
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
}...
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
|
g.DubboProtocol#refer创建DubboInvoker
1
2
3
4
5
6
7
8
|
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
//getClients 创建ExchangeClient
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
|
(1).getClients创建ExchangeClient
ExchangeClient的时实现类HeaderExchangeClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
/**
* Create new connection
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
...
ExchangeClient client;
try {
// 懒载创建客户端
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type).connect
// HeaderExchanger.connect
client = Exchangers.connect(url, requestHandler);
}
}...
return client;
}
|
(3).LazyConnectExchangeClient懒载创建客户端,请求创建链接
1
2
3
4
5
6
7
8
|
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
|
3.Cluster集群处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@SPI(FailoverCluster.NAME)
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
|

- MockClusterWrapper - 包装类降级处理 -> 创建MockClusterInvoker
- FailoverCluster - 失败自动切换 -> FailoverInvoker
- FailfastCluster - 快速失败 -> 下面类同
- FailsafeCluster - 失败安全
- FailbackCluster - 失败自动恢复
- ForkingCluster - 并行调用多个服务提供者
- BroadcastCluster - 广播
- MergeableCluster - 合并
默认MockClusterWrapper->FailoverCluster
(1).MockClusterWrapper创建MockClusterInvoker
MockClusterInvoker是服务降级处理
mock=force:return+null
表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
- 还可以改为
mock=fail:return+null
表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
a.invoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
...
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
...
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
|
b.doMockInvoke
创建MockInvoker处理降级处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
private Result doMockInvoke(Invocation invocation, RpcException e) {
Result result = null;
Invoker<T> minvoker;
List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
if (mockInvokers == null || mockInvokers.size() == 0) {
minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
} else {
minvoker = mockInvokers.get(0);
}
try {
result = minvoker.invoke(invocation);
}...
return result;
}
|
(2).FailoverCluster创建FailoverClusterInvoker
a.调用AbstractClusterInvoker.invoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public Result invoke(final Invocation invocation) throws RpcException {
...
LoadBalance loadbalance;
//directory.list(invocation);
List<Invoker<T>> invokers = list(invocation);
//负载均衡操作
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
|
b.FailoverClusterInvoker.doInvoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
//获取重试机会
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
...
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
...异常
}
|
c.进行负载均衡AbstractClusterInvoker.doselect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 只有两个Invoker,进行轮询操作,不走LoadBalance逻辑
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
//负载均衡操作
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
}...
}
}...
}
return invoker;
}
|
d.重新选择AbstractClusterInvoker.reselect
重新选择,首先使用不在“ selected”中的调用者,如果所有调用者都在“ selected”中,则使用负载均衡策略选择一个可用的调用者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
throws RpcException {
//Allocating one in advance, this list is certain to be used.
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
//First, try picking a invoker not in `selected`.
if (availablecheck) { // invoker.isAvailable() should be checked
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
} else { // do not check invoker.isAvailable()
for (Invoker<T> invoker : invokers) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
// Just pick an available invoker using loadbalance policy
{
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
|
(3).ForkingCluster创建ForkingClusterInvoker 并发操作
a.ForkingClusterInvoker#doInvoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
//获取并发数量
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
//超时处理
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
//线程池执行,运用newCachedThreadPool
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
}...
}
});
}
try {
//看谁处理最快;或者等待超时处理
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
}...
}
|
(4).MergeableCluster创建MergeableClusterInvoker 并发操作
a.MergeableClusterInvoker#invoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
public Result invoke(final Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
//获取merger
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
if (ConfigUtils.isEmpty(merger)) { // 如果某个方法没有合并,则仅调用一个invoker
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
Class<?> returnType;
try {
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
}...
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
//线程池执行,运用newCachedThreadPool
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}
Object result = null;
List<Result> resultList = new ArrayList<Result>(results.size());
//获取超时时间
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
...
} else {
resultList.add(r);
}
} catch (Exception e) {
...
}
}
if (resultList.size() == 0) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}
if (returnType == void.class) {
return new RpcResult((Object) null);
}
if (merger.startsWith(".")) { //直接用反射处理合并结果的操作
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
...
}
if (method != null) {
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} ...
}...
} else {
Merger resultMerger;
// "true".equalsIgnoreCase(value)|| "default".equalsIgnoreCase(value);
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
}...
}
return new RpcResult(result);
}
|
b.Merger接口就一个方法merge
i.IntArrayMerger int数组操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class IntArrayMerger implements Merger<int[]> {
public int[] merge(int[]... items) {
int totalLen = 0;
for (int[] item : items) {
totalLen += item.length;
}
int[] result = new int[totalLen];
int index = 0;
for (int[] item : items) {
for (int i : item) {
result[index++] = i;
}
}
return result;
}
}
|
4.LoadBalance负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
|

(1).AbstractLoadBalance抽象类,几个负载均衡继承抽象类,几个公用方法
a.select做基础验证,最终调用子类实现doSelect
1
2
3
4
5
6
7
|
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
|
b.获取权重getWeight
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//获取服务提供者启动时间
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//获取服务提供者运行时间
int uptime = (int) (System.currentTimeMillis() - timestamp);
//获取服务预热时间,默认为10分钟
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
//重新计算服务权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,下面代码逻辑上形似于 (uptime / warmup) * weight。
// 随着服务运行时间 uptime 增大,权重计算值 ww 会慢慢接近配置值 weight
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
|
(2).RandomLoadBalance加权随机算法
a.doSelect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
//sameWeight为true,说明weight都相同,就随机选择;不按权重选择随机选择
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// 按权重值大小随机值,随机值遍历相减,减值为小于零,选择当前Invoker
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
|
(3).RoundRobinLoadBalance#doSelect 加权轮询负载均衡
a.轮询意思肯定缓存保存现在执行下标位置
1
|
private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
|
b.doSelect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = 全限定类名 + "." + 方法名,
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int length = invokers.size(); // Number of invokers
int maxWeight = 0; // The maximum weight
int minWeight = Integer.MAX_VALUE; // The minimum weight
final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
int weightSum = 0;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
minWeight = Math.min(minWeight, weight); // Choose the minimum weight
if (weight > 0) {
invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
weightSum += weight;
}
}
AtomicPositiveInteger sequence = sequences.get(key);
if (sequence == null) {
sequences.putIfAbsent(key, new AtomicPositiveInteger());
sequence = sequences.get(key);
}
int currentSequence = sequence.getAndIncrement();
if (maxWeight > 0 && minWeight < maxWeight) {
int mod = currentSequence % weightSum;
for (int i = 0; i < maxWeight; i++) {
//轮询减mod
for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
final Invoker<T> k = each.getKey();
final IntegerWrapper v = each.getValue();
if (mod == 0 && v.getValue() > 0) {
return k;
}
if (v.getValue() > 0) {
v.decrement();
mod--;
}
}
}
}
// Round robin
return invokers.get(currentSequence % length);
}
|
(4).LeastActiveLoadBalance最小活跃数负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int leastActive = -1; // 最小的活跃数
int leastCount = 0; // 具有相同“最小活跃数”的服务者提供者(Invoker)数量
int[] leastIndexs = new int[length]; // leastIndexs 用于记录具有相同“最小活跃数”的 Invoker 在 invokers 列表中的下标信息
int totalWeight = 0; // 具有相同“最小活跃数”的总权重
int firstWeight = 0; // 具有相同“最小活跃数”,比较权重是否相同
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
//活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
leastActive = active; // Record the current least active value
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = weight; // Reset
firstWeight = weight; // Record the weight the first invoker
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
leastIndexs[leastCount++] = i; // Record index number of this invoker
totalWeight += weight; // Add this invoker's weight to totalWeight.
// If every invoker has the same weight?
if (sameWeight && i > 0
&& weight != firstWeight) {
sameWeight = false;
}
}
}
// assert(leastCount > 0)
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
if (!sameWeight && totalWeight > 0) {
// 这里代码类同加权随机算法
int offsetWeight = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
|
(5).ConsistentHashLoadBalance一致性hash算法
- 首先根据ip或者其他的信息为缓存节点生成一个hash,
- 并将这个hash投射到[0,2^64-1]的圆环上。
- 当有查询或写入请求时,则为根据相应规则生成的key,key生成一个hash值。然后查找第一个大于或等于该hash值的缓存节点;
- 上一步查找节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其hash值的缓存节点即可。
- 数据倾斜问题:由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况。
a.doSelect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果 invokers 是一个新的 List 对象,意味着服务提供者数量发生了变化,可能新增也可能减少了。
// 此时 selector.identityHashCode != identityHashCode 条件成立
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
|
b.ConsistentHashSelector一致性hash算法核心类
(1).构建[0,2^64-1]的圆环数据结构
1
|
private final TreeMap<Long, Invoker<T>> virtualInvokers;
|
(2).创建圆环数据结构以及虚拟节点数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
//圆环数据结构
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
//获取虚拟节点数量
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
//获取请求生产key参数
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
//将节点地址hash投射圆环数据结构上
//还用md5加密操作
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
|
(3).hash 运用位操作运行,尽量hash分散
1
2
3
4
5
6
7
|
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
|
(4).查询select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public Invoker<T> select(Invocation invocation) {
//组装请求key
String key = toKey(invocation.getArguments());
//进行MD5加密进行hash
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Invoker<T> invoker;
Long key = hash;
if (!virtualInvokers.containsKey(key)) {//是否hash相同值
//获取hash大于key集合;从小到大排序
SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}
|
5.代理暴露接口调用方法
用代理对象处理;最终调用JavassistProxyFactory.getProxy方法
最终调用JavassistProxyFactory.getProxy方法
1
2
3
4
5
|
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
|
a).InvokerInvocationHandler 调用服务方法都要经过InvokerInvocationHandler处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
|
执行流程:
- MockClusterInvoker#invoke
- FailoverClusterInvoker#invoke
- Router#route
- LoadBalance#select
- DubboInvoker#doInvoke
- ExchangeClient#request
(1).DubboInvoker#doInvoke
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) { //单向处理
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { //异步处理
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { //双向处理
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();//等待结果
}
}...
}
|
(2).ExchangeClient#request返回DefaultFuture
HeaderExchangeChannel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
|
6.NettyClient
(1).构造
1
2
3
4
5
6
|
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
// new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
// .getAdaptiveExtension().dispatch(handler, url)));
super(url, wrapChannelHandler(url, handler));
}
|
(2).doOpen 这里是客户端链接netty配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
|
(2).AbstractClient#connect=>NettyClient#doConnect正在链接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
...
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
//channel赋值
NettyClient.this.channel = newChannel;
}
}
}...
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
|
(3).NettyHandler委托处理ChannelHandler,具体不说明