Dubbo中服务注册与发现实现原理

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo中服务注册与发现实现原理相关的知识,希望对你有一定的参考价值。

我们知道,现在一般微服务为了更好的管理都会提供对应的服务注册与服务发现机制,Dubbo作为一个RPC框架和微服务组件也提供了服务注册和服务发现机制,接下来我们看看Dubbo是怎么实现的

服务注册

我们还是基于之前 dubbo服务端启动源码分析
我们知道,在对外发布服务的时候是通过ServiceConfig.export来进行服务的暴露发布的,而服务注册的时机也是在这个时候,ServiceConfig.export,会调用doExportUrls来进行服务发布和注册:

private void doExportUrls() 
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );

        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

        for (ProtocolConfig protocolConfig : protocols) 
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        
    

List registryURLs = ConfigValidationUtils.loadRegistries(this, true);

这个方法会判断当前是否是有注册中心的配置,系统是否配置了如下配置:

dubbo.registries.beijing.id=beijing
dubbo.registries.beijing.address=xxxx
dubbo.registries.beijing.zone=beijing

dubbo.registry.protocol=

那么这时候会重新生成基于注册中心的URL,会将URL.protocol设置为registry,这个时候暴露服务的URL就改变了,我们接着往下看:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) 
        Map<String, String> map = buildAttributes(protocolConfig);
        serviceMetadata.getAttachments().putAll(map);
        URL url = buildUrl(protocolConfig, registryURLs, map);
        exportUrl(url, registryURLs);
    
    private void exportUrl(URL url, List<URL> registryURLs) 
        String scope = url.getParameter(SCOPE_KEY);
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) 
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) 
                exportLocal(url);
            
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) 
                url = exportRemote(url, registryURLs);
                MetadataUtils.publishServiceDefinition(url);
            
        
        this.urls.add(url);
    
private URL exportRemote(URL url, List<URL> registryURLs) 
        if (CollectionUtils.isNotEmpty(registryURLs)) 
            for (URL registryURL : registryURLs) 
                if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) 
                    url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
                
                if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) 
                    continue;
                
                url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                if (monitorUrl != null) 
                    url = url.putAttribute(MONITOR_KEY, monitorUrl);
                
                String proxy = url.getParameter(PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) 
                    registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                
                doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
            
         else 
            if (MetadataService.class.getName().equals(url.getServiceInterface())) 
                MetadataUtils.saveMetadataURL(url);
            
            doExportUrl(url, true);
        
        return url;
    
private void doExportUrl(URL url, boolean withMetaData) 
        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
        if (withMetaData) 
            invoker = new DelegateProviderMetaDataInvoker(invoker, this);
        
        Exporter<?> exporter = PROTOCOL.export(invoker);
        exporters.add(exporter);
    

可以看到最终通过

PROTOCOL.export(invoker);

来进行发布,而这里的PROTOCOL:

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

Protocol$Adaptive:

public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException 
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);

这里获取的Protocol则是获取URL中对应的protocol类型,而上面分析这里对应就是的registry,而Dubbo中的dubbo-registry中META-INF.dubbo.internalorg.apache.dubbo.rpc.Protocol文件中内容如下:

registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol
service-discovery-registry=org.apache.dubbo.registry.integration.RegistryProtocol

registry获取到的Protocol为InterfaceCompatibleRegistryProtocol,而其暴露服务在其父类RegistryProtocol助攻完成:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException 
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        final Registry registry = getRegistry(registryUrl);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) 
            register(registry, registeredProviderUrl);
        
        registerStatedUrl(registryUrl, registeredProviderUrl, register);
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        notifyExport(exporter);
        return new DestroyableExporter<>(exporter);
    

这里首先会将服务暴露出去:

final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl);

服务暴露成功后,在将服务注册到注册中心去:

register(registry, registeredProviderUrl);

private void register(Registry registry, URL registeredProviderUrl) 
        registry.register(registeredProviderUrl);
    

而这里具体执行的`Registry``则通过如下方式获得:

Registry registry = getRegistry(registryUrl);
protected Registry getRegistry(final URL registryUrl) 
        return registryFactory.getRegistry(registryUrl);
    

而这里的registryFactoryRegistryProtocol一个属性,提供了get、set方法,我们需要注意的是,这里是通过SPI机制来加载RegistryProtocol,在加载RegistryProtocol的时候SPI机制同时会判断其属性有没有set方法,如果有set方法,会在用SPI机制加载属性对应的接口实现并注入。我们以nacos举例,对应加载的RegistryFactoryNacosRegistryFactory创建的为NacosRegistry.而在NacosRegistry通过NacosNamingService将服务信息注册到了Nacos中。
而我们来看看注册的时候注册了哪些东西:

public void doRegister(URL url) 
        final String serviceName = getServiceName(url);
        final Instance instance = createInstance(url);
        execute(namingService -> namingService.registerInstance(serviceName,
                getUrl().getGroup(Constants.DEFAULT_GROUP), instance));
    

这里获取服务名称逻辑如下:

private String getServiceName(URL url, String category) 
        return category + SERVICE_NAME_SEPARATOR + url.getColonSeparatedKey();
    
    public String getColonSeparatedKey() 
        StringBuilder serviceNameBuilder = new StringBuilder();
        serviceNameBuilder.append(this.getServiceInterface());
        append(serviceNameBuilder, VERSION_KEY, false);
        append(serviceNameBuilder, GROUP_KEY, false);
        return serviceNameBuilder.toString();
    

可以看到,getServiceName返回的是一个多个部分拼接起来的字符串,包含如下部分:

  • 类别,主要有providers,consumers,routers,configurators
  • 接口全名称
  • 版本号
  • 分组

这几个信息用:连接。
Instance创建逻辑如下:

private Instance createInstance(URL url) 
        // Append default category if absent
        String category = url.getCategory(DEFAULT_CATEGORY);
        URL newURL = url.addParameter(CATEGORY_KEY, category);
        newURL = newURL.addParameter(PROTOCOL_KEY, url.getProtocol());
        newURL = newURL.addParameter(PATH_KEY, url.getPath());
        String ip = url.getHost();
        int port = url.getPort();
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setMetadata(new HashMap<>(newURL.getParameters()));
        return instance;
    

然后将上述信息注册到Nacos中,这样就完成了服务端的服务注册。
需要注意的是,如果我们配置register=false的话,那么这里是不会注册到注册中心的,也就是我们可以在DubboService的属性中配置register=false这样被注解的服务就不会注册到注册中心了。

服务发现

与服务注册类似,服务发现是在RegistryProtocol.refer中实现的,通过之前Dubbo消费端启动流程、处理逻辑,方法调用实现
的分析,我们发现,消费端启动流程在RegistryProtocol.refer中返回的是一个ServiceDiscoveryMigrationInvoker,而其触发消费端的注册和消费端的服务订阅则是在interceptInvoker:

protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl, URL registryURL) 
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) 
            return invoker;
        

        for (RegistryProtocolListener listener : listeners) 
            listener.onRefer(this, invoker, consumerUrl, registryURL);
        
        return invoker;
    

Dubbo中默认在MigrationRuleListener实现了onRefer的事件处理:

public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) 
        MigrationRuleHandler<?> migrationRuleHandler = handlers.computeIfAbsent((MigrationInvoker<?>) invoker, _key -> 
            ((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
            return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
        );

        migrationRuleHandler.doMigrate(rule);
    

而这里最后会调用RegistryProtocol.getServiceDiscoveryInvoker:

public <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) 
        DynamicDirectory<T> directory = new ServiceDiscoveryRegistryDirectory<>(type, url);
        return doCreateInvoker(directory, cluster, registry, type);
    
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) 
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL urlToRegistry = new ServiceConfigURL(
            parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
            parameters.remove(REGISTER_IP_KEY), 0, getPath(parameters, type), parameters);
        if (directory.isShouldRegister()) 
            directory.setRegisteredConsumerUrl(urlToRegistry);
            registry.以上是关于Dubbo中服务注册与发现实现原理的主要内容,如果未能解决你的问题,请参考以下文章

微服务系列:服务注册与发现的实现原理及实现优劣势比较

Nacos服务注册与发现的原理

Eureka服务注册发现原理流程

Nacos 服务注册与发现原理分析

Eureka 服务注册与发现

Dubbo3终极特性「云原生三中心架构」带你探索Dubbo3体系下的配置中心和元数据中心注册中心的原理及开发实战(上)