通关分布式系统「限流问题」?来一篇源码实战

Posted 搜云库技术团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通关分布式系统「限流问题」?来一篇源码实战相关的知识,希望对你有一定的参考价值。

在分布式领域,我们难免会遇到并发量突增,对后端服务造成高压力,严重甚至会导致系统宕机。为避免这种问题,我们通常会为接口添加限流、降级、熔断等能力,从而使接口更为健壮。Java领域常见的开源组件有Netflix的hystrix,阿里系开源的sentinel等,都是蛮不错的限流熔断框架。

注 意
文末有:3625页互联网大厂面试题

今天我们就基于Redis组件的特性,实现一个分布式限流组件,名字就定为shield-ratelimiter。

原理

首先解释下为何采用Redis作为限流组件的核心。

通俗地讲,假设一个用户(用IP判断)每秒访问某服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并设置键的过期时间为60秒。

当一个用户对此服务接口发起一次访问就把键值加1,在单位时间(此处为1s)内当键值增加到10的时候,就禁止访问服务接口。PS:在某种场景中添加访问时间间隔还是很有必要的。我们本次不考虑间隔时间,只关注单位时间内的访问次数。

需求

原理已经讲过了,说下需求。

1、基于Redis的incr及过期机制开发 2、调用方便,声明式 3、Spring支持

基于上述需求,我们决定基于注解方式进行核心功能开发,基于Spring-boot-starter作为基础环境,从而能够很好的适配Spring环境。

另外,在本次开发中,我们不通过简单的调用Redis的java类库API实现对Redis的incr操作。

原因在于,我们要保证整个限流的操作是原子性的,如果用Java代码去做操作及判断,会有并发问题。这里我决定采用Lua脚本进行核心逻辑的定义。

为何使用Lua

在正式开发前,我简单介绍下对Redis的操作中,为何推荐使用Lua脚本。

1、减少网络开销: 不使用 Lua 的代码需要向 Redis 发送多次请求, 而脚本只需一次即可, 减少网络传输; 2、原子操作: Redis 将整个脚本作为一个原子执行, 无需担心并发, 也就无需事务; 3、复用: 脚本会永久保存 Redis 中, 其他客户端可继续使用.

正式开发

到这里,我们正式开始手写限流组件的进程。

1. 工程定义

项目基于maven构建,主要依赖Spring-boot-starter,我们主要在springboot上进行开发,因此自定义的开发包可以直接依赖下面这个坐标,方便进行包管理。版本号自行选择稳定版。

 
   
   
 
  1. <dependency>

  2. <groupId>org.springframework.boot</groupId>

  3. <artifactId>spring-boot-starter</artifactId>

  4. <version>1.4.2.RELEASE</version>

  5. </dependency>

2. Redis整合

由于我们是基于Redis进行的限流操作,因此需要整合Redis的类库,上面已经讲到,我们是基于Springboot进行的开发,因此这里可以直接整合RedisTemplate。

2.1 坐标引入

这里我们引入spring-boot-starter-redis的依赖。

 
   
   
 
  1. <dependency>

  2. <groupId>org.springframework.boot</groupId>

  3. <artifactId>spring-boot-starter-redis</artifactId>

  4. <version>1.4.2.RELEASE</version>

  5. </dependency>

2.2 注入CacheManager及RedisTemplate

新建一个Redis的配置类,命名为RedisCacheConfig,使用javaconfig形式注入CacheManager及RedisTemplate。为了操作方便,我们采用了Jackson进行序列化。代码如下

 
   
   
 
  1. @Configuration

  2. @EnableCaching

  3. public class RedisCacheConfig {


  4. private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class);


  5. @Bean

  6. public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) {

  7. CacheManager cacheManager = new RedisCacheManager(redisTemplate);

  8. if (LOGGER.isDebugEnabled()) {

  9. LOGGER.debug("Springboot Redis cacheManager 加载完成");

  10. }

  11. return cacheManager;

  12. }


  13. @Bean

  14. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

  15. RedisTemplate<String, Object> template = new RedisTemplate<>();

  16. template.setConnectionFactory(factory);


  17. //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)

  18. Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);


  19. ObjectMapper mapper = new ObjectMapper();

  20. mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

  21. mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

  22. serializer.setObjectMapper(mapper);


  23. template.setValueSerializer(serializer);

  24. //使用StringRedisSerializer来序列化和反序列化redis的key值

  25. template.setKeySerializer(new StringRedisSerializer());

  26. template.afterPropertiesSet();

  27. LOGGER.info("Springboot RedisTemplate 加载完成");

  28. return template;

  29. }

  30. }

注意 要使用 @Configuration 标注此类为一个配置类,当然你可以使用 @Component, 但是不推荐,原因在于 @Component 注解虽然也可以当作配置类,但是并不会为其生成CGLIB代理Class,而使用@Configuration,CGLIB会为其生成代理类,进行性能的提升。

2.3 调用方application.propertie需要增加Redis配置

我们的包开发完毕之后,调用方的application.properties需要进行相关配置如下:

 
   
   
 
  1. #单机模式redis

  2. spring.redis.host=127.0.0.1

  3. spring.redis.port=6379

  4. spring.redis.pool.maxActive=8

  5. spring.redis.pool.maxWait=-1

  6. spring.redis.pool.maxIdle=8

  7. spring.redis.pool.minIdle=0

  8. spring.redis.timeout=10000

  9. spring.redis.password=

如果有密码的话,配置password即可。

这里为单机配置,如果需要支持哨兵集群,则配置如下,Java代码不需要改动,只需要变动配置即可。注意 两种配置不能共存!

 
   
   
 
  1. #哨兵集群模式

  2. # database name

  3. spring.redis.database=0

  4. # server password 密码,如果没有设置可不配

  5. spring.redis.password=

  6. # pool settings ...池配置

  7. spring.redis.pool.max-idle=8

  8. spring.redis.pool.min-idle=0

  9. spring.redis.pool.max-active=8

  10. spring.redis.pool.max-wait=-1

  11. # name of Redis server 哨兵监听的Redis server的名称

  12. spring.redis.sentinel.master=mymaster

  13. # comma-separated list of host:port pairs 哨兵的配置列表

  14. spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579

3. 定义注解

为了调用方便,我们定义一个名为RateLimiter 的注解,内容如下

 
   
   
 
  1. /**

  2. * @className RateLimiter

  3. * @desc 限流注解

  4. */

  5. @Target(ElementType.METHOD)

  6. @Retention(RetentionPolicy.RUNTIME)

  7. @Documented

  8. public @interface RateLimiter {


  9. /**

  10. * 限流key

  11. * @return

  12. */

  13. String key() default "rate:limiter";

  14. /**

  15. * 单位时间限制通过请求数

  16. * @return

  17. */

  18. long limit() default 10;


  19. /**

  20. * 过期时间,单位秒

  21. * @return

  22. */

  23. long expire() default 1;

  24. }

该注解明确只用于方法,主要有三个属性。

1、key–表示限流模块名,指定该值用于区分不同应用,不同场景,推荐格式为:应用名:模块名:ip:接口名:方法名 2、limit–表示单位时间允许通过的请求数 3、expire–incr的值的过期时间,业务中表示限流的单位时间。

4. 解析注解

定义好注解后,需要开发注解使用的切面,这里我们直接使用aspectj进行切面的开发。先看代码

 
   
   
 
  1. @Aspect

  2. @Component

  3. public class RateLimterHandler {

  4. private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class);@

  5. Autowired

  6. RedisTemplate redisTemplate;

  7. private DefaultRedisScript < Long > getRedisScript;@

  8. PostConstruct

  9. public void init() {

  10. getRedisScript = new DefaultRedisScript < > ();

  11. getRedisScript.setResultType(Long.class);

  12. getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua")));

  13. LOGGER.info("RateLimterHandler[分布式限流处理器]脚本加载完成");

  14. }

  15. }

这里是注入了RedisTemplate,使用其API进行Lua脚本的调用。

init() 方法在应用启动时会初始化DefaultRedisScript,并加载Lua脚本,方便进行调用。

PS: Lua脚本放置在classpath下,通过ClassPathResource进行加载。

 
   
   
 
  1. @Pointcut("@annotation(com.snowalker.shield.ratelimiter.core.annotation.RateLimiter)")

  2. public void rateLimiter() {}

这里我们定义了一个切点,表示只要注解了 @RateLimiter 的方法,均可以触发限流操作。

 
   
   
 
  1. @Around("@annotation(rateLimiter)")

  2. public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable {

  3. if (LOGGER.isDebugEnabled()) {

  4. LOGGER.debug("RateLimterHandler[分布式限流处理器]开始执行限流操作");

  5. }

  6. Signature signature = proceedingJoinPoint.getSignature();

  7. if (!(signature instanceof MethodSignature)) {

  8. throw new IllegalArgumentException("the Annotation @RateLimter must used on method!");

  9. }

  10. /**

  11. * 获取注解参数

  12. */

  13. // 限流模块key

  14. String limitKey = rateLimiter.key();

  15. Preconditions.checkNotNull(limitKey);

  16. // 限流阈值

  17. long limitTimes = rateLimiter.limit();

  18. // 限流超时时间

  19. long expireTime = rateLimiter.expire();

  20. if (LOGGER.isDebugEnabled()) {

  21. LOGGER.debug("RateLimterHandler[分布式限流处理器]参数值为-limitTimes={},limitTimeout={}", limitTimes, expireTime);

  22. }

  23. /**

  24. * 执行Lua脚本

  25. */

  26. List < String > keyList = new ArrayList();

  27. // 设置key值为注解中的值

  28. keyList.add(limitKey);

  29. /**

  30. * 调用脚本并执行

  31. */

  32. Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes);

  33. if (result == 0) {

  34. String msg = "由于超过单位时间=" + expireTime + "-允许的请求次数=" + limitTimes + "[触发限流]";

  35. LOGGER.debug(msg);

  36. return "false";

  37. }

  38. if (LOGGER.isDebugEnabled()) {

  39. LOGGER.debug("RateLimterHandler[分布式限流处理器]限流执行结果-result={},请求[正常]响应", result);

  40. }

  41. return proceedingJoinPoint.proceed();

  42. }

这段代码的逻辑为,获取 @RateLimiter 注解配置的属性:key、limit、expire,并通过 redisTemplate.execute(RedisScriptscript,Listkeys,Objectargs)方法传递给Lua脚本进行限流相关操作,逻辑很清晰。

这里我们定义如果脚本返回状态为0则为触发限流,1表示正常请求。

5. Lua脚本

这里是我们整个限流操作的核心,通过执行一个Lua脚本进行限流的操作。脚本内容如下

 
   
   
 
  1. --获取KEY

  2. local key1 = KEYS[1]


  3. local val = redis.call('incr', key1)

  4. local ttl = redis.call('ttl', key1)


  5. --获取ARGV内的参数并打印

  6. local expire = ARGV[1]

  7. local times = ARGV[2]


  8. redis.log(redis.LOG_DEBUG,tostring(times))

  9. redis.log(redis.LOG_DEBUG,tostring(expire))


  10. redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val);

  11. if val == 1 then

  12. redis.call('expire', key1, tonumber(expire))

  13. else

  14. if ttl == -1 then

  15. redis.call('expire', key1, tonumber(expire))

  16. end

  17. end


  18. if val > tonumber(times) then

  19. return 0

  20. end


  21. return 1

逻辑很通俗,我简单介绍下。

1、首先脚本获取Java代码中传递而来的要限流的模块的key,不同的模块key值一定不能相同,否则会覆盖!

2、redis.call(‘incr’, key1)对传入的key做incr操作,如果key首次生成,设置超时时间ARGV[1];(初始值为1)

3、ttl是为防止某些key在未设置超时时间并长时间已经存在的情况下做的保护的判断;

当过期后,又是新的一轮循环,整个过程是一个原子性的操作,能够保证单位时间不会超过我们预设的请求阈值。

到这里我们便可以在项目中进行测试。

测试

这里我贴一下核心代码,我们定义一个接口,并注解 @RateLimiter(key=ratedemo:1.0.0”,limit=5,expire=100) 表示模块 ratedemo:sendPayment:1.0.0在100s内允许通过5个请求,这里的参数设置是为了方便看结果。实际中,我们通常会设置1s内允许通过的次数。

 
   
   
 
  1. @Controller

  2. public class TestController {

  3. private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);@

  4. ResponseBody@ RequestMapping("ratelimiter")@ RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100)

  5. public String sendPayment(HttpServletRequest request) throws Exception {

  6. return "正常请求";

  7. }

  8. }

我们通过RestClient请求接口,日志返回如下:

 
   
   
 
  1. 2018-10-28 00:00:00.602 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler :

  2. RateLimterHandler[分布式限流处理器]开始执行限流操作

  3. 2018-10-28 00:00:00.688 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler :

  4. RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应


  5. 2018-10-28 00:00:00.860 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler :

  6. RateLimterHandler[分布式限流处理器]开始执行限流操作

  7. 2018-10-28 00:00:01.183 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :

  8. RateLimterHandler[分布式限流处理器]开始执行限流操作

  9. 2018-10-28 00:00:01.520 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler :

  10. RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应

  11. 2018-10-28 00:00:01.521 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :

  12. RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应


  13. 2018-10-28 00:00:01.557 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler :

  14. RateLimterHandler[分布式限流处理器]开始执行限流操作

  15. 2018-10-28 00:00:01.558 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler :

  16. RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应


  17. 2018-10-28 00:00:01.774 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler :

  18. RateLimterHandler[分布式限流处理器]开始执行限流操作

  19. 2018-10-28 00:00:02.111 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler :

  20. RateLimterHandler[分布式限流处理器]开始

  21. 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler :

  22. RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应


  23. 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler :

  24. 由于超过单位时间=100-允许的请求次数=5[触发限流]

  25. 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :

  26. RateLimterHandler[分布式限流处理器]开始执行限流操作

  27. 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :

  28. RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100

  29. 2018-10-28 00:00:02.278 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler :

  30. 由于超过单位时间=100-允许的请求次数=5[触发限流]

  31. 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :

  32. RateLimterHandler[分布式限流处理器]开始执行限流操作

  33. 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :

  34. RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100

  35. 2018-10-28 00:00:02.446 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler :

  36. 由于超过单位时间=100-允许的请求次数=5[触发限流]

  37. 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :

  38. RateLimterHandler[分布式限流处理器]开始执行限流操作

  39. 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :

  40. RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100

  41. 2018-10-28 00:00:02.629 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler :

  42. 由于超过单位时间=100-允许的请求次数=5[触发限流]

总结

我们通过Redis的incr及expire功能特性,开发定义了一套基于注解的分布式限流操作,核心逻辑基于Lua保证了原子性。达到了很好的限流的目的,生产上,可以基于该特点进行定制自己的限流组件,当然你可以参考本文的代码,相信你写的一定比我的demo更好!

https://github.com/TaXueWWL/shleld-ratelimter


往期推荐




第2版:互联网大厂面试题

包括 Java 集合、JVM、多线程、并发编程、设计模式、算法调优Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、mysql、RabbitMQ、Kafka、Linux、Netty、Tomcat、Python、html、CSS、Vue、React、javascriptandroid 大数据、阿里巴巴等大厂面试题等、等技术栈!

阅读原文: 高清 3625页大厂面试题  PDF

以上是关于通关分布式系统「限流问题」?来一篇源码实战的主要内容,如果未能解决你的问题,请参考以下文章

如何利用redis来进行分布式集群系统的限流设计

实战:使用 Nginx 限流

高并发服务限流实践

重学SpringCloud系列八之分布式系统流量卫兵sentinel

实战如何亲手搭建一个分布式 IM(即时通讯) 系统

理论+算法+实战,教你如何实现亿级流量下的分布式限流