(十七)ATP应用测试平台——Redis实现API接口访问限流(固定窗口限流算法)

Posted 北溟溟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(十七)ATP应用测试平台——Redis实现API接口访问限流(固定窗口限流算法)相关的知识,希望对你有一定的参考价值。

前言

开始正文之前,大多数情况下应该有这样一段场景。面试官:说说平常在项目中,你是如何使用redis的?我:我们就很简单啦,比如前后端分离token的存储、短信验证码的存储,权限列表的存储,一些热点数据的存储。再高大上一点,分布式锁。面试官:还有呢?心中顿时一万只草泥马奔过,这还不够吗?还能干什么?不好意思,实在是想不出了。面试官,好了,回去等通知吧。

其实对于redis的使用万变不离其踪,归根结底是对redis的某些特性的利用,例如其所有操作都是的原子性、数据可以设置过期时间、低延迟高吞吐量。redis分布式锁正是基于redis的这些基本特性实现的。本节内容我们使用redis实现接口API的访问限流,这里提供spring的interceptor拦截器和aop切面俩种方式实现接口api的细粒度限流,根据实际情况,选择一种方式即可。说到限流我们前面已经介绍过在微服务中使用阿里巴巴的产品sentinel实现限流,sentinel是功能更加强大限流产品。本节内容我们是基于redis自身的一些特性实现限流,相对来说还是一种比较通俗易懂的实现限流方式,包括在springcloud-gateway网关服务中也可以使用redis限流,其已内置到网关中,具体的实现步骤我们后面有时间在叙。

正文

  • 引入redis的pom依赖
<!--redis启动器-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

  • application.yml中配置redis服务
spring:
  redis:
    #默认数据分区
    database: 0
    #redis集群节点配置
    cluster:
      nodes:
        - 192.168.56.10:6379
        - 192.168.56.10:6380
        - 192.168.56.10.6381
      max-redirects: 3
    #超时时间
    timeout: 10000
    #哨兵节点配置
    sentinel:
      master: mymaster
      nodes:
        - "192.168.56.10:26379"
        - "192.168.56.10:26380"
        - "192.168.56.10:26381"
    #redis密码
    password: root
    #redis 客户端工具
    lettuce:
      pool:
        # 连接池最大连接数(使用负值表示没有限制) 默认为8
        max-active: 8
        # 连接池中的最小空闲连接 默认为 0
        min-idle: 1
        # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认为-1
        max-wait: 1000
        # 连接池中的最大空闲连接 默认为8
        max-idle: 8

  • 在类路径下创建限流的lua脚本
local flow_key = KEYS[1]
local flow_count = tonumber(ARGV[1])
local flow_time = tonumber(ARGV[2])
local current = redis.call('get', flow_key)
if current and tonumber(current) > flow_count then
    return tonumber(current)
end
current = redis.call('incr', flow_key)
if tonumber(current) == 1 then
    redis.call('expire', flow_key, flow_time)
end
return tonumber(current)

  •  lua脚本说明
#限流传入的key
local flow_key = KEYS[1]
#限流传入的达到访问请求的上限限流次数
local flow_count = tonumber(ARGV[1])
#限流传入的达到访问请求的限流时间范围,默认是秒
local flow_time = tonumber(ARGV[2])
#判断当前限流的key是否存在
local current = redis.call('get', flow_key)
#如果存在,并且达到了上限限流次数,返回当前的访问次数,tonumber将value值转换为int值
if current and tonumber(current) > flow_count then
    return tonumber(current)
end
#将限流的key值加1操作
current = redis.call('incr', flow_key)
if tonumber(current) == 1 then
	#如果是第一次访问,设置其过期时间,即限流时间范围
    redis.call('expire', flow_key, flow_time)
end
#返回当前的限流次数
return tonumber(current)
  • redis缓存工具类创建与lua脚本组件注入
@Configuration
public class RedisConfig 
    /**
     * redis缓存工具类创建
     * @param redissonConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate() 
        // 缓存序列化配置,避免存储乱码
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance ,
                ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    

   

    /***
     * lua脚本组件初始化
     * @return
     */
    @Bean
    public DefaultRedisScript<Long> defaultRedisScript() 
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/flow_limit.lua")));
        redisScript.setResultType(Long.class);
        return redisScript;
    

  •  创建限流的枚举类FlowLimitType
package com.yundi.atp.platform.common;

public enum FlowLimitType 
    /**
     * 默认限流策略(全局限流)
     */
    GLOBAL,
    /**
     * 根据IP地址限流
     */
    IP,
    /**
     * 根据用户ID限流
     */
    USER

  • 创建限流注解,实现基于注解的定制化限流
package com.yundi.atp.platform.annotation;

import com.yundi.atp.platform.common.FlowLimitType;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface FlowLimiterRate 
    /**
     * 限流策略:默认使用IP
     */
    FlowLimitType flowLimitType() default FlowLimitType.IP;

    /**
     * 限流key前缀:key_flow_limit_rate
     */
    String flowLimitKey() default "key_flow_limit_rate:";

    /**
     * 限流时间段,单位秒
     */
    int flowLimitTime() default 5;

    /**
     * 限流次数
     */
    int flowLimitNumber() default 10;

  •  创建一个限流拦截器FlowLimiterHandlerInterceptor用于限流处理
package com.yundi.atp.platform.interceptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.yundi.atp.platform.annotation.FlowLimiterRate;
import com.yundi.atp.platform.common.FlowLimitType;
import com.yundi.atp.platform.common.Result;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Arrays;

@Slf4j
@Component
public class FlowLimiterHandlerInterceptor implements HandlerInterceptor 
    /**
     * 执行controller方法前
     *
     * @param request
     * @param response
     * @param handler
     * @return
     * @throws Exception
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception 
        log.info("开始流量监控咯");
        if (handler instanceof HandlerMethod) 
            FlowLimiterRate methodAnnotation = ((HandlerMethod) handler).getMethodAnnotation(FlowLimiterRate.class);
            if (methodAnnotation != null) 
                //限流类型
                FlowLimitType flowLimitType = methodAnnotation.flowLimitType();
                //限流key
                String flowLimitKey = methodAnnotation.flowLimitKey();
                //限流时间
                Integer flowLimitTime = methodAnnotation.flowLimitTime();
                //限流次数
                Integer flowLimitNumber = methodAnnotation.flowLimitNumber();
                log.info("限流类型:,限流key:,限流时间:,限流次数:", flowLimitType, flowLimitKey, flowLimitTime, flowLimitNumber);
                //判断限流规则:IP
                if (FlowLimitType.IP.equals(flowLimitType)) 
                    //查询url
                    String requestURI = request.getRequestURI();
                    //查询ip
                    String ipAddr = getIpAddr(request);
                    //拼接key
                    String key = flowLimitKey + ipAddr + ":" + requestURI;
                    log.info("key:", key);
                    //获取RedisTemplate实例
                    RedisTemplate redisTemplate = (RedisTemplate)this.getBean("redisTemplate", request);
                    //RedisScript实例
                    DefaultRedisScript<Long> defaultRedisScript = this.getBean(DefaultRedisScript.class, request);
                    Long number = (Long) redisTemplate.execute(defaultRedisScript, Arrays.asList(key), flowLimitNumber, flowLimitTime);
                    if (number == null || number.intValue() > flowLimitNumber) 
                        log.info("访问过于频繁,请稍候再试");
                        limitResult(response);
                        return false;
                    
                    log.info("限制请求'',当前请求'',缓存key''", flowLimitNumber, number.intValue(), key);
                
                //todo 根据访问用户限流

            
        
        return true;
    

    /**
     * 达到限流要求的响应结果
     * @param response
     * @throws IOException
     */
    public void limitResult(HttpServletResponse response) throws IOException 
        response.setCharacterEncoding("UTF-8");
        response.setContentType("application/json; charset=utf-8");
        ObjectMapper objectMapper = new ObjectMapper();
        response.getWriter().println(objectMapper.writeValueAsString(Result.fail("请求访问频繁,请稍后再试!")));
        return;
    

    /**
     * 获取ip地址
     *
     * @param request
     * @return
     */
    private String getIpAddr(HttpServletRequest request) 
        String ip = request.getHeader("X-Real-IP");
        if (ip != null && !"".equals(ip) && !"unknown".equalsIgnoreCase(ip)) 
            return ip;
        
        ip = request.getHeader("X-Forwarded-For");
        if (ip != null && !"".equals(ip) && !"unknown".equalsIgnoreCase(ip)) 
            int index = ip.indexOf(',');
            if (index != -1) 
                return ip.substring(0, index);
             else 
                return ip;
            
         else 
            return request.getRemoteAddr();
        
    

    /**
     * 根据类型获取容器对象实例
     *
     * @param clazz
     * @param request
     * @param <T>
     * @return
     */
    private <T> T getBean(Class<T> clazz, HttpServletRequest request) 
        WebApplicationContext applicationContext = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
        return applicationContext.getBean(clazz);
    

    /**
     * 根据名称获取容器对象实例
     *
     * @param beanName
     * @param request
     * @return
     */
    private Object getBean(String beanName, HttpServletRequest request) 
        WebApplicationContext applicationContext = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
        return applicationContext.getBean(beanName);
    

  •  注册限流拦截器
package com.yundi.atp.platform.config;

import com.yundi.atp.platform.interceptor.AuthHandlerInterceptor;
import com.yundi.atp.platform.interceptor.FlowLimiterHandlerInterceptor;
import com.yundi.atp.platform.resolver.ParamHandlerMethodArgumentResolver;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import java.util.List;

/**
 * @Author: 北溟溟
 * @Description: 添加静态资源文件,外部可以直接访问地址
 * @Date: 2021/5/18 10:54
 * @Version: 1.0.0
 */
@Configuration
public class MyWebMvcConfig implements WebMvcConfigurer 

   

    /**
     * 注册拦截器
     * @param registry
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) 
        registry.addInterceptor(new AuthHandlerInterceptor()).addPathPatterns("/**").order(1);
        //注册限流拦截器
        registry.addInterceptor(new FlowLimiterHandlerInterceptor()).addPathPatterns("/**").order(2);
    

  •  测试限流controller
@Slf4j
@Api(tags = "用户管理")
@RestController
@RequestMapping("/sys/user")
public class UserController 
    @Autowired
    private UserService userService;


    @FlowLimiterRate(flowLimitType = FlowLimitType.IP,flowLimitKey = "key_flow_limit_rate_user:",flowLimitTime = 5,flowLimitNumber = 3)
    @ApiOperation(value = "查询用户详情")
    @PostMapping(value = "/info")
    public Result info(@RequestParam(value = "id") String id) 
        User user = userService.findUserInfoById(id);
        return Result.success(user);
     

  • 测试结果

 这里设置的是在5秒内,如果访问次数超过3次,就会发生限流

  • aop的实现方式与拦截器基本一致,这里只提供aop的实现代码,其它同上,实际在使用过程中任选一种方式即可,aop的方式更加灵活,既可以通过注解的方式也可以指定切面去做限流。
package com.yundi.atp.platform.aspect;

import com.yundi.atp.platform.annotation.FlowLimiterRate;
import com.yundi.atp.platform.common.FlowLimitType;
import com.yundi.atp.platform.exception.BizException;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;


@Aspect
@Slf4j
@Configuration
public class FlowLimiterAspect 
    @Autowired
    RedisTemplate redisTemplate;
    @Autowired
    DefaultRedisScript defaultRedisScript;

    /**
     * 切点:注解方式
     */
    @Pointcut("@annotation(com.yundi.atp.platform.annotation.FlowLimiterRate)")
    public void flowLimiterPointCut() 

    

     /**
     * 切点:固定切面方式
     */
    @Pointcut(value = "execution(* com.yundi.atp.platform.module.test.controller.*.*(..))")
    public void flowLimiterPackagePointCut() 

    

    /**
     * 注解版:针对特定接口的限流规则
     *
     * @param joinPoint
     * @param flowLimiterRate
     */
    @Before(value = "flowLimiterPointCut() && @annotation(flowLimiterRate)")
    public void flowLimiter(JoinPoint joinPoint, FlowLimiterRate flowLimiterRate) 
        //限流类型
        FlowLimitType flowLimitType = flowLimiterRate.flowLimitType();
        //限流key
        String flowLimitKey = flowLimiterRate.flowLimitKey();
        //限流时间
        Integer flowLimitTime = flowLimiterRate.flowLimitTime();
        //限流次数
        Integer flowLimitNumber = flowLimiterRate.flowLimitNumber();
        log.info("限流类型:,限流key:,限流时间:,限流次数:", flowLimitType, flowLimitKey, flowLimitTime, flowLimitNumber);
        //判断限流规则:IP
        if (FlowLimitType.IP.equals(flowLimitType)) 
            ServletRequestAttributes requestAttributes = ServletRequestAttributes.class.
                    cast(RequestContextHolder.getRequestAttributes());
            HttpServletRequest request = requestAttributes.getRequest();
            //查询url
            String requestURI = request.getRequestURI();
            //查询ip
            String ipAddr = getIpAddr(request);
            //拼接key
            String key = flowLimitKey + ipAddr + ":" + requestURI;
            log.info("key:", key);
            //获取RedisTemplate实例
            Long number = (Long) redisTemplate.execute(defaultRedisScript, Arrays.asList(key), flowLimitNumber, flowLimitTime);
            if (number == null || number.intValue() > flowLimitNumber) 
                log.info("访问过于频繁,请稍候再试");
                throw new BizException(0, "访问过于频繁,请稍候再试");
            
            log.info("限制请求'',当前请求'',缓存key''", flowLimitNumber, number.intValue(), key);
        
    

    /**
     * 特定切面:针对特定接口的限流规则
     *
     * @param joinPoint
     */
    @Around(value = "flowLimiterPackagePointCut()")
    public void flowLimiterPackage(JoinPoint joinPoint) 
        //限流时间
        Integer flowLimitTime = 10;
        //限流次数
        Integer flowLimitNumber = 5;
        //限流key
        ServletRequestAttributes requestAttributes = ServletRequestAttributes.class.
                cast(RequestContextHolder.getRequestAttributes());
        HttpServletRequest request = requestAttributes.getRequest();
        String key = request.getRequestURI();
        log.info("限流类型:,限流key:,限流时间:,限流次数:", "全局限流", "url", flowLimitTime, flowLimitNumber);
        //获取RedisTemplate实例
        Long number = (Long) redisTemplate.execute(defaultRedisScript, Arrays.asList(key), flowLimitNumber, flowLimitTime);
        if (number == null || number.intValue() > flowLimitNumber) 
            log.info("访问过于频繁,请稍候再试");
            throw new BizException(0, "访问过于频繁,请稍候再试");
        
        log.info("限制请求'',当前请求'',缓存key''", flowLimitNumber, number.intValue(), key);
    


    /**
     * 获取ip地址
     *
     * @param request
     * @return
     */
    private String getIpAddr(HttpServletRequest request) 
        String ip = request.getHeader("X-Real-IP");
        if (ip != null && !"".equals(ip) && !"unknown".equalsIgnoreCase(ip)) 
            return ip;
        
        ip = request.getHeader("X-Forwarded-For");
        if (ip != null && !"".equals(ip) && !"unknown".equalsIgnoreCase(ip)) 
            int index = ip.indexOf(',');
            if (index != -1) 
                return ip.substring(0, index);
             else 
                return ip;
            
         else 
            return request.getRemoteAddr();
        
    

结语

关于Redis实现API接口访问限流的实战内容到这里就结束了,我们下期见。。。

以上是关于(十七)ATP应用测试平台——Redis实现API接口访问限流(固定窗口限流算法)的主要内容,如果未能解决你的问题,请参考以下文章

(十七)ATP应用测试平台——自定义实现一个springboot2的线程池启动器starter

(十七)ATP应用测试平台——自定义实现一个springboot2的线程池启动器starter

(二十五)ATP应用测试平台——springboot集成knife4j实现API接口文档说明

(二十二)ATP应用测试平台——swagger2集成swagger-bootstrap-ui实现API文档访问

(十四)ATP应用测试平台——使用docker-compose一键式安装ATP应用测试平台的依赖服务

(二十三)ATP应用测试平台——阿里云短信发送功能集成