Dubbo服务的发布流程

Posted Java Miraculous

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo服务的发布流程相关的知识,希望对你有一定的参考价值。

今天这篇文章非常关键,堪称Dubbo的核心内容,出去面试的时候如果问你Dubbo相关的问题,十有八九是Dubbo服务的发布和引用流程,由于内容太多,今天先分析Dubbo的服务发布流程。

中的写的那个demo吗,其中Provider中的testProvider方法的第6步(导出服务),也就是serviceConfig.export()这个方法,服务提供方需要调用这个方法来发布服务。

看下serviceConfig.export()这个方法的代码:

public synchronized void export() { checkAndUpdateSubConfigs();        //一、是否需要导出服务 if (!shouldExport()) { return; } //二、是否需要延迟发布 if (shouldDelay()) { delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS); } else {            //三、直接发布 doExport(); } }

接下来对这个方法中的关键方法进行解析,checkAndUpdateSubConfigs()这个方法就不细说了,就是检测下配置,自己可以点点看

一、shouldExport()方法:

private boolean shouldExport() { //1 /**      是否要导出这个服务 protected Boolean export;       */ Boolean shouldExport = getExport(); //2 /**      provider配置 private ProviderConfig provider;         */ if (shouldExport == null && provider != null) { shouldExport = provider.getExport();        }        //3 // default value is true if (shouldExport == null) { return true;        } return shouldExport; }

1.1、getExport()这个方法你点进去你会发现他是dubbo配置类的一个属性,叫做:

/**     * Whether to export the service(是否导出服务) */ protected Boolean export;

你可以初始化serviceConfig的时候给它设置值的:

1.2、如果没有配置export属性并且provider不为空(注意下这个provider

/** * The provider configuration */ private ProviderConfig provider;

其实也是serviceConfig的一个配置)就从provider中获取

1.3、经过前两步之后如果shouldExport还是空,那就默认为true,也就是导出服务

二、shouldDelay()方法,是否延迟发布

private boolean shouldDelay() { //1 Integer delay = getDelay(); //2 /**         延时发布多少毫秒 protected Integer delay;        */ if (delay == null && provider != null) { delay = provider.getDelay(); } //3 return delay != null && delay > 0; }

2.1、getDeley()方法其实取得是serviceConfig的delay属性

/** * The time delay register service (milliseconds) */ protected Integer delay;

可以看到单位是毫秒,如果设置了这个属性,就会延时多少毫秒之后发布服务。

2.2、如果没有设置delay的值并且provider配置不为空的话,就从provider中获取。

2.3、如果设置的delay不为空并且大于0的话肯定就是要延迟发布了。

通过上面的代码可知,Dubbo延迟发布其实是使用ScheduledExcutorService(定时任务线程池)来实现的

/** * A delayed exposure service timer */ private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));

三、doExport()直接发布,其实延时发布最终调用的也是这个方法

protected synchronized void doExport() { //1  /**          服务导出或者不导出的一个标识,如果true表示不用导出这个服务 private transient volatile boolean unexported; */ if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } //2 /**        服务是否已经被导出,true表示该服务已经被导出了 private transient volatile boolean exported;       */ if (exported) { return; }        //3 开始导出前把是否已经被导出标记成已导出 exported = true; //4 /**      服务的名称      private String path; */ /**         被导出服务的接口名          private String interfaceName;       */ if (StringUtils.isEmpty(path)) { path = interfaceName; } //5 导出成url doExportUrls(); }

3.1、判断这个服务是不是被标记成不导出,如果标记了就直接抛出异常

3.2、判断服务是否已被导出,如果已被导出,就直接返回

3.3、准备导出,导出前将exported标记为true

3.4、判断如果path为空,就把要导出服务的接口名字赋值给path

3.5、导出成url,可以看到又是一个方法,继续看doExportUrls()这个方法吧

@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { //1 加载注册中心 List<URL> registryURLs = loadRegistries(true); //2 for (ProtocolConfig protocolConfig : protocols) {            //2.1 封装pathKey String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);            //2.2 将要暴露的服务封装成一个pojo ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); //2.3 将要暴露的服务加入缓存 ApplicationModel.initProviderModel(pathKey, providerModel);            //2.4  doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }

3.5.1、loadRegistries(),加载注册中心

protected List<URL> loadRegistries(boolean provider) { List<URL> registryList = new ArrayList<URL>();        // 1 判断注册中心是否为空 if (CollectionUtils.isNotEmpty(registries)) { for (RegistryConfig config : registries) {                //2 获取注册中心地址 String address = config.getAddress(); if (StringUtils.isEmpty(address)) {                    //3 如果注册中心地址不存在,就给个默认值0.0.0.0 address = Constants.ANYHOST_VALUE; }                //4 如果注册中新地址不可用,也就是直连的意思  N/A,前面加非,意思就是不是直连的时候 if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>();                    //5 应用信息添加到map中,比如应用名 appendParameters(map, application);                    //6 将注册中心的配置放入map appendParameters(map, config);                    //7 将path放入map,"path" -> "org.apache.dubbo.registry.RegistryService" map.put(Constants.PATH_KEY, RegistryService.class.getName());                    //8 将dubbo的版本,当前的时间戳以及运行时的进程号放入map appendRuntimeParameters(map);                    //9 如果参数中不包含协议,那么协议默认为Dubbo协议 if (!map.containsKey(Constants.PROTOCOL_KEY)) { map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL); }                    //10 把map转换成url zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736&release=2.7.1&timestamp=1606877890879 List<URL> urls = UrlUtils.parseURLs(address, map);                    //11 遍历url for (URL url : urls) {                        //12 改变url registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-provider&dubbo=2.0.2&pid=1736&registry=zookeeper&release=2.7.1&timestamp=1606877890879 url = URLBuilder.from(url) .addParameter(Constants.REGISTRY_KEY, url.getProtocol()) .setProtocol(Constants.REGISTRY_PROTOCOL) .build(); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {                             //13 放入注册中心集合,因为可能有多个注册中心 registryList.add(url); } } } } } return registryList; }

接下来对上面的每一步都进行详细的解析,因为真的很重要:

3.5.1.1、判断注册中心是否为空,我们看下这个registries

/**     * Registry centers  注册中心配置 */ protected List<RegistryConfig> registries;

为什么是个list?因为dubbo服务可以被暴露到多个注册中心,比如我可以同时把我的服务暴露到zk和redis,就是这个道理

Dubbo服务的发布流程

3.5.1.3、如果这个adress为空的话,就给个缺省值0.0.0.0

3.5.1.4、如果注册中心不可用,意思就是直连,用过dubbo的人应该知道N/A,其实就是不连注册中心,消费者直接调用提供者,这里是不是直连的时候进入,所以配置不能是N/A,我现在用的zk,所以肯定会进入的,继续往下走

3.5.1.5、将应用信息放入map,其实就是初始化ApplicationConfig的时候传入的那个dubbo-provider,标识提供者的名称

Dubbo服务的发布流程

3.5.1.6、将注册中心配置放入map,可以看到并未放入任何东西,map的大小还是1

Dubbo服务的发布流程

3.5.1.7、将path放入map

Dubbo服务的发布流程

3.5.1.8、添加一些运行时的信息

Dubbo服务的发布流程

3.5.1.9、添加协议,如果没有设置,默认是Dubbo协议

Dubbo服务的发布流程

Dubbo服务的发布流程

3.5.1.11-13、转换url,注意看下和上面的有什么不同

Dubbo服务的发布流程

对比一下你会发现url的前缀由zookeeper变成了registry了,并且连接中拼上了registry=zookeeper,然后将url放入注册中心集合。

3.5.2、测试类里咱并未给serviceConfig设置protocols属性,看看默认的是什么

Dubbo服务的发布流程

3.5.2.1、封装pathKey

这里面path是服务名,也就是你要暴露的服务,值是你暴露的接口的名字,group就是你设置的dubbo的分组,version就是服务的版本,所以经过这句代码之后,看下pathKey的值

String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);

Dubbo服务的发布流程

3.5.2.2、将要暴露的服务封装成一个pojo

ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);

先看下这句代码里的providerModel这个类:

package org.apache.dubbo.rpc.model;
import java.lang.reflect.Method;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;
/**    要发布的服务的模型,意思就是把你要发布的服务封装成一个pojo了 * ProviderModel which is about published services */public class ProviderModel { //服务名,这个其实就是pathKey private final String serviceName; //服务的实例 private final Object serviceInstance;    //服务的接口名 private final Class<?> serviceInterfaceClass;    //接口里的方法 private final Map<String, List<ProviderMethodModel>> methods = new HashMap<String, List<ProviderMethodModel>>();
public ProviderModel(String serviceName, Object serviceInstance, Class<?> serviceInterfaceClass) { if (null == serviceInstance) { throw new IllegalArgumentException("Service[" + serviceName + "]Target is NULL."); }
this.serviceName = serviceName; this.serviceInstance = serviceInstance; this.serviceInterfaceClass = serviceInterfaceClass;
initMethod(); }

public String getServiceName() { return serviceName; }
public Class<?> getServiceInterfaceClass() { return serviceInterfaceClass; }
public Object getServiceInstance() { return serviceInstance; }
public List<ProviderMethodModel> getAllMethods() { List<ProviderMethodModel> result = new ArrayList<ProviderMethodModel>(); for (List<ProviderMethodModel> models : methods.values()) { result.addAll(models); } return result; }
public ProviderMethodModel getMethodModel(String methodName, String[] argTypes) { List<ProviderMethodModel> methodModels = methods.get(methodName); if (methodModels != null) { for (ProviderMethodModel methodModel : methodModels) { if (Arrays.equals(argTypes, methodModel.getMethodArgTypes())) { return methodModel; } } } return null; }
private void initMethod() { Method[] methodsToExport = null; methodsToExport = this.serviceInterfaceClass.getMethods();
for (Method method : methodsToExport) { method.setAccessible(true);
List<ProviderMethodModel> methodModels = methods.get(method.getName()); if (methodModels == null) { methodModels = new ArrayList<ProviderMethodModel>(1); methods.put(method.getName(), methodModels); } methodModels.add(new ProviderMethodModel(method, serviceName)); } }
}

3.5.2.3、将要暴露的服务加入缓存

ApplicationModel.initProviderModel(pathKey, providerModel);

先看下initProviderModel这个方法:

public static void initProviderModel(String serviceName, ProviderModel providerModel) { if (providedServices.putIfAbsent(serviceName, providerModel) != null) { LOGGER.warn("Already register the same:" + serviceName); } }
/** * full qualified class name -> provided service */ private static final ConcurrentMap<String, ProviderModel> providedServices = new ConcurrentHashMap<>();

其实就是往这个concurrentHashMap中放值。key和value如下:

Dubbo服务的发布流程

3.5.2.4、接下来又是一个方法doExportUrlsFor1Protocol,继续看,这个方法可是有点长了

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {        //1.取出协议名,没有设置的话默认是dubbo协议 String name = protocolConfig.getName(); if (StringUtils.isEmpty(name)) { name = Constants.DUBBO; }        //2.初始化一个hashMap Map<String, String> map = new HashMap<String, String>();        //放入side map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);        //放入运行时的一些信息,比如时间戳,dubbo的版本,进程号,服务版本号等信息 appendRuntimeParameters(map); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this);        //3.判断方法的相关配置是否为空 if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } List<ArgumentConfig> arguments = method.getArguments(); if (CollectionUtils.isNotEmpty(arguments)) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); // visit all methods if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // target the method, and get its signature if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); // one callback in the method if (argument.getIndex() != -1) { if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // multiple callbacks in the method for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"); }
} } } // end of methods for }        //4 是否是泛型调用 if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { //5获取修订版本号,其实取得就是接口的版本号 String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); }            //6 获取接口里的方法 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) {                //如果接口中没有方法,就打印个警告日志:接口里没有方法 logger.warn("No method found in service interface " + interfaceClass.getName());                //往map里放入 methods->* map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else {                //如果接口里存在方法,就将方法名放入map,多个用逗号隔开 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } //7 判断token是否为空 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } // 8 获取提供者的ip地址和端口号 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }         //9 获取scope,这里为null,如果scope设置成none的话就不进行导出了 String scope = url.getParameter(Constants.SCOPE_KEY); // 如果配置成none就不导出了        if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {            // 如果scope的值不是remote,就进行本地导出 if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { //10 本地导出 exportLocal(url); }            // 11 如果scope的值不是local,就进行远程导出 if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); }                //12 判断注册中心是否为空 if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) {                        //给url拼接上dynamic参数,这个参数是从注册中心的url上获取,没有的话默认是false url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));                        //获取dubbo监控中心的url URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { //不为空的话拼接到url上面 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); }
                        // 13 从url上获取代理,对于提供者来说,可以用这个自定义的代理方式去创建invoker的 String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) {                            //不为空的话拼接到注册中心的url上 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); }                        //14 通过代理生成invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));                        //对exporter进行包装 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                        //通过相关协议将invoker导出成exporter Exporter<?> exporter = protocol.export(wrapperInvoker);                        //添加到要导出的服务列表里 exporters.add(exporter); } } else { //如果注册中心为空,直接生成invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                    //将invoker转换成exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */                 //15 元数据存储 MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }
@SuppressWarnings({"unchecked", "rawtypes"}) private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URLBuilder.from(url) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build(); Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }

3.5.2.4.1、先看一眼入参:

Dubbo服务的发布流程

3.5.2.4.2、一直运行到判断methods这一行,看下map中都存了什么玩意,先有个大概印象:

Dubbo服务的发布流程

3.5.2.4.3、判断方法的相关配置是否为空,咱的测试类里并未设置这个配置,所以会跳过去

3.5.2.4.4、判断是否是泛型调用,如果是的话设置泛型类型,这里也是跳过

3.5.2.4.5、获取修订的版本号,其实取得就是接口的版本号

3.5.2.4.6、将接口里的方法名都放入map,多个用逗号隔开

Dubbo服务的发布流程

3.5.2.4.7、判断token是否为空,这里有必要说下这个token,这个token是权限校验用的,意思要校验消费者有没有权限去调用这个服务,这个token属于提供者端的配置,消费者只有通过注册中心才能获取,目的就是为了防止消费者越过注册中心直接去调用用提供者。


Dubbo服务的发布流程

3.5.2.4.9、如果scope配置成none的话就不进行导出了,然后如果scope的值只要不是remote,就进行本地导出,因为这里咱没配置scope,所以值是null,那妥妥的本地导出了。

3.5.2.4.10、exportLocal()方法,本地导出

@SuppressWarnings({"unchecked", "rawtypes"}) private void exportLocal(URL url) {        //1.只要协议不是injvm,就导出 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {            //2.将url转换为本地导出的url URL local = URLBuilder.from(url) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST_VALUE) .setPort(0) .build();            //3.调用protocol的export方法得到一个exporter,并添加到要导出的集合exporters中 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }

3.5.2.4.10.1、只要协议不是injvm,就进行本地导出

3.5.2.4.10.2、传进来的url转换为本地导出的url,咱看下转换后的url是什么样子的:injvm://127.0.0.1/com.ayo.dubbo.service.UserService?anyhost=true&application=dubbo-provider&bind.ip=10.2.116.32&bind.port=20880&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&group=member&interface=com.ayo.dubbo.service.UserService&methods=say&pid=10384&register=true&release=2.7.1&revision=1.0.0&side=provider&timestamp=1606903863883&version=1.0.0

看见没,协议变成了injvm了,ip变成了127.0.0.1,端口号没了,这就是进行本地导出时候的url的变化,很重要,务必记住。

3.5.2.4.10.3、首先根据ref(要暴露接口的实现类)、接口类、本地导出的url调用proxyFactory的getInvoker方法获取一个invoker,然后再调用protocol的export方法将这个invoker导出成一个exporter!!本地导出完毕

3.5.2.4.11、如果scope的值不是local,就进行远程导出,由此可以推断,如果是local的时候只进行本地导出

3.5.2.4.12、判断注册中心url是否为空,其实就是判断是否存在注册中心,因为远程暴露其实就是把服务注册到注册中心

3.5.2.4.13、判断代理方式是否为空,不为空的话会选择传过来的代理方式生成invoker,可以自定义。

3.5.2.4.14、通过代理生成invoker,通过相关协议将invoker导出成exporter,对exporter进行包装,然后将exporter添加到要导出的服务列表里。

这篇文章就先写到这里,主要分析了服务提供者导出服务的过程,但其实核心的export方法底层做了很多工作,限于篇幅,下一篇再说!

以上是关于Dubbo服务的发布流程的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo工作流程

面试官问我:解释一下Dubbo服务暴露

dubbo与zookeeper

Dubbo实践(十四)生产者发布服务

dubbo分布式系统---环境搭建

Dubbo源码——服务的创建和暴露