k8s之限流机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了k8s之限流机制相关的知识,希望对你有一定的参考价值。


一、限流算法

1、漏桶leaky bucket

漏桶的概念是一个底部有孔的桶,无论水进入桶的速度是多少,它都会以恒定的速度通过孔从桶中泄漏出来。如果桶中没有水,则流速为零,如果桶已满,则多余的水溢出并丢失。其实就是将突发流量入桶转换为恒定流量发送。

k8s之限流机制_kubernetes

  2、令牌桶token bucket

qps设置为100, burst设置为200, 就会以每10ms一个的速度往令牌桶中生成令牌,而令牌桶的容量为200。

每一个request被处理需要首先从桶中获取一个令牌,桶里没有令牌可取时,则拒绝服务(延迟、丢弃、标记)

k8s之限流机制_限流_02

漏桶算法能够强行限制数据的传输速率,令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。
比如一秒来了200个request,QPS是100,如果是漏铜算法,200个处理不了,多余的100个扔掉。但如果是令牌桶,因为桶里本身有100个,在加上QPS也是100,那这200个就处理了。

二、kube-apiserver限流

kube-apiserver组件是外部操作集群资源的入口,作为一个完善的api类组件的实现,为保障api组件平稳运行,限流功能自然是必不可少的。

3个限流参数:
--max-requests-inflight=400(非mutating操作:get,list,watch等查询操作,默认是400,0表示无限制)
inflight是k8s apiserver从接受一个请求到一直将这个请求处理完,即所有apiserver已经接受但是还没有处理完的请求都叫inflight;同时,inflight的200和400不是qps,具体实现是申请一个200容量的缓存channel,当请求到达时占用一个channel的坑位,请求处理完后释放一个channel的坑位;

--max-mutating-requests-inflight=200(mutating操作:update, delete等操作,默认是200,0表示无限制);

k8s之限流机制_APF_03

--enable-priority-and-fairness=true(APF,API Priority and Fairness,即请求分优先级且同级别请求被公平对待);

k8s 1.8之前的限流只是将请求分成了mutating和非mutating请求,当某个客户端错误的向kube-apiserver发起大量的请求时,必然会触发kube-apiserver的限流操作,影响其他的客户端的请求,因此高阶的限流APF就诞生了;

在k8s 1.18 apiserver中默认开启APF,启用 APF 特性后,服务器的总并发量限制将设置为 ​​--max-requests-inflight​​​ 和 ​​--max-mutating-requests-inflight​​ 之和。 可变请求和不可变请求之间不再有任何区别。并且这些并发数被分配给各个 PL,分配方式是根据 PriorityLevelConfiguration.Spec.Limited.AssuredConcurrencyShares 的数值按比例分配,PL 的 AssuredConcurrencyShare 越大,分配到的并发份额越大。

APF的优点

  1. APF以更细粒度的方式对请求进行分类和隔离;
  2. APF引入了空间有限的排队机制,因此在非常短暂的突发情况下,APIServer不会拒绝任何请求;
  3. 通过公平排队从队列中分发请求,这样一个行为不佳的控制器就不会饿死其他控制器;

通俗一点就是:

多等级:其实就是按照不同用户,不同场景,不同的组件,用不同的等级队列 Flow1,Flow2.........,每一个等级队列它限流完全不是一个,也就有多个通道,不同通道它的限流机制是一样的,但是它限流不是共享单一队列的,它们完全是独立的限流队列,这样一个队列阻塞了,那么其他的队列还是通的。

多队列:并且在每个等级,如果也是一个单队列,假设flow1对应了一个队列,那么一样会有问题,万一flow1里面有坏用户发了很多的请求,那么整个flow流里面,所有的请求都在后面排了,它的破坏力就很大的,那么APF里面也有多队列的概念,就是在这个flow里面提供多个队列,即使是同一个flow,即使我是系统组件发的请求,不同的组件发送到的队列也可能是不一样的。

这样,即使有一个坏的用户造成了大量的请求堆积,那么它也只塞某个等级的某个队列,如果你的请求是在别的队列里面,那也是不影响的。 也就是通过多等级,多队列的方式,来实现精细化的限流。

APF的实现
APF限流主要依赖两种资源,PriorityLevelConfigurations定义限制策略和队列的容量,FlowSchemas对请求进行分类并能指定用户或用户组,并与一个PriorityLevelConfigurations相匹配。

kubectl get flowschemas.flowcontrol.apiserver.k8s.io kube-controller-manager -oyaml

kubectl get flowschemas.flowcontrol.apiserver.k8s.io kube-controller-manager -oyaml

apiVersion: flowcontrol.apiserver.k8s.io/v1beta1
kind: FlowSchema
metadata:
name: kube-controller-manager
spec:
# 如何把与该模式匹配的请求分散到各个流中。ByUser,把不同用户的请求分散到不同的流中;ByNamespace把不同ns的请求分散到不同流中;为空,与此 FlowSchema 匹配的请求将被视为单个流的一部分;
distinguisherMethod:
type: ByNamespace
matchingPrecedence: 800 # 匹配到请求的优先级,1~10000,越小优先级越高
priorityLevelConfiguration: # 关联的PriorityLevelConfigurations
name: workload-high
rules: # 匹配规则,如果为空,则不会有请求匹配到该FS
- nonResourceRules: # 非资源规则,health、metrics等
- nonResourceURLs:
- *
verbs:
- *
- resourceRules: # 资源规则
- apiGroups:
- *
clusterScope: true
namespaces:
- *
resources:
- *
verbs:
- *
subjects: # 匹配请求主体,可以是用户、组、sa
- kind: User
user:
name: system:kube-controller-manager

kubectl get PriorityLevelConfiguration workload-high -oyaml

kubectl get PriorityLevelConfiguration workload-high -oyaml

apiVersion: flowcontrol.apiserver.k8s.io/v1beta1
kind: PriorityLevelConfiguration
metadata:
name: workload-high
spec:
limited:
# 启用 APF 特性后,服务器的总并发量限制将设置为 --max-requests-inflight 和 --max-mutating-requests-inflight 之和。这些并发数被分配给各个 PL,分配方式是根据AssuredConcurrencyShares 的数值按比例分配,PL 的 AssuredConcurrencyShare 越大,分配到的并发份额越大。
assuredConcurrencyShares: 40
limitResponse: # 定义如何处理当前无法被处理的请求
queuing:
handSize: 6 # shuffle sharding即将请求指派给某个队列的技术,比哈希取模稍好
queueLengthLimit: 50 #队列长度
queues: 128 # 当前PL的队列数
type: Queue #类型,Queue或者Reject,Reject直接返回429并拒绝,Queue将请求加入队列
type: Limited #限制策略,Limited或Exempt, Exempt即不限制

APF流程图

k8s之限流机制_优先级_04

每一个请求分类对应一个FlowSchema,FS 内的请求又会根据 Flow Distinguisher进一步划分为不同的Flow。

FS会设置一个优先级(Priority Level),不同优先级的并发资源是隔离的。所以不同优先级的资源不会相互排挤。特定优先级的请求可以被高优处理。

总之,通过 FS,可以根据请求的主体 (User, Group, ServiceAccout)、动作 (Get, List, Create, Delete …)、资源类型 (pod, deployment …)、namespace、url 对请求进行分类。

三、client-go的限流

1、client-go的client限流,限流器源码目录"k8s.io/client-go/util/flowcontrol",主要有三个参数:

- QPS是指发令牌到令牌桶的速率,默认值是5;

- Burst是指令牌桶的最大桶容量,默认值是10;

- RateLimiter是自定义限流器,参数会覆盖QPS、Burst两个参数。

2、Clientset默认使用的限流器是令牌桶算法,即client-go的flowcontrol.NewTokenBucketRateLimiter(),且需要注意controller-runtime的ctrl.GetConfigOrDie() 默认QPS=20,Burst=30。

3、client-go发出请求时,限流的位置是"k8s.io/client-go/rest/request.go"的tryThrottle()。

四、Controller的workQueue限流

1、限流器实现了对于队列元素的重试规则,源码目录"k8s.io/client-go/util/workqueue",包括三个函数:

- When 获取某个元素应该等待的时间,

- Forget 释放某个元素不再监测,

- NumRequeues 返回该元素已经失败重试的次数。

2、限流器主要有四种类型,主要行为表现在当某一事件元素失败后,等待时间的计算规则不一致。

- 令牌桶算法限速,当某一事件元素失败后,等待时间的计算规则不一致 flowcontrol.NewTokenBucketRateLimiter(qps float32, burst int)
- 排队指数限速,根据失败次数按指数延时, workqueue.NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration)。其中,baseDelay 基础限速时间,maxDelay 最大限速时间
- 计数器模式,先快后慢,  workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int)。其中,fastDelay 快限速时间,slowDelay 慢限速时间,maxFastAttempts 快限速元素个数
- 混合模式,多种限速算法同时使用,返回所有RateLimiter的最坏值, NewMaxOfRateLimiter()

3、默认情况下限流器是MaxOfRateLimiter,即(令牌桶算法qps=10,burst=100) +( 根据失败次数按指数延时baseDelay =5ms,maxDelay=1000s),且等待时间=min(1000s,5ms*2^n)。

因此默认的WorkQueue的重试等待时间是5ms*2的n次方,n是重试次数,但是不超过1000s,并且还有一个令牌桶算法处理尖峰流量实现平滑限流,所以流量暴增时等待时间可能比1000s更长。

参考​​K8s之ControllerRateLimiter简单理解_傅里叶、

五、EventRateLimit准入控制器

EventRateLimiter准入控制是目的是限制 Event 请求的速度,缓解了 APIServer 的压力。

启用EventRateLimiter准入控制器:在k8s apiserver的命令行标志 ​​--admission-control-config-file​​​ 设置的文件中, 引用 ​​EventRateLimit​​ 配置文件:

apiVersion: apiserver.config.k8s.io/v1
kind: AdmissionConfiguration
plugins:
- name: EventRateLimit
path: eventconfig.yaml
...

  ​​eventconfig.yaml​​ 的示例:

apiVersion: eventratelimit.admission.k8s.io/v1alpha1
kind: Configuration
limits:
- type: Namespace
qps: 50
burst: 100
cacheSize: 2000
- type: User
qps: 10
burst: 50
  • ​Server​​: API 服务器收到的所有事件请求共享一个桶。
  • ​Namespace​​: 每个名字空间都对应一个专用的桶。
  • ​User​​: 为每个用户分配一个桶。
  • ​SourceAndObject​​: 根据事件的来源和涉及对象的各种组合分配桶。

每个eventratelimit 配置使用一个单独的令牌桶限速器,每次event操作,遍历每个匹配的限速器检查是否能获取令牌,如果不允许请求则返回429。

六、问题排查

1、如果请求不多但依然触发了限流,大概率是因为某个客户端的不恰当使用,可以借助审计日志来分析异常请求。
2、出现throttle时,可能是发送request的client对自己进行了限流(k8s.io/client-go/rest.RESTClientFor(rest.Config))

高并发之限流实现

本次样例从单机层面上,采用拦截器的方式对请求限流。

资源:https://github.com/xiaozhuanfeng/rateLimiterProj

 工程结构:

技术图片

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 热启动:项目修改,即时生效
          完整的打包环境下运行的时候会被禁用
          java -jar启动应用或者用一个特定的classloader启动 认为生产环境
          -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

        <!-- 引入guava 包-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>

        <!-- 引入fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

#端口号设置
server.port=8007
#日志的输出路径
logging.path=/logs/rateLimiterProj
logging.config=classpath:logback-spring.xml

#自定义限流方式,默认当请求超过指定QPS,放弃请求
rateLimit.qps = 1
rateLimit.type=acquire
rateLimit.tryAcquire.permits=1
rateLimit.tryAcquire.timeOut=100

1、新建抽象拦截器

package com.example.demo.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.util.IOUtils;
import com.example.demo.constant.ResponseEnum;
import com.example.demo.dto.ResponseDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

public abstract class AbstractInterceptor extends HandlerInterceptorAdapter {
    public final Logger log = LoggerFactory.getLogger(this.getClass());

    /**
     * 具体拦截方法
     *
     * @param req
     * @return
     */
    protected abstract ResponseEnum handleRequest(HttpServletRequest req) throws Exception;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        ResponseEnum result = ResponseEnum.SERVER_ERROR;
        try {
            result = handleRequest(request);
        } catch (Exception e) {
            log.error("handleRequest catch an Exception>>>", e);
        }

        if (ResponseEnum.OK == result) {
            //成功
            return true;
        }

        handleFailResponse(response, result);
        return false;
    }

    public void handleFailResponse(HttpServletResponse resp, ResponseEnum result) {
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);

        ResponseDTO dto = new ResponseDTO();
        dto.setCode(result.getCode());
        dto.setMesg(result.getMesg());
        PrintWriter pw = null;
        try {
            pw = resp.getWriter();
            pw.write(JSON.toJSONString(dto));
        } catch (IOException e) {
            log.error("handleFailResponse catch an IOException>>>", e);
        } finally {
            IOUtils.close(pw);
        }
    }
}

2、新建RateLimiter Bean

package com.example.demo.configuration;

import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RateLimitConfig {

    @Value("${rateLimit.qps}")
    private double qps;

    @Bean
    public RateLimiter getRateLimiter() {

        if(0 == this.qps){
            this.qps = 1.0D;
        }
        return RateLimiter.create(qps);
    }

}

3、新建返回枚举类和响应实体类

技术图片
package com.example.demo.constant;

/**
 * 自定义响应码
 */
public enum ResponseEnum {
    OK(200, "成功"),
    RATE_LIMIT(403, "访问次数受限"),
    AUTHENTICATION_FAIL(401, "未授权"),
    SERVER_ERROR(500, "服务器错误");

    private int code;

    private String mesg;

    ResponseEnum(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseEnum{" +
                "code=" + code +
                ", mesg=‘" + mesg + ‘‘‘ +
                ‘}‘;
    }
}
View Code
技术图片
package com.example.demo.dto;

public class ResponseDTO {
    private int code;

    private String mesg;

    public ResponseDTO() {
    }

    public ResponseDTO(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseDTO{" +
                "code=" + code +
                ", mesg=‘" + mesg + ‘‘‘ +
                ‘}‘;
    }
}
View Code

4、创建请求拦截器

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@Component("rateLimitInterceptor")
public class RateLimitInterceptor extends AbstractInterceptor{

    private static final String RATELIMIT_TYPE_ACQUIRE = "acquire";

    @Value("${rateLimit.type}")
    private String rateLimitType;

    @Value("${rateLimit.tryAcquire.permits}")
    private int permits;

    @Value("${rateLimit.tryAcquire.timeOut}")
    private long timeout;

    @Autowired
    private RateLimiter rateLimiter;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        if(RATELIMIT_TYPE_ACQUIRE.equals(rateLimitType)){
            return passWaitReq(req);
        }else{
            return giveUpReqWhenExceedLimit(req);
        }
    }

    /**
     * 阻塞线程直到请求可以再授予许可
     * @return
     */
    private ResponseEnum passWaitReq(HttpServletRequest req){
        //permits,默认1,从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求数
        double waitTime = rateLimiter.acquire();
        log.info("请求成功......waitTime="+waitTime);
        return ResponseEnum.OK;
    }

    /**
     * 判断是否可以立即获取许可,否则放弃请求
     * @param req
     * @return
     */
    private ResponseEnum giveUpReqWhenExceedLimit(HttpServletRequest req){
        //timeout 时间内获取令牌,默认timeout=0L,如果可以则挂起等待相应时间并返回true,否则立即返回false
        if(0 == permits || 0L == timeout){
            if(!rateLimiter.tryAcquire()){
                log.warn("限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }else{
            if(!rateLimiter.tryAcquire(permits,timeout, TimeUnit.MICROSECONDS)){
                log.warn("permits="+permits+",timeout="+timeout+",限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }

        log.info("请求成功......");
        return ResponseEnum.OK;
    }
}


5、创建Controller,测试用

技术图片
package com.example.demo.controller;

import org.apache.juli.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @RequestMapping(value="/rateLimit/getHello",method = RequestMethod.GET)
    public String getHello(){
        log.info("rateLimit Hello ........");

        try {
            //发呆1s
            Thread.sleep(100);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Get:Hello,World";
    }

    @RequestMapping(value="/calculate/getHello",method = RequestMethod.GET)
    public String postHello(){
        log.info("calculate Hello ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Post:Hello,World";
    }
}
View Code

6、配置请求拦截器

package com.example.demo.configuration;

import com.example.demo.interceptor.CalculateReqInterceptor;
import com.example.demo.interceptor.RateLimitInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;

@Configuration
public class WebInterceptorConfig extends WebMvcConfigurationSupport {

    @Bean
    public RateLimitInterceptor getRateLimitInterceptor() {
        return new RateLimitInterceptor();
    }

    @Bean
    public CalculateReqInterceptor getCalculateReqInterceptor() {
        return new CalculateReqInterceptor();
    }

    @Override
    protected void addInterceptors(InterceptorRegistry registry) {
        InterceptorRegistration rateLimitIcpt = registry.addInterceptor(getRateLimitInterceptor());
        // 拦截配置
        rateLimitIcpt.addPathPatterns("/rateLimit/**");

        //需要计算请求时间的请求
        //InterceptorRegistration calculateIcpt = registry.addInterceptor(getCalculateReqInterceptor());
        //calculateIcpt.addPathPatterns("/calculate/**");
    }
}

测试:

  @Test
    public void mulitplyReq() throws Exception {
        String url = "/rateLimit/getHello";
        //String url = "/calculate/getHello";
        for(int i = 0;i < 10;i++){
            sendReq(i,url);
        }
    }
     private void sendReq(int batchNo,String url) throws Exception {
        /**
         * 1、mockMvc.perform执行一个请求。
         * 2、MockMvcRequestBuilders.get("XXX")构造一个请求。
         * 3、ResultActions.param添加请求传值
         * 4、ResultActions.accept(MediaType.TEXT_HTML_VALUE))设置返回类型
         * 5、ResultActions.andExpect添加执行完成后的断言。
         * 6、ResultActions.andDo添加一个结果处理器,表示要对结果做点什么事情
         *   比如此处使用MockMvcResultHandlers.print()输出整个响应结果信息。
         * 5、ResultActions.andReturn表示执行完成后返回相应的结果。
         */

        MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.get(url)
                .accept(MediaType.TEXT_HTML_VALUE))
                // .andExpect(MockMvcResultMatchers.status().isOk())             //等同于Assert.assertEquals(200,status);
                // .andExpect(MockMvcResultMatchers.content().string("hello lvgang"))    //等同于 Assert.assertEquals
                // ("hello lvgang",content);
                //.andDo(MockMvcResultHandlers.print())//打印
                .andReturn();
        MockHttpServletResponse result = mvcResult.getResponse();
        System.out.println("Thread"+Thread.currentThread().getName()+">>>batchNo="+batchNo+":"+result.getContentAsString());
    }

结果:

15:31:43.428 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.435 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=0:Get:Hello,World
15:31:43.629 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.0
15:31:43.629 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=1:Get:Hello,World
15:31:44.545 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.812705
15:31:44.545 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=2:Get:Hello,World
15:31:45.544 [main] INFO c.e.d.i.RateLimitInterceptor - 请求成功......waitTime=0.89766
15:31:45.544 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........

...

切换下rateLimit.type=tryAcquire,qps=10 ,qps一定要设置恰当,否则用户体验将不太每秒。运行:

@Test
    public void threadReq() throws Exception {
        String url = "/rateLimit/getHello";
        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        while(true){
            Thread.sleep(60 * 1000);
            break;
        }
    }

技术图片


下面简单介绍其他应用级限流:

(1)限流总并发/连接/请求数

对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。

如果你使用过Tomcat,其Connector 其中一种配置有如下几个参数:

acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;

maxConnections: 瞬时最大连接数,超出的会排队等待;

maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。

详细的配置请参考官方文档。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。

Springboot内置tomcat属性在org.springframework.boot.autoconfigure.web.ServerProperties,可在application.properties配置属性。

Tomcat特有配置都以”server.tomcat”作为前缀

如上面的:

#端口号设置

server.port=8007

(2)限流总资源数

如果有的资源是稀缺资源(如数据库连接、线程),而且可能有多个系统都会去使用它,那么需要限制应用;可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是100,那么本应用最多可以使用100个资源,超出了可以等待或者抛异常。

(3)限流某个接口的总并发/请求数

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.atomic.AtomicLong;

@Service("limitReqNumInterceptor")
public class LimitReqNumInterceptor extends AbstractInterceptor{

    @Autowired
    private AtomicLong atomic;

    private static final long LIMIT_NUM = 10L;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        //AtomicLong 对请求数限流  注意:非线程安全(理论上超出限制是可以接受的),请求定制需要在拦截器配置,将会非常多,不好用
        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒绝请求
            log.warn("超出请求数>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("请求通过....");
        return ResponseEnum.OK;
    }

    //@Scheduled(cron="*/6 * * * * ?")
   /* private void process(){
        if(atomic.get() > LIMIT_NUM) {
            log.warn("请求达到上限,定时清理请求数>>>"+atomic.get());
            //启动定时任务,定时清空请求数
            atomic.set(0L);
            log.warn("定时清理后>>>"+atomic.get());
        }
    }*/

}

限流某个接口的时间窗请求数

利用guava catche,缓存uri路径的方式

/**
     * 过期时间
     */
    private static final long EXPIRE_SEC = 20L;

    /**
     * 定制  过期时间
     */
    private LoadingCache<String, AtomicLong> LoadingCache =
            CacheBuilder.newBuilder().expireAfterWrite(EXPIRE_SEC, TimeUnit.SECONDS).build(new CacheLoader<String,
                    AtomicLong>() {
                //本地缓存没有命中时,调用load,并将结果缓存
                @Override
                public AtomicLong load(String aLong) throws Exception {
                    return new AtomicLong(0L);
                }
            });

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        String uri = req.getRequestURI();
        AtomicLong atomic = LoadingCache.get(uri);
        System.out.println("uri="+uri+",当前请求数,number="+atomic.get());

        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒绝请求
            log.warn("uri="+uri+"超出请求数>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("请求通过....");
        return ResponseEnum.OK;
    }

Controller,拦截器设置略

技术图片
@RequestMapping(value="/numLimit/getHello",method = RequestMethod.GET)
    public String sayHello(){
        log.info("numLimit Hello ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }

    @RequestMapping(value="/numLimit/getHello2",method = RequestMethod.GET)
    public String sayHello2(){
        log.info("numLimit Hello2 ........");

        try {
            //发呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }
View Code

测试:

@Test
    public void mulitplyReq2() throws Exception {
        String url = "/numLimit/getHello";
        //String url = "/calculate/getHello";
        for (int i = 0; i < 100; i++) {
            sendReq(i, url);
        }

        String url2 = "/numLimit/getHello2";
        for (int i = 0; i < 15; i++) {
            sendReq(i, url2);
        }

        Thread.sleep(15 * 1000L);
        for (int i = 0; i < 15; i++) {
            sendReq(i, url);
        }
    }

参考:

https://www.cnblogs.com/xuwc/p/9123078.html

以上是关于k8s之限流机制的主要内容,如果未能解决你的问题,请参考以下文章

Dubbo之限流TpsLimitFilter源码分析

高并发之限流实现

rest framework之限流组件

秒杀链路兜底方案之限流&降级实战

Spring Cloud Gateway 之限流操作

Spring Cloud Gateway 之限流操作