Dubbo负载均衡与集群容错
Posted 又蠢又笨的懒羊羊程序猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo负载均衡与集群容错相关的知识,希望对你有一定的参考价值。
负载均衡与集群容错
Invoker
在Dubbo中Invoker就是一个具有调用功能的对象,在服务提供端就是实际的服务实现,只是将服务实现封装起来变成一个Invoker。
在服务消费端,从注册中心得到服务提供者的信息之后,将一条条信息封装为Invoker,这个Invoker就具备了远程调用的能力。
综上,Dubbo就是创建了一个统一的模型,将可调用(可执行体)的服务对象都统一封装为Invoker。
而ClusterInvoker
就是将多个服务引入的Invoker封装起来,对外统一暴露一个Invoker,并且赋予这些Invoker集群容错的功能。
服务目录
服务目录,即Directory
,实际上它就是多个Invoker的集合,服务提供端一般都会集群分布,同样的服务会有多个提供者,因此需要一个服务目录来统一存放它们,需要调用服务的时候便从这个服务目录中进行挑选。
同时服务目录还是实现了NotifyListener
接口,当集群中新增了一台服务提供者或者下线了一台服务提供者,目录都会对服务提供者进行更新,新增或者删除对应的Invoker。
从上图中,可以看到用了一个抽象类AbstractDirectory
来实现 Directory
接口,抽象类中运用到了模板方法模式,将一些公共方法和逻辑写好,作为一个骨架,然后具体实现由了两个子类来完成,两个子类分别为StaticDirectory
和RegistryDirectory
。
RegistryDirectory
RegistryDirectory
实现了NotifyListener
接口,可以监听注册中心的变化,当注册中心配置发生变化时,服务目录也可以收到变更通知,然后根据更新之后的配置刷新Invoker列表。
由此可知RegistryDirectory
共有三个作用:
- 获取Invoker列表
- 监听注册中心
- 刷新Invoker列表
获取Invoker列表
RegistryDirectory
实现了父类AbstractDirectory
的抽象方法doList()
,该方法可以得到Invoker列表
public List<Invoker<T>> doList(Invocation invocation) {
if (this.forbidden) {
throw new RpcException(....);
} else {
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; //获取方法调用名和Invoker的映射表
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
//以下就是根据方法名和方法参数获取可调用的Invoker
if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
}
if (invokers == null) {
invokers = (List)localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = (List)localMethodInvokerMap.get("*");
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = (List)iterator.next();
}
}
}
return (List)(invokers == null ? new ArrayList(0) : invokers);
}
}
监听注册中心
通过实现NotifyListener
接口可以感知注册中心的数据变更。
RegistryDirectory
定义了三个集合invokerUrls
routerUrls
configuratorUrls
分别处理对应的配置然后转化成对象。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList();
List<URL> routerUrls = new ArrayList();
List<URL> configuratorUrls = new ArrayList();
Iterator i$ = urls.iterator();
while(true) {
while(true) {
while(i$.hasNext()) {
//....根据urls填充上述三个列表
}
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls); //根据urls转化为configurators配置
}
List localConfigurators;
if (routerUrls != null && !routerUrls.isEmpty()) {
localConfigurators = this.toRouters(routerUrls);
if (localConfigurators != null) {
this.setRouters(localConfigurators); //根据urls转化为routers配置
}
}
localConfigurators = this.configurators;
this.overrideDirectoryUrl = this.directoryUrl;
Configurator configurator;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for(Iterator i$ = localConfigurators.iterator(); i$.hasNext(); this.overrideDirectoryUrl = configurator.configure(this.overrideDirectoryUrl)) {
configurator = (Configurator)i$.next();
}
}
this.refreshInvoker(invokerUrls); //根据invokerUrls刷新invoker列表
return;
}
}
}
刷新Invoker列表
private void refreshInvoker(List<URL> invokerUrls) {
//如果invokerUrls只有一个URL并且协议是empty,那么清除所有invoker
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && "empty".equals(((URL)invokerUrls.get(0)).getProtocol())) {
this.forbidden = true;
this.methodInvokerMap = null;
this.destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; //获取旧的Invoker列表
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet();
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
//根据URL生成InvokerMap
Map<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);
//根据新的InvokerMap生成方法名和Invoker列表对应的Map
Map<String, List<Invoker<T>>> newMethodInvokerMap = this.toMethodInvokers(newUrlInvokerMap);
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = this.multiGroup ? this.toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
this.destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); //销毁无效的Invoker
} catch (Exception var6) {
logger.warn("destroyUnusedInvokers error. ", var6);
}
}
}
上述操作就是根据invokerUrls
数量以及协议头是否为empty来判断是否禁用所有invokers,如果不禁用的话将invokerUrls转化为Invoker,并且得到<url,Invoker>的映射关系。
再进一步进行转化,得到<methodName,List>的映射关系,再将同一组的Invoker进行合并,将合并结果赋值给methodInvokerMap
,这个methodInvokerMap
就是在doList
中使用到的Map。
最后刷新InvokerMap
,销毁无效的Invoker。
StaticDirectory
StaticDirectory是静态目录,所有Invoker是固定的不会删减的,并且所有Invoker由构造器来传入。
内部逻辑也相当简单,只定义了一个列表用于存储Invokers。实现父类的方法也只是将这些Invokers原封不动地返回。
private final List<Invoker<T>> invokers;
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return this.invokers;
}
服务路由
服务路由规定了服务消费者可以调用哪些服务提供者,Dubbo常用的是条件路由ConditionRouter
。
条件路由由两个条件组成,格式为[服务消费者匹配条件] => [服务提供者匹配条件]
,例如172.26.29.15 => 172.27.19.89
规定了只有IP为172.26.29.15
的服务消费者才可以访问IP为172.27.19.89
的服务提供者,不可以调用其他的服务。
路由一样是通过RegistryDirectory
中的notify()
更新的,在调用toMethodInvokers()
的时候会进行服务器级别的路由和方法级别的路由。
Cluster
在前面的流程中我们已经通过Directory
获取了服务目录,并且通过路由获取了一个或多个Invoker,但是对于服务消费者还是需要进行选择,筛选出一个Invoker进行调用。
Dubbo默认的Cluster实现有多种,如下:
- FailoverCluster
- FailfastCluster
- FailsafeCluster
- FailbackCluster
- BroadcastCluster
- AvailableCluster
每个Cluster内部返回的都是xxxClusterInvoker,例如FailoverCluster:
public class FailoverCluster implements Cluster {
public static final String NAME = "failover";
public FailoverCluster() {
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}
FailoverClusterInvoker
FailoverClusterInvoker
实现的功能是失败调用(有重试次数)自动切换。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
this.checkInvokers(invokers, invocation);
//重试次数
int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList(invokers.size());
Set<String> providers = new HashSet(len);
//根据重试次数循环调用
for(int i = 0; i < len; ++i) {
if (i > 0) {
this.checkWhetherDestroyed();
copyinvokers = this.list(invocation);
this.checkInvokers(copyinvokers, invocation);
}
//负载均衡筛选出一个Invoker作本次调用
Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
//将使用过的Invoker保存起来,下次重试时做过滤用
invoked.add(invoker);
//记录到上下文中
RpcContext.getContext().setInvokers(invoked);
try {
//发起调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("....");
}
Result var12 = result;
return var12;
} catch (RpcException var17) { //catch异常 继续下次循环重试
if (var17.isBiz()) {
throw var17;
}
le = var17;
} catch (Throwable var18) {
le = new RpcException(var18.getMessage(), var18);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(....);
}
上述方法中,首先获取重试次数len
,根据重试次数进行循环调用,调用发生异常会被catch住,然后重新调用。
每次循环会通过负载均衡选出一个Invoker,然后利用这个Invoker进行远程调用,每次选出的Invoker会记录下来,在下次调用的select()
中会将使用上次调用的Invoker进行重试,如果上一次没有调用或者上次调用的Invoker下线了,那么会重新进行负载均衡进行选择。
FailfastClusterInvoker
FailfastClusterInvoker
只会进行一次远程调用,如果失败后立马抛出异常。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
this.checkInvokers(invokers, invocation);
Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); //负载均衡选择Invoker
try {
return invoker.invoke(invocation); //发起远程调用
} catch (Throwable var6) { //失败调用直接将错误抛出
if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
throw (RpcException)var6;
} else {
throw new RpcException(....);
}
}
}
FailsafeClusterInvoker
FailsafeClusterInvoker
是一种安全失败的cluster,调用发生错误仅仅是记录一下日志,然后就返回了空结果。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
this.checkInvokers(invokers, invocation);
//负载均衡选出Invoker后直接进行调用
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
return invoker.invoke(invocation);
} catch (Throwable var5) { //调用错误只是打印日志
logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);
return new RpcResult();
}
}
FailbackClusterInvoker
FailbackClusterInvoker
调用失败后,会记录下本次调用,然后返回一个空结果给服务消费者,并且会通过一个定时任务对失败的调用进行重试。适用于执行消息通知等最大努力场景。
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
this.checkInvokers(invokers, invocation);
//负载均衡选出Invoker
Invoker<Dubbo的容错与负载均衡
Dubbo -- dubbo高级特性(序列化 地址缓存 超时与重试机制 多版本:灰度发布 负载均衡 集群容错策略 服务降级)