高并发之限流实现

Posted xiaozhuanfeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发之限流实现相关的知识,希望对你有一定的参考价值。

本次样例从单机层面上,采用拦截器的方式对请求限流。

资源:https://github.com/xiaozhuanfeng/rateLimiterProj

 工程结构:

技术图片

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 热启动:项目修改,即时生效
          完整的打包环境下运行的时候会被禁用
          java -jar启动应用或者用一个特定的classloader启动 认为生产环境
          -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

        <!-- 引入guava 包-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>

        <!-- 引入fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

#端口号设置
server.port=8007
#日志的输出路径
logging.path=/logs/rateLimiterProj
logging.config=classpath:logback-spring.xml

#自定义限流方式,默认当请求超过指定QPS,放弃请求
rateLimit.qps = 1
rateLimit.type=acquire
rateLimit.tryAcquire.permits=1
rateLimit.tryAcquire.timeOut=100

1、新建抽象拦截器

package com.example.demo.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.util.IOUtils;
import com.example.demo.constant.ResponseEnum;
import com.example.demo.dto.ResponseDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

public abstract class AbstractInterceptor extends HandlerInterceptorAdapter {
    public final Logger log = LoggerFactory.getLogger(this.getClass());

    /**
     * 具体拦截方法
     *
     * @param req
     * @return
     */
    protected abstract ResponseEnum handleRequest(HttpServletRequest req) throws Exception;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        ResponseEnum result = ResponseEnum.SERVER_ERROR;
        try {
            result = handleRequest(request);
        } catch (Exception e) {
            log.error("handleRequest catch an Exception>>>", e);
        }

        if (ResponseEnum.OK == result) {
            //成功
            return true;
        }

        handleFailResponse(response, result);
        return false;
    }

    public void handleFailResponse(HttpServletResponse resp, ResponseEnum result) {
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);

        ResponseDTO dto = new ResponseDTO();
        dto.setCode(result.getCode());
        dto.setMesg(result.getMesg());
        PrintWriter pw = null;
        try {
            pw = resp.getWriter();
            pw.write(JSON.toJSONString(dto));
        } catch (IOException e) {
            log.error("handleFailResponse catch an IOException>>>", e);
        } finally {
            IOUtils.close(pw);
        }
    }
}

2、新建RateLimiter Bean

package com.example.demo.configuration;

import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RateLimitConfig {

    @Value("${rateLimit.qps}")
    private double qps;

    @Bean
    public RateLimiter getRateLimiter() {

        if(0 == this.qps){
            this.qps = 1.0D;
        }
        return RateLimiter.create(qps);
    }

}

3、新建返回枚举类和响应实体类

技术图片
package com.example.demo.constant;

/**
 * 自定义响应码
 */
public enum ResponseEnum {
    OK(200, "成功"),
    RATE_LIMIT(403, "访问次数受限"),
    AUTHENTICATION_FAIL(401, "未授权"),
    SERVER_ERROR(500, "服务器错误");

    private int code;

    private String mesg;

    ResponseEnum(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseEnum{" +
                "code=" + code +
                ", mesg=‘" + mesg + ‘‘‘ +
                ‘}‘;
    }
}
View Code
技术图片
package com.example.demo.dto;

public class ResponseDTO {
    private int code;

    private String mesg;

    public ResponseDTO() {
    }

    public ResponseDTO(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseDTO{" +
                "code=" + code +
                ", mesg=‘" + mesg + ‘‘‘ +
                ‘}‘;
    }
}
View Code

4、创建请求拦截器

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@Component("rateLimitInterceptor")
public class RateLimitInterceptor extends AbstractInterceptor{

    private static final String RATELIMIT_TYPE_ACQUIRE = "acquire";

    @Value("${rateLimit.type}")
    private String rateLimitType;

    @Value("${rateLimit.tryAcquire.permits}")
    private int permits;

    @Value("${rateLimit.tryAcquire.timeOut}")
    private long timeout;

    @Autowired
    private RateLimiter rateLimiter;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        if(RATELIMIT_TYPE_ACQUIRE.equals(rateLimitType)){
            return passWaitReq(req);
        }else{
            return giveUpReqWhenExceedLimit(req);
        }
    }

    /**
     * 阻塞线程直到请求可以再授予许可
     * @return
     */
    private ResponseEnum passWaitReq(HttpServletRequest req){
        //permits,默认1,从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求数
        double waitTime = rateLimiter.acquire();
        log.info("请求成功......waitTime="+waitTime);
        return ResponseEnum.OK;
    }

    /**
     * 判断是否可以立即获取许可,否则放弃请求
     * @param req
     * @return
     */
    private ResponseEnum giveUpReqWhenExceedLimit(HttpServletRequest req){
        //timeout 时间内获取令牌,默认timeout=0L,如果可以则挂起等待相应时间并返回true,否则立即返回false
        if(0 == permits || 0L == timeout){
            if(!rateLimiter.tryAcquire()){
                log.warn("限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }else{
            if(!rateLimiter.tryAcquire(permits,timeout, TimeUnit.MICROSECONDS)){
                log.warn("permits="+permits+",timeout="+timeout+",限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }

        log.info("请求成功......");
        return ResponseEnum.OK;
    }
}


5、创建Controller,测试用

技术图片
package com.example.demo.controller;

import org.apache.juli.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @RequestMapping(value="/rateLimit/getHello",method = RequestMethod.GET)
    public String getHello(){
        log.info("rateLimit Hello ........");

        try {
            //发呆1s
            Thread.sleep(100);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Get:Hello,World";
    }

    @RequestMapping(value="/calculate/getHello",method = RequestMethod.GET)
    public String postHello(){
        log.info("calculate Hello ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Post:Hello,World";
    }
}
View Code

6、配置请求拦截器

package com.example.demo.configuration;

import com.example.demo.interceptor.CalculateReqInterceptor;
import com.example.demo.interceptor.RateLimitInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;

@Configuration
public class WebInterceptorConfig extends WebMvcConfigurationSupport {

    @Bean
    public RateLimitInterceptor getRateLimitInterceptor() {
        return new RateLimitInterceptor();
    }

    @Bean
    public CalculateReqInterceptor getCalculateReqInterceptor() {
        return new CalculateReqInterceptor();
    }

    @Override
    protected void addInterceptors(InterceptorRegistry registry) {
        InterceptorRegistration rateLimitIcpt = registry.addInterceptor(getRateLimitInterceptor());
        // 拦截配置
        rateLimitIcpt.addPathPatterns("/rateLimit/**");

        //需要计算请求时间的请求
        //InterceptorRegistration calculateIcpt = registry.addInterceptor(getCalculateReqInterceptor());
        //calculateIcpt.addPathPatterns("/calculate/**");
    }
}

测试:

  @Test
    public void mulitplyReq() throws Exception {
        String url = "/rateLimit/getHello";
        //String url = "/calculate/getHello";
        for(int i = 0;i < 10;i++){
            sendReq(i,url);
        }
    }
     private void sendReq(int batchNo,String url) throws Exception {
        /**
         * 1、mockMvc.perform执行一个请求。
         * 2、MockMvcRequestBuilders.get("XXX")构造一个请求。
         * 3、ResultActions.param添加请求传值
         * 4、ResultActions.accept(MediaType.TEXT_html_VALUE))设置返回类型
         * 5、ResultActions.andExpect添加执行完成后的断言。
         * 6、ResultActions.andDo添加一个结果处理器,表示要对结果做点什么事情
         *   比如此处使用MockMvcResultHandlers.print()输出整个响应结果信息。
         * 5、ResultActions.andReturn表示执行完成后返回相应的结果。
         */

        MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.get(url)
                .accept(MediaType.TEXT_HTML_VALUE))
                // .andExpect(MockMvcResultMatchers.status().isOk())             //等同于Assert.assertEquals(200,status);
                // .andExpect(MockMvcResultMatchers.content().string("hello lvgang"))    //等同于 Assert.assertEquals
                // ("hello lvgang",content);
                //.andDo(MockMvcResultHandlers.print())//打印
                .andReturn();
        MockHttpServletResponse result = mvcResult.getResponse();
        System.out.println("Thread"+Thread.currentThread().getName()+">>>batchNo="+batchNo+":"+result.getContentAsString());
    }

结果:

15:31:43.428 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.435 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=0:Get:Hello,World
15:31:43.629 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.629 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=1:Get:Hello,World
15:31:44.545 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.812705
15:31:44.545 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=2:Get:Hello,World
15:31:45.544 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.89766
15:31:45.544 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........

...

切换下rateLimit.type=tryAcquire,qps=10 ,qps一定要设置恰当,否则用户体验将不太每秒。运行:

@Test
    public void threadReq() throws Exception {
        String url = "/rateLimit/getHello";
        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        while(true){
            Thread.sleep(60 * 1000);
            break;
        }
    }

技术图片


下面简单介绍其他应用级限流:

(1)限流总并发/连接/请求数

对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。

如果你使用过Tomcat,其Connector 其中一种配置有如下几个参数:

acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;

maxConnections: 瞬时最大连接数,超出的会排队等待;

maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。

详细的配置请参考官方文档。另外如mysql(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。

Springboot内置tomcat属性在org.springframework.boot.autoconfigure.web.ServerProperties,可在application.properties配置属性。

Tomcat特有配置都以”server.tomcat”作为前缀

如上面的:

#端口号设置

server.port=8007

(2)限流总资源数

如果有的资源是稀缺资源(如数据库连接、线程),而且可能有多个系统都会去使用它,那么需要限制应用;可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是100,那么本应用最多可以使用100个资源,超出了可以等待或者抛异常。

(3)限流某个接口的总并发/请求数

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.atomic.AtomicLong;

@Service("limitReqNumInterceptor")
public class LimitReqNumInterceptor extends AbstractInterceptor{

    @Autowired
    private AtomicLong atomic;

    private static final long LIMIT_NUM = 10L;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        //AtomicLong 对请求数限流  注意:非线程安全(理论上超出限制是可以接受的),请求定制需要在拦截器配置,将会非常多,不好用
        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒绝请求
            log.warn("超出请求数>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("请求通过....");
        return ResponseEnum.OK;
    }

    //@Scheduled(cron="*/6 * * * * ?")
   /* private void process(){
        if(atomic.get() > LIMIT_NUM) {
            log.warn("请求达到上限,定时清理请求数>>>"+atomic.get());
            //启动定时任务,定时清空请求数
            atomic.set(0L);
            log.warn("定时清理后>>>"+atomic.get());
        }
    }*/

}

限流某个接口的时间窗请求数

利用guava catche,缓存uri路径的方式

/**
     * 过期时间
     */
    private static final long EXPIRE_SEC = 20L;

    /**
     * 定制  过期时间
     */
    private LoadingCache<String, AtomicLong> LoadingCache =
            CacheBuilder.newBuilder().expireAfterWrite(EXPIRE_SEC, TimeUnit.SECONDS).build(new CacheLoader<String,
                    AtomicLong>() {
                //本地缓存没有命中时,调用load,并将结果缓存
                @Override
                public AtomicLong load(String aLong) throws Exception {
                    return new AtomicLong(0L);
                }
            });

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        String uri = req.getRequestURI();
        AtomicLong atomic = LoadingCache.get(uri);
        System.out.println("uri="+uri+",当前请求数,number="+atomic.get());

        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒绝请求
            log.warn("uri="+uri+"超出请求数>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("请求通过....");
        return ResponseEnum.OK;
    }

Controller,拦截器设置略

技术图片
@RequestMapping(value="/numLimit/getHello",method = RequestMethod.GET)
    public String sayHello(){
        log.info("numLimit Hello ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }

    @RequestMapping(value="/numLimit/getHello2",method = RequestMethod.GET)
    public String sayHello2(){
        log.info("numLimit Hello2 ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }
View Code

测试:

@Test
    public void mulitplyReq2() throws Exception {
        String url = "/numLimit/getHello";
        //String url = "/calculate/getHello";
        for (int i = 0; i < 100; i++) {
            sendReq(i, url);
        }

        String url2 = "/numLimit/getHello2";
        for (int i = 0; i < 15; i++) {
            sendReq(i, url2);
        }

        Thread.sleep(15 * 1000L);
        for (int i = 0; i < 15; i++) {
            sendReq(i, url);
        }
    }

参考:

https://www.cnblogs.com/xuwc/p/9123078.html

以上是关于高并发之限流实现的主要内容,如果未能解决你的问题,请参考以下文章

聊聊高并发系统之限流特技

聊聊高并发系统之限流特技-1 开涛

大流量 高并发系统之限流特技

Spring Cloud Gateway 之限流操作

Spring Cloud Gateway 之限流操作

高可用之限流