微服务之服务限流
Posted 猿呈志
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务之服务限流相关的知识,希望对你有一定的参考价值。
服务限流
服务限流是指当系统资源不够,不足以应对大量请求,即系统资源与访问量出现矛盾的时候,我们为了保证有限的资源能够正常服务,因此对系统按照预设的规则进行流量限制或功能限制的一种方法。
服务限流常用方法:
1、计数器方法
系统维护一个计数器,来一个请求就加1,请求处理完成就减1,当计数器大于指定的阈值,就拒绝新的请求。
2、队列方法
就是基于FIFO队列,所有请求都进入队列,后端程序从队列中取出待处理的请求依次处理。
3、令牌桶方法
首先还是要基于一个队列,请求放到队列里面。但除了队列以外,还要设置一个令牌桶,另外有一个脚本以持续恒定的速度往令牌桶里面放令牌,后端处理程序每处理一个请求就必须从桶里拿出一个令牌,如果令牌拿完了,那就不能处理请求了。我们可以控制脚本放令牌的速度来达到控制后端处理的速度,以实现动态流控。
nginx限流
1、控制速率
http {
limit_req_zone $binary_remote_addr zone=myRateLimit:10m rate=10r/s;
}
配置 server,使用 limit_req 指令应用限流。
server {
location / {
limit_req zone=myRateLimit;
proxy_pass http://my_upstream;
}
}
2、控制并发连接数
ngx_http_limit_conn_module 提供了限制连接数的能力,利用 limit_conn_zone 和 limit_conn 两个指令即可。
limit_conn_zone $binary_remote_addr zone=perip:10m;
limit_conn_zone $server_name zone=perserver:10m;
server {
...
limit_conn perip 10;
limit_conn perserver 100;
}
SpringBoot限流
SpringBoot可通过实现HandlerInterceptor接口实现统一限流。
public class LimiterInterceptor implements HandlerInterceptor {
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
//实现限流算法
return false;
}
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) throws Exception {
}
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {
}
}
也可以基于自定义注解的方式实现限流。
1、简单限流
采用Guava RateLimiter限流
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
if (rateLimiterBuilder == null) {
BeanFactory factory = WebApplicationContextUtils
.getRequiredWebApplicationContext(request.getServletContext());
rateLimiterBuilder = (RateLimiterBuilder) factory.getBean("rateLimiterBuilder");
}
String url = request.getRequestURI();
RateLimiter rateLimiter = rateLimiterBuilder.build(url);
// 能否在一秒内获取令牌
if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {
System.out.println("短期无法获取令牌,真不幸,排队也瞎排");
return false;
}
return true;
}
如果tryAcquire不能获取令牌的情况下,可抛出异常,利用SpringBoot的全局异常处理,实现前端的友好提示。
2、分布式限流
分布式限流的实现方式与简单限流一样,只需要将RateLimiter替换成分布式限流实现即可。
public class RedisLimiterBucket {
/**
* maxPermits 最大存储令牌数
*/
private Long maxPermits;
/**
* storedPermits 当前存储令牌数
*/
private Long storedPermits;
/**
* intervalMillis 添加令牌时间间隔
*/
private Long intervalMillis;
/**
* nextFreeTicketMillis 下次请求可以获取令牌的起始时间,默认当前系统时间
*/
private Long nextFreeTicketMillis;
public RedisLimiterBucket() {
}
/**
* @param permitsPerSecond
* 每秒放入的令牌数
* @param maxBurstSeconds
* maxPermits由此字段计算,最大存储maxBurstSeconds秒生成的令牌
*
*/
public RedisLimiterBucket(Double permitsPerSecond, Integer maxBurstSeconds) {
if (null == maxBurstSeconds) {
maxBurstSeconds = 60;
}
this.maxPermits = (long) (permitsPerSecond * maxBurstSeconds);
this.storedPermits = permitsPerSecond.longValue();
this.intervalMillis = (long) (TimeUnit.SECONDS.toMillis(1) / permitsPerSecond);
this.nextFreeTicketMillis = System.currentTimeMillis();
}
/**
* redis的过期时长
*
* @return
*/
public long expires() {
long now = System.currentTimeMillis();
return 2 * TimeUnit.MINUTES.toSeconds(1)
+ TimeUnit.MILLISECONDS.toSeconds(Math.max(nextFreeTicketMillis, now) - now);
}
/**
* 异步更新当前持有的令牌数
* 若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
*
* @param now
* @return
*/
public boolean reSync(long now) {
if (now > nextFreeTicketMillis) {
storedPermits = Math.min(maxPermits, storedPermits + (now - nextFreeTicketMillis) / intervalMillis);
nextFreeTicketMillis = now;
return true;
}
return false;
}
public Long getMaxPermits() {
return maxPermits;
}
public void setMaxPermits(Long maxPermits) {
this.maxPermits = maxPermits;
}
public Long getStoredPermits() {
return storedPermits;
}
public void setStoredPermits(Long storedPermits) {
this.storedPermits = storedPermits;
}
public Long getIntervalMillis() {
return intervalMillis;
}
public void setIntervalMillis(Long intervalMillis) {
this.intervalMillis = intervalMillis;
}
public Long getNextFreeTicketMillis() {
return nextFreeTicketMillis;
}
public void setNextFreeTicketMillis(Long nextFreeTicketMillis) {
this.nextFreeTicketMillis = nextFreeTicketMillis;
}
public String toString() {
return JsonUtil.toJson(this);
}
}
public class RedisLimiter {
/**
* redis key
*/
private String key;
/**
* redis分布式锁的key
*
* @return
*/
private String lockKey;
/**
* 每秒存入的令牌数
*/
private Double permitsPerSecond;
/**
* 最大存储maxBurstSeconds秒生成的令牌
*/
private Integer maxBurstSeconds;
/**
* 分布式同步锁
*/
private RedisDistributedLock syncLock;
/**
*
*/
private IRedisService redisService;
RedisLimiter(String key, Double permitsPerSecond, Integer maxBurstSeconds, RedisDistributedLock syncLock,
IRedisService redisService) {
this.key = key;
this.lockKey = "DISTRIBUTED_LOCK:" + key;
this.permitsPerSecond = permitsPerSecond;
this.maxBurstSeconds = maxBurstSeconds;
this.syncLock = syncLock;
this.redisService = redisService;
}
private RedisLimiterBucket putDefaultBucket() {
this.lock();
try {
String value = redisService.get(key);
if (StringUtils.isEmpty(value)) {
RedisLimiterBucket bucket = new RedisLimiterBucket(permitsPerSecond, maxBurstSeconds);
redisService.set(key, bucket.toString(), bucket.expires());
return bucket;
} else {
return JsonUtil.fromJson(value, RedisLimiterBucket.class);
}
} finally {
this.unlock();
}
}
/**
* 获取令牌桶
*
* @return
*/
public RedisLimiterBucket getBucket() {
String value = redisService.get(key);
if (StringUtils.isEmpty(value)) {
return putDefaultBucket();
}
return JsonUtil.fromJson(value, RedisLimiterBucket.class);
}
/**
* 更新令牌桶
*
* @param permits
*/
public void setBucket(RedisLimiterBucket bucket) {
redisService.set(key, bucket.toString(), bucket.expires());
}
/**
* 等待直到获取指定数量的令牌
*
* @param tokens
* @return
* @throws InterruptedException
*/
public Long acquire(Long tokens) {
long milliToWait = this.reserve(tokens);
try {
Thread.sleep(milliToWait);
} catch (InterruptedException e) {
}
return milliToWait;
}
/**
* 获取1一个令牌
*
* @return
* @throws InterruptedException
*/
public long acquire() {
return acquire(1L);
}
public Boolean tryAcquire(Long timeout) {
return tryAcquire(1L, timeout);
}
/**
*
* @param tokens
* 要获取的令牌数
* @param timeout
* 获取令牌等待的时间,负数被视为0
* @param unit
* @return
* @throws InterruptedException
*/
public Boolean tryAcquire(Long tokens, Long timeout) {
TimeUnit unit = TimeUnit.SECONDS;
long timeoutMicros = Math.max(unit.toMillis(timeout), 0);
checkTokens(tokens);
Long milliToWait;
try {
this.lock();
if (!this.canAcquire(tokens, timeoutMicros)) {
return false;
} else {
milliToWait = this.reserveAndGetWaitLength(tokens);
}
} finally {
this.unlock();
}
try {
Thread.sleep(milliToWait);
} catch (InterruptedException e) {
}
return true;
}
/**
* 获取令牌n个需要等待的时间
*
* @param tokens
* @return
*/
private long reserve(Long tokens) {
this.checkTokens(tokens);
try {
this.lock();
return this.reserveAndGetWaitLength(tokens);
} finally {
this.unlock();
}
}
/**
* 在等待的时间内是否可以获取到令牌
*
* @param tokens
* @param timeoutMillis
* @return
*/
private Boolean canAcquire(Long tokens, Long timeoutMillis) {
return queryEarliestAvailable(tokens) - timeoutMillis <= 0;
}
/**
* 返回获取{tokens}个令牌最早可用的时间
*
* @param tokens
* @return
*/
private Long queryEarliestAvailable(Long tokens) {
long n = System.currentTimeMillis();
RedisLimiterBucket bucket = this.getBucket();
bucket.reSync(n);
// 可以消耗的令牌数
long storedPermitsToSpend = Math.min(tokens, bucket.getStoredPermits());
// 需要等待的令牌数
long freshPermits = tokens - storedPermitsToSpend;
// 需要等待的时间
long waitMillis = freshPermits * bucket.getIntervalMillis();
return LongMath.saturatedAdd(bucket.getNextFreeTicketMillis() - n, waitMillis);
}
/**
* 预定@{tokens}个令牌并返回所需要等待的时间
*
* @param tokens
* @return
*/
private Long reserveAndGetWaitLength(Long tokens) {
long n = System.currentTimeMillis();
RedisLimiterBucket bucket = this.getBucket();
bucket.reSync(n);
// 可以消耗的令牌数
long storedPermitsToSpend = Math.min(tokens, bucket.getStoredPermits());
// 需要等待的令牌数
long freshPermits = tokens - storedPermitsToSpend;
// 需要等待的时间
long waitMillis = freshPermits * bucket.getIntervalMillis();
bucket.setNextFreeTicketMillis(LongMath.saturatedAdd(bucket.getNextFreeTicketMillis(), waitMillis));
bucket.setStoredPermits(bucket.getStoredPermits() - storedPermitsToSpend);
this.setBucket(bucket);
return bucket.getNextFreeTicketMillis() - n;
}
/**
* 加锁
*/
private void lock() {
syncLock.lock(lockKey);
}
/**
* 解锁
*/
private void unlock() {
syncLock.unlock(lockKey);
}
/**
* 校验token值
*
* @param tokens
*/
private void checkTokens(Long tokens) {
if (tokens < 0) {
throw new RuntimeException("Requested tokens " + tokens + " must be positive");
}
}
}
zuul限流
抽时间再写
dubbo限流
抽时间再写
以上是关于微服务之服务限流的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud微服务安全实战_6-1_微服务之间的通讯安全之概述
微服务之间的通讯安全-JWT优化之日志错误处理限流及JWT改造后执行流程梳理