dubbo服务发布
文章目录
【注意】最后更新于 December 10, 2018,文中内容可能已过时,请谨慎使用。
一、dubbo整体架构
1.先看看架构图
分层架构,一目了然,每一层处理相应的业务逻辑;
官方文档写很棒;
2.在 Dubbo 的核心领域模型中:
-
Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
-
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
-
Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
-
基本设计原则
-
采用 Microkernel + Plugin 模式,Microkernel 只负责组装 Plugin,Dubbo 自身的功能也是通过扩展点实现的,也就是 Dubbo 的所有功能点都可被用户自定义扩展所替换。
-
采用 URL 作为配置信息的统一格式,所有扩展点都通过传递 URL 携带配置信息。
- 例如:registry://192.168.1.7:9090/com.alibaba.service1?param1=value1¶m2=value2
a.领域模型类图
二、dubbo发布服务
1.Spring#afterPropertiesSet()执行方法发布服务
执行方法顺序:
ServiceBean.afterPropertiesSet()->ServiceConfig.export()->doExport()->doExportUrls() ->doExportUrlsFor1Protocol()
2.ServiceConfig中定义常量(static final)Protocol和ProxyFactory的适配器类,从ExtensionLoader获取;
|
|
(1).ExtensionLoader自动生成Protocol生成Protocol$Adaptive类,看看结构,跟ExtensionLoader分析是不是一样;
|
|
(2).Protocol的SPI实现
创建对象被ProtocolFilterWrapper和ProtocolListenerWrapper一起包装
(3).ProxyFactory的SPI实现动态代理
a.接口类图
- getProxy这个用于消费端实现接口创建代理
- getInvoker作用于的服务端创建Invoker代理
- 创建AbstractProxyInvoker抽象类进行代理
b.SPI实现类
默认情况javassist动态代理,创建实例对象时,对象都被StubProxyFactoryWrapper包装; 具体不做分析,在SPI机制中说明;
c.JavassistProxyFactory 代理动态代理,创建代码,编译class,减少用反射,提高性能
d.StubProxyFactoryWrapper用于Dubbo的本地存根,消费端先调用本地的实现类逻辑处理;
3.ServiceConfig#doExportUrlsFor1Protocol部分代码
- 注册registryURLs
- registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.6.0&pid=28302®istry=zookeeper×tamp=1593676329574
- dubbo://192.168.31.122:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.31.122&bind.port=20881&dubbo=2.6.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=28302&server=netty4&side=provider×tamp=1593676329586
|
|
4. exportLocal 发布本地
|
|
5. 发布注册中心
ProtocolFilterWrapper->ProtocolListenerWrapper->RegistryProtocol
(1). ProtocolFilterWrapper#exporter
- URL是registry注册协议。不封装Filter链的Invoker
- Invoker封装构成Filter连图;
|
|
getActivateExtension获取实现Filter接口注解@Activate对象;@Activate按group,value做筛选;
key:Constants.SERVICE_FILTER_KEY, group:Constants.PROVIDER
GenericFilter处理Dubbo的泛化;
TimeoutFilter只是超时日志打印处理;
a.Filter的SPI实现
|
|
(2). ProtocolListenerWrapper#exporter
|
|
ExporterListener 未做实现
a).ExporterListener的SPI实现
|
|
(3). RegistryProtocol#exporter
|
|
a).doLocalExport处理DubboProtocol
|
|
b).getRegistry 链接注册中心
|
|
i. RegistryFactory的SPI实现
|
|
c).register 提供服务注册到注册中心
|
|
例如 Zookeeper注册地址:
/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.31.12….
d).订阅Constants.CONFIGURATORS_CATEGORY路径配置信息
|
|
(4). DubboProtocol#exporter
|
|
(a). openServer 开启Netty服务
|
|
(b). Exchangers
|
|
就一个类HeaderExchanger
|
|
i.HeaderExchangeServer处理心跳
ii.DecodeHandler包装–>HeaderExchangeHandler包装->ExchangeHandler
(c).HeaderExchangeHandler#received 接受
|
|
i.Request和Response
ii.Request
(d).HeaderExchangeHandler#request请求
|
|
i.DefaultFuture实现Future模式,等待Response结果
- 缓存等待DefaultFuture
|
|
- 创建DefaultFuture,添加到缓存中
|
|
- 开启一个线程处理超时DefaultFuture的RemotingInvocationTimeoutScan
(e).requestHandler 客服端请求最终调用方法
|
|
(f).Transporter
|
|
NettyTransporter 源码
|
|
6.Registry 注册中心
看看接口类图
![image/dubbo25.jpg]
(1).实现类
![image/dubbo26.jpg]
- FailbackRegistry故障处理
- ZookeeperRegistry Zookeeper注册中心
- RedisRegistry redis注册中心
(2).AbstractRegistry抽象类
a.构造
- “/.dubbo/dubbo-registry-” + APPLICATION + “-” + Addres + “.cache"缓存文件path
- 文件不存在就创建,加载到缓存properties中
|
|
b.数据结构
|
|
从这个数据结构中,一目了然注册和监听都是ConcurrentMap控制,这里具体不说了
c.notify配置信息有变动实时触发
|
|
(3).FailbackRegistry容错处理
例如注册失败或者订阅失败等错误是怎么处理呢??
a.看看数据结构
|
|
记录错误操作对应相应操作
b.构造器
|
|
原理就是线程池每5秒触发一次,重试操作;是不是很简单
(4).ZookeeperRegistry
a.构造方法
|
|
- ZookeeperTransporter接口创建不同的zookeeper客户端;用户自己选择
- 这里不具体分析ZookeeperTransporter
b.doRegister 注册服务
|
|
toUrlPath: dubbo+/+ServiceInterface+/+CATEGORY_KEY+/+Url
c.doSubscrib订阅
- ServiceInterface为*时,订阅dubbo下路径
- 为正式服务名时,监听服务路径通知this.notify方法
(2).RedisRegistry
链接Redis工具包JedisPool;看看注册服务和订阅是怎么实现
a.doRegister注册服务
- hset(key, value, expire)
- publish(key, Constants.REGISTER); 命令用于将信息发送到指定的频道。
- expire过期时间,注册服务过期怎么处理呢?
- expirePeriod默认60秒
- 调度线程每expirePeriod/2;重新hset和publish命令
|
|
b.doSubscribe订阅
- 开启Notifier线程,用命令psubscribe订阅操作
7.NettyServer 源码分析
(1).构造
|
|
- MultiMessageHandler处理多个消息
- HeartbeatHandler心跳处理
- 形成ChannelHandler连
- MultiMessageHandler -> HeartbeatHandler -> XXXChannelHandler(Dispatcher) -> DecodeHandler–>HeaderExchangeHandler->ExchangeHandle
(2).ChannelHandler Channel处理
顾名思义知道方法意思
(3).Dispatcher线程派发器
通道信息派发器
|
|
- all:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。
- direct:所有消息都不派发到线程池,全部在IO线程上直接执行。
- message:只有请求响应消息派发到线程池,其他连接断开事件、心跳等消息,直接在IO线程上执行。
- execution:只请求消息派发到线程池,不含响应,响应和其他连接断开事件、心跳等消息,直接在IO线程上执行。
- connection:创建单个线程池,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池。
a.AllDispatcher源码分析
|
|
b.AllChannelHandler线程池执行客户端请求
AllChannelHandler继续WrappedChannelHandler抽象方法
i.WrappedChannelHandler#构造方法创建业务线程池
|
|
ii.WrappedChannelHandler往线程池中添加ChannelEventRunnable任务
|
|
c. ThreadPool的SPI实现
|
|
- fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。
- cached: 缓存线程池,空闲一分钟自动删除,需要时重建。
- limited:可伸缩线程池,但池中的线程只会增长不会收缩。
d. DataStore的SPI实现
|
|
- SimpleDataStore
1 2 3
// <component name or id, <data-name, data-value>> private ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap<String, ConcurrentMap<String, Object>>();
d.ChannelEventRunnable#run业务线程池任务处理
|
|
(4).doOpen start Server
Netty4启动服务
|
|
a.NettyCodecAdapter编解码器适配器
i.encode编码,最终调用codec.encode
|
|
ii.decode解码,最终调用codec.decode
|
|
b.Codec2编解码器
(1).DubboCodec默认编解码器,继承ExchangeCodec,ExchangeCodec最终进行编解码器
ExchangeCodec源码
|
|
(2).ExchangeCodec#encodeRequest
|
|
(3).DubboCodec#encodeRequestData
|
|
(4).ExchangeCodec#encodeResponse
|
|
(5).DubboCodec#decodeBody
|
|
c.NettyServerHandler
- channelActive调用ChannelHandler.connected
- channelInactive调用ChannelHandler.connected
- channelRead调用ChannelHandler.received
- write调用ChannelHandler.sent
- exceptionCaughtd调用ChannelHandler.caught
- NettyChannel缓存上下文
|
|
(5).NettyServer实现ChannelHandler接口
7.Filter
1.ExecuteLimitFilter用于提供者限制并发数量
- 方法+executes 配置并发数量
- Semaphore控制
2.ActiveLimitFilter用于消费者限制并发数量
- 方法+actives配置并发数量
- 方法+timeout设置等待
- AtomicInteger控制数量
- RpcStatus做统计
3.GenericFilter用于泛化提供者实现
- 方法名称:$invoke
- 参数包含generic
|
|
4.GenericImplFilter用于泛化消费者实现
- URL参数包含generic就可以进行泛化
- 按参数内容进行序列化操作
- GenericService进行调用