Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实

Posted 李浩宇Alex

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实相关的知识,希望对你有一定的参考价值。

并发编程的三剑客

  • 缓存 缓存的目的是提升系统访问速度和增大系统处理容量。
  • 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开。
  • 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。

限流的思想

溢出思想:

速度控制

限流的算法

计数限流算法

固定窗口计数

实现原理
  • 固定窗口计数法思想比较简单,只需要确定两个参数:计数周期T及周期内最大访问(调用)数N。请求到达时使用以下流程进行操作:

  • 固定窗口计数实现简单,并且只需要记录上一个周期起始时间与周期内访问总数,几乎不消耗额外的存储空间。
算法缺陷

令牌桶算法

  • 令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
  • 桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。
  • 当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。
  • 如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。
优点

漏桶算法

  • 漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

  • 如上图就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。
  • 当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。

令牌桶和漏桶对比

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;

  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;
  • 漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;

  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

信号量的应用

  • 操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

  • 信号量的本质是控制某个资源可被同时访问的个数,在一定程度上可以控制某资源的访问频率,但不能精确控制。

限流的思想

Guava中的RateLimiter可以限制单进程中某个方法的速率,本文主要介绍如何使用,实现原理请参考文档:推荐:超详细的Guava RateLimiter限流原理解析和推荐:RateLimiter 源码分析(Guava 和 Sentinel 实现)。

Guava RateLimiter

原理:Guava RateLimiter基于令牌桶算法,

  • RateLimiter系统限制QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌。
  • 然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。

Guava RateLimiter 控制操作

Guava RateLimiter 限速手段

  • RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
  • RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。

Maven配置

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.0-jre</version>
</dependency>

Java简单案例

public class RateLimiterService {
    // 每秒发出5个令牌
    RateLimiter rateLimiter = RateLimiter.create(5);
    /**
     * 尝试获取令牌
     */
    public boolean tryAcquire() {
        return rateLimiter.tryAcquire();
    }
    public  void acquire() {
        rateLimiter.acquire();
    }
   public static void main(String[] args){
        if (accessLimitService.tryAcquire()) {
            log.info("start");
            // 模拟业务执行500毫秒
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "access success [" + LocalDateTime.now() + "]";
        } else {
            //log.warn("限流");
            return "access limit [" + LocalDateTime.now() + "]";
        }
      }
}

public void testMethod(){
    ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
            if (rateLimiter.tryAcquire()) {
                try {
                    log.info("start");
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
            } else {
                log.warn("限流");
            }
        }));

public void testMethod2(){
    ExecutorService pool = Executors.newFixedThreadPool(10);
        RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
        IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
            rateLimiter.acquire();
            log.info("start");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
        pool.shutdown();
      }
  }
}
public class GuavaRateLimiter {
    public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<String, RateLimiter>();
    //初始化限流工具RateLimiter
    static {
        createResourceRateLimiter("order", 50);
    }
    public static void createResourceRateLimiter(String resource, double qps) {
        if (resourceRateLimiter.contains(resource)) {
            resourceRateLimiter.get(resource).setRate(qps);
        } else {
            //创建限流工具,每秒发出50个令牌指令
            RateLimiter rateLimiter = RateLimiter.create(qps);
            resourceRateLimiter.putIfAbsent(resource, rateLimiter);
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 5000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //如果获得令牌指令,则执行业务逻辑
                    if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) {
                        System.out.println("执行业务逻辑");
                    } else {
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

方法摘要

限流及创建方法


create方法
public static RateLimiter create(double permitsPerSecond)
返回的RateLimiter
  • 确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。

  • 当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。

  • 当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。
参数:
  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • 抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
参数:
  • permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
  • warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
  • unit – 参数warmupPeriod 的时间单位
抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0

限流及阻塞方法

acquire



public double acquire()
返回:
acquire
public double acquire(int permits)
参数:
  • permits – 需要获取的许可数
返回:
  • 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0

tryAcquire

public boolean tryAcquire(long timeout,TimeUnit unit)
参数:
  • timeout – 等待许可的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位
返回:
  • true表示获取到许可,反之则是false
抛出:
  • IllegalArgumentException – 如果请求的许可数为负数或者为0

tryAcquire

public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
参数:
  • permits – 需要获取的许可数
  • timeout – 等待许可数的最大时间,负数以0处理
  • unit – 参数timeout 的时间单位
返回:
  • true表示获取到许可,反之则是false

限流及状态设置


public final void setRate(double permitsPerSecond)
参数:
  • permitsPerSecond – RateLimiter的新的稳定速率
抛出:
  • IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public final double getRate()

以上是关于Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实的主要内容,如果未能解决你的问题,请参考以下文章

#私藏项目实操分享#Java技术开发专题系列之Guava RateLimiter针对于限流器的入门到实战(含源码分析介绍)

Java技术专题「Guava技术系列」Guava-Collections实战使用相关Guava不一般的集合框架

Java技术指南「并发编程专题」Fork/Join框架基本使用和原理探究(原理及源码篇)

Java技术专题「提升篇」Guava Collections实战指南—挑战Guava不一般的集合框架

Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇

高并发编程专题说明