四种限流算法图解

Posted rayylee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了四种限流算法图解相关的知识,希望对你有一定的参考价值。

前言

会使用分布式,那么就说明业务量是较大的,如果不对流量进行限制,很大几率就导致高峰期时机器宕机。而常用的限流算法有四种,这也是所有限流框架实现限流的基础,接下来就通过本文来认识下它们。

1. 计数器限流算法

计数器是限流中最简单的,规定为:在指定周期内累加访问次数,当访问次数达到设定的阈值时,出发限流策略,当进入下一个时间周期时会将访问次数清零
在这里插入图片描述

  • 优点:实现简单
  • 临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量

在这里插入图片描述

  • 突刺现象:如果在单位时间10秒内的前100ms,通过了500个请求,则后面的990ms都无法接受任何请求,也就无法应对短时间高并发。
package main

import (
	"fmt"
	"sync"
	"time"
)

type RequestLimiter struct {
	Interval time.Duration // 重新计数时间
	MaxCount int           // 最大计数
	Lock     sync.Mutex
	ReqCount int // 目前的请求数
}

func (reqLimiter *RequestLimiter) IsAvailable() bool {
	reqLimiter.Lock.Lock()
	defer reqLimiter.Lock.Unlock()

	return reqLimiter.ReqCount < reqLimiter.MaxCount
}

// 非阻塞
func (reqLimiter *RequestLimiter) AddRequestCount() bool {
	reqLimiter.Lock.Lock()
	defer reqLimiter.Lock.Unlock()
	if reqLimiter.ReqCount < reqLimiter.MaxCount {
		reqLimiter.ReqCount += 1
		return true
	}
	return false
}

func NewRequestLimitService(interval time.Duration, maxCount int) *RequestLimiter {
	reqLimit := &RequestLimiter{
		Interval: interval,
		MaxCount: maxCount,
	}
	go func() {
		ticker := time.NewTicker(interval)
		for true {
			<-ticker.C
			reqLimit.Lock.Lock()
			fmt.Println("reset Count ...")
			reqLimit.ReqCount = 0
			reqLimit.Lock.Unlock()
		}
	}()

	return reqLimit
}

/**
	原理:协程不阻塞 + 死循环 + ticker 定时执行 ReqCount 归零
	ticker := time.NewTicker(2*time.Second)
	for {
		currentTime := <-ticker.C
		fmt.Println("当前时间为:", currentTime)
	}

	输出:
		当前时间为: 2021-05-19 11:02:12.3603475 +0800 CST m=+2.002147201
		当前时间为: 2021-05-19 11:02:14.3611806 +0800 CST m=+4.002980301
		当前时间为: 2021-05-19 11:02:16.360625 +0800 CST m=+6.002424701
 */
func main() {
	service := NewRequestLimitService(time.Second, 2)
	for true {
		hasToken := service.AddRequestCount()
		if hasToken {
			fmt.Println(time.Now())
		}
	}
}

2. 滑动窗口限流算法

为了避免计数器中的临界问题,让限制更加平滑,将固定窗口中分割出多个小时间窗口,分别在每个小的时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口。

  • 优点:实现相对简单,且没有计数器算法的临界问题
  • 缺点:无法应对短时间高并发(突刺现象)
    在这里插入图片描述
    如图所示,最终只需要统计每个小时间窗口不超过阈值/n 以及 在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值即可
package main

import (
	"fmt"
	"sync"
	"time"
)

type WindowLimiter struct {
	Interval    time.Duration // 总计数时间
	WinCount    []int         // 每个窗口的访问数量
	TicketSize  int           // 窗口最大容量
	TicketCount int           // 窗口数量
	Lock        sync.Mutex
	CurIndex    int // 目前使用哪个窗口
}

func (reqLimiter *WindowLimiter) IsAvailable() bool {
	reqLimiter.Lock.Lock()
	defer reqLimiter.Lock.Unlock()
	return reqLimiter.WinCount[reqLimiter.CurIndex] < reqLimiter.TicketSize
}

func (reqLimiter *WindowLimiter) AddRequestCount() bool {
	reqLimiter.Lock.Lock()
	defer reqLimiter.Lock.Unlock()
	if reqLimiter.WinCount[reqLimiter.CurIndex] < reqLimiter.TicketSize {
		reqLimiter.WinCount[reqLimiter.CurIndex]++
		return true
	}
	return false
}

func NewRequestLimitService(interval time.Duration, ticketCount int, ticketSize int) *WindowLimiter {
	reqLimit := &WindowLimiter{
		Interval:    interval,
		WinCount:    make([]int, ticketCount, ticketCount),
		TicketSize:  ticketSize,
		TicketCount: ticketCount,
		CurIndex:    0,
	}
	go func() {
		ticker := time.NewTicker(time.Duration(interval.Nanoseconds() / int64(ticketCount)))
		for true {
			<-ticker.C
			reqLimit.Lock.Lock()
			reqLimit.CurIndex = (reqLimit.CurIndex + 1) % reqLimit.TicketCount
			reqLimit.WinCount[reqLimit.CurIndex] = 0
			fmt.Println("reset Count ...")
			reqLimit.Lock.Unlock()
		}
	}()

	return reqLimit
}

func main() {
	service := NewRequestLimitService(time.Second, 2, 1)
	for true {
		hasToken := service.AddRequestCount()
		if hasToken {
			fmt.Println(time.Now())
		}
	}
}

3. 漏桶限流算法

漏桶限流算法的核心就是, 不管上面的水流速度有多块, 漏桶水滴的流出速度始终保持不变

  • 实际应用:消息中间件采用的就是漏桶限流的思想
  • 主要作用:控制数据注入网络的速度;平滑网络上的突发流量(类似于电容整流)
  • 不足:无法应对突发的并发流量,因为流出速率一直都是恒定的

在这里插入图片描述
如图所示,就是一个固定的桶,桶上半部分没有限制流入速率(满了会溢出),但出水速率受限于孔的大小,也就是不管多少请求,最后给服务的请求数量的速率是恒定的,多余的请求将无法通过在桶内等待

package main

import (
	"fmt"
	"math"
	"sync"
	"time"
)

type BucketLimiter struct {
	Timestamp time.Time // 当前注水的时间戳
	Capacity  float64   // 桶的容量
	Rate      float64   // 速度
	Water     float64   // 当前水量
	Lock      sync.Mutex
}

func AddWater(bucket *BucketLimiter) bool {
	now := time.Now()
	leftWater := math.Max(0, bucket.Water-now.Sub(bucket.Timestamp).Seconds()*bucket.Rate)
	bucket.Lock.Lock()
	defer bucket.Lock.Unlock()
	if leftWater+1 < bucket.Capacity {
		// 尝试加水,此时水桶未满
		bucket.Timestamp = now
		bucket.Water = leftWater + 1
		return true
	} else {
		// 水满了,拒绝访问
		return false
	}
}

func main() {
	service := &BucketLimiter{
		Timestamp: time.Now(),
		Capacity:  2,
		Rate:      1,
		Water:     0,
	}
	for true {
		hasToken := AddWater(service)
		if hasToken {
			fmt.Println(time.Now())
		}

	}
}

4. 令牌桶限流算法

令牌桶是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法
速度恒定、令牌桶大小固定,如果令牌桶被填满,则会丢弃生成的令牌,如果桶内没有令牌则出现限流策略

  • 优点:可以像漏桶那样匀速,也可以像计数器那样突发处理
    在这里插入图片描述

  • 基本过程:每进来一个请求,都在桶里获取一个令牌;如果有令牌,则拿着令牌通过;如果没有令牌,则不允许请求通过

  • 几种情况:
    请求速度 > 令牌生成速度:当令牌被取空后会被限流
    请求速度 = 令牌生成速度:流量处于平稳状态
    请求速度 < 令牌生成速度:请求可被正常处理,多余的令牌直接丢弃

package main

import (
	"math"
	"sync"
	"time"
)

// 定义令牌桶结构
type tokenBucket struct {
	timestamp time.Time // 当前时间戳
	capacity  float64   // 桶的容量(存放令牌的最大量)
	rate      float64   // 令牌放入速度
	tokens    float64   // 当前令牌总量
	lock      sync.Mutex
}

// 判断是否获取令牌(若能获取,则处理请求)
func getToken(bucket tokenBucket) bool {
	now := time.Now()
	bucket.lock.Lock()
	defer bucket.lock.Unlock()
	// 先添加令牌
	leftTokens := math.Max(bucket.capacity, bucket.tokens+now.Sub(bucket.timestamp).Seconds()*bucket.rate)
	if leftTokens < 1 {
		// 若桶中一个令牌都没有了,则拒绝
		return false
	} else {
		// 桶中还有令牌,领取令牌
		bucket.tokens -= 1
		bucket.timestamp = now
		return true
	}
}

总结

不同的限流算法都有其应用场景,并不是说令牌桶算法就最好,计数器算法就最差,具体可以依据业务的并发情况进行选择,比如业务是否会有突发流量?是否对流量恒定有要求?最好是根据业务特点选择不同的限流算法。

以上是关于四种限流算法图解的主要内容,如果未能解决你的问题,请参考以下文章

太强了!主流的四种限流策略,都可以通过redis实现

主流的四种限流策略,我都可以通过redis实现,干货仅此一篇

5种限流算法,7种限流方式,挡住突发流量?

5种限流算法,7种限流方式,挡住突发流量?

5种限流算法,7种限流方式,挡住突发流量?

常见的几种限流算法代码实现(JAVA)