Apache Dubbo 服务发布源码分析
Posted chun_soft
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Dubbo 服务发布源码分析相关的知识,希望对你有一定的参考价值。
1、源码分析思考
按照对于 dubbo 的理解,如果要实现服务发布和注册,需要做哪些事情?
- 配置文件解析或者注解解析
- 服务注册
- 启动 netty 服务实现远程监听
2、Dubbo 对于 Spring 的扩展
最早我们使用 Spring 的配置,来实现 dubbo 服务的发布,方便大家的同时,也意味着 Dubbo 里面和 Spring 肯定有那种说不清的关系。
/**
* (1)spring.schemas文件用来配置schame文件的位置;
* (2)当spring容器扫描到配置文件,例如dubbo-consumeer.xml,遇到名称空间:xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
* (3)就会通过名称空间去查询对应的xsd约束文件,就如schemaLocation中配置的:xsi:schemaLocation="
* http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"
* (4)再用这个找到的xsd去spring.schemas文件中找到xsd文件的位置,并校验xsd文件的正确性,返回当前文件(applicationContext.xml)的Document对象
* (5)最终加载spring.handlers文件,然后调用DubboNamespaceHandler的init方法,然后是registerBeanDefinitionParser方法,该方法将节点名称和解析封装在NamespaceHandlerSupport的map中。
*/
2.1 Spring 的标签扩展
在 spring 中定义了两个接口
- NamespaceHandler:注册一堆 BeanDefinitionParser,利用他们来进行解析
- BeanDefinitionParser:用于解析每个 element 的内容。
Spring 默认会加载 jar 包下的 META-INF/spring.handlers 文件寻找对应的 NamespaceHandler。Dubbo-config 模块下的 dubbo-config-spring。
http\\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
2.2 Dubbo 的接入实现
Dubbo 中 spring 扩展就是使用 spring 的自定义类型,所以同样也有NamespaceHandler、BeanDefinitionParser。而 NamespaceHandler 是 DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我们想看 dubbo:service 的配置,就直接看
DubboBeanDefinitionParser(ServiceBean.class,true)
这个里面主要做了一件事,把不同的配置分别转化成 spring 容器中的 bean 对象
application 对应 ApplicationConfig
registry 对应 RegistryConfig
monitor 对应 MonitorConfig
provider 对应 ProviderConfig
consumer 对应 ConsumerConfig
我们仔细看,发现涉及到服务发布和服务调用的两个配置的解析,使用用的是 ServiceBean 和 referenceBean。并不是 config 结尾的,这两个类稍微特殊些,当然他同时也继承了 ServiceConfig 和 ReferenceConfig
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
2.3 DubboBeanDefinitionParser
这里面是实现具体配置文件解析的入口,它重写了 parse 方法,对 spring 的配置进行解析。我们关注一下 ServiceBean 的解析。实际就是解析 dubbo:service 这个标签中对应的属性:
else if (ServiceBean.class.equals(beanClass)) {
String className = resolveAttribute(element, "class", parserContext);
if (StringUtils.isNotEmpty(className)) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition, parserContext);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
}
2.4 ServiceBean 的实现
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware {
}
ServiceBean 这个类,分别实现了 InitializingBean, DisposableBean, ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware
InitializingBean
接口为 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是继承该接口的类,在初始化 bean 的时候会执行该方法。被重写的方法为 afterPropertiesSet
DisposableBean
被重写的方法为 destroy,bean 被销毁的时候,spring 容器会自动执行 destory 方法,比如释放资源
ApplicationContextAware
实现了这个接口的 bean,当 spring 容器初始化的时候,会自动的将 ApplicationContext 注入进来
ApplicationListener
ApplicationEvent 事件监听,spring 容器启动后会发一个事件通知。被重写的方法为:onApplicationEvent,onApplicationEvent 方法传入的对象是 ContextRefreshedEvent。这个对象是当 Spring 的上下文被刷新或者加载完毕的时候触发的。因此服务就是在 Spring 的上下文刷新后进行导出操作的。
BeanNameAware
获得自身初始化时,本身的 bean 的 id 属性,被重写的方法为 setBeanName
ApplicationEventPublisherAware
这个是一个异步事件发送器。被重写的方法为 setApplicationEventPublisher,简单来说,在 spring 里面提供了类似于消息队列的异步事件解耦功能。(典型的观察者模式的应用)
spring 事件发送监听由 3 个部分组成
1.ApplicationEvent:表示事件本身,自定义事件需要继承该类 2.ApplicationEventPublisherAware:事件发送器,需要实现该接口 3.ApplicationListener:事件监听器接口
3、ServiceBean 中服务暴露过程
在 ServiceBean 中,我们暂且只需要关注两个方法,分别是:
在初始化 bean 的时候会执行该方法 afterPropertiesSet, spring 容器启动后会发一个事件通知 onApplicationEvent。
-
afterPropertiesSet
我们发现这个方法里面,就是把 dubbo 中配置的 application、registry、service、protocol 等信息,加载到对应的 config 实体中,便于后续的使用。 -
onApplicationEvent
spring 容器启动之后,会收到一个这样的事件通知,这里面做了两个事情
- 判断服务是否已经发布过
- 如果没有发布,则调用调用 export 进行服务发布的流程(这里就是入口)
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
export
serviceBean 中,重写了 export 方法,实现了一个事件的发布。并且调用了 super.export() ,也就是会调用父类的 export 方法
@Override
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
4、ServiceConfig 配置类
先整体来看一下这个类的作用,从名字来看,它应该和其他所有 config 类一样去实现对配置文件中 service 的配置信息的存储。实际上这个类并不单纯,所有的配置它都放在了一个 AbstractServiceConfig 的抽象类,自己实现了很多对于服务发布之前要做的操作逻辑。
4.1 export
public synchronized void export() {
// 检查并更新配置信息
checkAndUpdateSubConfigs();
// 当前的服务是否需要发布, 通过配置实现:@Service(export = false)
if (!shouldExport()) {
return;
}
// 检查是否需要延时发布,通过配置@Service(delay = 1000)实现,单位毫秒
if (shouldDelay()) {
// 这里的延时是通过定时器来实现
delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 如果没有配置 delay,则直接调用 export 进行发布
doExport();
}
}
4.2 doExport
这里仍然还是在实现发布前的各种判断,比如判断:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 服务是否已经发布过
if (exported) {
return;
}
// 设置发布状态
exported = true;
// path 表示服务路径,默认使用 interfaceName
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
4.3 doExportUrls
- 记载所有配置的注册中心地址
- 遍历所有配置的协议,protocols
- 针对每种协议发布一个对应协议的服务
private void doExportUrls() {
// 加载所有配置的注册中心的地址,组装成一个 URL
// (registry://ip:port/org.apache.dubbo.registry.RegistryService 的东西)
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
// group 跟 version 组成一个 pathKey(serviceName)
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// applicationModel 用来存储 ProviderModel,发布的服务的元数据,后续会用到
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
// 发布指定协议
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
4.4 doExportUrlsFor1Protocol
发布指定协议的服务,我们以 Dubbo 服务为例,由于代码太多,就不全部贴出来
- 前面的一大串 if else 代码,是为了把当前服务下所配置的dubbo:method参数进行解析,保存到 map 集合中
- 获得当前服务需要暴露的 ip 和端口
- 把解析到的所有数据,组装成一个 URL,大概应该是: dubbo://192.168.13.1:20881/com.xx.ISayHelloService
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略一大串 ifelse 代码,用于解析<dubbo:method> 配置
// 省略解析<dubbo:service>中配置参数的代码,比如 token、比如 service 中的 method 名称等存储在 map 中
// 获得当前服务要发布的目标 ip 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 组装 URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 这里是通过 ConfiguratorFactory 去实现动态改变配置的功能,这里暂时不涉及后续再分析
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
如果 scope!="none"则发布服务,默认 scope 为 null。如果 scope 不为 none,判断是否为 local 或 remote,从而发布 Local 服务或 Remote 服务,默认两个都会发布
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// injvm 发布到本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 发布远程服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
-
Local
服务只是 injvm 的服务,提供一种消费者和提供者都在一个 jvm 内的调用方式。使用了 Injvm 协议,是一个伪协议,它不开启端口,不发起远程调用,只在 JVM 内直接关联,(通过集合的方式保存了发布的服务信息),但执行 Dubbo 的 Filter 链。简单来说,就是你本地的 dubbo 服务调用,都依托于 dubbo 的标准来进行。这样可以享受到 dubbo 的一些配置服务。 -
remote
表示根据根据配置的注册中心进行远程发布。遍历多个注册中心,进行协议的发布
- Invoker 是一个代理类,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。(后续单独分析)
- DelegateProviderMetaDataInvoker,因为 2.7 引入了元数据,所以这里对 invoker 做了委托,把 invoker 交给 DelegateProviderMetaDataInvoker 来处理
- 调用 protocol.export(invoker)来发布这个代理
- 添加到 exporters 集合
for (URL registryURL : registryURLs) {
// 省略部分代码...
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded (EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
4.5 protocol.export
protocol.export,这个 protocol 是什么呢?找到定义处发现它是一个自适应扩展点,打开 Protocol 这个扩展点,又可以看到它是一个在方法层面上的自适应扩展,意味着它实现了对于 export 这个方法的适配。也就意味着这个 Protocol 是一个动态代理类, Protocol$Adaptive
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Protocol$Adaptive
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol. destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol. getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc. RpcException {
if (arg0 == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Pro
tocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Pro
tocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extNameApache Dubbo 服务发布源码分析