dubbo学习-服务暴露与注册源码剖析
Posted ljming的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo学习-服务暴露与注册源码剖析相关的知识,希望对你有一定的参考价值。
1. 服务导出简介
服务导出的方法入口位于ServiceBean这个类,我们配置的<dubbo:service/>
标签或者被@Service注解修饰的bean,都会转换成ServiceBean。ServiceBean继承自ServiceConfig,实现了InitializingBean、ApplicationListener等接口。ServiceBean重写了ApplicationListener的onApplicationEvent方法,在spring容器发布刷新事件时被触发执行这个方法,在onApplicationEvent方法中会执行服务的导出逻辑。整个导出流程可分为三个部分:
第一部分是前置工作,主要对配置参数进行检查,组装成URL;
第二部分是服务导出,服务导出分为本地导出(injvm)和远程导出两个过程;
第三部分是服务注册,向注册中心注册服务信息,用于服务发现。
ServiceBean实现了InitializingBean接口,重写了afterPropertiesSet方法。在Bean的实例化过程中,最后会调用afterPropertiesSet方法,该方法的作用主要是设置相关的配置信息,如<dubbo:provider/>
、<dubbo:applicaiton/>
、<dubbo:registy/>
等。在该方法最后,会判断supportApplicationListener的值,如果为false,则不会经过spring容器的刷新通过进行服务导出,而是直接进行服务导出,否则的话,通过spring容器发布刷新事件进行导出。supportedApplicationListener表示当前spring容器是否支持ApplicationListener,这个值的初始值为false,在spring容器将自己设置到ServiceBean的时候,ServiceBean 的 setApplicationContext 方法会检测 Spring 容器是否支持 ApplicationListener。若支持,则将 supportedApplicationListener 置为 true。ServiceBean 是 Dubbo 与 Spring 框架进行整合的关键,可以看做是两个框架之间的桥梁。具有同样作用的类还有 ReferenceBean。
2. 前置工作
从上面我们可以了解到服务的导出有两个地方,一个是afterPropertiesSet方法,如果不支持ApplicationListener,则会在该方法内部调用export方法进行服务导出。另一个地方则是,支持ApplicationListener,spring发布容器刷新机制,在ServiceBean的onApplicationEvent方法内部会调用export方法。而ServiceBean的export方法调用了其父类ServiceConfig的export方法来实现。
public synchronized void export() {
checkAndUpdateSubConfigs(); // ①
if (!shouldExport()) { // ②
return;
}
if (shouldDelay()) { // ③
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport(); // ④
}
}
doExport方法内部没有做实质性的逻辑,在方法内部主要调用了doExportUrls方法,进行多协议多注册中的服务导出。
①: 检测配置信息。
②: 判断是否需要导出服务。主要判断<dubbo: service/>标签或者<dubbo: provider/>标签配置的export值,缺省配置为true。如果配置的export为false,则不执行导出。
③: 判断是否进行延迟导出。主要判断<dubbo: service/>标签或者<dubbo: provider/>标签配置的delay值,缺省配置为空,表示不延迟暴露。如果delay的值大于0表示延迟暴露,否者不延迟暴露。延迟暴露会通过一个定时任务,延迟delay毫秒数调用doExport方法进行服务导出。
④: 执行服务导出。
doExport方法内部没有做实质性的逻辑,在方法内部主要调用了doExportUrls方法,进行多协议多注册中的服务导出。
2.1 doExportUrls方法实现
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true); // ①
for (ProtocolConfig protocolConfig : protocols) { // ②
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs); // ③
}
}
①: 加载配置中的<dubbo: registry/>配置,可以配置多个。将配置的信息组装成URL对象。
Registry相关的URL信息中,protocol=registry,表示是注册协议。
②: 循环每个协议,进行多协议多注册中心服务导出。
③: 调用doExportUrlsFor1Protocol方法,逐个协议暴露。
2.2 doExportUrlsFor1Protocol方法实现
在doExportUrlsFor1Protocol方法中,首先会构建服务导出的URL,然后执行服务导出和注册。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName(); // 协议名称,默认为dubbo
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 将配置参数放到map中,key为对应的配置名称,value为配置的值
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// 解析MethodConfig对象设置的方法级别的配置,并将参数放到map中
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
// 遍历每个method配置,添加方法级别的配置信息
} // end of methods for
}
// 判断调用类型,如果是泛化调用,则设置泛化类型
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
// 如果不是泛化调用,设置对应的配置值
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 获取接口定义的所有方法,将所有的方法名用,分割作为value,key为methods,方法map
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 判断调用该service服务是否需要token,如果需要将配置的token信息放入map
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// 执行服务导出流程
// 获取服务host,默认为本机ip地址
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// 获取服务暴露的端口,默认为20880
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 将map中的参数信息,构建成URL对象
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 获取配置的scope值
String scope = url.getParameter(SCOPE_KEY);
// 如果配置的scope值为none,则不暴露服务
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果配置的scope值不为remote,则进行本地暴露
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 如果配置的scope值不为local,则进行远程暴露
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 循环每一个注册地址,进行服务暴露
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加载监控中心的配置信息,并放入到URL中
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 获取配置的proxy值,用来指定用哪个proxy工厂的getInvoker方法生成invoker类的
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 将服务的实现类转换成为Invoker对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// 对Invoker对象进行一次封装
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 执行服务暴露
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
// ...省略不相关代码...
}
// ...省略不相关代码...
}
}
this.urls.add(url);
}
该方法在执行服务导出之前,先会读取服务的配置信息,将其放入map中,map的key为配置的名称, value为配置的值。在最后构建url的地方,将协议名称name、host,port、map等信息进行封装,转换为URL对象。
将配置信息转换为URL对象之后,然后就进行导出。首先会获取服务的scope配置,如果配置的scope值为none,则不执行服务导出。如果scope的不是remote,则进行本地导出。如果scope的值不是local,则进行远程导出。所以scope的缺省配置下,会进行本地和远程导出。
3. 服务导出
服务的导出分为本地导出和远程导出,本地暴露使用injvm协议,不会开启端口,不发起远程调用,只在jvm内关联。相比远程调用而言,不需要注册到注册中心。绑定的host:port 为127.0.0.1:0。这里不聚焦此处。主要看远程导出的实现。这个步骤总体分两步,首先是将本地接口实现类转换成为Invoker类;然后将Invoker进行导出,转换为Exporter类。
3.1 本地接口实现类转换为Invoker实现
在进行服务导致之前,会将本地接口实现类转换成为Invoker对象。通过调用ProxyFactory的getInvoker方法来实现。
// 获取配置的proxy值,用来指定用哪个proxy工厂的getInvoker方法生成invoker类的
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
配置的proxy的值表示使用哪个proxy工厂来生成Invoker对象。proxy的值也会通过放到URL中,进行传递。缺省配置为javassist。
3.1.1 ProxyFactory的getInoker方法实现
PROXY_FACTORY在ServiceConfig的定义如下;
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
PROXY_FACTORY是ProxyFactory的自适应扩展类ProxyFactory$Adaptive
,getInvoker方法被@Adaptive注解修饰,我们可以看看自适应扩展类ProxyFactory$Adaptive
的getInvoker方法内容。
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
从getInvoker方法的内容可以看出,内部实现会通过proxy字段值来选择具体的扩展实现类的getInvoker实现。缺省配置的扩展了名称为javassist,也就是说ProxyFactory的默认自适应扩展类为JavassistFactory。
dubbo提供了基于jdk和javassit的ProxyFactory实现,JdkProxyFactory和JavassistProxyFactory。
3.1.1.1 JdkProxyFacotry的getInvoker实现
我们先来看下getInvoker方法在JdkProxyFactory里的实现。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 通过反射获取proxy的方法,然后进行调用
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
该方法返回一个AbstractProxyInvoker对象,重写了doInvoker方法。doInvoker方法的原理就是通过反射的方式来调用目标方法。
3.1.1.2 JavassitProxyFactory的getInvoker实现
基于javassist的getInvoker实现。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
/**
* JavassitProxyFactory作为ProxyFactory的默认自适应实现,getInvoker方法通过创建Wrapper子类,对proxy类进行包装,返回AbstractProxyInvoker对象。
* 在wrapper子类中实现invokerMethod方法,方法体内会为每个proxy的方法做方法名和方法参数匹配校验,匹配成共则进行调用。相比JdkProxyFactory实现,省去了反射调用的开销。
*/
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
方法也返回一个AbstractProxyInvoker对象,重写了doInvoker方法。和JdkProxyFactory不同的是,JavassistProxyFactory会生成一个Wrapper类。Wrapper了是一个包装类,用于包装目标类,尽可以通过getWrapper方法创建子类。在创建wrapper子类的过程中,子类代码生成逻辑会对传入的class信息进行解析,拿到诸如方法、成员变量等信息。以及生成invokerMenthod方法和其它一些方法代码。代码生成完毕后,通过javassist生成class对象,最后再通过反射生成Wrapper实例。
和基于Jdk的实现不同的是,基于Javassit的实现,在调用过程中,jdk每次都是基于反射调用,而javassist通过生成包装类,通过包装类去完成实际调用,相比之下,javassist消耗性能更小。
为了携带服务的配置元数据信息,通过DelegateProviderMetaDataInvoker来对Invoker类进行封装,DelegateProviderMetaDataInvoker实现了Invoker接口,并持有Invoker对象的引用和配置元数据信息的引用。接下来的导出流程将基于这个类进行。
3.1.2 小结
本地接口进行导出之前,需要将接口实现类转换为Invoker对象。dubbo默认提供了基于Jdk和Javassit的两种实现,基于Javassit的实现减少了反射调用,性能更好。
3.2 将Invoker对象转换为Exporter
实现Invoker对象转换成为Exporter对象是在DubboProtocol中实现的,但是根据Invoker对象的URL信息可以发现,URL的信息中将registry的配置信息和服务的配置信息结合在了一起,完成这一操作的就是实现本地接口转换成Invoker的实现。
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
registryURL.addParameterAndEncoded这个方法,将url(也就是服务的配置信息)都结合到了一次。
从图中可以看出,url信息中包含了注册信息和服务导出信息,使用export字段将服务信息结合,此时url的protocol值为registry。
服务导出的实现是调用Protocol的export方法,Protocol是一个扩展接口,export方法被@Adaptive注解修饰。
Exporter<?> exporter = protocol.export(wrapperInvoker);
3.2.1 Protocol的export方法实现
上面说到Invoker实现Expoter的转换是通过Protocol的export方法实现,protocol变量在ServiceConfig的定义如下:
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
protocol是Protocol扩展接口的自适应扩展类Protocol$Adaptive
,可以看下export方法在Protocol$Adaptive
类中的内容:
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
可以看到export方法会从url中获取protocol的值作为具体的扩展类名称,默认为dubbo。然后根据扩展类名称获取具体的扩展实现类,再调用这个类的export方法。
和服务导出相关的Protocol扩展实现类有RegistryProtocol、DubboProtocol、ProtocolFilterWrapper、ProtocolListenerWrapper等。
当然这里只针对基于dubbo协议的服务导出,如果是基于其它协议的,诸如WebServiceProtocol、HttpProtocol、RedisProtocol、XmlRpcProtocol等,可以看它们的具体实现。
3.2.1.1 RegistryProtocol的export方法实现
上面说到调用protocol.export方法是,invoker对象中的url的protocol值为registry,所以根据spi的自适应机制,此时export的具体实现在RegistryProtocol的export方法。但其实在进入到RegistryProtocol的export内部之前,现需要经过ProtocolFilterWrapper和ProtocolListenerWrapper的调用,这两个类是扩展接口的包装类,这个两个类的export方法实现,对于protocol值为registry不做任何操作,直接调用RegistryProtocol的export。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 取出注册中心的url
URL registryUrl = getRegistryUrl(originInvoker);
// 取出服务暴露的url
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 执行本地暴露,将Invoker转换为Exporter
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 省略服务注册的代码
}
该方法总体分为三个步骤: 1,将URL中的注册信息和服务导出信息分类;2,进行服务导出;3,进行服务注册。这里只关注1,2步,先不用关注服务注册的实现。
registryUrl是服务注册的URL信息。提取方法实现如下:
private URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl(); // 原始的url信息,registry开头
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { // 是registry协议才提取
// 获取url中参数名为registry的值,因为配置的<dubbo: registry="zookeeper://localhost:2181">,所以protocol值为zookeeper
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
// 将协议修改为具体的注册协议(zookeeper),并去除registry参数,形成新的registryUrl信息
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
providerUrl是服务导出的URL信息,提取方法实现如下:
private URL getProviderUrl(final Invoker<?> originInvoker) {
String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY); // 取出URL中参数为export的值
if (export == null || export.length() == 0) {
throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
}
return URL.valueOf(export);
}
从图可以看出,此时registryUrl的协议为zookeeper,providerUrl的协议是dubbo。
拿到providerUrl之后,就会调用doLocalExport方法尽心服务导出。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
// 服务暴露的key,服务暴露之后,会放入map缓存
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// protocol.export方法,配置的dubbo协议,所以调用DubboProtocol#export方法,但是会先经过ProtocolFilterWrapper和ProtocolListenerWrapper
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
因为我们知道此时providerUrl的协议是dubbo,所有将Invoker转换为Exporter的实现在DubboProtocol的export方法中。
3.2.1.1 DubboProtocol的export方法实现
在DubboProtocol的export方法内部,会开启NettyServer进行服务监听。而在进入DubboProtocol的export方法内部之前,也会进入到ProtocolFilterWrapper和ProtocolListenerWrapper的调用。
ProtocolListenerWrapper注入了监听器列表,ExporterListener也是一个SPI扩展接口,我们可自己实现该扩展点,然后在服务完成导出之后,回到ExporterListener的exported方法做些其它的事情。
ProtocolFilterWrapper的export方法会为Invoker对象构建一个调用链。最终返回一个 CallbackRegistrationInvoker对象。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 加载所有的Filter自动激活扩展实现类,也就是所有的被@Activate注解修饰的扩展实现类
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// 构建调用链,filters或根据@Activate注解设置的排序字段进行排序,构建的调用链Filter实现类在前,invoker在最后
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
// 省略无关代码
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
// 省略无关代码
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
最后方法返回的CallbackRegistrationInvoker对象,它是ProtocolFilterWrapper的内部类, 实现Invoker接口。封装了Invoker调用链和所有的过滤器。
static class CallbackRegistrationInvoker<T> implements Invoker<T> {
private final Invoker<T> filterInvoker; // 封装了filter的invoker调用链,真正的调用实现类在链尾
private final List<Filter> filters; // 所有的过滤器实现类
public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
this.filterInvoker = filterInvoker;
this.filters = filters;
}
}
完成调用链的构建之后,就会进入到DubboProtocol的export方法完成服务导出。基本流程就是讲Invoker转换为Exporter,然后放到exportMap缓存中,然后调用operServer方法开启NettyServer,进行服务监听。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 根据url获取服务暴露的key
String key = serviceKey(url); // ①
// 将Invoker对象转换成为Exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // ②
// 将Exporter放入map缓存
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 开启NettyServer,进行监听
openServer(url); // ③
optimizeSerialization(url);
return exporter;
}
①: 根据invoker的url构建服务的key。
②: 将Invoker对象转换为Exporter,并放入exporterMap中。
③: 调用openServer方法开启NettyServer,进行服务监听。
3.2.2 openServer方法实现
private void openServer(URL url) {
// 获取提供者机器地址+ip
String key = url.getAddress();
// 只有提供者才需要启动监听
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 先从缓存中获取server,缓存中没有的话,则通过createServer创建。每个机器的ip:port是唯一的,所以只会创建一次NettyServer,之后都是从缓存中取
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
该方法实现首先会判断当前URL是否为服务提供方的信息,只有是提供者才需要启动监听。然后会先从serverMap中根据ip:port获取ExchangeServer对象,每个ip:port是唯一的,所以只会创建一次Server对象,之后都是从缓存中获取。如果缓存中没有,调用调用createServer方法创建Server,在双重检测的机制下确保只创建一次。
3.2.3 createServer方法实现
dubbo默认使用netty作为网络通讯服务器。
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// 当服务准备关闭时,运行发送只读事件
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 开启心跳机制
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build(); // ①
// 获取使用了那种Server,默认使用Netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER); // ②
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // ③
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
①: 向URL添加其它参数信息。如服务准备关闭,是否开启只处理只读事件;是否开启心跳机制;
②: 获取配置的server字段值,默认为netty。dubbo也提供了基于如Mina服务的实现。
③: 调用Exchangers.bind方法开启服务。
Exchangers.bind方法中首先获取Exchanger对象,Exchanger是扩展点接口,默认扩展实现了是HeaderExchanger。所以进到HeaderExchanger的bind方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
方法最终返回HeaderExchangeServer对象。在返回之前会先调用Transporters.bind方法,创建一个Server对象。创建Server对象,会传入一个DecodeHandler,创建DecodeHandler会传入HeaderExchangeHandler,创建HeaderExchangeHandler又会传入一个ExchangeHandler,这个ExchangeHandler是位于DubboProtocol的ExchangeHandlerAdapter实例——requestHandler。ExchangeHandlerAdapter实现了ExchangeHandler接口。
通过上面的分析,Transporters.bind方法传入的ChannelHandler实例为DecoderHandler。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取Transporter扩展类实例,然后执行bind操作。getTransporter方法返回的是自适应扩展类。
// bind方法会先根据server的值来决定使用那个扩展实现类,如果server值为空,则使用transporter的值来决定使用哪个扩展实现类
// 这里server为netty,所以调用NettyTransporter#bind方法,并发挥NettyServer对象
return getTransporter().bind(url, handler);
}
该方法的实现,也是通过URL中的参数来决定选择Transporter扩展接口的实际扩展实现类。getTransporter方法返回Transporter$Adaptive自适应类,根据bind方法上的@Adaptive注解配置key,从URL中选择参数为key的值。默认为netty,所以会调用NettyTransporter的bind方法。
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyTransporter的bind方法,返回一个NettyServer实例。接下来看NettyServer的构造函数。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
构造函数主要是调用父类的构造函数。在调用父类构造函数之前,会调用ChannelHandlers.wrap方法,对传进来的ChannelHandler进行封装,此时传进来的ChannelHandler为DecoderHandler。
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url) 默认Dispatcher为all,所以返回AllChannelHandler
// MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)));
}
wrap方法内部调用wrapInternal方法对Handler进行封装。直接看最后一句,Dispatcher是一个扩展接口,dispatch方法被@Adaptive注解修饰,根据URL中的dispatcher、dispather、channel.handler参数值来选择具体的扩展实现类。这里默认是all,所以dispatch方法的默认实现是AllDispatcher的dispatch方法。AllDispatcher的dispatch方法,直接返回AllChannelHandler对象。
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
所以warp方法将传进来的DecoderHandler进行封装后,返回的是MultiMessageHandler对象。内部的引用关系为:
MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecoderHandler -> HeaderExchangeHandler -> ExchangeHandlerAdapter。
完成Handler的封装之后,就会进入到父类AbstractServer的构造函数,这时传进来的ChannelHandler实例为MultiMessageHandler。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); // ①
// 本地地址
localAddress = getUrl().toInetSocketAddress();
// 绑定的ip
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
// 绑定端口
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort); // ②
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); // ③
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); // ④
try {
// 启动服务监听
doOpen(); // ⑤
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
①: 继续调用父类的构造函数,设置handler、timeout、connectTimeout、codec、url等属性值。
③: 获取服务器允许最大连接数。默认为0,表示不限制。
④: 获取连接空闲时间。默认为600s。
⑤: 调用doOpen方法,开启服务监听。
doOpen方法在AbstractServer是一个抽象方法,有子类实现。具体看NettyServer的doOpen方法。
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap(); // 创建ServerBootStrap
// ①
// 创建主线程组
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// 创建工作线程组
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
// 创建NettyServerHandler ②
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
// ③
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 添加解码器Handler到管道
.addLast("encoder", adapter.getEncoder()) // 添加编码码器Handler到管道
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler); // 添加处理业务的handler到管道
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
这就是一个典型的Netty开启服务的实现。
①: Dubbo使用了Netty的reactor模式,使用主从多线程来出来连接事件和业务事件。
②: 创建处理业务的Handler。而构造函数传入的ChannelHandler对象是NettyServer对象,从NettyServer的继承关系可以,NettyServer继承自AbstractPeer,而AbstractPeer实现了ChannelHandler接口,所有NettyServer本质上也会一个ChannelHandler。根据NettyServer的构造函数可知,此时NettyServer对象内部持有的ChannelHandler对象是MultiMessageHandler。
③: 添加编码器handler、解码器handler和处理业务的handler对象到pipeline中。
经过上述开启NettyServer的过程,ChannelHandler的最终的连接关系如下图:
NettyServer接收到一个请求之后,就会逐一经过这里ChannelHandler,最后到达ExchangeHandlerAdapter实例,也就是DubboProtocol的requestHandler,最终由它来完成业务处理。这块会在后面的内容中讲到。
3.3 服务导出小结
服务的导出是将接口实现类转换成Invoker对象,Invoker转换成Exporter对象的过程。首先ServiceConfig类拿到对外提供服务的的实际类ref(例如:DemoServiceImpl),然后通过ProxyFactory类的getInvoker方法,将ref转换为一个AbstractProxyInvoker实例,到这一步就完成了接口实现类到Invoker的转换。Invoker转换为Exporter的过程,根据具体的导出协议,如dubbo,则会在DubboProtocol中的export方法完成转换。
4. 服务注册
在分析服务导出的地方,只给出了RegistryProtocol的export方法的服务导出部分代码。完成服务的导出之后,接下来就会执行服务注册的逻辑。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
/**
* 省略服务导出的代码
*/
// 根据注册中心url获取对应的Registry扩展实现类。这里配置的是注册协议是zookeeper,所以registry对象是ZookeeperRegistry
final Registry registry = getRegistry(originInvoker); // ①
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // ②
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 根据配置判断,是否需要注册服务
boolean register = registeredProviderUrl.getParameter("register", true); // ③
if (register) {
// 进行服务注册
register(registryUrl, registeredProviderUrl); // ④
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
①: 根据Invoker对象获取Registry对象。
②: 获取服务提供者需要注册到注册中心的URL信息。
③: 从registeredProviderUrl获取register参数信息,也就是配置的register值,如果为false,则不会进行服务注册,反正进行服务注册。默认为true。
④: 进行服务注册。
4.1 getRegistry方法实现
进行服务注册之前,需要通过Invoker中的URL信息获取Registry对象。
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
方法逻辑首先会获取registryUrl,getRegistryUrl方法在服务导出时已经分析过了,registryUrl对象的protocol值为zookeeper;然后调用RegistryFactory的getRegistry方法获取Registry实例。
RegistryFactory是一个扩展接口,getRegistry方法被@Adaptive注解修饰,根据protocol的值来选择具体的实现类。这里registryUrl的protocol值为zookeeper,所有getRegistry方法实现要看ZookeeperRegistryFactory。ZookeeperRegistryFactory继承自AbstractRegistryFactory,getRegistry方法在父类中已经被实现。
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 先从缓存中获取,没有的话通过createRegistry方法创建
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// createRegistry方法是一个抽象方法,有子类实现,因为此时的注册协议为zookeeper,所以看ZookeeperRegistryFactory#createRegistry方法
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 放入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
该方法主要从缓存中获取Registry实例,如果缓存中没有就调用createRegistry方法创建,然后在放入缓存。通过ReentrantLock来确保Registry不会被重复创建。createRegistry方法是一个抽象方法,由子类去实现,因为此时的protocol值为zookeeper,所有看ZookeeperRegistryFactory的createRegistry方法实现。
public Registry createRegistry(URL url) {
// 返回一个ZookeeperRegistry对象
return new ZookeeperRegistry(url, zookeeperTransporter);
}
该方法简单粗暴的返回了一个ZookeeperRegistry对象。
4.1.1 ZookeeperRegistry构造函数实现
ZookeeperRegistry继承自FailbackRegistry,构造函数中首先会调用父类的构造函数,主要是设置注册失败后重试机制。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 父类的构造函数,会设置注册失败的轮询
super(url); // ①
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 连接上zkServer
zkClient = zookeeperTransporter.connect(url); // ②
// 添加状态监控器,当发生重连接事件时,对注册和订阅信息进行覆盖
zkClient.addStateListener(state -> { // ③
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
①: 调用父类构造函数,设置注册失败重试。基于HashedWheelTimer,默认每5s重试一次,最多重试128次。
②: 连接Zookeeper服务器。
③: 添加Zookeeper状态监听器,当发生重连接事件时,对注册信和订阅信息进行覆盖。
4.2 执行服务注册
经过上面的步骤,获取到了ZookeeperRegistry对象。接下来就会执行注册逻辑。在执行注册逻辑之前,会获取通过调用getRegisteredProvinceUrl方法获取提供者的信息。简单理解处理过程就是: 将服务暴露的URL信息进行了修改,去掉了如monitor、bind.ip、bind.port、qos.enable、interfaces等的参数信息。
拿到更新之后的服务URL信息之后,从URL信息中获取register参数值,如为false,则不进行服务注册。默认为true,进行服务注册。
public void register(URL registryUrl, URL registeredProviderUrl) {
// RegistryFactory是个扩展接口,getRegistry方法被@Adaptive修饰,通过protocol的值选择具体的扩展实现类。
// RegisterFactory的扩展实现类都继承了AbstractRegisterFactory类,所以先回执行AbstractRegisterFactory#getRegistry方法,返回ZookeeperRegistry对象
Registry registry = registryFactory.getRegistry(registryUrl);
// ZookeeperRegistry对象继承FailbackRegistry,先执行FailbackRegistry#register方法
registry.register(registeredProviderUrl);
}
register方法中会再次获取Registry实例,因为之前已经创建过了,所以这次直接从缓存中获取即可。因为此时注册协议为zookeeper,ZookeeperRegistry继承了FailbackRegistry,且父类对register方法做了实现。
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 抽象方法,由子类实现,这里是ZookeeperRegistry
doRegister(url);
} catch (Exception e) {
// 省略异常处理代码
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
该方法的主体实现还是需要看子类ZookeeperRegistry的doRegister方法实现。
public void doRegister(URL url) {
try {
// 通过调用zkClient.create方法注册服务节点信息到zookeeper
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
该方法通过zkClient向Zookeeper服务器创建url相关的目录。
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
// 判断是否是临时节点
if (ephemeral) {
createEphemeral(path);
} else {
// 第一层的root节点,也就是/dubbo目录是持久节点
// 第二层的service节点,也就是/dubbo/xxx.xxx.xxxService目录是持久节点
// 第三层的type节点,也就是/dubbo/xxx.xxx.xxxService/providers目录是持久化节点
createPersistent(path);
}
}
create方法对path进行递归创建,如果path是临时节点,则会创建临时目录,反正创建持久目录。在dubbo服务中,第一层的root节点、第二层的service节点、第三层的type节点,都是持久化目录,其余是临时目录。
4.3 服务注册小结
服务注册总体分为两步,首先根据协议获取Registry实例,之后通过注册中心实例注册服务。注册服务的过程也就是向Zookeeper服务器创建服务相关信息的目录。
以上是关于dubbo学习-服务暴露与注册源码剖析的主要内容,如果未能解决你的问题,请参考以下文章