四种限流算法图解
Posted rayylee
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了四种限流算法图解相关的知识,希望对你有一定的参考价值。
前言
会使用分布式,那么就说明业务量是较大的,如果不对流量进行限制,很大几率就导致高峰期时机器宕机。而常用的限流算法有四种,这也是所有限流框架实现限流的基础,接下来就通过本文来认识下它们。
1. 计数器限流算法
计数器是限流中最简单的,规定为:在指定周期内累加访问次数,当访问次数达到设定的阈值时,出发限流策略,当进入下一个时间周期时会将访问次数清零
- 优点:实现简单
- 临界问题:如图所示,当在8-10秒和10-12秒内分别并发500,虽然没有超过阈值,但如果算8-12秒,则并发数高达1000,已经超过了原先定义的10秒内不超过500的并发量
- 突刺现象:如果在单位时间10秒内的前100ms,通过了500个请求,则后面的990ms都无法接受任何请求,也就无法应对短时间高并发。
package main
import (
"fmt"
"sync"
"time"
)
type RequestLimiter struct {
Interval time.Duration // 重新计数时间
MaxCount int // 最大计数
Lock sync.Mutex
ReqCount int // 目前的请求数
}
func (reqLimiter *RequestLimiter) IsAvailable() bool {
reqLimiter.Lock.Lock()
defer reqLimiter.Lock.Unlock()
return reqLimiter.ReqCount < reqLimiter.MaxCount
}
// 非阻塞
func (reqLimiter *RequestLimiter) AddRequestCount() bool {
reqLimiter.Lock.Lock()
defer reqLimiter.Lock.Unlock()
if reqLimiter.ReqCount < reqLimiter.MaxCount {
reqLimiter.ReqCount += 1
return true
}
return false
}
func NewRequestLimitService(interval time.Duration, maxCount int) *RequestLimiter {
reqLimit := &RequestLimiter{
Interval: interval,
MaxCount: maxCount,
}
go func() {
ticker := time.NewTicker(interval)
for true {
<-ticker.C
reqLimit.Lock.Lock()
fmt.Println("reset Count ...")
reqLimit.ReqCount = 0
reqLimit.Lock.Unlock()
}
}()
return reqLimit
}
/**
原理:协程不阻塞 + 死循环 + ticker 定时执行 ReqCount 归零
ticker := time.NewTicker(2*time.Second)
for {
currentTime := <-ticker.C
fmt.Println("当前时间为:", currentTime)
}
输出:
当前时间为: 2021-05-19 11:02:12.3603475 +0800 CST m=+2.002147201
当前时间为: 2021-05-19 11:02:14.3611806 +0800 CST m=+4.002980301
当前时间为: 2021-05-19 11:02:16.360625 +0800 CST m=+6.002424701
*/
func main() {
service := NewRequestLimitService(time.Second, 2)
for true {
hasToken := service.AddRequestCount()
if hasToken {
fmt.Println(time.Now())
}
}
}
2. 滑动窗口限流算法
为了避免计数器中的临界问题,让限制更加平滑,将固定窗口中分割出多个小时间窗口,分别在每个小的时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小时间窗口。
- 优点:实现相对简单,且没有计数器算法的临界问题
- 缺点:无法应对短时间高并发(突刺现象)
如图所示,最终只需要统计每个小时间窗口不超过阈值/n
以及在滑动窗口范围内的所有的小时间窗口总的计数不超过阈值
即可
package main
import (
"fmt"
"sync"
"time"
)
type WindowLimiter struct {
Interval time.Duration // 总计数时间
WinCount []int // 每个窗口的访问数量
TicketSize int // 窗口最大容量
TicketCount int // 窗口数量
Lock sync.Mutex
CurIndex int // 目前使用哪个窗口
}
func (reqLimiter *WindowLimiter) IsAvailable() bool {
reqLimiter.Lock.Lock()
defer reqLimiter.Lock.Unlock()
return reqLimiter.WinCount[reqLimiter.CurIndex] < reqLimiter.TicketSize
}
func (reqLimiter *WindowLimiter) AddRequestCount() bool {
reqLimiter.Lock.Lock()
defer reqLimiter.Lock.Unlock()
if reqLimiter.WinCount[reqLimiter.CurIndex] < reqLimiter.TicketSize {
reqLimiter.WinCount[reqLimiter.CurIndex]++
return true
}
return false
}
func NewRequestLimitService(interval time.Duration, ticketCount int, ticketSize int) *WindowLimiter {
reqLimit := &WindowLimiter{
Interval: interval,
WinCount: make([]int, ticketCount, ticketCount),
TicketSize: ticketSize,
TicketCount: ticketCount,
CurIndex: 0,
}
go func() {
ticker := time.NewTicker(time.Duration(interval.Nanoseconds() / int64(ticketCount)))
for true {
<-ticker.C
reqLimit.Lock.Lock()
reqLimit.CurIndex = (reqLimit.CurIndex + 1) % reqLimit.TicketCount
reqLimit.WinCount[reqLimit.CurIndex] = 0
fmt.Println("reset Count ...")
reqLimit.Lock.Unlock()
}
}()
return reqLimit
}
func main() {
service := NewRequestLimitService(time.Second, 2, 1)
for true {
hasToken := service.AddRequestCount()
if hasToken {
fmt.Println(time.Now())
}
}
}
3. 漏桶限流算法
漏桶限流算法的核心就是, 不管上面的水流速度有多块, 漏桶水滴的流出速度始终保持不变。
- 实际应用:消息中间件采用的就是漏桶限流的思想
- 主要作用:控制数据注入网络的速度;平滑网络上的突发流量(类似于电容整流)
- 不足:无法应对突发的并发流量,因为流出速率一直都是恒定的
如图所示,就是一个固定的桶,桶上半部分没有限制流入速率(满了会溢出),但出水速率受限于孔的大小,也就是不管多少请求,最后给服务的请求数量的速率是恒定的,多余的请求将无法通过在桶内等待
package main
import (
"fmt"
"math"
"sync"
"time"
)
type BucketLimiter struct {
Timestamp time.Time // 当前注水的时间戳
Capacity float64 // 桶的容量
Rate float64 // 速度
Water float64 // 当前水量
Lock sync.Mutex
}
func AddWater(bucket *BucketLimiter) bool {
now := time.Now()
leftWater := math.Max(0, bucket.Water-now.Sub(bucket.Timestamp).Seconds()*bucket.Rate)
bucket.Lock.Lock()
defer bucket.Lock.Unlock()
if leftWater+1 < bucket.Capacity {
// 尝试加水,此时水桶未满
bucket.Timestamp = now
bucket.Water = leftWater + 1
return true
} else {
// 水满了,拒绝访问
return false
}
}
func main() {
service := &BucketLimiter{
Timestamp: time.Now(),
Capacity: 2,
Rate: 1,
Water: 0,
}
for true {
hasToken := AddWater(service)
if hasToken {
fmt.Println(time.Now())
}
}
}
4. 令牌桶限流算法
令牌桶是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法
速度恒定、令牌桶大小固定,如果令牌桶被填满,则会丢弃生成的令牌,如果桶内没有令牌则出现限流策略
-
优点:可以像漏桶那样匀速,也可以像计数器那样突发处理
-
基本过程:每进来一个请求,都在桶里获取一个令牌;如果有令牌,则拿着令牌通过;如果没有令牌,则不允许请求通过
-
几种情况:
请求速度 > 令牌生成速度:当令牌被取空后会被限流
请求速度 = 令牌生成速度:流量处于平稳状态
请求速度 < 令牌生成速度:请求可被正常处理,多余的令牌直接丢弃
package main
import (
"math"
"sync"
"time"
)
// 定义令牌桶结构
type tokenBucket struct {
timestamp time.Time // 当前时间戳
capacity float64 // 桶的容量(存放令牌的最大量)
rate float64 // 令牌放入速度
tokens float64 // 当前令牌总量
lock sync.Mutex
}
// 判断是否获取令牌(若能获取,则处理请求)
func getToken(bucket tokenBucket) bool {
now := time.Now()
bucket.lock.Lock()
defer bucket.lock.Unlock()
// 先添加令牌
leftTokens := math.Max(bucket.capacity, bucket.tokens+now.Sub(bucket.timestamp).Seconds()*bucket.rate)
if leftTokens < 1 {
// 若桶中一个令牌都没有了,则拒绝
return false
} else {
// 桶中还有令牌,领取令牌
bucket.tokens -= 1
bucket.timestamp = now
return true
}
}
总结
不同的限流算法都有其应用场景,并不是说令牌桶算法就最好,计数器算法就最差,具体可以依据业务的并发情况进行选择,比如业务是否会有突发流量?是否对流量恒定有要求?最好是根据业务特点选择不同的限流算法。
以上是关于四种限流算法图解的主要内容,如果未能解决你的问题,请参考以下文章