微服务之服务限流

Posted 猿呈志

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务之服务限流相关的知识,希望对你有一定的参考价值。

服务限流

服务限流是指当系统资源不够,不足以应对大量请求,即系统资源与访问量出现矛盾的时候,我们为了保证有限的资源能够正常服务,因此对系统按照预设的规则进行流量限制或功能限制的一种方法。

服务限流常用方法:

1、计数器方法

系统维护一个计数器,来一个请求就加1,请求处理完成就减1,当计数器大于指定的阈值,就拒绝新的请求。

2、队列方法

就是基于FIFO队列,所有请求都进入队列,后端程序从队列中取出待处理的请求依次处理。

3、令牌桶方法

首先还是要基于一个队列,请求放到队列里面。但除了队列以外,还要设置一个令牌桶,另外有一个脚本以持续恒定的速度往令牌桶里面放令牌,后端处理程序每处理一个请求就必须从桶里拿出一个令牌,如果令牌拿完了,那就不能处理请求了。我们可以控制脚本放令牌的速度来达到控制后端处理的速度,以实现动态流控。

nginx限流

1、控制速率

http { limit_req_zone $binary_remote_addr zone=myRateLimit:10m rate=10r/s; }

配置 server,使用 limit_req 指令应用限流。

server { location / { limit_req zone=myRateLimit; proxy_pass http://my_upstream; } }

2、控制并发连接数

ngx_http_limit_conn_module 提供了限制连接数的能力,利用 limit_conn_zone 和 limit_conn 两个指令即可。

limit_conn_zone $binary_remote_addr zone=perip:10m;limit_conn_zone $server_name zone=perserver:10m;
server { ... limit_conn perip 10; limit_conn perserver 100;}

SpringBoot限流

SpringBoot可通过实现HandlerInterceptor接口实现统一限流。

public class LimiterInterceptor implements HandlerInterceptor {
@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //实现限流算法 return false; }
@Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { }
@Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { }
}

也可以基于自定义注解的方式实现限流。

1、简单限流

采用Guava RateLimiter限流

@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (rateLimiterBuilder == null) { BeanFactory factory = WebApplicationContextUtils .getRequiredWebApplicationContext(request.getServletContext()); rateLimiterBuilder = (RateLimiterBuilder) factory.getBean("rateLimiterBuilder"); } String url = request.getRequestURI(); RateLimiter rateLimiter = rateLimiterBuilder.build(url); // 能否在一秒内获取令牌 if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) { System.out.println("短期无法获取令牌,真不幸,排队也瞎排"); return false; } return true; }

如果tryAcquire不能获取令牌的情况下,可抛出异常,利用SpringBoot的全局异常处理,实现前端的友好提示。

2、分布式限流

分布式限流的实现方式与简单限流一样,只需要将RateLimiter替换成分布式限流实现即可。

public class RedisLimiterBucket { /** * maxPermits 最大存储令牌数 */ private Long maxPermits; /** * storedPermits 当前存储令牌数 */ private Long storedPermits; /** * intervalMillis 添加令牌时间间隔 */ private Long intervalMillis; /** * nextFreeTicketMillis 下次请求可以获取令牌的起始时间,默认当前系统时间 */ private Long nextFreeTicketMillis;
public RedisLimiterBucket() {
}
/** * @param permitsPerSecond * 每秒放入的令牌数 * @param maxBurstSeconds * maxPermits由此字段计算,最大存储maxBurstSeconds秒生成的令牌 * */ public RedisLimiterBucket(Double permitsPerSecond, Integer maxBurstSeconds) { if (null == maxBurstSeconds) { maxBurstSeconds = 60; } this.maxPermits = (long) (permitsPerSecond * maxBurstSeconds); this.storedPermits = permitsPerSecond.longValue(); this.intervalMillis = (long) (TimeUnit.SECONDS.toMillis(1) / permitsPerSecond); this.nextFreeTicketMillis = System.currentTimeMillis(); }
/** * redis的过期时长 * * @return */ public long expires() { long now = System.currentTimeMillis(); return 2 * TimeUnit.MINUTES.toSeconds(1) + TimeUnit.MILLISECONDS.toSeconds(Math.max(nextFreeTicketMillis, now) - now); }
/** * 异步更新当前持有的令牌数 * 若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据 * * @param now * @return */ public boolean reSync(long now) { if (now > nextFreeTicketMillis) { storedPermits = Math.min(maxPermits, storedPermits + (now - nextFreeTicketMillis) / intervalMillis); nextFreeTicketMillis = now; return true; } return false; }
public Long getMaxPermits() { return maxPermits; }
public void setMaxPermits(Long maxPermits) { this.maxPermits = maxPermits; }
public Long getStoredPermits() { return storedPermits; }
public void setStoredPermits(Long storedPermits) { this.storedPermits = storedPermits; }
public Long getIntervalMillis() { return intervalMillis; }
public void setIntervalMillis(Long intervalMillis) { this.intervalMillis = intervalMillis; }
public Long getNextFreeTicketMillis() { return nextFreeTicketMillis; }
public void setNextFreeTicketMillis(Long nextFreeTicketMillis) { this.nextFreeTicketMillis = nextFreeTicketMillis; }
@Override public String toString() { return JsonUtil.toJson(this); }
}
public class RedisLimiter {
/** * redis key */ private String key; /** * redis分布式锁的key * * @return */ private String lockKey; /** * 每秒存入的令牌数 */ private Double permitsPerSecond; /** * 最大存储maxBurstSeconds秒生成的令牌 */ private Integer maxBurstSeconds; /** * 分布式同步锁 */ private RedisDistributedLock syncLock;
/** * */ private IRedisService redisService;
RedisLimiter(String key, Double permitsPerSecond, Integer maxBurstSeconds, RedisDistributedLock syncLock, IRedisService redisService) { this.key = key; this.lockKey = "DISTRIBUTED_LOCK:" + key; this.permitsPerSecond = permitsPerSecond; this.maxBurstSeconds = maxBurstSeconds; this.syncLock = syncLock; this.redisService = redisService; }
private RedisLimiterBucket putDefaultBucket() { this.lock(); try { String value = redisService.get(key); if (StringUtils.isEmpty(value)) { RedisLimiterBucket bucket = new RedisLimiterBucket(permitsPerSecond, maxBurstSeconds); redisService.set(key, bucket.toString(), bucket.expires()); return bucket; } else { return JsonUtil.fromJson(value, RedisLimiterBucket.class); } } finally { this.unlock(); }
}
/** * 获取令牌桶 * * @return */ public RedisLimiterBucket getBucket() { String value = redisService.get(key); if (StringUtils.isEmpty(value)) { return putDefaultBucket(); } return JsonUtil.fromJson(value, RedisLimiterBucket.class); }
/** * 更新令牌桶 * * @param permits */ public void setBucket(RedisLimiterBucket bucket) { redisService.set(key, bucket.toString(), bucket.expires()); }
/** * 等待直到获取指定数量的令牌 * * @param tokens * @return * @throws InterruptedException */ public Long acquire(Long tokens) { long milliToWait = this.reserve(tokens); try { Thread.sleep(milliToWait); } catch (InterruptedException e) { } return milliToWait; }
/** * 获取1一个令牌 * * @return * @throws InterruptedException */ public long acquire() { return acquire(1L); }
public Boolean tryAcquire(Long timeout) { return tryAcquire(1L, timeout); }
/** * * @param tokens * 要获取的令牌数 * @param timeout * 获取令牌等待的时间,负数被视为0 * @param unit * @return * @throws InterruptedException */ public Boolean tryAcquire(Long tokens, Long timeout) { TimeUnit unit = TimeUnit.SECONDS; long timeoutMicros = Math.max(unit.toMillis(timeout), 0); checkTokens(tokens); Long milliToWait; try { this.lock(); if (!this.canAcquire(tokens, timeoutMicros)) { return false; } else { milliToWait = this.reserveAndGetWaitLength(tokens); } } finally { this.unlock(); } try { Thread.sleep(milliToWait); } catch (InterruptedException e) { } return true; }
/** * 获取令牌n个需要等待的时间 * * @param tokens * @return */ private long reserve(Long tokens) { this.checkTokens(tokens); try { this.lock(); return this.reserveAndGetWaitLength(tokens); } finally { this.unlock(); } }
/** * 在等待的时间内是否可以获取到令牌 * * @param tokens * @param timeoutMillis * @return */ private Boolean canAcquire(Long tokens, Long timeoutMillis) { return queryEarliestAvailable(tokens) - timeoutMillis <= 0; }
/** * 返回获取{tokens}个令牌最早可用的时间 * * @param tokens * @return */ private Long queryEarliestAvailable(Long tokens) { long n = System.currentTimeMillis(); RedisLimiterBucket bucket = this.getBucket(); bucket.reSync(n); // 可以消耗的令牌数 long storedPermitsToSpend = Math.min(tokens, bucket.getStoredPermits()); // 需要等待的令牌数 long freshPermits = tokens - storedPermitsToSpend; // 需要等待的时间 long waitMillis = freshPermits * bucket.getIntervalMillis(); return LongMath.saturatedAdd(bucket.getNextFreeTicketMillis() - n, waitMillis); }
/** * 预定@{tokens}个令牌并返回所需要等待的时间 * * @param tokens * @return */ private Long reserveAndGetWaitLength(Long tokens) { long n = System.currentTimeMillis(); RedisLimiterBucket bucket = this.getBucket(); bucket.reSync(n); // 可以消耗的令牌数 long storedPermitsToSpend = Math.min(tokens, bucket.getStoredPermits()); // 需要等待的令牌数 long freshPermits = tokens - storedPermitsToSpend; // 需要等待的时间 long waitMillis = freshPermits * bucket.getIntervalMillis(); bucket.setNextFreeTicketMillis(LongMath.saturatedAdd(bucket.getNextFreeTicketMillis(), waitMillis)); bucket.setStoredPermits(bucket.getStoredPermits() - storedPermitsToSpend); this.setBucket(bucket); return bucket.getNextFreeTicketMillis() - n; }
/** * 加锁 */ private void lock() { syncLock.lock(lockKey); }
/** * 解锁 */ private void unlock() { syncLock.unlock(lockKey); }
/** * 校验token值 * * @param tokens */ private void checkTokens(Long tokens) { if (tokens < 0) { throw new RuntimeException("Requested tokens " + tokens + " must be positive"); } }
}

zuul限流

抽时间再写

dubbo限流

抽时间再写

以上是关于微服务之服务限流的主要内容,如果未能解决你的问题,请参考以下文章

鹰眼跟踪限流降级,EDAS的微服务解决之道

Spring Cloud微服务安全实战_6-1_微服务之间的通讯安全之概述

微服务之间的通讯安全-JWT优化之日志错误处理限流及JWT改造后执行流程梳理

微服务治理之如何优雅应对突发流量洪峰

spring cloud微服务快速教程之 Spring Cloud Alibaba--sentinel-限流熔断降级

架构设计之「服务限流」