Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实
Posted 李浩宇Alex
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实相关的知识,希望对你有一定的参考价值。
并发编程的三剑客
- 缓存 缓存的目的是提升系统访问速度和增大系统处理容量。
- 降级 降级是当服务出现问题或者影响到核心流程时,需要暂时屏蔽掉,待高峰或者问题解决后再打开。
- 限流 限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理。
限流的思想
溢出思想:
速度控制
限流的算法
计数限流算法
固定窗口计数
实现原理
-
固定窗口计数法思想比较简单,只需要确定两个参数:计数周期T及周期内最大访问(调用)数N。请求到达时使用以下流程进行操作:
- 固定窗口计数实现简单,并且只需要记录上一个周期起始时间与周期内访问总数,几乎不消耗额外的存储空间。
算法缺陷
令牌桶算法
- 令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。
- 桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。
- 当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。
- 如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。
优点
漏桶算法
-
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
- 如上图就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。
- 当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。
令牌桶和漏桶对比
-
令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
- 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;
-
漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
- 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;
信号量的应用
-
操作系统的信号量是个很重要的概念,Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
- 信号量的本质是控制某个资源可被同时访问的个数,在一定程度上可以控制某资源的访问频率,但不能精确控制。
限流的思想
Guava中的RateLimiter可以限制单进程中某个方法的速率,本文主要介绍如何使用,实现原理请参考文档:推荐:超详细的Guava RateLimiter限流原理解析和推荐:RateLimiter 源码分析(Guava 和 Sentinel 实现)。
Guava RateLimiter
原理:Guava RateLimiter基于令牌桶算法,
- RateLimiter系统限制QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌。
- 然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
Guava RateLimiter 控制操作
Guava RateLimiter 限速手段
- RateLimiter从概念上来讲,速率限制器会在可配置的速率下分配许可证。如果必要的话,每个acquire() 会阻塞当前线程直到许可证可用后获取该许可证。一旦获取到许可证,不需要再释放许可证。
- RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
Maven配置
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0-jre</version>
</dependency>
Java简单案例
public class RateLimiterService {
// 每秒发出5个令牌
RateLimiter rateLimiter = RateLimiter.create(5);
/**
* 尝试获取令牌
*/
public boolean tryAcquire() {
return rateLimiter.tryAcquire();
}
public void acquire() {
rateLimiter.acquire();
}
public static void main(String[] args){
if (accessLimitService.tryAcquire()) {
log.info("start");
// 模拟业务执行500毫秒
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "access success [" + LocalDateTime.now() + "]";
} else {
//log.warn("限流");
return "access limit [" + LocalDateTime.now() + "]";
}
}
}
public void testMethod(){
ExecutorService pool = Executors.newFixedThreadPool(10);
RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
if (rateLimiter.tryAcquire()) {
try {
log.info("start");
Thread.sleep(500);
} catch (InterruptedException e) {
}
} else {
log.warn("限流");
}
}));
public void testMethod2(){
ExecutorService pool = Executors.newFixedThreadPool(10);
RateLimiter rateLimiter = RateLimiter.create(5); // rate is "5 permits per second"
IntStream.range(0, 10).forEach(i -> pool.submit(() -> {
rateLimiter.acquire();
log.info("start");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
pool.shutdown();
}
}
}
public class GuavaRateLimiter {
public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<String, RateLimiter>();
//初始化限流工具RateLimiter
static {
createResourceRateLimiter("order", 50);
}
public static void createResourceRateLimiter(String resource, double qps) {
if (resourceRateLimiter.contains(resource)) {
resourceRateLimiter.get(resource).setRate(qps);
} else {
//创建限流工具,每秒发出50个令牌指令
RateLimiter rateLimiter = RateLimiter.create(qps);
resourceRateLimiter.putIfAbsent(resource, rateLimiter);
}
}
public static void main(String[] args) {
for (int i = 0; i < 5000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//如果获得令牌指令,则执行业务逻辑
if (resourceRateLimiter.get("order").tryAcquire(10, TimeUnit.MICROSECONDS)) {
System.out.println("执行业务逻辑");
} else {
System.out.println("限流");
}
}
}).start();
}
}
}
方法摘要
限流及创建方法
create方法
public static RateLimiter create(double permitsPerSecond)
返回的RateLimiter
-
确保了在平均情况下,每秒发布的许可数不会超过permitsPerSecond,每秒钟会持续发送请求。
-
当传入请求速率超过permitsPerSecond,速率限制器会每秒释放一个许可(1.0 / permitsPerSecond 这里是指设定了permitsPerSecond为1.0) 。
- 当速率限制器闲置时,允许许可数暴增到permitsPerSecond,随后的请求会被平滑地限制在稳定速率permitsPerSecond中。
参数:
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
- 抛出:
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit)
参数:
- permitsPerSecond – 返回的RateLimiter的速率,意味着每秒有多少个许可变成有效。
- warmupPeriod – 在这段时间内RateLimiter会增加它的速率,在抵达它的稳定速率或者最大速率之前
- unit – 参数warmupPeriod 的时间单位
抛出:
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
限流及阻塞方法
acquire
public double acquire()
返回:
acquire
public double acquire(int permits)
参数:
- permits – 需要获取的许可数
返回:
- 执行速率的所需要的睡眠时间,单位为妙;如果没有则返回0
抛出:
- IllegalArgumentException – 如果请求的许可数为负数或者为0
tryAcquire
public boolean tryAcquire(long timeout,TimeUnit unit)
参数:
- timeout – 等待许可的最大时间,负数以0处理
- unit – 参数timeout 的时间单位
返回:
- true表示获取到许可,反之则是false
抛出:
- IllegalArgumentException – 如果请求的许可数为负数或者为0
tryAcquire
public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
参数:
- permits – 需要获取的许可数
- timeout – 等待许可数的最大时间,负数以0处理
- unit – 参数timeout 的时间单位
返回:
- true表示获取到许可,反之则是false
限流及状态设置
public final void setRate(double permitsPerSecond)
参数:
- permitsPerSecond – RateLimiter的新的稳定速率
抛出:
- IllegalArgumentException – 如果permitsPerSecond为负数或者为0
public final double getRate()
以上是关于Java技术指南「并发编程专题」针对于Guava RateLimiter限流器的入门到精通(含实的主要内容,如果未能解决你的问题,请参考以下文章
#私藏项目实操分享#Java技术开发专题系列之Guava RateLimiter针对于限流器的入门到实战(含源码分析介绍)
Java技术专题「Guava技术系列」Guava-Collections实战使用相关Guava不一般的集合框架
Java技术指南「并发编程专题」Fork/Join框架基本使用和原理探究(原理及源码篇)
Java技术专题「提升篇」Guava Collections实战指南—挑战Guava不一般的集合框架