一份golang令牌桶攻略(juju/ratelimit)
Posted 落霜风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一份golang令牌桶攻略(juju/ratelimit)相关的知识,希望对你有一定的参考价值。
一份golang令牌桶使用攻略(juju/ratelimit)
使用场景
令牌桶的一个主要使用场景是限流。
程序以一定的速率生产令牌加入到令牌桶中。
每个请求到达时都会尝试从令牌桶中获取一块令牌, 如果获取令牌失败(令牌桶为空)则不处理该请求, 以此达到限流的目的。
juju/ratelimit使用
juju/ratelimit是开源的、golang语言实现的高效令牌桶,代码简洁,在本文写作时该项目有2.4k的start。
项目地址是 github.com/juju/ratelimit
api介绍
创建令牌锁
先看三个创建令牌锁的方法及它们的区别:
- NewBucket
创建一个令牌桶, 设置填充频率(fillInterval)和初始容量(capacity), 每填充频率的时间会向令牌桶中加入1块令牌。
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
//令牌桶容量为1,每10ms填充一块令牌
bucket := ratelimit.NewBucket(10 * time.Millisecond, 1)
- NewBucketWithQuantum
创建一个令牌桶, 设置填充频率(fillInterval)、初始容量(capacity)、每秒填充的令牌数(quantum), 每填充频率的时间会向令牌桶中加入quantum块令牌。
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
//令牌桶容量为3, 每10ms填充3块令牌
bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 3, 3)
- NewBucketWithRate
创建一个令牌桶, 设置每秒速率(rate)、初始容量(capacity)。
func NewBucketWithRate(rate float64, capacity int64) *Bucket
//令牌桶容量为1, 每秒限速100次
bucket := ratelimit.NewBucketWithRate(100, 1)
获取令牌
-
Take 非阻塞, 返回需等待的时间
func (tb *Bucket) Take(count int64) time.Duration
-
TakeAvailable 非阻塞, 令牌数不满足需求时, 返回可用的令牌数
func (tb *Bucket) TakeAvailable(count int64) int64
-
TakeMaxDuration 非阻塞, 返回需等待时间, 超过最大时间返回 0, false
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
-
Wait 阻塞, 直到拿到令牌
func (tb *Bucket) Wait(count int64)
-
WaitMaxDuration 阻塞 若在最大等待时间内能拿到令牌则阻塞, 否则立即返回false
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
其它方法
-
Available 返回当前可用令牌数
func (tb *Bucket) Available() int64
-
Rate 返回每秒限流速率
func (tb *Bucket) Rate() float64
关键源码分析
令牌桶结构是怎样的
type Bucket struct
//Clock提供了获取当前时间、sleep指定时间的方法
clock Clock
//桶被创建的时间, 当前属于第几个tick也是基于这个起始时间来计算
startTime time.Time
// 桶容量
capacity int64
// 每个tick向桶填充的令牌数
quantum int64
// 填充间隔
fillInterval time.Duration
// 桶方法使用的互斥锁, 保障线程安全
mu sync.Mutex
// 桶内可用的令牌数
// 当有操作等待令牌的时候, 这个数值会变成负数
availableTokens int64
// 上一个填充tick
// 计算当前tick与上一个tick的差值 得出需要填充的令牌数
latestTick int64
按一定的速率向桶中填充令牌是如何实现的?
juju/ratelimit没有启用额外的线程定时向桶中填充令牌, 而是在外部调用令牌锁的方法时触发一次填充方法,根据当前时间和令牌锁的创建时间的差值计算出是否需要填充、需要填充的数量。
填充令牌的方法, 外部调用令牌锁的方法时会触发
func (tb *Bucket) adjustavailableTokens(tick int64)
//令牌桶结构中记录了上一个填充周期的值
lastTick := tb.latestTick
tb.latestTick = tick
//如果桶是满的直接返回
if tb.availableTokens >= tb.capacity
return
//需要填充的数量是 (本周期数 - 上次填充周期数) * 单周期填充数
tb.availableTokens += (tick - lastTick) * tb.quantum
//填充数量不得超过桶的容量
if tb.availableTokens > tb.capacity
tb.availableTokens = tb.capacity
return
获取令牌的关键代码
Take(), TakeMaxDuration(), Wait(), WaitMaxDuration()这几个方法都是通过调用take()这个内部方法实现的
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool)
if count <= 0
return 0, true
//计算当前tick, 调用adjustavailableTokens填充令牌
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
//用可用令牌数(availableTokens)减去需要获取的令牌数(count),这里计算出的avail可能为负值
avail := tb.availableTokens - count
//令牌充足, 返回0(不需要等待), true(获取令牌成功)
if avail >= 0
tb.availableTokens = avail
return 0, true
//计算出一个endTick, 在未来的endTick到达时,令牌数将不再是负的
endTick := tick + (-avail+tb.quantum-1)/tb.quantum
//计算endTick的时间点
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
//需要等待的时间时endTime - now
waitTime := endTime.Sub(now)
if waitTime > maxWait
return 0, false
//更新availableTokens, 可能为负值
tb.availableTokens = avail
//返回等待时间, 获取成功
return waitTime, true
使用令牌锁的简单例子
代码:
import (
"github.com/juju/ratelimit"
"time"
)
func main()
//创建一个令牌桶初始容量为1, 每10ms填充3个令牌
bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 1, 3)
//程序运行长3秒
endTime := time.Now().Add(3 * time.Second)
//打印桶的每秒限速频率(预期300/s)
println("bucket rate:" , bucket.Rate(), "/s")
//使用一个变量记录获取令牌的总数
var tockensCount int64 = 0
for
//每次拿1块令牌, 成功返回1, 失败返回0
tocken := bucket.TakeAvailable(1)
tockensCount += tocken
if(time.Now().After(endTime))
println("tockensCount: ", tockensCount)
return;
time.Sleep(5 * time.Millisecond)
程序运行结果:
bucket rate: +3.000000e+002 /s
tockensCount: 301
tockensCount为什么是301而不是300:
因为创建的令牌桶初始容量为1。桶初始化完成后里面已经有一块令牌了, 可以立即拿到这块令牌不需要等待填充。
在后面的3000ms共填充了300块令牌。
使用令牌桶时要注意, 由于令牌桶是有"容量"的, 允许一定的瞬时流量, 对限制速率有严格要求的时候要小心设置容量与填充速度, 并进行实测验证。
使用golang实现令牌桶限流和时间窗口控制
这篇文章不是讲令牌桶算法原理,关于原理,请参考 https://blog.csdn.net/lzw_2006/article/details/51768935
我这里只是使用golang语言来实现令牌桶算法,以及时间窗口限流。
针对接口进行并发控制
如果担心接口某个时刻并发量过大了,可以细粒度地限制每个接口的 总并发/请求数
以下代码golang实现
package main
import (
"fmt"
"net"
"os"
"sync/atomic"
"time"
)
var (
limiting int32 = 1 // 这就是我的令牌桶
)
func main()
tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr
checkError(err)
listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口
checkError(err)
defer listener.Close()
for
conn, err := listener.Accept() // 在此处阻塞,每次来一个请求才往下运行handle函数
if err != nil
fmt.Println(err)
continue
go handle(&conn) // 起一个单独的协程处理,有多少个请求,就起多少个协程,协程之间共享同一个全局变量limiting,对其进行原子操作。
func handle(conn *net.Conn)
defer (*conn).Close()
n := atomic.AddInt32(&limiting, -1) // dcr 1 by atomic,获取一个令牌,总数减1。这是一个原子性的操作,并发情况下,数据不会写错。
if n < 0
// 令牌不够用了,限流,抛弃此次请求。
(*conn).Write([]byte("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\nError, too many request, please try again."))
else
// 还有剩余令牌可用
time.Sleep(1 * time.Second) // 假设我们的应用处理业务用了1s的时间
(*conn).Write([]byte("HTTP/1.1 200 OK\\r\\n\\r\\nI can change the world!")) // 业务处理结束后,回复200成功。
atomic.AddInt32(&limiting, 1) // add 1 by atomic,业务处理完毕,放回令牌
// 异常报错的处理
func checkError(err error)
if err != nil
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
limiting这个变量就是我用来限流的,把它看做令牌桶的池子吧。初始池中只有1个令牌,每一条处理请求,sleep了1秒。看看并发的效果。在一个终端中启动
go run example1.go
另外起一个终端,用golang的boom来做压测。要提前安装boom工具
go get github.com/rakyll/hey
go install github.com/rakyll/hey
然后压测
$ hey -c 10 -n 50 http://localhost:9090
Summary:
Total: 5.0246 secs
Slowest: 1.0066 secs
Fastest: 0.0008 secs
Average: 0.1023 secs
Requests/sec: 9.9510
Response time histogram:
0.001 [1] |■
0.101 [44] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.202 [0] |
0.303 [0] |
0.403 [0] |
0.504 [0] |
0.604 [0] |
0.705 [0] |
0.805 [0] |
0.906 [0] |
1.007 [5] |■■■■■
Latency distribution:
10% in 0.0011 secs
25% in 0.0013 secs
50% in 0.0014 secs
75% in 0.0044 secs
90% in 1.0021 secs
95% in 1.0061 secs
0% in 0.0000 secs
Details (average, fastest, slowest):
DNS+dialup: 0.0016 secs, 0.0008 secs, 1.0066 secs
DNS-lookup: 0.0010 secs, 0.0003 secs, 0.0022 secs
req write: 0.0002 secs, 0.0000 secs, 0.0008 secs
resp wait: 0.1022 secs, 0.0000 secs, 1.0050 secs
resp read: 0.0001 secs, 0.0000 secs, 0.0002 secs
Status code distribution:
[200] 5 responses
[404] 45 responses
hey命令-c表示并发数,我设为10,-n表示总共发送多少条,我发50条。
结果是只有5条返回http成功的状态码200,其他45条都失败了。这说明有得线程能竞争资源成功,有的线程竞争资源失败,这里只有5个竞争成功的。总共用时也就5.0246秒,平均速率1r/s。这种结果这和代码中令牌池只有1个令牌,而每个请求要花1s的时间的要求相吻合。说明我们现在将请求限流在1r/s,超过这个速度涌进来的请求都会被抛弃404。
注意:这里使用的是golang的协程,和线程还是有区别的,不过在这里不影响我们做测试,只要把它理解为并发就行了,协程的原理可以去搜下看看。
修改一下结果,把limiting改成10,再测试
......
Status code distribution:
[200] 50 responses
这回是恰到好处啊,刚好满足10r/s的QPS,所有的请求都成功了。
当然,这种并发控制方式简单粗暴,没有平滑处理,慎用。
针对时间窗口进行并发控制
如果某个基础服务调用量很大,我们害怕它被突然的大流量打挂,所以需要限制一个窗口期内接口的请求量。下面是一种实现窗口时间并发控制的方法
我们使用缓存来存储计数器,秒数作为Key,Value代表这一秒有多少个请求。这样就限制了一秒内的并发数,过期时间设置长一些,比如两秒,保证一秒内的数据是存在的。
package main
import (
"fmt"
"net"
"os"
"time"
cache "github.com/UncleBig/goCache"
)
var (
limit int = 10
c *cache.Cache
)
func main()
c = cache.New(10*time.Minute, 30*time.Second)
tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr
checkError(err)
listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口
checkError(err)
defer listener.Close()
for
conn, err := listener.Accept()
if err != nil
fmt.Println(err)
continue
go handle(&conn)
func handle(conn *net.Conn)
defer (*conn).Close()
t := time.Now().Unix()
key := fmt.Sprintf("%d", t)
if n, found := c.Get(key); found
num := n.(int)
fmt.Printf("key:%d num:%d\\n", t, num)
if num >= limit
(*conn).Write([]byte("HTTP/1.1 404 NOT FOUND\\r\\n\\r\\nError, too many request, please try again."))
else
(*conn).Write([]byte("HTTP/1.1 200 OK\\r\\n\\r\\nI can change the world!"))
c.Increment(key, 1)
else
(*conn).Write([]byte("HTTP/1.1 200 OK\\r\\n\\r\\nI can change the world!"))
c.Set(key, 1, 2 * time.Second)
func checkError(err error)
if err != nil
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
这段代码用了缓存,所以要先下载库
go get -u github.com/UncleBig/goCache
同样的方式启动测试,先来个小测试,服务端打印日志
[root@VM_195_216_centos ~]# go run example2.go
key:1510229724 num:1 success
key:1510229724 num:2 success
key:1510229724 num:3 success
key:1510229724 num:4 success
key:1510229724 num:5 success
key:1510229724 num:6 success
key:1510229724 num:7 success
key:1510229724 num:8 success
key:1510229724 num:9 success
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
key:1510229724 num:10 failed
再看看我们测试用的命令
$ hey -c 10 -n 30 http://localhost:9090
......
Status code distribution:
[200] 10 responses
[404] 20 responses
结果是10条成功20条失败。看服务端 的日志发现,所有的日志都是打印的同一秒(1510229724)内的请求。当累计处理完10条限流要求的请求之后(num从1打印到10),再往后在这一秒内的请求都直接返回失败了,在这一秒内的限流取得了成功。
接下来再看看,大量持续请求的情况下,限流效果。
[root@VM_195_216_centos ~]# go run example2.go
key:1510229933 num:1 success
key:1510229933 num:2 success
key:1510229933 num:3 success
key:1510229933 num:4 success
key:1510229933 num:5 success
key:1510229933 num:6 success
key:1510229933 num:7 success
key:1510229933 num:8 success
key:1510229933 num:9 success
key:1510229933 num:10 failed
key:1510229933 num:10 failed
......
key:1510229933 num:10 failed
key:1510229933 num:10 failed
key:1510229934 num:1 success
key:1510229934 num:2 success
key:1510229934 num:3 success
key:1510229934 num:4 success
key:1510229934 num:5 success
key:1510229934 num:6 success
key:1510229934 num:7 success
key:1510229934 num:8 success
key:1510229934 num:9 success
key:1510229934 num:10 failed
key:1510229934 num:10 failed
......
key:1510229934 num:10 failed
key:1510229934 num:10 failed
key:1510229935 num:1 success
key:1510229935 num:2 success
key:1510229935 num:3 success
key:1510229935 num:4 success
key:1510229935 num:5 success
key:1510229935 num:6 success
key:1510229935 num:7 success
key:1510229935 num:8 success
key:1510229935 num:9 success
key:1510229935 num:10 failed
key:1510229935 num:10 failed
......
key:1510229935 num:10 failed
key:1510229935 num:10 failed
key:1510229936 num:1 success
key:1510229936 num:2 success
key:1510229936 num:3 success
key:1510229936 num:4 success
key:1510229936 num:5 success
key:1510229936 num:6 success
key:1510229936 num:7 success
key:1510229936 num:8 success
key:1510229936 num:9 success
key:1510229936 num:10 failed
key:1510229936 num:10 failed
......
测试命令
$ hey -c 10 -n 10000 http://localhost:9090
Summary:
Total: 2.9792 secs
......
Status code distribution:
[200] 40 responses
[404] 9937 responses
这次总共花了近3秒时间,发了1w条请求,由于日志打印太多了,截取部分有代表性的。可以看到经历了3秒,每1秒内都只成功10条,接下来到下一秒之前的请求都是失败的。3秒总共成功了40条,按理说应该30条,可能边界值那几毫秒控制的不是很精准,这个误差可以容忍,还是能达到限流的理想效果。
创建于 2018-09-08 北京,更新于 2019-05-23 北京
该文章在以下平台同步
以上是关于一份golang令牌桶攻略(juju/ratelimit)的主要内容,如果未能解决你的问题,请参考以下文章