Dubbo消费端集群负载均衡实现路由Router

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端集群负载均衡实现路由Router相关的知识,希望对你有一定的参考价值。

我们知道,一般Dubbo服务端会存在多个实例,Dubbo消费端内置了策略,可以进行负载均衡调度。我们接下来大致看下。
官方给出的路由、负载均衡整体架构如下:

在开始之前,先说下Dubbo中个几个概念:

  • Cluster 俗称的集群,为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。
  • Directory服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。而Invoker就是最终持有的远程方法调用
  • LoadBalance负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。负载均衡可分为软件负载均衡和硬件负载均衡。在我们日常开发中,一般很难接触到硬件负载均衡。但软件负载均衡还是可以接触到的,比如 nginx。在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。
  • Router服务路由,。服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。Dubbo 目前提供了三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。

上述解释来自Dubbo逛网。
可以看到,Dubbo的负载均衡就是借助这几个模块来实现。而其
基于之前的分析Dubbo消费端启动流程、处理逻辑,方法调用实现(基于Dubbo3)
我们知道在ReferenceConfig进行了Invoker的初始化:

private T createProxy(Map<String, String> map) 
        if (shouldJvmRefer(map)) 
            URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) 
                logger.info("Using injvm service " + interfaceClass.getName());
            
         else 
            urls.clear();
            if (url != null && url.length() > 0)  // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) 
                    for (String u : us) 
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) 
                            url = url.setPath(interfaceName);
                        
                        if (UrlUtils.isRegistry(url)) 
                            urls.add(url.putAttribute(REFER_KEY, map));
                         else 
                            URL peerURL = ClusterUtils.mergeUrl(url, map);
                            peerURL = peerURL.putAttribute(PEER_KEY, true);
                            urls.add(peerURL);
                        
                    
                
             else  // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) 
                    checkRegistry();
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) 
                        for (URL u : us) 
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) 
                                u = u.putAttribute(MONITOR_KEY, monitorUrl);
                            
                            urls.add(u.putAttribute(REFER_KEY, map));
                        
                    
                    if (urls.isEmpty()) 
                        throw new IllegalStateException(
                                "No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
                                        " use dubbo version " + Version.getVersion() +
                                        ", please config <dubbo:registry address=\\"...\\" /> to your spring config.");
                    
                
            

            if (urls.size() == 1) 
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
             else 
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) 
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) 
                        registryURL = url; // use last registry url
                    
                

                if (registryURL != null)  // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                 else  // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ?
                            (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
                                    Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                
            
        
        if (logger.isInfoEnabled()) 
            logger.info("Referred dubbo service " + interfaceClass.getName());
        
        URL consumerURL = new ServiceConfigURL(CONSUMER_PROTOCOL, map.get(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        MetadataUtils.publishServiceDefinition(consumerURL);

        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    

在这里我们看到,如果只有一个地址且非注册地址的话,那么就直连,不进行任何负载均衡处理。

如果有多个地址,且地址是注册地址的话,那么这里对应registry对应的protocol,
这里REF_PROTOCOL是一个AdaptiveExtension,

private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

在Dubhbo中对应的是Protocol$Adaptive,其refer实现如下:

public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException 
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);

这里我们就获取到了一个RegistryProtocol,其对应的refer实现如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException 
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) 
            return proxyFactory.getInvoker((T) registry, type, url);
        

        // group="a,b" or group="*"
        Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) 
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) 
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
            
        

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url, qs);
    

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) 
        Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
        consumerAttribute.remove(REFER_KEY);
        URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
            null,
            null,
            parameters.get(REGISTER_IP_KEY),
            0, getPath(parameters, type),
            parameters,
            consumerAttribute);
        url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
        ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
        return interceptInvoker(migrationInvoker, url, consumerUrl, url);
    

如果是RegistryService服务,直接注册。其他走doRefer.
而这里可以看到,如果我们配置了group属性,那么会在这里进行合并,获取到的是一个MergeableCluster.
而接下来的doRefer中则会进行实际的服务引用处理:

protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) 
        Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
        consumerAttribute.remove(REFER_KEY);
        URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
            null,
            null,
            parameters.get(REGISTER_IP_KEY),
            0, getPath(parameters, type),
            parameters,
            consumerAttribute);
        url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
        ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
        return interceptInvoker(migrationInvoker, url, consumerUrl, url);
    

ClusterInvoker migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);

这里返回的是一个ServiceDiscoveryMigrationInvoker.
另外需要注意看下这个方法interceptInvoker:

protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl, URL registryURL) 
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) 
            return invoker;
        

        for (RegistryProtocolListener listener : listeners) 
            listener.onRefer(this, invoker, consumerUrl, registryURL);
        
        return invoker;
    

这个方法相当于是调用了对应监听器,而Dubbo中默认的实现是MigrationRuleListener,当触发refer事件之后,会调用MigrationRuleHandler.doMigrate,最后会触发:

serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);

最后是返回了一个InvocationInterceptorInvoker.

回到ReferenceConfigcreateProxy

if (registryURL != null) // registry url is available
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
else
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));

这里如果是通过服务注册来获取的话,那么返回的是一个:ZoneAwareClusterInvoker(用来处理不同机房zone)
如果我们没有特殊指定,ZoneAwareClusterInvoker会持有一个FailoverClusterInvoker,同样的FailoverClusterInvoker里面会在持有一个Invoker
ZoneAwareClusterInvoker => FailoverClusterInvoker => Invoker
大概是这样一个封装流程。

zone,多注册中心机房处理

对于在配置中使用了机房配置的话,比如:

dubbo.registries.shanghai.id=shanghai
dubbo.registries.shanghai.address=xxxx
dubbo.registries.shanghai.zone=shanghai

dubbo.registries.beijing.id=beijing
dubbo.registries.beijing.address=xxxx
dubbo.registries.beijing.zone=beijing

为了进行流量的分离,需要北京机房的只能消费北京,上海机房只能消费上海的。
这时候ZoneAwareClusterInvoker首先会根据zone在多个机房里面选择一个:

public Result 猜测的rpc负载均衡原理,基于dubbo的架构

Dubbo在集群模式下的容错机制和负载均衡策略

Dubbo中集群Cluster,负载均衡,容错,路由解析

RPC原来就是socket——RPC框架到dubbo的服务动态注册,服务路由,负载均衡演化

dubbo:服务路由的实现

Dubbo 源码分析 - 集群容错之 Router