常见的限流算法与实现

Posted 结构化思维wz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了常见的限流算法与实现相关的知识,希望对你有一定的参考价值。

限流的实现

常见的限流算法:

限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。

常见的限流算法有三种:

计数器限流(固定窗口)

原理:

  • 时间线划分为多个独立且固定大小窗口;
  • 落在每一个时间窗口内的请求就将计数器加1;
  • 如果计数器超过了限流阈值,则后续落在该窗口的请求都会被拒绝。但时间达到下一个时间窗口时,计数器会被重置为0。

案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: FixedWindow
 * @Description: 固定窗口算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class FixedWindow 

    /**
     * 阈值
     */
    private static Integer QPS = 2;
    
    /**
     * 时间窗口(毫秒)
     */
    private static long TIME_WINDOWS = 1000;
    
    /**
     * 计数器
     */
    private static AtomicInteger REQ_COUNT = new AtomicInteger();

    /**
     * 窗口开始时间
     */
    private static long START_TIME = System.currentTimeMillis();

    public synchronized static boolean tryAcquire() 
        //超时窗口
        if ((System.currentTimeMillis() - START_TIME) > TIME_WINDOWS) 
            REQ_COUNT.set(0);
            START_TIME = System.currentTimeMillis();
        
        return REQ_COUNT.incrementAndGet() <= QPS;
    

    public static void main(String[] args) throws InterruptedException 
        for (int i = 0; i < 10; i++) 
            Thread.sleep(250);
            LocalTime now = LocalTime.now();
            if (!tryAcquire()) 
                System.out.println(now + " 被限流");
             else 
                System.out.println(now + " 做点什么");
            
        
    


问题:

虽然我们限制了 QPS 为 2,但是当遇到时间窗口的临界突变时,如 1s 中的后 500 ms 和第 2s 的前 500ms 时,虽然是加起来是 1s 时间,却可以被请求 4 次。

滑动窗口

滑动窗口算法是对固定窗口算法的改进

原理:

将单位时间划分为多个区间,一般都是均分为多个小的时间段;

每一个区间内都有一个计数器,有一个请求落在该区间内,则该区间内的计数器就会加一;

每过一个时间段,时间窗口就会往右滑动一格,抛弃最老的一个区间,并纳入新的一个区间;

计算整个时间窗口内的请求总数时会累加所有的时间片段内的计数器,计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。

上图的示例中,每 500ms 滑动一次窗口,可以发现窗口滑动的间隔越短,时间窗口的临界突变问题发生的概率也就越小,不过只要有时间窗口的存在,还是有可能发生时间窗口的临界突变问题

代码案例:

package com.example.studyproject.algorithm;

import lombok.Data;

import java.time.LocalTime;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: SlidingWindow
 * @Description: 滑动窗口
 * @Author: Ze WANG
 * @Date:  2022/9/26
 * @Version 1.0
 **/
public class SlidingWindow 
    /**
     * 阈值
     */
    private int qps = 2;
    /**
     * 时间窗口总大小(毫秒)
     */
    private long windowSize = 1000;
    /**
     * 多少个子窗口
     */
    private Integer windowCount = 10;
    /**
     * 窗口列表
     */
    private WindowInfo[] windowArray = new WindowInfo[windowCount];

    public SlidingWindow(int qps) 
        this.qps = qps;
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < windowArray.length; i++) 
            windowArray[i] = new WindowInfo(currentTimeMillis, new AtomicInteger(0));
        
    

    /**
     * 1. 计算当前时间窗口
     * 2. 更新当前窗口计数 & 重置过期窗口计数
     * 3. 当前 QPS 是否超过限制
     * @return 是否被限流
     */
    public synchronized boolean tryAcquire() 
        long currentTimeMillis = System.currentTimeMillis();
        // 1. 计算当前时间窗口
        int currentIndex = (int)(currentTimeMillis % windowSize / (windowSize / windowCount));
        // 2.  更新当前窗口计数 & 重置过期窗口计数
        int sum = 0;
        for (int i = 0; i < windowArray.length; i++) 
            WindowInfo windowInfo = windowArray[i];
            if ((currentTimeMillis - windowInfo.getTime()) > windowSize) 
                windowInfo.getNumber().set(0);
                windowInfo.setTime(currentTimeMillis);
            
            if (currentIndex == i && windowInfo.getNumber().get() < qps) 
                windowInfo.getNumber().incrementAndGet();
            
            sum = sum + windowInfo.getNumber().get();
        
        // 3. 当前 QPS 是否超过限制
        return sum <= qps;
    

    @Data
    private class WindowInfo 
        // 窗口开始时间
        private Long time;
        // 计数器
        private AtomicInteger number;

        public WindowInfo(long time, AtomicInteger number) 
            this.time = time;
            this.number = number;
        
        // get...set...
    

    public static void main(String[] args) throws InterruptedException 
        int qps = 2, count = 20, sleep = 300, success = count * sleep / 1000 * qps;
        System.out.println(String.format("当前QPS限制为:%d,当前测试次数:%d,间隔:%dms,预计成功次数:%d", qps, count, sleep, success));
        success = 0;
        SlidingWindow myRateLimiter = new SlidingWindow(qps);
        for (int i = 0; i < count; i++) 
            Thread.sleep(sleep);
            if (myRateLimiter.tryAcquire()) 
                success++;
                if (success % qps == 0) 
                    System.out.println(LocalTime.now() + ": success, ");
                 else 
                    System.out.print(LocalTime.now() + ": success, ");
                
             else 
                System.out.println(LocalTime.now() + ": fail");
            
        
        System.out.println();
        System.out.println("实际测试成功次数:" + success);
    

输出结果:

已连接到目标 VM, 地址: ''127.0.0.1:50101',传输: '套接字''
当前QPS限制为:2,当前测试次数:20,间隔:300ms,预计成功次数:12
14:20:38.833: success, 14:20:39.142: success, 
14:20:39.455: success, 14:20:39.766: success, 
14:20:40.077: fail
14:20:40.377: fail
14:20:40.678: success, 14:20:40.992: success, 
14:20:41.307: fail
14:20:41.621: fail
14:20:41.922: success, 14:20:42.229: success, 
14:20:42.539: fail
14:20:42.840: fail
14:20:43.140: success, 14:20:43.455: success, 
14:20:43.756: fail
14:20:44.070: fail
14:20:44.386: success, 14:20:44.687: success, 

实际测试成功次数:12
与目标 VM 断开连接, 地址为: ''127.0.0.1:50101',传输: '套接字''

进程已结束,退出代码0

漏桶算法

漏桶算法思路很简单,我们把水比作是请求,漏桶比作是系统处理能力极限,水先进入到漏桶里,漏桶里的水按一定速率流出,当流出的速率小于流入的速率时,由于漏桶容量有限,后续进入的水直接溢出(拒绝请求),以此实现限流。

由介绍可以知道,漏桶模式中的消费处理总是能以恒定的速度进行,可以很好的保护自身系统不被突如其来的流量冲垮;但是这也是漏桶模式的缺点,假设 QPS 为 2,同时 2 个请求进来,2 个请求并不能同时进行处理响应,因为每 1s / 2= 500ms 只能处理一个请求。

代码案例:

package com.example.studyproject.algorithm;

import java.time.LocalTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName: LeakyBucket
 * @Description: 漏桶算法
 * @Author: Ze WANG
 * @Date: 2022/9/26
 * @Version 1.0
 **/
public class LeakyBucket 

    /**
     * 水桶的大小
     */
    private final int bucket;

    /**
     * qps,水露出的速度
     */
    private int qps;

    /**
     * 当前水量
     */
    private long water;

    private long timeStamp = System.currentTimeMillis();

    public LeakyBucket(int bucket, int qps) 
        this.bucket = bucket;
        this.qps = qps;
    

    /**
     * 桶是否已经满了
     * @return true未满
     */
    public boolean tryAcquire()
        //1.计算剩余水量
        long now = System.currentTimeMillis();
        long timeGap = (now - timeStamp)/1000;
        water = Math.max(0,water-timeGap*qps);
        timeStamp = now;

        // 如果未满,放行
        if(water< bucket)
            water += 1;
            return true;
        
        return false;
    

    public static void main(String[] args) throws InterruptedException 
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        ExecutorService singleThread = Executors.newSingleThreadExecutor();

        LeakyBucket rateLimiter = new LeakyBucket(20, 2);
        // 存储流量的队列
        Queue<Integer> queue = new LinkedList<>();
        // 模拟请求  不确定速率注水
        singleThread.execute(() -> 
            int count = 0;
            while (true) 
                count++;
                boolean flag = rateLimiter.tryAcquire();
                if (flag) 
                    queue.offer(count);
                    System.out.println(count + "--------流量被放行--------");
                 else 
                    System.out.println(count + "流量被限制");
                
                try 
                    Thread.sleep((long) (Math.random() * 1000));
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        );

        // 模拟处理请求 固定速率漏水
        scheduledExecutorService.scheduleAtFixedRate(() -> 
            if (!queue.isEmpty()) 
                System.out.println(queue.poll() + "被处理");
            
        , 0, 100, TimeUnit.MILLISECONDS);

        // 保证主线程不会退出
        while (true) 
            Thread.sleep(10000);
        
    




令牌桶算法

令牌桶算法的原理也比较简单,我们可以理解成医院的挂号看病,只有拿到号以后才可以进行诊病。

系统会维护一个令牌(token)桶,以一个恒定的速度往桶里放入令牌(token),这时如果有请求进来想要被处理,则需要先从桶里获取一个令牌(token),当桶里没有令牌(token)可取时,则该请求将被拒绝服务。令牌桶算法通过控制桶的容量、发放令牌的速率,来达到对请求的限制。

原理:

  • 令牌桶的实现思路类似于生产者和消费之间的关系。
  • 系统服务作为生产者,按照指定频率向桶(容器)中添加令牌,如 QPS 为 2,每 500ms 向桶中添加一个令牌,如果桶中令牌数量达到阈值,则不再添加。
  • 请求执行作为消费者,每个请求都需要去桶中拿取一个令牌,取到令牌则继续执行;如果桶中无令牌可取,就触发拒绝策略,可以是超时等待,也可以是直接拒绝本次请求,由此达到限流目的。

思考:

  • 1s / 阈值(QPS) = 令牌添加时间间隔。
  • 桶的容量等于限流的阈值,令牌数量达到阈值时,不再添加。
  • 可以适应流量突发,N 个请求到来只需要从桶中获取 N 个令牌就可以继续处理。
  • 有启动过程,令牌桶启动时桶中无令牌,然后按照令牌添加时间间隔添加令牌,若启动时就有阈值数量的请求过来,会因为桶中没有足够的令牌而触发拒绝策略,不过如 RateLimiter 限流工具已经优化了这类问题。

代码案例

使用Google封装的令牌桶RateLimiter   

/**
     * 代码中限制 QPS 为 2,也就是每隔 500ms 生成一个令牌,但是程序每隔 250ms 获取一次令牌,所以两次获取中只有一次会成功。
     */
    public static void main(String[] args) throws InterruptedException 
        RateLimiter rateLimiter = RateLimiter.create(2);

        for (int i = 0; i < 10; i++) 
            String time = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
            System.out.println(time + ":" + rateLimiter.tryAcquire());
            Thread.sleep(250);
        
    

限流的实现

限流有很多种方法实现:

  • 基于Guava工具类实现限流
  • 基于AOP实现限流
  • 基于Redis实现限流(适用于分布式)
  • 使用Redisson实现限流(适用于分布式)
  • Sentinel限流(适用于分布式)
  • nginx、Gateway限流…(适用于分布式)

以上是关于常见的限流算法与实现的主要内容,如果未能解决你的问题,请参考以下文章

三种常见的限流算法

基于redis实现的四种常见的限流策略

基于redis实现的四种常见的限流策略

基于redis实现的四种常见的限流策略

java并发系列 - 第29天:高并发中常见的限流方式

限流策略