基于redis的分布式RateLimiter(限流)实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于redis的分布式RateLimiter(限流)实现相关的知识,希望对你有一定的参考价值。

参考技术A

系统需要对接某IM厂商rest接口,向客户端推送消息(以及其他IM业务)
该厂商对rest接口调用有频率限制:总rest调用9000次/30s;消息推送600次/30s
系统为分布式集群,需要控制整个分布式集群总的接口调用频率满足以上限制

上篇文章 《 Guava RateLimiter源码解析 》中介绍了Guava RateLimiter的用法及原理,但为什么不直接使用Guava RateLimiter?原因有二:

为什么说选用redis是合理的?

这里完全参考Guava RateLimiter实现思路,不同的是,Guava将令牌桶数据存放于对象(内存)中,这里讲令牌桶数据存放在redis中,奉上源码 https://github.com/manerfan/m...

首先创建令牌桶数据模型

reSync函数同样是为了解决令牌桶数据更新问题,在每次获取令牌之前调用,这里不多介绍
expires函数计算redis数据过期时间。同样的例子,某接口需要分别对每个用户做访问频率限制,假设系统中存在6W用户,则至多需要在redis中创建6W条数据,对于长期运行的系统,这个数字会只增不减,这对于redis来说也是一个不小的挑战(虽然示例中的数字相对较小)。为了减轻redis压力,需要对令牌桶数据做过期处理,对于使用频率不是很高的业务场景,可以及时清理。

putDefaultPermits用于生成默认令牌桶并存入redis
permits的getter setter方法实现了redis中令牌桶的获取及更新
now用于获取redis服务器的时间,这样便能保证分布式集群中各节点对数据处理的一致性

该函数与reserveAndGetWaitLength等同,只是为了避免并发问题而添加了同步锁

需要注意,这里与Guava RateLimiter不同的是, Guava中的返回是更新前的(上次请求计算的)nextFreeTicketMicros,本次请求通过为上次请求的预消费行为埋单而实现突发请求的处理;这里返回的是由于桶中令牌不足而需要真真切切等待的时间

分布式架构(10)---基于Redis组件的特性,实现一个分布式限流

分布式---基于Redis进行接口IP限流

场景 为了防止我们的接口被人恶意访问,比如有人通过JMeter工具频繁访问我们的接口,导致接口响应变慢甚至崩溃,所以我们需要对一些特定的接口进行IP限流,即一定时间内同一IP访问的次数是有限的。

实现原理 用Redis作为限流组件的核心的原理,将用户的IP地址当Key,一段时间内访问次数为value,同时设置该Key过期时间。

比如某接口设置相同IP10秒内请求5次,超过5次不让访问该接口。

1. 第一次该IP地址存入redis的时候,key值为IP地址,value值为1,设置key值过期时间为10秒。
2. 第二次该IP地址存入redis时,如果key没有过期,那么更新value为2。
3. 以此类推当value已经为5时,如果下次该IP地址在存入redis同时key还没有过期,那么该Ip就不能访问了。
4. 当10秒后,该key值过期,那么该IP地址再进来,value又从1开始,过期时间还是10秒,这样反反复复。

说明从上面的逻辑可以看出,是一时间段内访问次数受限,不是完全不让该IP访问接口。

技术框架 SpringBoot + RedisTemplate (采用自定义注解完成)

这个可以用于真实项目开发场景。

一、代码

1、自定义注解

这边采用自定义注解的目的就是,在接口上使用自定义注解,让代码看去非常整洁。

IpLimiter

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface IpLimiter 
    /**
     * 限流ip
     */
    String ipAdress() ;
    /**
     * 单位时间限制通过请求数
     */
    long limit() default 10;
    /**
     * 单位时间,单位秒
     */
    long time() default 1;
    /**
     * 达到限流提示语
     */
    String message();

2、测试接口

在接口上使用了自定义注解@IpLimiter

@Controller
public class IpController 
    
    private static final Logger LOGGER = LoggerFactory.getLogger(IpController.class);
    private static final String MESSAGE = "请求失败,你的IP访问太频繁";

    //这里就不获取请求的ip,而是写死一个IP
    @ResponseBody
    @RequestMapping("iplimiter")
    @IpLimiter(ipAdress = "127.198.66.01", limit = 5, time = 10, message = MESSAGE)
    public String sendPayment(HttpServletRequest request) throws Exception 
        return "请求成功";
    
    @ResponseBody
    @RequestMapping("iplimiter1")
    @IpLimiter(ipAdress = "127.188.145.54", limit = 4, time = 10, message = MESSAGE)
    public String sendPayment1(HttpServletRequest request) throws Exception 
        return "请求成功";
    

3、处理IpLimter注解的AOP

这边采用切面的方式处理自定义注解。同时为了保证原子性,这边写了redis脚本ipLimiter.lua来执行redis命令,来保证操作原子性。

@Aspect
@Component
public class IpLimterHandler 

    private static final Logger LOGGER = LoggerFactory.getLogger(IpLimterHandler.class);

    @Autowired
    RedisTemplate redisTemplate;

    /**
     * getRedisScript 读取脚本工具类
     * 这里设置为Long,是因为ipLimiter.lua 脚本返回的是数字类型
     */
    private DefaultRedisScript<Long> getRedisScript;

    @PostConstruct
    public void init() 
        getRedisScript = new DefaultRedisScript<>();
        getRedisScript.setResultType(Long.class);
        getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("ipLimiter.lua")));
        LOGGER.info("IpLimterHandler[分布式限流处理器]脚本加载完成");
    

    /**
     * 这个切点可以不要,因为下面的本身就是个注解
     */
//    @Pointcut("@annotation(com.jincou.iplimiter.annotation.IpLimiter)")
//    public void rateLimiter() 

    /**
     * 如果保留上面这个切点,那么这里可以写成
     * @Around("rateLimiter()&&@annotation(ipLimiter)")
     */
    @Around("@annotation(ipLimiter)")
    public Object around(ProceedingJoinPoint proceedingJoinPoint, IpLimiter ipLimiter) throws Throwable 
        if (LOGGER.isDebugEnabled()) 
            LOGGER.debug("IpLimterHandler[分布式限流处理器]开始执行限流操作");
        
        Signature signature = proceedingJoinPoint.getSignature();
        if (!(signature instanceof MethodSignature)) 
            throw new IllegalArgumentException("the Annotation @IpLimter must used on method!");
        
        /**
         * 获取注解参数
         */
        // 限流模块IP
        String limitIp = ipLimiter.ipAdress();
        Preconditions.checkNotNull(limitIp);
        // 限流阈值
        long limitTimes = ipLimiter.limit();
        // 限流超时时间
        long expireTime = ipLimiter.time();
        if (LOGGER.isDebugEnabled()) 
            LOGGER.debug("IpLimterHandler[分布式限流处理器]参数值为-limitTimes=,limitTimeout=", limitTimes, expireTime);
        
        // 限流提示语
        String message = ipLimiter.message();
        /**
         * 执行Lua脚本
         */
        List<String> ipList = new ArrayList();
        // 设置key值为注解中的值
        ipList.add(limitIp);
        /**
         * 调用脚本并执行
         */
        Long result = (Long) redisTemplate.execute(getRedisScript, ipList, expireTime, limitTimes);
        if (result == 0) 
            String msg = "由于超过单位时间=" + expireTime + "-允许的请求次数=" + limitTimes + "[触发限流]";
            LOGGER.debug(msg);
            // 达到限流返回给前端信息
            return message;
        
        if (LOGGER.isDebugEnabled()) 
            LOGGER.debug("IpLimterHandler[分布式限流处理器]限流执行结果-result=,请求[正常]响应", result);
        
        return proceedingJoinPoint.proceed();
    

4、RedisCacheConfig(配置类)

@Configuration
public class RedisCacheConfig 

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class);

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) 
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        serializer.setObjectMapper(mapper);

        template.setValueSerializer(serializer);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());
        template.afterPropertiesSet();
        LOGGER.info("Springboot RedisTemplate 加载完成");
        return template;
    

5、ipLimiter.lua 脚本

优点
减少网络的开销: 脚本只执行一次,不需要发送多次请求, 减少网络传输;
保证原子操作: 整个脚本作为一个原子执行, 就不用担心并发问题;

--获取KEY
local key1 = KEYS[1]

local val = redis.call('incr', key1)
local ttl = redis.call('ttl', key1)

--获取ARGV内的参数并打印
local expire = ARGV[1]
local times = ARGV[2]

redis.log(redis.LOG_DEBUG,tostring(times))
redis.log(redis.LOG_DEBUG,tostring(expire))

redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val);
if val == 1 then
    redis.call('expire', key1, tonumber(expire))
else
    if ttl == -1 then
        redis.call('expire', key1, tonumber(expire))
    end
end

if val > tonumber(times) then
    return 0
end
return 1

6、application.properties

#redis
spring.redis.hostName=
spring.redis.host=
spring.redis.port=6379
spring.redis.jedis.pool.max-active=8
spring.redis.jedis.pool.max-wait=
spring.redis.jedis.pool.max-idle=8
spring.redis.jedis.pool.min-idle=10
spring.redis.timeout=100ms
spring.redis.password=

logging.path= /Users/xub/log
logging.level.com.jincou.iplimiter=DEBUG
server.port=8888

7、SpringBoot启动类

@SpringBootApplication
public class Application 

    public static void main(String[] args) 
        SpringApplication.run(Application.class, args);
    

8、测试

技术图片

完美上面这个测试非常符合我们的预期,前五次访问接口是成功的,后面就失败了,直到10秒后才可以重新访问,这样反反复复。

其它的这边就不一一展示了,附上该项目源码。

Github地址 https://github.com/yudiandemingzi/ipLimiter


参考

这个设计是我在刷github的时候看到确实很好,我这边只是在它的基础上做了点小改动,非常感谢该作者的分享。
github地址:https://github.com/TaXueWWL/shleld-ratelimter

有关AOP有篇文章讲的不错:spring aop 中@annotation()的使用




只要自己变优秀了,其他的事情才会跟着好起来(中将1)

以上是关于基于redis的分布式RateLimiter(限流)实现的主要内容,如果未能解决你的问题,请参考以下文章

一个轻量级的基于RateLimiter的分布式限流实现

分布式环境下限流方案的实现redis RateLimiter Guava,Token Bucket, Leaky Bucket

Redisson限流器(RateLimiter)

最近学习了限流与RateLimiter

SpringBoot基于RateLimiter+AOP动态的为不同接口限流

常用限流算法与Guava RateLimiter源码解析