Dubbo原理和源码解析之服务引用

Posted 漫谈Java架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo原理和源码解析之服务引用相关的知识,希望对你有一定的参考价值。

一、框架设计

在官方 《 Dubbo 开发指南 》 框架设计部分,给出了引用服务时序图:

另外,在官方 《 Dubbo 用户指南 》 集群容错部分,给出了服务引用的各功能组件关系图:

public Object getObject() throws Exception {    return get();
}

ReferenceConfig.java

public synchronized T get() {    if (destroyed){        throw new IllegalStateException("Already destroyed!");
    }    if (ref == null) {
        init();
    }    return ref;
}private void init() {    //.......忽略
    ref = createProxy(map);
}private T createProxy(Map<String, String> map) {    //.....忽略
    invoker = refprotocol.refer(interfaceClass, urls.get(0));    //.....忽略
    // 创建服务代理
    return (T) proxyFactory.getProxy(invoker);
}

2.2 服务发现

因为通过注册中心,因此在 ReferenceConfig.java#createProxy()  方法中,进入 RegistryProtocol.java#refer()  方法。

RegistryProtocol.java

private Cluster cluster;public void setCluster(Cluster cluster) {
    this.cluster = cluster;
}private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

RegistryDirectory 通过 RegistryDirectory#subscribeUrl() 向 Zookeeper 订阅服务节点信息并 watch 变更,这样就实现了服务自动发现。

2.3 Invoker选取

2.3.1 Cluster

上面我之所以把设置 Cluster 的代码贴上,是因为此处涉及到一个 Dubbo 服务框架核心的概念——微内核和插件机制( 此处会单独一篇文章详细介绍 ):

有关 Dubbo 的设计原则,请查看Dubbo《 一些设计上的基本常识 》。

Cluster 类的定义如下:

Cluster.java

@SPI(FailoverCluster.NAME)public interface Cluster {    /**
     * Merge the directory invokers to a virtual invoker.
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

cluster 的类型是 Cluster$Adaptive ,实际上是一个通用的代理类,它会根据 URL 中的 cluster 参数值定位到实际的 Cluster 实现类(默认是 FailoverCluster)。 由于 ExtensionLoader  在实例化对象时,会在实例化完成之后自动套上 Wrapper 类,而 MockerClusterWrapper  就是这样一个 Wrapper。

MockerClusterWrapper.java

public class MockClusterWrapper implements Cluster {    private Cluster cluster;    public MockClusterWrapper(Cluster cluster) {        this.cluster = cluster;
    }    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {        return new MockClusterInvoker<T>(directory, this.cluster.join(directory));
    }
}

也就是说,实例化出来的 FailoverCluster  会作为参数赋予 MockerClusterWrapper#cluster ,而 MockClusterWrapper  会作为参数赋予 RegistryProtocol#cluster 。因此 RegistryProtocol#doRefer()  中调用 cluster.join(directory)  实际上是调用的 MockClusterWrapper#join(directory) 。 使用这种机制,可以把一些公共的处理放在 Wrapper 类中,实现代码和功能收敛。

MockClusterInvoker.java

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),
                             Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
    if (value.length() == 0 || value.equalsIgnoreCase("false")){        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {        if (logger.isWarnEnabled()) {
            logger.info("force-mock: " + invocation.getMethodName() + 
                        " force-mock enabled , url : " +  directory.getUrl());
        }        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        }catch (RpcException e) {            if (e.isBiz()) {                throw e;
            } else {                if (logger.isWarnEnabled()) {
                    logger.info("fail-mock: " + invocation.getMethodName() + 
                            " fail-mock enabled , url : " +  directory.getUrl(), e);
                }                //fail:mock
                result = doMockInvoke(invocation, e);
            }
        }
    }    return result;
}

这里还涉及到 Dubbo 另外一个核心机制——Mock。Mock 可以在测试中模拟服务调用的各种异常情况,还用来实现服务降级。 从 MockClusterWrapper.join()  方法可知,实际创建的 ClusterInvoker  是封装了 FailoverClusterInvoker  的 MockerClusterInvoker 。

在 MockerClusterInvoker  中,调用之前 Dubbo 会先检查 URL 中是否有 mock 参数(通过服务治理后台 Consumer 端的屏蔽和容错进行设置,或者直接动态设置 mock 参数值),如果存在且以 force 开头,则不发起远程调用直接执行降级逻辑;如果存在且以 fail 开头,则在远程调用异常时才会执行降级逻辑。

因此,通过 MockerClusterWrapper  成功地在 Invoker 中植入了 Mock 机制。

2.3.2 Directory

在 RegistryProtocol#doRefer()  中可以看到,服务发现过程是通过 RegistryDirectory  向 Zookeeper 订阅来实现的。 先看看 Directory 类之间的关系:

public interface Directory<T> extends Node {    
    Class<T> getInterface();

    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

Directory 可以看做是对应 Interface 的 Invoker 列表,而这个列表可能是动态变化的,比如注册中心推送变更。

通过 ReferenceConfig#createProxy()  方法可知, StaticDirectory  主要用于多注册中心引用的场景,它的 invoker 列表是通过参数传入的、固定的。在此不做更详细的解析了。

RegistryDirectory 用于使用单注册中心发现服务的场景。 RegistryDirectory  没有重写 list() 方法,所以使用 AbstractDirectory#list()  方法:

AbstractDirectory.java 

public List<Invoker<T>> list(Invocation invocation) throws RpcException {    if (destroyed) {        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }    List<Invoker<T>> invokers = doList(invocation);    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {        for (Router router : localRouters) {            try {                if (router.getUrl() == null || 
                         router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + 
                              ", cause: " + t.getMessage(), t);
            }
        }
    }    return invokers;
}

RegistryDirectory.java

/**
 * 获取 invoker 列表
 */public List<Invoker<T>> doList(Invocation invocation) {    if (forbidden) {        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +
                      NetUtils.getLocalHost() + " access service " + getInterface().getName() + 
                      " from registry " + getUrl().getAddress() + " use dubbo version " +
                       Version.getVersion() + 
                       ", Please check registry access list (whitelist/blacklist).");
    }    List<Invoker<T>> invokers = null;    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;  //本地缓存
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {        String methodName = RpcUtils.getMethodName(invocation);        //根据方法名从本地缓存中获取invoker列表,此处略
        //……
    }    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}/**
 * 节点变更通知
 */public synchronized void notify(List<URL> urls) {    List<URL> invokerUrls = new ArrayList<URL>();    List<URL> routerUrls = new ArrayList<URL>();    List<URL> configuratorUrls = new ArrayList<URL>();    for (URL url : urls) {        String protocol = url.getProtocol();        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);        if (Constants.ROUTERS_CATEGORY.equals(category) 
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + 
                        url + " from registry " + getUrl().getAddress() + 
                        " to consumer " + NetUtils.getLocalHost());
        }
    }    // configurators 
    if (configuratorUrls != null && configuratorUrls.size() >0 ){        this.configurators = toConfigurators(configuratorUrls);
    }    // routers
    if (routerUrls != null && routerUrls.size() >0 ){        List<Router> routers = toRouters(routerUrls);        if(routers != null){ // null - do nothing
            setRouters(routers);
        }
    }    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;    if (localConfigurators != null && localConfigurators.size() > 0) {        for (Configurator configurator : localConfigurators) {            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }    // providers
    refreshInvoker(invokerUrls);
}/**
 * 根据invokerURL列表转换为invoker列表
 */private void refreshInvoker(List<URL> invokerUrls){    //......
    Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
    //......}/**
 * 合并url参数 顺序为override > -D >Consumer > Provider
 */private Map<String, Invoker<T>> toInvokers(List<URL> urls) {    //......
    URL url = mergeUrl(providerUrl);    //......}

在 dolist() 方法中,如果通过服务治理禁止 Consumer 访问的话,此处直接抛出响应的异常。

RegistryDirectory 实现了 NotifyListener ,在 ZK 节点变化时能收到通知更新内存缓存,其中 RegistryDirectory# mergeUrl()  方法中会按照优先级合并参数(动态配置在此处生效)。

服务引用时从内存缓存中获取并返回invoker列表,并根据路由规则再进行一次过滤。

2.3.3 Router

Router 的作用就是从 Directory 的 invoker 列表中刷选出符合路由规则的 invoker 子集。目前 Dubbo 提供了基于IP、应用名和协议等的静态路由功能,功能和实现比较简单,在此不做过多解释。

2.3.4 LoadBalance

public Result invoke(final Invocation invocation) throws RpcException {

    checkWheatherDestoried();

    LoadBalance loadbalance;
    
    List<Invoker<T>> invokers = list(invocation);    if (invokers != null && invokers.size() > 0) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class)
                     .getExtension(invokers.get(0).getUrl()
                     .getMethodParameter(invocation.getMethodName(),
                                         Constants.LOADBALANCE_KEY, 
                                         Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class)
                     .getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);    return doInvoke(invocation, invokers, loadbalance);
}

FailoverClusterInvoker.java

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);    int len = getUrl().getMethodParameter(invocation.getMethodName(), 
                       Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;    if (len <= 0) {
        len = 1;
    }    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());    Set<String> providers = new HashSet<String>(len);    for (int i = 0; i < len; i++) {        //重试时,进行重新选择,避免重试时invoker列表已发生变化.
        //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        if (i > 0) {
            checkWheatherDestoried();
            copyinvokers = list(invocation);            //重新检查一下
            checkInvokers(copyinvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);        try {
            Result result = invoker.invoke(invocation);            if (le != null && logger.isWarnEnabled()) {
                logger.warn("", le);
            }            return result;
        } catch (RpcException e) {            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }    throw new RpcException();
}

在调用或重试时,每次都通过 LoadBalance 选出一个 Invoker 进行调用。

至此,调用流程结束。


以上是关于Dubbo原理和源码解析之服务引用的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo原理和源码解析之服务暴露

Dubbo原理何源码解析之服务暴露

dubbo框架-学习-dubbo原理

【dubbo源码】13. 服务消费方之@Reference依赖注入原理

Dubbo 源码分析系列之三 —— 架构原理

Dubbo推刊