RateLimiter 源码分析(Guava 和 Sentinel 实现)

Posted 方志朋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RateLimiter 源码分析(Guava 和 Sentinel 实现)相关的知识,希望对你有一定的参考价值。


做积极的人,而不是积极废人


作者javadoop,资深Java工程师 原文链接https://www.javadoop.com/post/rate-limiter

本文主要介绍关于流控的两部分内容。

第一部分介绍 Guava 中 RateLimiter 的源码,包括它的两种模式,目前网上大部分文章只分析简单的 SmoothBursty 模式,而没有分析带有预热的 SmoothWarmingUp。

第二部分介绍 Sentinel 中流控的实现,本文不要求读者了解 Sentinel,这部分内容和 Sentinel 耦合很低,所以读者不需要有阅读压力。

Sentinel 中流控设计是参考 Guava RateLimiter 的,所以阅读第二部分内容,需要有第一部分内容的背景。

Guava RateLimiter

RateLimiter 基于漏桶算法,但它参考了令牌桶算法,这里不讨论流控算法,请自行查找资料。

RateLimiter 使用介绍

RateLimiter 的接口非常简单,它有两个静态方法用来实例化,实例化以后,我们只需要关心 acquire 就行了,甚至都没有 release 操作。

// RateLimiter 接口列表:

 
   
   
 
  1. // 实例化的两种方式:

  2. public static RateLimiter create(double permitsPerSecond){}

  3. public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}


  4. public double acquire() {}

  5. public double acquire(int permits) {}


  6. public boolean tryAcquire() {}

  7. public boolean tryAcquire(int permits) {}

  8. public boolean tryAcquire(long timeout, TimeUnit unit) {}

  9. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}


  10. public final double getRate() {}

  11. public final void setRate(double permitsPerSecond) {}

RateLimiter 的作用是用来限流的,我们知道 java 并发包中提供了 Semaphore,它也能够提供对资源使用进行控制,我们看一下下面的代码:

 
   
   
 
  1. // Semaphore

  2. Semaphore semaphore = new Semaphore(10);

  3. for (int i = 0; i < 100; i++) {

  4. executor.submit(new Runnable() {

  5. @Override

  6. public void run() {

  7. semaphore.acquireUninterruptibly(1);

  8. try {

  9. doSomething();

  10. } finally {

  11. semaphore.release();

  12. }

  13. }

  14. });

  15. }

Semaphore 用来控制同时访问某个资源的并发数量,如上面的代码,我们设置 100 个线程工作,但是我们能做到最多只有 10 个线程能同时到 doSomething() 方法中。它控制的是并发数量。

而 RateLimiter 是用来控制访问资源的速率(rate)的,它强调的是控制速率。比如控制每秒只能有 100 个请求通过,比如允许每秒发送 1MB 的数据。

它的构造方法指定一个 permitsPerSecond 参数,代表每秒钟产生多少个 permits,这就是我们的速率。

RateLimiter 允许预占未来的令牌,比如,每秒产生 5 个 permits,我们可以单次请求 100 个,这样,紧接着的下一个请求需要等待大概 20 秒才能获取到 permits。

SmoothRateLimiter 介绍

RateLimiter 目前只有一个子类,那就是抽象类 SmoothRateLimiter,SmoothRateLimiter 有两个实现类,也就是我们这边要介绍的两种模式,我们先简单介绍下 SmoothRateLimiter,然后后面分两个小节分别介绍它的两个实现类。

RateLimiter 源码分析(Guava 和 Sentinel 实现)

RateLimiter 作为抽象类,只有两个属性:

 
   
   
 
  1. private final SleepingStopwatch stopwatch;


  2. private volatile Object mutexDoNotUseDirectly;

stopwatch 非常重要,它用来“计时”,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。

mutexDoNotUseDirectly 用来做锁,RateLimiter 依赖于 synchronized 来控制并发,所以我们之后可以看到,各个属性甚至都没有用 volatile 修饰。

然后我们来看 SmoothRateLimiter 的属性,分别代表什么意思。

 
   
   
 
  1. // 当前还有多少 permits 没有被使用,被存下来的 permits 数量

  2. double storedPermits;


  3. // 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值

  4. double maxPermits;


  5. // 每隔多少时间产生一个 permit,

  6. // 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000

  7. double stableIntervalMicros;


  8. // 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧

  9. private long nextFreeTicketMicros = 0L;

其实,看到这几个属性,我们就可以大致猜一下它的内部实现了:

nextFreeTicketMicros 是一个很关键的属性。我们每次获取 permits 的时候,先拿 storedPermits 的值,如果够,storedPermits 减去相应的值就可以了,如果不够,那么还需要将 nextFreeTicketMicros 往前推,表示我预占了接下来多少时间的量了。那么下一个请求来的时候,如果还没到 nextFreeTicketMicros 这个时间点,需要 sleep 到这个点再返回,当然也要将这个值再往前推。

大家在这里可能会有疑惑,因为时间是一直往前走的,所以 storedPermits 的信息可能是不准确的,不过,只需要在关键的操作中同步一下,重新计算就好了。

SmoothBursty 分析

我们先从比较简单的 SmoothBursty 出发,来分析 RateLimiter 的源码,之后我们再分析 SmoothWarmingUp。

Bursty 是突发的意思,它说的不是下面这个意思:我们设置了 1k 每秒,而我们可以一次性获取 5k 的 permits,这个场景表达的不是突发,而是在说预先占有了接下来几秒产生的 permits。

突发说的是,RateLimiter 会缓存一定数量的 permits 在池中,这样对于突发请求,能及时得到满足。想象一下我们的某个接口,很久没有请求过来,突然同时来了好几个请求,如果我们没有缓存一些 permits 的话,很多线程就需要等待了。

SmoothBursty 默认缓存最多 1 秒钟的 permits,不可以修改。

RateLimiter 的静态构造方法:

 
   
   
 
  1. public static RateLimiter create(double permitsPerSecond) {

  2. return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());

  3. }

构造参数 permitsPerSecond 指定每秒钟可以产生多少个 permits。

 
   
   
 
  1. static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {

  2. RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);

  3. rateLimiter.setRate(permitsPerSecond);

  4. return rateLimiter;

  5. }

我们看到,这里实例化的是 SmoothBursty 的实例,它的构造方法很简单,而且它只有一个属性 maxBurstSeconds,这里就不贴代码了。

构造函数指定了 maxBurstSeconds 为 1.0,也就是说,最多会缓存 1 秒钟,也就是 (1.0 * permitsPerSecond) 这么多个 permits 到池中。

这个 1.0 秒,关系到 storedPermits 和 maxPermits:

0 <= storedPermits <= maxPermits = permitsPerSecond

我们继续往后看 setRate 方法:

 
   
   
 
  1. public final void setRate(double permitsPerSecond) {

  2. checkArgument(

  3. permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");

  4. synchronized (mutex()) {

  5. doSetRate(permitsPerSecond, stopwatch.readMicros());

  6. }

  7. }

setRate 这个方法是一个 public 方法,它可以用来调整速率。我们这边继续跟的是初始化过程,但是大家提前知道这个方法是用来调整速率用的,对理解源码有很大的帮助。注意看,这里用了 synchronized 控制并发。

 
   
   
 
  1. @Override

  2. final void doSetRate(double permitsPerSecond, long nowMicros) {

  3. // 同步

  4. resync(nowMicros);

  5. // 计算属性 stableIntervalMicros

  6. double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;

  7. this.stableIntervalMicros = stableIntervalMicros;

  8. doSetRate(permitsPerSecond, stableIntervalMicros);

  9. }

resync 方法很简单,它用来调整 storedPermits 和 nextFreeTicketMicros。这就是我们说的,在关键的节点,需要先更新一下 storedPermits 到正确的值。

 
   
   
 
  1. void resync(long nowMicros) {

  2. // 如果 nextFreeTicket 已经过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景

  3. // 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits

  4. if (nowMicros > nextFreeTicketMicros) {

  5. double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();

  6. storedPermits = min(maxPermits, storedPermits + newPermits);

  7. nextFreeTicketMicros = nowMicros;

  8. }

  9. }

coolDownIntervalMicros() 这个方法大家先不用关注,可以看到,在 SmoothBursty 类中的实现是直接返回了 stableIntervalMicros 的值,也就是我们说的,每产生一个 permit 的时间长度。

当然了,细心的读者,可能会发现,此时的 stableIntervalMicros 其实没有设置,也就是说,上面发生了一次除以 0 值的操作,得到的 newPermits 其实是一个无穷大。而 maxPermits 此时还是 0 值,不过这里其实没有关系。

我们回到前面一个方法,resync 同步以后,会设置 stableIntervalMicros 为一个正确的值,然后进入下面的方法:

 
   
   
 
  1. @Override

  2. void doSetRate(double permitsPerSecond, double stableIntervalMicros) {

  3. double oldMaxPermits = this.maxPermits;

  4. // 这里计算了,maxPermits 为 1 秒产生的 permits

  5. maxPermits = maxBurstSeconds * permitsPerSecond;

  6. if (oldMaxPermits == Double.POSITIVE_INFINITY) {

  7. // if we don't special-case this, we would get storedPermits == NaN, below

  8. storedPermits = maxPermits;

  9. } else {

  10. // 因为 storedPermits 的值域变化了,需要等比例缩放

  11. storedPermits =

  12. (oldMaxPermits == 0.0)

  13. ? 0.0 // initial state

  14. : storedPermits * maxPermits / oldMaxPermits;

  15. }

  16. }

上面这个方法,我们要这么看,原来的 RateLimiter 是用某个 permitsPerSecond 值初始化的,现在我们要调整这个频率。对于 maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放。

到此,构造方法就完成了,我们得到了一个 RateLimiter 的实现类 SmoothBursty 的实例,可能上面的源码你还是会有一些疑惑,不过也没关系,继续往下看,可能你的很多疑惑就解开了。

接下来,我们来分析 acquire 方法:

 
   
   
 
  1. @CanIgnoreReturnValue

  2. public double acquire() {

  3. return acquire(1);

  4. }


  5. @CanIgnoreReturnValue

  6. public double acquire(int permits) {

  7. // 预约,如果当前不能直接获取到 permits,需要等待

  8. // 返回值代表需要 sleep 多久

  9. long microsToWait = reserve(permits);

  10. // sleep

  11. stopwatch.sleepMicrosUninterruptibly(microsToWait);

  12. // 返回 sleep 的时长

  13. return 1.0 * microsToWait / SECONDS.toMicros(1L);

  14. }

我们来看 reserve 方法:

 
   
   
 
  1. final long reserve(int permits) {

  2. checkPermits(permits);

  3. synchronized (mutex()) {

  4. return reserveAndGetWaitLength(permits, stopwatch.readMicros());

  5. }

  6. }


  7. final long reserveAndGetWaitLength(int permits, long nowMicros) {

  8. // 返回 nextFreeTicketMicros

  9. long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

  10. // 计算时长

  11. return max(momentAvailable - nowMicros, 0);

  12. }

继续往里看:

 
   
   
 
  1. @Override

  2. final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

  3. // 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)

  4. resync(nowMicros);

  5. // 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值

  6. long returnValue = nextFreeTicketMicros;

  7. // storedPermits 中可以使用多少个 permits

  8. double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

  9. // storedPermits 中不够的部分

  10. double freshPermits = requiredPermits - storedPermitsToSpend;

  11. // 为了这个不够的部分,需要等待多久时间

  12. long waitMicros =

  13. storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0

  14. + (long) (freshPermits * stableIntervalMicros);

  15. // 将 nextFreeTicketMicros 往前推

  16. this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);

  17. // storedPermits 减去被拿走的部分

  18. this.storedPermits -= storedPermitsToSpend;

  19. return returnValue;

  20. }

我们可以看到,获取 permits 的时候,其实是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另一部分来自于预占未来的 freshPermits。

这里提一个关键点吧,我们看到,返回值是 nextFreeTicketMicros 的旧值,因为只要到这个时间点,就说明当次 acquire 可以成功返回了,而不管 storedPermits 够不够。如果 storedPermits 不够,会将 nextFreeTicketMicros 往前推一定的时间,预占了一定的量。

到这里,acquire 方法就分析完了,大家看到这里,逆着往前看就是了。应该说,SmoothBursty 的源码还是非常简单的。

SmoothWarmingUp 分析

分析完了 SmoothBursty,我们再来分析 SmoothWarmingUp 会简单一些。我们说过,SmoothBursty 可以处理突发请求,因为它会缓存最多 1 秒的 permits,而待会我们会看到 SmoothWarmingUp 完全不同的设计。

SmoothWarmingUp 适用于资源需要预热的场景,比如我们的某个接口业务,需要使用到数据库连接,由于连接需要预热才能进入到最佳状态,如果我们的系统长时间处于低负载或零负载状态(当然,应用刚启动也是一样的),连接池中的连接慢慢释放掉了,此时我们认为连接池是冷的。

假设我们的业务在稳定状态下,正常可以提供最大 1000 QPS 的访问,但是如果连接池是冷的,我们就不能让 1000 个请求同时进来,因为这会拖垮我们的系统,我们应该有个预热升温的过程。

对应到 SmoothWarmingUp 中,如果系统处于低负载状态,storedPermits 会一直增加,当请求来的时候,我们要从 storedPermits 中取 permits,最关键的点在于,从 storedPermits 中取 permits 的操作是比较耗时的,因为没有预热。

回顾一下前面介绍的 SmoothBursty,它从 storedPermits 中获取 permits 是不需要等待时间的,而这边洽洽相反,从 storedPermits 获取需要更多的时间,这是最大的不同,先理解这一点,能帮助你更好地理解源码。

大家先有一些粗的概念,然后我们来看下面这个图:

RateLimiter 源码分析(Guava 和 Sentinel 实现)

这个图不容易看懂,X 轴代表 storedPermits 的数量,Y 轴代表获取一个 permits 需要的时间。

假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是写死的,用户不能修改)。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 需要 300ms,而如果 storedPermits 小于 thresholdPermits 的时候,只需要 100ms。

想象有一条垂直线 x=k,它与 X 轴的交点 k 代表当前 storedPermits 的数量:

  • 当系统在非常繁忙的时候,这条线停留在 x=0 处,此时 storedPermits 为 0

  • 当 limiter 没有被使用的时候,这条线慢慢往右移动,直到 x=maxPermits 处;

  • 如果 limiter 被重新使用,那么这条线又慢慢往左移动,直到 x=0 处;当 storedPermits 处于 maxPermits 状态时,我们认为 limiter 中的 permits 是冷的,此时获取一个 permit 需要较多的时间,因为需要预热,有一个关键的分界点是 thresholdPermits。

预热时间是我们在构造的时候指定的,图中梯形的面积就是预热时间,因为预热完成后,我们能进入到一个稳定的速率中(stableInterval),下面我们来计算出 thresholdPermits 和 maxPermits 的值。

有一个关键点,从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是 warmupPeriod。

RateLimiter 源码分析(Guava 和 Sentinel 实现)

之所以长方形的面积是 warmupPeriod/2,是因为 coldFactor 是硬编码的 3。

梯形面积为 warmupPeriod,即:

 
   
   
 
  1. warmupPeriod = 2 * stableInterval * thresholdPermits

由此,我们得出 thresholdPermits 的值:

 
   
   
 
  1. thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后我们根据梯形面积的计算公式:

 
   
   
 
  1. warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

得出 maxPermits 为:

 
   
   
 
  1. maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

这样,我们就得到了 thresholdPermits 和 maxPermits 的值。

接下来,我们来看一下冷却时间间隔,它指的是 storedPermits 中每个 permit 的增长速度,也就是我们前面说的 x=k 这条垂直线往右的移动速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间,我们将其定义为:

 
   
   
 
  1. @Override

  2. double coolDownIntervalMicros() {

  3. return warmupPeriodMicros / maxPermits;

  4. }

  5. 贴一下代码,大家就知道了,在 resync 中用到的这个:


  6. void resync(long nowMicros) {

  7. if (nowMicros > nextFreeTicketMicros) {

  8. // coolDownIntervalMicros 在这里使用

  9. double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();

  10. storedPermits = min(maxPermits, storedPermits + newPermits);

  11. nextFreeTicketMicros = nowMicros;

  12. }

  13. }

基于上面的分析,我们来看 SmoothWarmingUp 的其他源码。

首先,我们来看它的 doSetRate 方法,有了前面的介绍,这个方法的源码非常简单:

 
   
   
 
  1. @Override

  2. void doSetRate(double permitsPerSecond, double stableIntervalMicros) {

  3. double oldMaxPermits = maxPermits;

  4. // coldFactor 是固定的 3

  5. double coldIntervalMicros = stableIntervalMicros * coldFactor;

  6. // 这个公式我们上面已经说了

  7. thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;

  8. // 这个公式我们上面也已经说了

  9. maxPermits =

  10. thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);

  11. // 计算那条斜线的斜率。数学知识,对边 / 临边

  12. slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);

  13. if (oldMaxPermits == Double.POSITIVE_INFINITY) {

  14. // if we don't special-case this, we would get storedPermits == NaN, below

  15. storedPermits = 0.0;

  16. } else {

  17. storedPermits =

  18. (oldMaxPermits == 0.0)

  19. ? maxPermits // initial state is cold

  20. : storedPermits * maxPermits / oldMaxPermits;

  21. }

  22. }

setRate 方法非常简单,接下来,我们要分析的是 storedPermitsToWaitTime 方法,我们回顾一下下面的代码:

RateLimiter 源码分析(Guava 和 Sentinel 实现)

这段代码是 acquire 方法的核心,waitMicros 由两部分组成,一部分是从 storedPermits 中获取花费的时间,一部分是等待 freshPermits 产生花费的时间。在 SmoothBursty 的实现中,从 storedPermits 中获取 permits 直接返回 0,不需要等待。

而在 SmoothWarmingUp 的实现中,由于需要预热,所以从 storedPermits 中取 permits 需要花费一定的时间,其实就是要计算下图中,阴影部分的面积。

RateLimiter 源码分析(Guava 和 Sentinel 实现)

 
   
   
 
  1. @Override

  2. long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

  3. double availablePermitsAboveThreshold = storedPermits - thresholdPermits;

  4. long micros = 0;

  5. // 如果右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积

  6. if (availablePermitsAboveThreshold > 0.0) {

  7. // 从右边部分获取的 permits 数量

  8. double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);

  9. // 梯形面积公式:(上底+下底)*高/2

  10. double length =

  11. permitsToTime(availablePermitsAboveThreshold)

  12. + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);

  13. micros = (long) (permitsAboveThresholdToTake * length / 2.0);

  14. permitsToTake -= permitsAboveThresholdToTake;

  15. }

  16. // 加上 长方形部分的阴影面积

  17. micros += (long) (stableIntervalMicros * permitsToTake);

  18. return micros;

  19. }


  20. // 对于给定的 x 值,计算 y 值

  21. private double permitsToTime(double permits) {

  22. return stableIntervalMicros + permits * slope;

  23. }

到这里,SmoothWarmingUp 也已经说完了。

如果大家对于 Guava RateLimiter 还有什么疑惑,欢迎在留言区留言,对于 Sentinel 中的流控不感兴趣的读者,看到这里就可以结束了。

Sentinel 中的流控

Sentinel 是阿里开源的流控、熔断工具,这里不做过多的介绍,感兴趣的读者请自行了解。

在 Sentinel 的流控中,我们可以配置流控规则,主要是控制 QPS 和线程数,这里我们不讨论控制线程数,控制线程数的代码不在我们这里的讨论范围内,下面的介绍都是指控制 QPS。

RateLimiterController

RateLimiterController 非常简单,它通过使用 latestPassedTime 属性来记录最后一次通过的时间,然后根据规则中 QPS 的限制,计算当前请求是否可以通过。

举个非常简单的例子:设置 QPS 为 10,那么每 100 毫秒允许通过一个,通过计算当前时间是否已经过了上一个请求的通过时间 latestPassedTime 之后的 100 毫秒,来判断是否可以通过。假设才过了 50ms,那么需要当前线程再 sleep 50ms,然后才可以通过。如果同时有另一个请求呢?那需要 sleep 150ms 才行。

RateLimiter 源码分析(Guava 和 Sentinel 实现)

 
   
   
 
  1. public class RateLimiterController implements TrafficShapingController {


  2. // 排队最大时长,默认 500ms

  3. private final int maxQueueingTimeMs;

  4. // QPS 设置的值

  5. private final double count;

  6. // 上一次请求通过的时间

  7. private final AtomicLong latestPassedTime = new AtomicLong(-1);


  8. public RateLimiterController(int timeOut, double count) {

  9. this.maxQueueingTimeMs = timeOut;

  10. this.count = count;

  11. }


  12. @Override

  13. public boolean canPass(Node node, int acquireCount) {

  14. return canPass(node, acquireCount, false);

  15. }


  16. // 通常 acquireCount 为 1,这里不用关心参数 prioritized

  17. @Override

  18. public boolean canPass(Node node, int acquireCount, boolean prioritized) {

  19. // Pass when acquire count is less or equal than 0.

  20. if (acquireCount <= 0) {

  21. return true;

  22. }

  23. //

  24. if (count <= 0) {

  25. return false;

  26. }


  27. long currentTime = TimeUtil.currentTimeMillis();

  28. // 计算每 2 个请求之间的间隔,比如 QPS 限制为 10,那么间隔就是 100ms

  29. long costTime = Math.round(1.0 * (acquireCount) / count * 1000);


  30. // Expected pass time of this request.

  31. long expectedTime = costTime + latestPassedTime.get();


  32. // 可以通过,设置 latestPassedTime 然后就返回 true 了

  33. if (expectedTime <= currentTime) {

  34. // Contention may exist here, but it's okay.

  35. latestPassedTime.set(currentTime);

  36. return true;

  37. } else {

  38. // 不可以通过,需要等待

  39. long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();

  40. // 等待时长大于最大值,返回 false

  41. if (waitTime > maxQueueingTimeMs) {

  42. return false;

  43. } else {

  44. // 将 latestPassedTime 往前推

  45. long oldTime = latestPassedTime.addAndGet(costTime);

  46. try {

  47. // 需要 sleep 的时间

  48. waitTime = oldTime - TimeUtil.currentTimeMillis();

  49. if (waitTime > maxQueueingTimeMs) {

  50. latestPassedTime.addAndGet(-costTime);

  51. return false;

  52. }

  53. // in race condition waitTime may <= 0

  54. if (waitTime > 0) {

  55. Thread.sleep(waitTime);

  56. }

  57. return true;

  58. } catch (InterruptedException e) {

  59. }

  60. }

  61. }

  62. return false;

  63. }


  64. }

这个策略还是非常好理解的,简单粗暴,快速失败。

WarmUpController

WarmUpController 用来防止突发流量迅速上升,导致系统负载严重过高,本来系统在稳定状态下能处理的,但是由于许多资源没有预热,导致这个时候处理不了了。比如,数据库需要建立连接、需要连接到远程服务等,这就是为什么我们需要预热。

啰嗦一句,这里不仅仅指系统刚刚启动需要预热,对于长时间处于低负载的系统,突发流量也需要重新预热。

Guava 的 SmoothWarmingUp 是用来控制获取令牌的速率的,和这里的控制 QPS 还是有一点区别,但是中心思想是一样的。我们在看完源码以后再讨论它们的区别。

为了帮助大家理解源码,我们这边先设定一个场景:QPS 设置为 100,预热时间设置为 10 秒。代码中使用 “【】” 代表根据这个场景计算出来的值。

 
   
   
 
  1. public class WarmUpController implements TrafficShapingController {


  2. // 阈值

  3. protected double count;

  4. // 3

  5. private int coldFactor;

  6. // 转折点的令牌数,和 Guava 的 thresholdPermits 一个意思

  7. // [500]

  8. protected int warningToken = 0;

  9. // 最大的令牌数,和 Guava 的 maxPermits 一个意思

  10. // [1000]

  11. private int maxToken;

  12. // 斜线斜率

  13. // [1/25000]

  14. protected double slope;


  15. // 累积的令牌数,和 Guava 的 storedPermits 一个意思

  16. protected AtomicLong storedTokens = new AtomicLong(0);

  17. // 最后更新令牌的时间

  18. protected AtomicLong lastFilledTime = new AtomicLong(0);


  19. public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {

  20. construct(count, warmUpPeriodInSec, coldFactor);

  21. }


  22. public WarmUpController(double count, int warmUpPeriodInSec) {

  23. construct(count, warmUpPeriodInSec, 3);

  24. }


  25. // 下面的构造方法,和 Guava 中是差不多的,只不过 thresholdPermits 和 maxPermits 都换了个名字

  26. private void construct(double count, int warmUpPeriodInSec, int coldFactor) {


  27. if (coldFactor <= 1) {

  28. throw new IllegalArgumentException("Cold factor should be larger than 1");

  29. }


  30. this.count = count;


  31. this.coldFactor = coldFactor;


  32. // warningToken 和 thresholdPermits 是一样的意思,计算结果其实是一样的

  33. // thresholdPermits = 0.5 * warmupPeriod / stableInterval.

  34. // 【warningToken = (10*100)/(3-1) = 500】

  35. warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);


  36. // maxToken 和 maxPermits 是一样的意思,计算结果其实是一样的

  37. // maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)

  38. // 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】

  39. maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));


  40. // 斜率计算

  41. // slope

  42. // slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);

  43. // 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】

  44. slope = (coldFactor - 1.0) / count / (maxToken - warningToken);


  45. }


  46. @Override

  47. public boolean canPass(Node node, int acquireCount) {

  48. return canPass(node, acquireCount, false);

  49. }


  50. @Override

  51. public boolean canPass(Node node, int acquireCount, boolean prioritized) {


  52. // Sentinel 的 QPS 统计使用的是滑动窗口


  53. // 当前时间窗口的 QPS

  54. long passQps = (long) node.passQps();


  55. // 这里是上一个时间窗口的 QPS,这里的一个窗口跨度是1分钟

  56. long previousQps = (long) node.previousPassQps();


  57. // 同步。设置 storedTokens 和 lastFilledTime 到正确的值

  58. syncToken(previousQps);


  59. long restToken = storedTokens.get();

  60. // 令牌数超过 warningToken,进入梯形区域

  61. if (restToken >= warningToken) {


  62. // 这里简单说一句,因为当前的令牌数超过了 warningToken 这个阈值,系统处于需要预热的阶段

  63. // 通过计算当前获取一个令牌所需时间,计算其倒数即是当前系统的最大 QPS 容量


  64. long aboveToken = restToken - warningToken;


  65. // 这里计算警戒 QPS 值,就是当前状态下能达到的最高 QPS。

  66. // (aboveToken * slope + 1.0 / count) 其实就是在当前状态下获取一个令牌所需要的时间

  67. double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));

  68. // 如果不会超过,那么通过,否则不通过

  69. if (passQps + acquireCount <= warningQps) {

  70. return true;

  71. }

  72. } else {

  73. // count 是最高能达到的 QPS

  74. if (passQps + acquireCount <= count) {

  75. return true;

  76. }

  77. }


  78. return false;

  79. }


  80. protected void syncToken(long passQps) {

  81. // 下面几行代码,说明在第一次进入新的 1 秒钟的时候,做同步

  82. // 题外话:Sentinel 默认地,1 秒钟分为 2 个时间窗口,分别 500ms

  83. long currentTime = TimeUtil.currentTimeMillis();

  84. currentTime = currentTime - currentTime % 1000;

  85. long oldLastFillTime = lastFilledTime.get();

  86. if (currentTime <= oldLastFillTime) {

  87. return;

  88. }


  89. // 令牌数量的旧值

  90. long oldValue = storedTokens.get();

  91. // 计算新的令牌数量,往下看

  92. long newValue = coolDownTokens(currentTime, passQps);


  93. if (storedTokens.compareAndSet(oldValue, newValue)) {

  94. // 令牌数量上,减去上一分钟的 QPS,然后设置新值

  95. long currentValue = storedTokens.addAndGet(0 - passQps);

  96. if (currentValue < 0) {

  97. storedTokens.set(0L);

  98. }

  99. lastFilledTime.set(currentTime);

  100. }


  101. }


  102. // 更新令牌数

  103. private long coolDownTokens(long currentTime, long passQps) {

  104. long oldValue = storedTokens.get();

  105. long newValue = oldValue;


  106. // 当前令牌数小于 warningToken,添加令牌

  107. if (oldValue < warningToken) {

  108. newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);

  109. } else if (oldValue > warningToken) {

  110. // 当前令牌数量处于梯形阶段,

  111. // 如果当前通过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却速度

  112. // 那么不需要添加令牌,否则需要添加令牌

  113. if (passQps < (int)count / coldFactor) {

  114. newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);

  115. }

  116. }

  117. return Math.min(newValue, maxToken);

  118. }


  119. }

coolDownTokens 这个方法用来计算新的 token 数量,其实我也没有完全理解作者的设计:


  • 第一、对于令牌的增加,在 Guava 中,使用 warmupPeriodMicros / maxPermits 作为增长率,因为它实现的是 storedPermits 从 0 到 maxPermits 花费的时间为 warmupPeriod。而这里是以每秒 count 个作为增长率,为什么?



  • 第二、else if 分支中的决定我没有理解,为什么用 passQps 和 count / coldFactor 进行对比来决定是否继续添加令牌?



  • 我自己的理解是,count/coldFactor 就是指冷却速度,那么就是说得通的。欢迎大家一起探讨。


最后,我们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别。

Guava 在于控制获取令牌的速率,它关心的是,获取 permits 需要多少时间,包括从 storedPermits 中获取,以及获取 freshPermits,以此推进 nextFreeTicketMicros 到未来的某个时间点。

而 Sentinel 在于控制 QPS,它用令牌数来标识当前系统处于什么状态,根据时间推进一直增加令牌,根据通过的 QPS 一直减少令牌。如果 QPS 持续下降,根据推演,可以发现 storedTokens 越来越多,然后越过 warningTokens 这个阈值,之后只有当 QPS 下降到 count/3 以后,令牌才会继续往上增长,一直到 maxTokens。

storedTokens 是以 “count 每秒”的增长率增长的,减少是以 前一分钟的 QPS 来减少的。其实这里我也有个疑问,为什么增加令牌的时候考虑了时间,而减少的时候却不考虑时间因素,提了 issue,似乎没人搭理。

WarmUpRateLimiterController

注意,这个类继承自刚刚介绍的 WarmUpController,它的流控效果定义为排队等待。它的代码其实就是前面介绍的 RateLimiterController 加上 WarmUpController。

 
   
   
 
  1. public class WarmUpRateLimiterController extends WarmUpController {


  2. private final int timeoutInMs;

  3. private final AtomicLong latestPassedTime = new AtomicLong(-1);


  4. public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {

  5. super(count, warmUpPeriodSec, coldFactor);

  6. this.timeoutInMs = timeOutMs;

  7. }


  8. @Override

  9. public boolean canPass(Node node, int acquireCount) {

  10. return canPass(node, acquireCount, false);

  11. }


  12. @Override

  13. public boolean canPass(Node node, int acquireCount, boolean prioritized) {

  14. long previousQps = (long) node.previousPassQps();

  15. syncToken(previousQps);


  16. long currentTime = TimeUtil.currentTimeMillis();


  17. long restToken = storedTokens.get();

  18. long costTime = 0;

  19. long expectedTime = 0;


  20. // 和 RateLimiterController 比较,区别主要就是这块代码


  21. if (restToken >= warningToken) {

  22. long aboveToken = restToken - warningToken;


  23. // current interval = restToken*slope+1/count

  24. double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));

  25. costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);

  26. } else {

  27. costTime = Math.round(1.0 * (acquireCount) / count * 1000);

  28. }

  29. expectedTime = costTime + latestPassedTime.get();


  30. if (expectedTime <= currentTime) {

  31. latestPassedTime.set(currentTime);

  32. return true;

  33. } else {

  34. long waitTime = costTime + latestPassedTime.get() - currentTime;

  35. if (waitTime > timeoutInMs) {

  36. return false;

  37. } else {

  38. long oldTime = latestPassedTime.addAndGet(costTime);

  39. try {

  40. waitTime = oldTime - TimeUtil.currentTimeMillis();

  41. if (waitTime > timeoutInMs) {

  42. latestPassedTime.addAndGet(-costTime);

  43. return false;

  44. }

  45. if (waitTime > 0) {

  46. Thread.sleep(waitTime);

  47. }

  48. return true;

  49. } catch (InterruptedException e) {

  50. }

  51. }

  52. }

  53. return false;

  54. }

  55. }

这个代码很简单,就是 RateLimiter 中的代码,然后加入了预热的内容。

在 RateLimiter 中,单个请求的 costTime 是固定的,就是 1/count,比如设置 100 qps,那么 costTime 就是 10ms。

但是这边,加入了 WarmUp 的内容,就是说,通过令牌数量,来判断当前系统的 QPS 应该是多少,如果当前令牌数超过 warningTokens,那么系统的 QPS 容量已经低于我们预设的 QPS,相应的,costTime 就会延长。

小结

有段时间没写文章了,写得不好之处,欢迎指正。

热门内容:    




  •  



喜欢就点个 "在看" 呗^_^

以上是关于RateLimiter 源码分析(Guava 和 Sentinel 实现)的主要内容,如果未能解决你的问题,请参考以下文章

Guava RateLimiter详解以及源码分析

Java技术指南「并发编程专题」Guava RateLimiter限流器入门到精通(源码分析)

Java难点攻克「Guava RateLimiter」针对于限流器的入门到实战和源码原理分析

Guava RateLimiter限流器使用示例

#私藏项目实操分享#Java技术开发专题系列之Guava RateLimiter针对于限流器的入门到实战(含源码分析介绍)

使用Guava RateLimiter限流以及源码解析