Dubbo之限流TpsLimitFilter源码分析

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之限流TpsLimitFilter源码分析相关的知识,希望对你有一定的参考价值。

本文基于incubator-dubbo 2.7.0版本

前言

在分布式系统中,限流和熔断是处理并发的两大利器。关于限流和熔断,需要记住一句话,客户端熔断,服务端限流。本文我会讲解Dubbo框架对限流的支持。

限流的作用

我个人理解限流的作用,保护应用,防止雪崩。每个应用都有自己处理请求的上限,一旦应用承受过多请求,首先会对正在处理中的请求造成影响,如果更严重,对上下游也会造成雪崩效应。

TpsLimitFilter分析

Dubbo中的限流通过TpsLimitFilter来实现,会在invoker执行实际业务逻辑前进行拦截,判断单位时间请求数是否超过上限,如果超过,抛出异常阻断调用。 TpsLimitFilter源码如下

 
   
   
 
  1. @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)

  2. public class TpsLimitFilter implements Filter {

  3.    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

  4.    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

  5.        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {

  6.            throw new RpcException(

  7.                    new StringBuilder(64)

  8.                            .append("Failed to invoke service ")

  9.                            .append(invoker.getInterface().getName())

  10.                            .append(".")

  11.                            .append(invocation.getMethodName())

  12.                            .append(" because exceed max service tps.")

  13.                            .toString());

  14.        }

  15.        return invoker.invoke(invocation);

  16.    }

  17. }

从TpsLimitFilter的源码中可以看到,因为是扩展点自动激活配置,首先TpsLimitFilter只对provider端有效,其次provider url的需要包括tps=xxx这个配置才能生效。

通过TPSLimiter的isAllowable实现限流 ,其内部采用了计数器算法,单位时间内限制多少调用次数,超过限制,返回false。

 
   
   
 
  1. public class DefaultTPSLimiter implements TPSLimiter {

  2.    /**

  3.     * 每个Service维护一个计数器

  4.     */

  5.    private final ConcurrentMap<String, StatItem> stats

  6.            = new ConcurrentHashMap<String, StatItem>();

  7.    @Override

  8.    public boolean isAllowable(URL url, Invocation invocation) {

  9.        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);

  10.        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,

  11.                Constants.DEFAULT_TPS_LIMIT_INTERVAL);

  12.        //servicekey并没有和方法绑定,只能限流接口

  13.        String serviceKey = url.getServiceKey();

  14.        if (rate > 0) {

  15.            StatItem statItem = stats.get(serviceKey);

  16.            if (statItem == null) {

  17.                stats.putIfAbsent(serviceKey,

  18.                        new StatItem(serviceKey, rate, interval));

  19.                statItem = stats.get(serviceKey);

  20.            }

  21.            return statItem.isAllowable();

  22.        } else {

  23.            StatItem statItem = stats.get(serviceKey);

  24.            if (statItem != null) {

  25.                stats.remove(serviceKey);

  26.            }

  27.        }

  28.        return true;

  29.    }

  30. }

TPSLimiter 针对每个service都创建一个计数器StatItem,通过StatItem的isAllowable方法判断请求是否有效

 
   
   
 
  1. class StatItem {

  2.    //接口名

  3.    private String name;

  4.    //计数周期开始

  5.    private long lastResetTime;

  6.    //计数间隔

  7.    private long interval;

  8.    //剩余计数请求数

  9.    private AtomicInteger token;

  10.    //总共允许请求数

  11.    private int rate;

  12.    StatItem(String name, int rate, long interval) {

  13.        this.name = name;

  14.        this.rate = rate;

  15.        this.interval = interval;

  16.        this.lastResetTime = System.currentTimeMillis();

  17.        this.token = new AtomicInteger(rate);

  18.    }

  19.    public boolean isAllowable() {

  20.        long now = System.currentTimeMillis();

  21.        if (now > lastResetTime + interval) {

  22.            token.set(rate);

  23.            lastResetTime = now;

  24.        }

  25.        int value = token.get();

  26.        boolean flag = false;

  27.        while (value > 0 && !flag) {

  28.            //乐观锁增加计数

  29.            flag = token.compareAndSet(value, value - 1);

  30.            //失败重新获取

  31.            value = token.get();

  32.        }

  33.        return flag;

  34.    }

  35.    long getLastResetTime() {

  36.        return lastResetTime;

  37.    }

  38.    int getToken() {

  39.        return token.get();

  40.    }

  41.    @Override

  42.    public String toString() {

  43.        return new StringBuilder(32).append("StatItem ")

  44.                .append("[name=").append(name).append(", ")

  45.                .append("rate = ").append(rate).append(", ")

  46.                .append("interval = ").append(interval).append("]")

  47.                .toString();

  48.    }

  49. }

StatItem内的逻辑很简单,针对每段时间(lastResetTime,lastResetTime+interval)允许rate次调用,只要计数器达不到上限,返回true。如果超过lastResetTime+interval,重置计数器。

使用TpsLimitFilter

令人费解的是,Dubbo框架并没有默认通过配置文件启动这个Filter,所以我们需要在classpath的META-INF/dubbo/目录下增加com.alibaba.dubbo.rpc.Filter文件

 
   
   
 
  1. tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

就算加上了这个配置,其实也还是生效不了,我们的provider url需要有tps=xxx参数

问题就来了,怎么加这个配置呢,答案就是override,这个功能的官方介绍如下

override的原理是,其实在RegistryProtocol使用export方法对服务进行本地暴露以及注册Provider Url到zk后,还做了另外一个操作,监听服务对应的 /dubbo/interface/configurations目录,一旦configurations目录下节点发生变化,就会重新生成暴露的url,然后进行reexport。 具体相关源码大家可以细细品味下,我觉得这个设计是dubbo服务治理的核心。 注册监听代码如下

 
   
   
 
  1. //得到override url,用于监听configurations目录

  2.        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);

  3.        //构造监听器,用于provider url被override时 重新发布exporter

  4.        //监听路径为 /dubbo/interface/configurations

  5.        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);

  6.        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

  7.        //向registry订阅这个url路径

  8.        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

回到正题,那么我们怎么让tps生效呢?

在zk的configurations目录下,增加一个目录,目录名如下

 
   
   
 
  1. override://10.111.27.41:20880/com.alibaba.dubbo.demo.DemoService?tps=5&category=configurators

zk操作命令如下

 
   
   
 
  1. create -e /dubbo/com.alibaba.dubbo.demo.DemoService/configurators/override%3a%2f%2f10.111.27.41%3a20880%2fcom.alibaba.dubbo.demo.DemoService%3ftps%3d5%26category%3dconfigurators 1

注意overrider后面这端url需要进行URLEncode,因为里面包含了/符号,zk会误识别为目录。 -e用于创建临时目录,客户端断开后这个目录会失效,也就是限流会失效。创建zk目录的时候需要注意下。最好设置成永久。

我通过以上方式设置tps=5之后,超过第六次调用后,就对客户端抛出异常了

限流算法

Dubbo的限流算法使用了最简单的计数器算法,如果并发流量刚好在上个计数器最后一秒和下个计数器第一秒来临,也不能完全预防突发流量,所以推荐自己使用令牌桶算法或漏桶算法实现自定义限流Filter,并且也可以考虑分布式限流。

关于限流算法,下面这篇文章还不错。 https://blog.csdn.net/tianyaleixiaowu/article/details/74942405

总结

Dubbo设计扩展性真的很强,我们可以通过对Dubbo源码的学习,学习到各个方面的知识,举一反三,应用到实际项目中去,也会有助于对其他框架的源码理解。

最后


以上是关于Dubbo之限流TpsLimitFilter源码分析的主要内容,如果未能解决你的问题,请参考以下文章

TPS限流

SOA架构之限流

高并发之限流实现

rest framework之限流组件

秒杀链路兜底方案之限流&降级实战

Spring Cloud Gateway 之限流操作