Java技术指南「并发编程专题」Guava RateLimiter限流器入门到精通(源码分析)

Posted 李浩宇Alex

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java技术指南「并发编程专题」Guava RateLimiter限流器入门到精通(源码分析)相关的知识,希望对你有一定的参考价值。

Guava包中限流实现分析

RateLimiter

回顾使用案例

从输出结果可以看出,RateLimiter具有预消费的能力:

  • 请求 1时并没有任何等待直接预消费了1个令牌
  • 请求 2时,由于之前预消费了1个令牌,故而等待了2秒,之后又预消费了6个令牌
  • 请求 3时同理,由于之前预消费了6个令牌,故而等待了12秒
  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。

  • 但是某些情况下并不需要这种突发请求处理能力,如某IM厂商提供消息推送接口,但推送接口有严格的频率限制(600次/30秒),在调用该IM厂商推送接口时便不能预消费,否则,则可能出现推送频率超出限制而失败。

  • 其中RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。

  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的,RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。

  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。标准创建者模式UML图如下所示(引用自百度百科)

Guava包中限流工具类

Guava核心限流类介绍

  • RateLimiter类为限流的核心类,其为public的抽象类,RateLimiter有一个实现类SmoothRateLimiter,根据不同消耗令牌的策略SmoothRateLimiter又有两个具体实现类SmoothBursty和SmoothWarmingUp。

Guava有两种限流模式

  • 一种为稳定模式(SmoothBursty:令牌生成速度恒定)
  • 一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值)

Guava RateLimiter核心类实现

  • 在实际使用过程中一般直接使用RateLimiter类,其他类对用户是透明的。RateLimiter类的设计使用了类似BUILDER模式的小技巧,并做了一定的调整。
  • 通过RateLimiter类图可见,RateLimiter类不仅承担了具体实现类的创建职责,同时也确定了被创建出的实际类可提供的方法。
SmoothBursty
  • Guava包RateLimiter类的说明文档,首先使用create函数创建限流器,指定每秒生成2个令牌,在需要调用服务时使用acquire函数或取令牌。
create函数分析
  • create函数具有两个个重载,根据不同的重载可能创建不同的RateLimiter具体实现子类。

  • 目前可返回的实现子类包括SmoothBursty及SmoothWarmingUp两种,具体不同下文详细分析。

  • 在调用create接口时,实际实例化的为SmoothBursty类
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;
/**
 * The maximum number of stored permits.
 * 最大存储令牌数
 */
double maxPermits;
/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔
 */
double stableIntervalMicros;
/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L;
// could be either in the past or future

tryAcquire函数实现机制
  • 就非常容易理解RateLimiter暴露出来的接口
@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}
  • acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
  • tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
  • canAcquire用于判断timeout时间内是否可以获取令牌
resync函数
/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}
acquire函数分析

acquire函数也具有两个重载类,但分析过程仅仅需要关系具有整形参数的函数重载即可,无参数的函数仅仅是acquire(1)的简便写法。

  • 预分配授权数量,此函数返回需要等待的时间,可能为0;
  • 根据等待时间进行休眠;
  • 以秒为单位,返回获取授权消耗的时间。
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
  double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
  • 该函数用于获取requiredPermits个令牌,并返回需要等待到的时间点
  • 其中,storedPermitsToSpend为桶中可以消费的令牌数,freshPermits为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到nextFreeTicketMicros

  • 需要注意的是,该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗来讲,本次请求需要为上次请求的预消费行为埋单,这也是RateLimiter可以预消费(处理突发)的原理所在。若需要禁止预消费,则修改此处返回更新后的nextFreeTicketMicros值。
SmoothBursty的构造函数
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存储maxBurstSeconds秒生成的令牌
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 计算最大存储令牌数
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don\'t special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}
  • 桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
  • 该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。
抽象函数分析
coolDownIntervalMicros函数

SmoothBursty类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return stableIntervalMicros;
}

SmoothWarmingUp类中对于coolDownIntervalMicros函数的实现如下:

@Override
double coolDownIntervalMicros() {
  return warmupPeriodMicros / maxPermits;
}
  • 其中maxPermits属性上文已经出现过,表示当前令牌桶的最大容量。
  • warmupPeriodMicros属性属于SmoothWarmingUp类的特有属性,表示令牌桶中令牌从0到maxPermits需要经过的时间,故warmupPeriodMicros / maxPermits表示在令牌数量达到maxPermits之前的令牌产生时间间隔。
storedPermitsToWaitTime函数

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}

SmoothBursty类中对于storedPermitsToWaitTime函数的实现如下:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // measuring the integral on the right part of the function (the climbing line)
  if (availablePermitsAboveThreshold > 0.0) {
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // TODO(cpovirk): Figure out a good name for this variable.
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // measuring the integral on the left part of the function (the horizontal line)
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}
  • 实现较为复杂,其核心思想在于计算消耗当前存储令牌时需要根据预热设置区别对待。其中涉及到新变量thresholdPermits,该变量为令牌阈值,当当前存储的令牌数大于该值时,消耗(storedPermits-thresholdPermits)范围的令牌需要有预热的过程(即消耗每个令牌的间隔时间慢慢减小),而消耗0~thresholdPermits个数的以存储令牌,每个令牌消耗时间为固定值,即stableIntervalMicros。

  • 而thresholdPermits取值需要考虑预热时间及令牌产生速度两个属性,即thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;。可见阈值为预热时间中能够产生的令牌数的一半,并且根据注释计算消耗阈值以上的令牌的时间可以转换为计算预热图的梯形面积(实际为积分),本处不详细展开。

  • 使用此种设计可以保证在上次请求间隔时间较长时,令牌桶中存储了较多的令牌,当消耗这些令牌时,最开始的令牌消耗时间较长,后续时间慢慢缩短直到达到stableIntervalMicros的状态,产生预热的效果。

实现总结

  • 根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时需要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

    • 一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。

    • 在实现限流器的过程中,基于令牌桶的思想,并且增加了带有预热器的令牌桶限流器实现。被限流的线程使用其自带的SleepingStopwatch工具类,最终使用的是Thread.sleep(ms, ns);方法,而线程使用sleep休眠时其持有的锁并不会释放,在多线程编程时此处需要注意。

    • 最后,限流器触发算法采用的是预定令牌的方式,即当前请求需要的令牌数不会对当前请求的等待时间造成影响,而是会影响下一次请求的等待时间。

以上是关于Java技术指南「并发编程专题」Guava RateLimiter限流器入门到精通(源码分析)的主要内容,如果未能解决你的问题,请参考以下文章

Java技术专题「Guava技术系列」Guava-Collections实战使用相关Guava不一般的集合框架

Java技术专题「提升篇」Guava Collections实战指南—挑战Guava不一般的集合框架

Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇

高并发编程专题说明

Java技术指南「并发编程专题」Fork/Join框架基本使用和原理探究(基础篇)

Day857.高性能限流器Guava RateLimiter -Java 并发编程实战