限流和常见的三种算法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了限流和常见的三种算法相关的知识,希望对你有一定的参考价值。

参考技术A 限流的三种算法
https://www.cnblogs.com/forezp/p/10140316.html

限流要解决的问题

典型限流的应用场景:

如何限流?
一般网关都有这种功能。 gateway、nginx、zuul等

限流:一定时间内,只允许N次请求。
从计算机友好的角度出发,是希望能在单位时间内均摊掉请求,使用漏斗算法可以达到这种效果。
但是漏斗算法有个弊端,就是先快后慢的这种请求,那么峰值的请求也只能排队等待被消费。实际上计算机是具备一定的高并发处理能力的,只要不是一直处于高并发下即可。所以 计数器限流和 漏洞限流折中的算法,令牌限流成为现在最主流的算法。

(Redis 结合expire方案可以实现)
第一次请求开始计时,例如1s以内,达到100次请求就拒绝访问了,直到1s过后,重新开始计数。

优点:

缺点:短暂的峰值过高对服务器不友好。服务器希望能把请求尽量的均摊开来,这样可以充分利用计算机资源。

消费的速度是恒定的,对于服务器而言是最友好的。
在算法实现方面,可以准备一个队列,用来保存请求,另外通过一个线程池(ScheduledExecutorService)来定期从队列中获取请求并执行,可以一次性获取多个并发执行。
参数:消费速度、桶容量(超过就抛弃,可以避免内存过大,有过多的等待的任务)

优点:

缺点:

令牌桶算法是比较常见的限流算法之一,大概描述如下:
1)所有的请求在处理之前都需要拿到一个可用的令牌才会被处理;
2)根据限流大小,设置按照一定的速率往桶里添加令牌;
3)桶设置最大的放置令牌限制,当桶满时、新添加的令牌就被丢弃活着拒绝;
4)请求达到后首先要获取令牌桶中的令牌,拿着令牌才可以进行其他的业务逻辑,处理完业务逻辑之后,将令牌直接删除;
5)令牌桶有最低限额,当桶中的令牌达到最低限额的时候,请求处理完之后将不会删除令牌,以此保证足够的限流;

这种算法,既可以保证系统由一定的高并发能力,比如当前令牌桶容量是100,一开始直接消费掉100个请求。有保证服务器不会因为短暂的爆发,而导致server端空闲,因为令牌桶还会持续的生产令牌。

既有一定的并发能力,又不至于完全失去控制,可控的兼具并发力和流量控制的限流算法.是计数器算法(一定的并发处理能力)和漏洞限流(高峰过后仍然会持续的产生令牌)的折中算法。

常见的限流算法与实现

限流的实现

常见的限流算法:

限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。

常见的限流算法有三种:

计数器限流(固定窗口)

原理:

  • 时间线划分为多个独立且固定大小窗口;
  • 落在每一个时间窗口内的请求就将计数器加1;
  • 如果计数器超过了限流阈值,则后续落在该窗口的请求都会被拒绝。但时间达到下一个时间窗口时,计数器会被重置为0。

案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: FixedWindow
 * @Description: 固定窗口算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class FixedWindow 

    /**
     * 阈值
     */
    private static Integer QPS = 2;
    
    /**
     * 时间窗口(毫秒)
     */
    private static long TIME_WINDOWS = 1000;
    
    /**
     * 计数器
     */
    private static AtomicInteger REQ_COUNT = new AtomicInteger();

    /**
     * 窗口开始时间
     */
    private static long START_TIME = System.currentTimeMillis();

    public synchronized static boolean tryAcquire() 
        //超时窗口
        if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) 
            REQ_COUNT.set(0);
            START_TIME = System.currentTimeMillis();
        
        return REQ_COUNT.incrementAndGet() <= QPS;
    

    public static void main(String[] args) throws InterruptedException 
        for (int i = 0; i < 10; i++) 
            Thread.sleep(250);
            LocalTime now = LocalTime.now();
            if (!tryAcquire()) 
                System.out.println(now + " 被限流");
             else 
                System.out.println(now + " 做点什么");
            
        
    


问题:

虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次。

滑动窗口

滑动窗口算法是对固定窗口算法的改进

原理:

将单位时间划分为多个区间,一般都是均分为多个小的时间段;

每一个区间内都有一个计数器,有一个请求落在该区间内,则该区间内的计数器就会加一;

每过一个时间段,时间窗口就会往右滑动一格,抛弃最老的一个区间,并纳入新的一个区间;

计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器,计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。

上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题

代码案例:

package com.example.studyproject.algorithm;

import lombok.Data;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: SlidingWindow
 * @Description: 滑动窗口
 * @Author: Ze WANG
 * @Date:  2022/9/26
 * @Version 1.0
 **/
public class SlidingWindow 
    /**
     * 阈值
     */
    private int qps = 2;
    /**
     * 时间窗口总大小(毫秒)
     */
    private long windowSize = 1000;
    /**
     * 多少个子窗口
     */
    private Integer windowCount = 10;
    /**
     * 窗口列表
     */
    private WindowInfo[] windowArray = new WindowInfo[windowCount];

    public SlidingWindow(int qps) 
        this.qps = qps;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < windowArray.length; i++) 
            windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
        
    

    /**
     * 1. 计算当前时间窗口
     * 2. 更新当前窗口计数 & 重置过期窗口计数
     * 3. 当前 QPS 是否超过限制
     * @return 是否被限流
     */
    public synchronized boolean tryAcquire() 
        long currentTimeMillis = System.currentTimeMillis();
        // 1. 计算当前时间窗口
        int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
        // 2.  更新当前窗口计数 & 重置过期窗口计数
        int sum = 0;
        for (int i = 0; i < windowArray.length; i++) 
            WindowInfo windowInfo = windowArray[i];
            if ((currentTimeMillis - windowInfo.getTime()) > windowSize) 
                windowInfo.getNumber().set(0);
                windowInfo.setTime(currentTimeMillis);
            
            if (currentIndex == i && windowInfo.getNumber().get() < qps) 
                windowInfo.getNumber().incrementAndGet();
            
            sum = sum + windowInfo.getNumber().get();
        
        // 3. 当前 QPS 是否超过限制
        return sum <= qps;
    

    @Data
    private class WindowInfo 
        // 窗口开始时间
        private Long time;
        // 计数器
        private AtomicInteger number;

        public WindowInfo(long time, AtomicInteger number) 
            this.time = time;
            this.number = number;
        
        // get...set...
    

    public static void main(String[] args) throws InterruptedException 
        int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
        System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));
        success = 0;
        SlidingWindow myRateLimiter = new SlidingWindow(qps);
        for (int i = 0; i < count; i++) 
            Thread.sleep(sleep);
            if (myRateLimiter.tryAcquire()) 
                success++;
                if (success % qps == 0) 
                    System.out.println(LocalTime.now() + ": success, ");
                 else 
                    System.out.print(LocalTime.now() + ": success, ");
                
             else 
                System.out.println(LocalTime.now() + ": fail");
            
        
        System.out.println();
        System.out.println("实际测试成功次数:" + success);
    

输出结果:

已连接到目标 VM, 地址: ''127.0.0.1:50101',传输: '套接字''
当前QPS限制为:2,当前测试次数:20,间隔:300ms,预计成功次数:12
14:20:38.833: success, 14:20:39.142: success, 
14:20:39.455: success, 14:20:39.766: success, 
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success, 
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success, 
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success, 
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success, 

实际测试成功次数:12
与目标 VM 断开连接, 地址为: ''127.0.0.1:50101',传输: '套接字''

进程已结束,退出代码0

漏桶算法

漏桶算法思路很简单,我们把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。

由介绍可以知道,漏桶模式中的消费处理总是能以恒定的速度进行,可以很好的保护自身系统不被突如其来的流量冲垮;但是这也是漏桶模式的缺点,假设 QPS 为 2,同时 2 个请求进来,2 个请求并不能同时进行处理响应,因为每 1s / 2= 500ms 只能处理一个请求。

代码案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: LeakyBucket
 * @Description: 漏桶算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class LeakyBucket 

    /**
     * 水桶的大小
     */
    private final int bucket;

    /**
     * qps,水露出的速度
     */
    private int qps;

    /**
     * 当前水量
     */
    private long water;

    private long timeStamp = System.currentTimeMillis();

    public LeakyBucket(int bucket, int qps) 
        this.bucket = bucket;
        this.qps = qps;
    

    /**
     * 桶是否已经满了
     * @return true未满
     */
    public boolean tryAcquire()
        //1.计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp)/1000;
        water = Math.max(0,water-timeGap*qps);
        timeStamp = now;

        // 如果未满,放行
        if(water< bucket)
            water += 1;
            return true;
        
        return false;
    

    public static void main(String[] args) throws InterruptedException 
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService singleThread = Executors.newSingleThreadExecutor();

        LeakyBucket rateLimiter = new LeakyBucket(20, 2);
        // 存储流量的队列
        Queue<Integer> queue = new LinkedList<>();
        // 模拟请求  不确定速率注水
        singleThread.execute(() -> 
            int count = 0;
            while (true) 
                count++;
                boolean flag = rateLimiter.tryAcquire();
                if (flag) 
                    queue.offer(count);
                    System.out.println(count + "--------流量被放行--------");
                 else 
                    System.out.println(count + "流量被限制");
                
                try 
                    Thread.sleep((long) (Math.random() * 1000));
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        );

        // 模拟处理请求 固定速率漏水
        scheduledExecutorService.scheduleAtFixedRate(() -> 
            if (!queue.isEmpty()) 
                System.out.println(queue.poll() + "被处理");
            
        , 0, 100, TimeUnit.MILLISECONDS);

        // 保证主线程不会退出
        while (true) 
            Thread.sleep(10000);
        
    




令牌桶算法

令牌桶算法的原理也比较简单,我们可以理解成医院的挂号看病,只有拿到号以后才可以进行诊病。

系统会维护一个令牌(token)桶,以一个恒定的速度往桶里放入令牌(token),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token),当桶里没有令牌(token)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。

原理:

  • 令牌桶的实现思路类似于生产者和消费之间的关系。
  • 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 2,每 500ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
  • 请求执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝本次请求,由此达到限流目的。

思考:

  • 1s / 阈值(QPS) = 令牌添加时间间隔。
  • 桶的容量等于限流的阈值,令牌数量达到阈值时,不再添加。
  • 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。
  • 有启动过程,令牌桶启动时桶中无令牌,然后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这类问题。

代码案例

使用Google封装的令牌桶RateLimiter   

/**
     * 代码中限制 QPS 为 2,也就是每隔 500ms 生成一个令牌,但是程序每隔 250ms 获取一次令牌,所以两次获取中只有一次会成功。
     */
    public static void main(String[] args) throws InterruptedException 
        RateLimiter rateLimiter = RateLimiter.create(2);

        for (int i = 0; i < 10; i++) 
            String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
            System.out.println(time + ":" + rateLimiter.tryAcquire());
            Thread.sleep(250);
        
    

限流的实现

限流有很多种方法实现:

  • 基于Guava工具类实现限流
  • 基于AOP实现限流
  • 基于Redis实现限流(适用于分布式)
  • 使用Redisson实现限流(适用于分布式)
  • Sentinel限流(适用于分布式)
  • Nginx、Gateway限流…(适用于分布式)

以上是关于限流和常见的三种算法的主要内容,如果未能解决你的问题,请参考以下文章

Nginx限流和黑名单配置

常见限流算法分析

Java限流策略

三种常见的限流算法

限流消峰的三种办法

Redis 实现限流的三种方式