Dubbo之Directory源码分析

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之Directory源码分析相关的知识,希望对你有一定的参考价值。

Directory的作用

首先看下Directory的接口定义

 
   
   
 
  1. public interface Directory<T> extends Node {

  2.    Class<T> getInterface();

  3.    List<Invoker<T>> list(Invocation invocation) throws RpcException;

  4. }

每个Directory实例会对应一个接口服务,它的主要功能是为Cluster提供远程对等调用invoker目录服务,list方法用于获取远程服务提供者的对等调用Invokers

 
   
   
 
  1. public interface Cluster {

  2.    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

  3. }

Directory用于获取多个远程对等调用invoker,而Cluster用于将这些invoker伪装成一个invoker进行集群调用,Cluster源码会单独讲解

Directory的两种实现

Directory有两种实现,StaticDirectory和RegistryDirectory,分别对应静态和动态的Invoker目录服务 这两个实现都继承了模板类AbstractDirectory,让我们来看下AbstractDirectory封装了什么逻辑 可以在AbstractDirectory中可以看到这么一个变量

 
   
   
 
  1. private volatile List<Router> routers;

routers用于路由,用于过滤远程对等调用invoker,关于Router路由源码我会单独讲解

可以看到AbstractDirectory实现了Directory接口的list方法

 
   
   
 
  1. public List<Invoker<T>> list(Invocation invocation) throws RpcException {

  2.        if (destroyed) {

  3.            throw new RpcException("Directory already destroyed .url: " + getUrl());

  4.        }

  5.        List<Invoker<T>> invokers = doList(invocation);

  6.        List<Router> localRouters = this.routers; // local reference

  7.        if (localRouters != null && !localRouters.isEmpty()) {

  8.            for (Router router : localRouters) {

  9.                try {

  10.                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {

  11.                        invokers = router.route(invokers, getConsumerUrl(), invocation);

  12.                    }

  13.                } catch (Throwable t) {

  14.                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);

  15.                }

  16.            }

  17.        }

  18.        return invokers;

  19.    }

上面逻辑留了模板方法doList给子类实现

 
   
   
 
  1. protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

子类实现doList方法,只负责invokers的获取,在AbstractDirectory中增加的router的过滤逻辑。消费者拿到的invoker集合,是经过routers过滤的。

对于StaticDirectory和RegistryDirectory,我们只要关注如何获取远程对等调用invokers的逻辑即可

StaticDirectory

StaticDirectory没什么好讲的,doList方法直接返回设置的invokers

 
   
   
 
  1. protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {

  2.        return invokers;

  3.    }

RegistryDirectory

RegistryDirectory实现了NotifyListener接口,会随着提供者的上下线动态刷新本地invoker缓存

对于invoker,在RegistryDirectory有两个缓存

 
   
   
 
  1. private volatile Map<String, Invoker<T>> urlInvokerMap;

  2. private volatile Map<String, List<Invoker<T>>> methodInvokerMap;

urlInvokerMap缓存url对应的invoker methodInvokerMap缓存方法对应的invokers,方法对应invoker可以存在多个,所以是 List<invoker >,在dolist方法中会用到 notify回调会刷新这两个缓存</invoker

我们先来看下RegistryDirectory的订阅操作

 
   
   
 
  1. public void subscribe(URL url) {

  2.        setConsumerUrl(url);

  3.        registry.subscribe(url, this);

  4.    }

这边的registry可以认为就是之前讲的ZookeeperRegistry,把RegistryDirectory自身作为订阅回调,一旦监控的路径发生变化,就会回调RegistryDirectory的notify方法

那么subscribe会订阅那些url?在RegistryProtocol中可以看到

 
   
   
 
  1. //directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现

  2.        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,

  3.                Constants.PROVIDERS_CATEGORY

  4.                        + "," + Constants.CONFIGURATORS_CATEGORY

  5.                        + "," + Constants.ROUTERS_CATEGORY));

会监听这个接口的provider,configurators,routers目录

接下来看notify的实现

 
   
   
 
  1. public synchronized void notify(List<URL> urls) {

  2.        //这里的通知会一次性传递对应监听目录下所有的url

  3.        List<URL> invokerUrls = new ArrayList<URL>();

  4.        List<URL> routerUrls = new ArrayList<URL>();

  5.        List<URL> configuratorUrls = new ArrayList<URL>();

  6.        //对url进行分类

  7.        for (URL url : urls) {

  8.            String protocol = url.getProtocol();

  9.            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

  10.            if (Constants.ROUTERS_CATEGORY.equals(category)

  11.                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {

  12.                routerUrls.add(url);

  13.            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)

  14.                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {

  15.                configuratorUrls.add(url);

  16.            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {

  17.                invokerUrls.add(url);

  18.            } else {

  19.                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());

  20.            }

  21.        }

  22.        // configurators

  23.        //刷新configuratorUrls

  24.        if (configuratorUrls != null && !configuratorUrls.isEmpty()) {

  25.            this.configurators = toConfigurators(configuratorUrls);

  26.        }

  27.        // routers

  28.        //刷新routers

  29.        if (routerUrls != null && !routerUrls.isEmpty()) {

  30.            List<Router> routers = toRouters(routerUrls);

  31.            if (routers != null) { // null - do nothing

  32.                setRouters(routers);

  33.            }

  34.        }

  35.        List<Configurator> localConfigurators = this.configurators; // local reference

  36.        // merge override parameters

  37.        this.overrideDirectoryUrl = directoryUrl;

  38.        //override这个overrideDirectoryUrl什么用?

  39.        if (localConfigurators != null && !localConfigurators.isEmpty()) {

  40.            for (Configurator configurator : localConfigurators) {

  41.                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);

  42.            }

  43.        }

  44.        // providers

  45.        //刷新客户端对等invoker

  46.        refreshInvoker(invokerUrls);

  47.    }

方法的开始,会对url进行分类,一共provider,configurators,routers三种类型 先刷新RegistryDirectory中的configurators,routers,再使用provider urls增量刷新invoker缓存 如果provider urls不存在,那么根据上一次的缓存provider urls,再使用router增量刷新invoker缓存

 
   
   
 
  1. private void refreshInvoker(List<URL> invokerUrls) {

  2.        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null

  3.                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

  4.            //传入的url protocol = empty,directory设置为禁用

  5.            this.forbidden = true; // Forbid to access

  6.            this.methodInvokerMap = null; // Set the method invoker map to null

  7.            //摧毁客户端的对等调用invoker

  8.            destroyAllInvokers(); // Close all invokers

  9.        } else {

  10.            this.forbidden = false; // Allow to access

  11.            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference

  12.            //invokerUrls为空,因为通知的url可能只改变了router或者configurator,提供者并没有变化,但是对应invoker配置还是需要被更改的

  13.            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {

  14.                //invokerUrls为空使用缓存的invokers urls,也就是上一次回调拿到invokers

  15.                invokerUrls.addAll(this.cachedInvokerUrls);

  16.            } else {

  17.                //更新缓存

  18.                this.cachedInvokerUrls = new HashSet<URL>();

  19.                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison

  20.            }

  21.            //invokerUrls为空,中止

  22.            if (invokerUrls.isEmpty()) {

  23.                return;

  24.            }

  25.            //把url转换为invoker,已经存在的invoker不会重新创建

  26.            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

  27.            //把newUrlInvokerMap转换为methodInvokerMap

  28.            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map

  29.            // state change

  30.            // If the calculation is wrong, it is not processed.

  31.            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {

  32.                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));

  33.                return;

  34.            }

  35.            //如果存在group配置,对method对应的invoker进行cluster伪装

  36.            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;

  37.            this.urlInvokerMap = newUrlInvokerMap;

  38.            try {

  39.                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker

  40.            } catch (Exception e) {

  41.                logger.warn("destroyUnusedInvokers error. ", e);

  42.            }

  43.        }

  44.    }

在refreshInvoker方法中,会根据消费者url的protocol过滤掉不匹配的提供者url,然后对过滤后的提供者url生成远程对等调用invoker,如果invoker已经存在,那么不用再重复创建

接下来看下doList方法的实现

 
   
   
 
  1. public List<Invoker<T>> doList(Invocation invocation) {

  2.        if (forbidden) {

  3.            // 1. No service provider 2. Service providers are disabled

  4.            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,

  5.                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()

  6.                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");

  7.        }

  8.        List<Invoker<T>> invokers = null;

  9.        //针对每个方法,有不同的invokers列表。可能存在路由配置

  10.        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference

  11.        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {

  12.            //从invocaion获取方法名和参数,可能是$invoke泛化调用

  13.            String methodName = RpcUtils.getMethodName(invocation);

  14.            Object[] args = RpcUtils.getArguments(invocation);

  15.            if (args != null && args.length > 0 && args[0] != null

  16.                    && (args[0] instanceof String || args[0].getClass().isEnum())) {

  17.                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter

  18.            }

  19.            if (invokers == null) {

  20.                invokers = localMethodInvokerMap.get(methodName);

  21.            }

  22.            if (invokers == null) {

  23.                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);

  24.            }

  25.            if (invokers == null) {

  26.                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();

  27.                if (iterator.hasNext()) {

  28.                    invokers = iterator.next();

  29.                }

  30.            }

  31.        }

  32.        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;

  33.    }

直接从methodInvokerMap中获取对应的invoker集合即可。优先通过方法名查找,如果找不到,通过*作为key查找,再找不到,返回methodInvokerMap第一个invoker集合。

讲解完子类需要实现的doList方法后,下面看下RegistryDirectory是如何被使用到的

RegistryDirectory的使用

RegistryDirectory封装了获取远程对等invokers的逻辑,主要使用在RegistryProtocol的doRefer方法

 
   
   
 
  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

  2.        //这边的url为consumer url

  3.        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

  4.        directory.setRegistry(registry);

  5.        //这里的protocol为spi注入的适配类

  6.        directory.setProtocol(protocol);

  7.        // all attributes of REFER_KEY

  8.        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

  9.        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

  10.        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())

  11.                && url.getParameter(Constants.REGISTER_KEY, true)) {

  12.            //注册consumer url

  13.            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,

  14.                    Constants.CHECK_KEY, String.valueOf(false)));

  15.        }

  16.        //directory订阅url对应interface的provider,configurators,routers接口目录,回调接口NotifyListener由RegistryDirectory实现

  17.        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,

  18.                Constants.PROVIDERS_CATEGORY

  19.                        + "," + Constants.CONFIGURATORS_CATEGORY

  20.                        + "," + Constants.ROUTERS_CATEGORY));

  21.        //通过cluster封装获取invoker的逻辑,将对多个invoker的集群调用封装成一个invoker

  22.        Invoker invoker = cluster.join(directory);

  23.        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);

  24.        return invoker;

  25.    }

RegistryDirectory通过与Cluster配合,将对多个invoker的集群调用封装成一个invoker,然后通过代理把invoker转换为代理对象bean,放入spring容器中去,就和正常使用本地bean一样。这就是RPC。

总结

Dubbo的目录服务,用于获取远程对等invoker,其实这种设计在业务场景中也能用到。 比如我们公司项目的司机池功能,对于每个订单都有一个司机池,并且这个司机池会随着司机状态变化而发生变化,也可以参考Directory的接口设计。


以上是关于Dubbo之Directory源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo 源码分析 - 集群容错之 Router

dubbo源码-从客户端看Registry和Directory

Dubbo之服务导出源码分析

Dubbo源码分析之XML的Bean解析

源码分析 Sentinel 之 Dubbo 适配原理

源码分析 Sentinel 之 Dubbo 适配原理