并发编程Pool 基本用法和如何实现
Posted @了凡
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程Pool 基本用法和如何实现相关的知识,希望对你有一定的参考价值。
博主介绍:
– 我是了 凡 微信公众号【了凡银河系】期待你的关注。未来大家一起加油啊~
前言
Go是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收,所以我们一般都是想用就用,没有考虑如何提高性能的问题。但是,如果想要使用Go开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。
但是具体都带来什么影响呢?Go的自动垃圾回收机制有一个STW(stop-the-world,程序暂停)的时间,还有另一个耗时的问题,就是在大量的创建在堆上的对象,也会影响垃圾回收标记的时间。
一般情况下做性能优化的时候,都选择将不用的对象手动回收起来,避免被垃圾回收掉,这样下一次使用的时候就不必在堆上重新创建了。并且,比如数据库连接、TCP的长连接等都可以考虑将连接保存下来,避免每次使用的时候都重新创建,不仅可以大大减少业务的耗时,还能提高应用程序的整体性能。
所以为了解决以上问题,GO标准库中提供了一个通用的Pool数据结构,也就是sync.Pool,使用它可以创建池化的对象。但是这个类型有一个缺点,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的。
sync.Pool
sync.Pool数据类型用来保存一组可独立访问的临时对象。临时是指:池化的对象会在未来的某个时候被毫无预兆地移除掉。而且,如果没有别的对象引用这个被移除的对象的化,这个被移除的对象就会被垃圾回收掉。
知识点:
- sync.Pool本身就是线程安全的,多个goroutine可以并发地调用它的方法存取对象;
- sync.Pool不可在使用之后再复制使用。
sync.Pool的使用方法
这个数据类型提供了三个对外的方法:New、Get和Put。
- New
这个字段的类型是函数func() interface。当调用Pool的Get方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个New方法来创建新的的元素。如果没有设置New字段,没有更多的空闲元素可返回时,Get方法将返回nil,表明当前没有可用的元素。 - Get
如果调用Get方法,就会从Pool取走一个元素,取走指会从Pool中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get方法的返回值还可能会是一个nil(Pool.New字段没有设置,又没有空闲元素可以返回),所以在使用的时候需要判断。 - Put
Put用于将一个元素存入Pool,Pool会把这个元素保存到池中,并且可以复用。但如果Put一个nil值,Pool就会忽略这个值。 - 缓冲池
buffer池(缓冲池)是sync.Pool最常用的一个场景。
因为byte slice是经常被创建销毁的一类对象,使用buffer池可以缓存已经创建的byte slice,例如,著名的静态网站生成工具Hugo中,包含了这样的实现bufpool。
var buffers = sync.Pool
New: func() interface
return new(bytes.Buffer)
,
func GetBuffer() *bytes.Buffer
return buffers.Get().(*bytes.Buffer)
func PutBuffer(buf *bytes.Buffer)
buf.Reset()
buffers.Put(buf)
此段代码,很常见,但是存在一个问题,请不要使用在项目中。这段代码可能存在内存泄漏问题。
实现原理
-
每次GC都会回收创建的对象。
如果缓存元素数量太多,就会导致STW耗时变长;缓存元素都被回收后,会导致Get命中率下降,Get方法不得不新创建很多对象。 -
底层实现使用了Mutex,对这个锁并发请求竞争记录的时候,会导致性能的下降。
sync.Pool做了大量的优化。
提高并发程序性能的优化点是尽量不要使用锁,如果不得已使用了锁,就把锁Go的粒度降到最低。Go对Pool的优化就是避免使用锁,同时将加锁的queue改成lock-free的queue的实现,给即将移除的元素再多一次“复活”的机会。
sync.Pool的数据结构如下图所示:
Pool最重要的两个字段是local和victim,因为它们两个主要作用就是存储空闲的元素。
每一次垃圾回收的时候,Pool会讲victim中的对象回收,然后把local的数据给victim,这样的话,local就会被清空,而victim就会像一个垃圾分拣站,里面的东西可能会被当作垃圾丢弃了,但是里面有用的东西也可能被捡回来重新使用。
victim中的元素如果被Get取走,那么这个元素就会很幸运,因为它再一次“复活”了。但是,如果这个时候Get的并发不是很大,元素没有被Get取走,那么就会被移除掉,因为没有别人引用它的话,就会被垃圾回收掉。
此段代码就是垃圾回收时sync.Pool的处理逻辑:
func poolCleanup()
// 丢弃当前victim,STW所以不用加锁
for _, p := range oldPools
p.victim = nil
p.victimSize = 0
// 将local复制给victim,并将原local置为nil
for _, p := allPools
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
oldPools, allPools = allPools, nil
在这段代码中,你需要关注一下local字段,因为所有当前主要的空闲可用的元素都存放在local字段中查找可用的元素。local字段包含一个poolLocallnternal字段,并提供CPU缓存对齐,从而避免false sharing。
poolLocallnternal包含两个字段:private和shared。
- private,代表一个缓存的元素,而且只能由相应的一个P存取。因为一个P同时只能执行一个goroutine,所以不会有并发的问题。
- shared,可以由任意的P访问,但是只有本地的P才能pushHead/popHead,其他P可以jpopTail,相当于只有一个本地的P作为生产者(Producer),多个P作为消费者,它是使用一个local-free 的queue列表实现的。
Get方法
func (p *Pool) Get() interface
// 把当前goroutine固定在当前的P上
l, pid := p.pin()
x := .private // 优先从local的private字段取,快速
l.private = nil
if x == nil
// 从当前的local.shared弹出一个,注意是从head读取并移除
x, _ = l.shared.popHead()
if x == nil // 如果没有,则去偷一个
x = p.getSlow(pid)
runtime_procUnpin()
// 如果没有获取到,尝试使用New函数生成一个新的
if x == nil && p.New != nil
x = p.New()
return x
从本地的private字段中获取可用元素,因为没有锁,获取元素的过程会非常快,如果么有获取到,就尝试从本地的shared获取一个,如果还没有,会使用getSlow方法去其他的shared中“偷”一个。最后,如果没有获取到,就尝试使用New函数创建一个新的。
getSlow方法,表示耗时可能比较长。首先要遍历所有的local,尝试从它们的shared他拿出一个元素。如果还没有找到一个,那么,就开始堆victim下手了。
在vintim中查询可用元素的逻辑还是一样的,先从对应的victim的private查找,如果查找不到,就再从其他victim的shared中查找。
getSlow源码逻辑:
func (p *Pool) getSlow(pid int) interface
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil
return x
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size
return nil
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil
l.private = nil
return x
for i := 0; i < int(size); i++
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil
return x
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
Put方法
func (p *Pool) Put(x interface)
if x == nil // nil值直接丢弃
return
l, _ := p.pin()
if l.private == nil // 如果本地private没有值,直接设置这个值即可
l.private = x
x = nil
if x != nil // 否则加入到本地队列中
l.shared.pushHead(x)
runtime_procUnpin()
Put 的逻辑相对简单,优先设置本地private,如果private字段已经有值了,那么就把此元素push到本地队列中。
总结
本次共分享了Pool是什么?要如何使用,并且了解了sync.Pool的数据结构以及执行流程和实现过程,相信对于Pool已经了解了基础用法和认识,如果想要了解更多可以从Pool的一些坑入手,例如内存泄露,以及内存浪费等。后续会写一下关于Go语言的GC以及变量逃逸等问题。
创作不易,点个赞吧!
如果需要后续再看点个收藏!
如果对我的文章有兴趣给个关注!
如果有问题,可以关注公众号【了凡银河系】点击联系我私聊。
以上是关于并发编程Pool 基本用法和如何实现的主要内容,如果未能解决你的问题,请参考以下文章