pulsar 实现的一种 RateLimiter

Posted allenwas3

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pulsar 实现的一种 RateLimiter相关的知识,希望对你有一定的参考价值。

pulsar 实现了一个 RateLimiter 来限制 dispatch 的速率。


大体思路是:初始有 n 个令牌,当令牌被申请完了后,其他人就无法获得令牌了,每隔一段时间 t 会清零已分配的令牌数。
所以,记住这 2 个参数即可。

通过一个测试用例,观察 RateLimiter 的用法。

// org.apache.pulsar.common.util.RateLimiterTest#testMultipleAcquire
public void testMultipleAcquire() throws Exception {
    // 每过 1000ms 重置令牌数
    final long rateTimeMSec = 1000;
    // 令牌总数为 100
    final int permits = 100;
    final int acquirePermist = 50;
    RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
    long start = System.currentTimeMillis();
    for (int i = 0; i < permits / acquirePermist; i++) {
        // 1 次获取 50 个令牌,2 次申请完 100 个令牌
        rate.acquire(acquirePermist);
    }
    long end = System.currentTimeMillis();
    assertTrue((end - start) < rateTimeMSec);
    // 时间还不到 1000ms,令牌没有重置,则可用令牌仍为 0
    assertEquals(rate.getAvailablePermits(), 0);
    rate.close();
}

接下来看下实现:

org.apache.pulsar.common.util.RateLimiter
  // 令牌总数
  private long permits;
  // 当前已分配的令牌数
  private long acquiredPermits;
  
  // 清理已分配令牌数的定时任务 1. 线程池 2. 定时任务 3. 间隔时间
  private final ScheduledExecutorService executorService;
  private ScheduledFuture<?> renewTask;
  private long rateTime;
  // 提供了一个接口用来修改令牌总数
  private Supplier<Long> permitUpdater; 

申请 acquirePermit 个令牌,自旋模式

public synchronized void acquire(long acquirePermit) throws InterruptedException {
    checkArgument(!isClosed(), "Rate limiter is already shutdown");
    checkArgument(acquirePermit <= this.permits,
            "acquiring permits must be less or equal than initialized rate =" + this.permits);

    // 如果还没创建清理定时任务,则创建它
    if (renewTask == null) {
        renewTask = createTask();
    }

    boolean canAcquire = false;
    do {
        // 如果已分配令牌数小于总令牌数,可以分配
        canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
        if (!canAcquire) {
            // 阻塞当前线程
            wait();
        } else {
            // 增加已分配令牌数
            acquiredPermits += acquirePermit;
        }
    } while (!canAcquire);
}

申请 acquirePermit 个令牌,快速失败模式

public synchronized boolean tryAcquire(long acquirePermit) {
    checkArgument(!isClosed(), "Rate limiter is already shutdown");
    // lazy init and start task only once application start using it
    if (renewTask == null) {
        renewTask = createTask();
    }

    // acquired-permits can‘t be larger than the rate
    if (acquirePermit > this.permits) {
        acquiredPermits = this.permits;
        return false;
    }
    // 这里并没有严格限制,无所谓,没必要太精确
    boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
    if (canAcquire) {
        acquiredPermits += acquirePermit;
    }
    return canAcquire;
}

创建清理分配令牌数的定时任务

protected ScheduledFuture<?> createTask() {
    return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
}

清理已分配令牌数

synchronized void renew() {
    // 直接重置为 0
    acquiredPermits = 0;
    // 如果提供了这个对象,则用它的值来设置总令牌数
    if (permitUpdater != null) {
        long newPermitRate = permitUpdater.get();
        if (newPermitRate > 0) {
            setRate(newPermitRate);
        }
    }
    // 唤醒所有等待当前对象的线程
    notifyAll();
}

 

以上是关于pulsar 实现的一种 RateLimiter的主要内容,如果未能解决你的问题,请参考以下文章

微服务架构实战篇:Spring boot2.x + Guava 并使用RateLimiter实现秒杀限流demo

RateLimiter令牌桶算法浅析

RateLimiter 的底层实现是啥?

JAVA中限制接口流量的方法

RateLimiter 的底层实现是啥?

Pulsar 客户端线程平衡