Dubbo消费端集群负载均衡实现路由Router
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo消费端集群负载均衡实现路由Router相关的知识,希望对你有一定的参考价值。
我们知道,一般Dubbo服务端会存在多个实例,Dubbo消费端内置了策略,可以进行负载均衡调度。我们接下来大致看下。
官方给出的路由、负载均衡整体架构如下:
在开始之前,先说下Dubbo中个几个概念:
Cluster
俗称的集群,为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。Directory
服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。而Invoker就是最终持有的远程方法调用LoadBalance
负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。负载均衡可分为软件负载均衡和硬件负载均衡。在我们日常开发中,一般很难接触到硬件负载均衡。但软件负载均衡还是可以接触到的,比如 nginx。在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。Router
服务路由,。服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。Dubbo 目前提供了三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。
上述解释来自Dubbo逛网。
可以看到,Dubbo的负载均衡就是借助这几个模块来实现。而其
基于之前的分析Dubbo消费端启动流程、处理逻辑,方法调用实现(基于Dubbo3)
我们知道在ReferenceConfig
进行了Invoker的初始化:
private T createProxy(Map<String, String> map)
if (shouldJvmRefer(map))
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled())
logger.info("Using injvm service " + interfaceClass.getName());
else
urls.clear();
if (url != null && url.length() > 0) // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0)
for (String u : us)
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath()))
url = url.setPath(interfaceName);
if (UrlUtils.isRegistry(url))
urls.add(url.putAttribute(REFER_KEY, map));
else
URL peerURL = ClusterUtils.mergeUrl(url, map);
peerURL = peerURL.putAttribute(PEER_KEY, true);
urls.add(peerURL);
else // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol()))
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us))
for (URL u : us)
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null)
u = u.putAttribute(MONITOR_KEY, monitorUrl);
urls.add(u.putAttribute(REFER_KEY, map));
if (urls.isEmpty())
throw new IllegalStateException(
"No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
" use dubbo version " + Version.getVersion() +
", please config <dubbo:registry address=\\"...\\" /> to your spring config.");
if (urls.size() == 1)
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
else
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls)
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url))
registryURL = url; // use last registry url
if (registryURL != null) // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
else // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
if (logger.isInfoEnabled())
logger.info("Referred dubbo service " + interfaceClass.getName());
URL consumerURL = new ServiceConfigURL(CONSUMER_PROTOCOL, map.get(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
在这里我们看到,如果只有一个地址且非注册地址的话,那么就直连,不进行任何负载均衡处理。
如果有多个地址,且地址是注册地址的话,那么这里对应registry
对应的protocol,
这里REF_PROTOCOL
是一个AdaptiveExtension
,
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
在Dubhbo中对应的是Protocol$Adaptive
,其refer实现如下:
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
这里我们就获取到了一个RegistryProtocol
,其对应的refer
实现如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type))
return proxyFactory.getInvoker((T) registry, type, url);
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0)
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group))
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters)
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl, url);
如果是RegistryService
服务,直接注册。其他走doRefer
.
而这里可以看到,如果我们配置了group
属性,那么会在这里进行合并,获取到的是一个MergeableCluster
.
而接下来的doRefer
中则会进行实际的服务引用处理:
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters)
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl, url);
ClusterInvoker migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
这里返回的是一个ServiceDiscoveryMigrationInvoker
.
另外需要注意看下这个方法interceptInvoker
:
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl, URL registryURL)
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners))
return invoker;
for (RegistryProtocolListener listener : listeners)
listener.onRefer(this, invoker, consumerUrl, registryURL);
return invoker;
这个方法相当于是调用了对应监听器,而Dubbo中默认的实现是MigrationRuleListener
,当触发refer事件之后,会调用MigrationRuleHandler.doMigrate
,最后会触发:
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
最后是返回了一个InvocationInterceptorInvoker
.
回到ReferenceConfig
的createProxy
处
if (registryURL != null) // registry url is available
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
else
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
这里如果是通过服务注册来获取的话,那么返回的是一个:ZoneAwareClusterInvoker
(用来处理不同机房zone)
如果我们没有特殊指定,ZoneAwareClusterInvoker会持有一个FailoverClusterInvoker,同样的FailoverClusterInvoker里面会在持有一个Invoker
即ZoneAwareClusterInvoker => FailoverClusterInvoker => Invoker
大概是这样一个封装流程。
zone,多注册中心机房处理
对于在配置中使用了机房配置的话,比如:
dubbo.registries.shanghai.id=shanghai
dubbo.registries.shanghai.address=xxxx
dubbo.registries.shanghai.zone=shanghai
dubbo.registries.beijing.id=beijing
dubbo.registries.beijing.address=xxxx
dubbo.registries.beijing.zone=beijing
为了进行流量的分离,需要北京机房的只能消费北京,上海机房只能消费上海的。
这时候ZoneAwareClusterInvoker
首先会根据zone
在多个机房里面选择一个:
public Result 猜测的rpc负载均衡原理,基于dubbo的架构