Go sync.Pool池化的学习
Posted 技术能量站
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go sync.Pool池化的学习相关的知识,希望对你有一定的参考价值。
一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。
一.前言
Go 语言标准库也大量使用了 sync.Pool,例如 fmt 和 encoding/json。
1.1 要解决的问题
一个新技术亦或是一个新名词,总是为了解决一些问题才出现的,所以先搞明白解决什么问题是第一步。
核心来说,我们的代码中会各种创建对象,比如new一个结构体、创建一个连接、甚至创建一个int都属于对象。那么假设在某些场景下,你的代码会频繁的创建某一种对象,那么这种操作可能会影响你程序的性能,原因是什么呢?
- 我们知道创建对象肯定是需要申请内存的
- 频繁的创建对象,会对GC造成较大的压力,其实主要是GC压力较大,golang的官方库sync.pool就是为了解决它,看名字就是池的方法。
1.2 池和缓存
sync.pool的思想很简单就是对象池,由于最近一直在做相关的事情,这里我们说个题外话,关于池和缓存说下我的一些看法。
- 工作中遇到过很多池:连接池,线程池,协程池,内存池等,会发现这些所谓池,都是解决同一个类型的问题,创建连接、线程等比较消耗资源,所以用池化的思想来解决这些问题,直接复用已经创建好的。
- 其实缓存也是,用到缓存的地方比如说,本地缓存、容灾缓存,性能缓存等名词,这些缓存的思想无非就是把计算好的存起来,真正的流量过来的时候,直接使用缓存好的内容,能提服务响应高速度。
1.3 总结下来就是
- 复用之前的内容,不用每次新建
- 提前准备好,不用临时创建
- 采用性能高的存储做缓存,更加提高响应速度 其实看下来跟我们的对象池,没什么区别,我们对象池也就是复用之前创建好的对象。
最后发散下思想,影响我们的程序性能的有以下几个,存储、计算、网络等,其实都可以做缓存,或者提前准备好,亦或者复用之前的结果。我们程序中很多init的东西不就是提前准备好的存储吗,我们很多做的local cache其实就是减少网络传输时间等,以后优化服务性能可以从这个角度考虑。
二. 工作原理
我们先看下如何使用如下结构体
package main
import (
"fmt"
"sync"
)
type item struct
value int
func main()
pool := sync.Pool
New: func() interface
return item
,
pool.Put(itemvalue: 1)
data := pool.Get()
fmt.Println("data = ",data)
看起来使用方式很简单,创建一个对象池的方式传进去一个New对象的函数,然后就是两个函数获取对象Get和放入对象Put。
想彻底搞明白原理,莫过于直接去读源码。 先瞅一眼结构体
2.1 Pool
Pool是一个可以分别存取的临时对象的集合。Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。
Pool可以安全的被多个线程同时使用。Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。
Pool的合理用法是管理一组被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。
Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。**这时应该由这些对象自己实现空闲列表。第一次使用后不得复制池。
// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
//
// An appropriate use of a Pool is to manage a group of temporary items
// silently shared among and potentially reused by concurrent independent
// clients of a package. Pool provides a way to amortize allocation overhead
// across many clients.
//
// An example of good use of a Pool is in the fmt package, which maintains a
// dynamically-sized store of temporary output buffers. The store scales under
// load (when many goroutines are actively printing) and shrinks when
// quiescent.
//
// On the other hand, a free list maintained as part of a short-lived object is
// not a suitable use for a Pool, since the overhead does not amortize well in
// that scenario. It is more efficient to have such objects implement their own
// free list.
//
// A Pool must not be copied after first use.
type Pool struct
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface
- local :指向的是poolLocal的数组的指针,[P]poolLocal 数组长度是调度器中P的数量,也就是说每一个P有自己独立的poolLocal。通过P.id来获取每个P自己独立的poolLocal。在poolLocal中有一个poolChain
- localSize:local数组的长度
- victim :victim 和 victimSize 这个是在 poolCleanup 流程里赋值了,赋值的内容就是 local 和 localSize 。victim 机制是把 Pool 池的清理由一轮 GC 改成 两轮 GC,进而提高对象的复用率,减少抖动;
- New func() interface:当池中没有对象时,get方法将使用New方法来返回一个对象,如果你不设置New方法的话,将会返回nil
那么看下这个本地资源池的结构.一层套一层
// Local per-P Pool appendix.
type poolLocalInternal struct
private interface // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
type poolLocal struct
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal)%128]byte
- poolChain:是一个双端队列链,缓存对象。 1.12版本中对于这个字段的并发安全访问是通过mutex加锁实现的;1.14优化后通过poolChain(无锁化)实现的。
2.2 pool.put
// Put adds x to the pool.
func (p *Pool) Put(x interface)
if x == nil
return
if race.Enabled
if fastrand()%4 == 0
// Randomly drop x on floor.
return
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
l, _ := p.pin()
if l.private == nil
l.private = x
x = nil
if x != nil
l.shared.pushHead(x)
runtime_procUnpin()
if race.Enabled
race.Enable()
Put函数主要逻辑是
(1)先调用p.pin() 函数,这个函数会将当前 goroutine与P绑定,并设置当前g不可被抢占(也就不会出现多个协程并发读写当前P上绑定的数据);
- 在p.pin() 函数里面还会check per P的[P]poolLocal数组是否发生了扩容(P 扩张)。
- 如果发生了扩容,需要调用pinSlow()来执行具体扩容。扩容获取一个调度器全局大锁allPoolsMu,然后根据当前最新的P的数量去执行新的扩容。这里的成本很高,所以尽可能避免手动增加P的数量。
(2)拿到per P的poolLocal后,优先将val put到private,如果private已经存在,就通过调用shared.pushHead(x) 塞到poolLocal里面的无锁双端队列的chain中。Put函数对于双端队列来说是作为一个Producer角色,所以这里的调用是无锁的。
(3)最后解除当前goroutine的禁止抢占。
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int)
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s
return indexLocal(l, pid), pid
return p.pinSlow()
func (p *Pool) pinSlow() (*poolLocal, int)
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s
return indexLocal(l, pid), pid
if p.local == nil
allPools = append(allPools, p)
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
2.3 pool.get
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface
if race.Enabled
race.Disable()
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil
x = p.getSlow(pid)
runtime_procUnpin()
if race.Enabled
race.Enable()
if x != nil
race.Acquire(poolRaceAddr(x))
if x == nil && p.New != nil
x = p.New()
return x
Get函数主要逻辑:
- 设置当前 goroutine 禁止抢占(race竞态检查);
- 从 poolLocal的private取,如果private不为空直接return;
- 从 poolLocal.shared这个双端队列chain里面无锁调用去取,如果取得到也直接return;
- 上面都去不到,调用getSlow(pid)去取
a. 首先会通过 steal 算法,去别的P里面的poolLocal去取,这里的实现是无锁的cas。如果能够steal一个过来,就直接return;
b. 如果steal不到,则从 victim 里找,和 poolLocal 的逻辑类似。最后,实在没找到,就把 victimSize 置 0,防止后来的“人”再到 victim 里找。 - 最后还拿不到,就通过New函数来创建一个新的对象。
这里是一个很明显的多层级缓存优化 + GPM调度结合起来。
private -> shared -> steal from other P -> victim cache -> New
func (p *Pool) getSlow(pid int) interface
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
for i := 0; i < int(size); i++
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil
return x
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size
return nil
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil
l.private = nil
return x
for i := 0; i < int(size); i++
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil
return x
// Mark the victim cache as empty for future gets don't bother
// with it.
atomic.StoreUintptr(&p.victimSize, 0)
return nil
2.4 victim cache优化与GC
对于Pool来说并不能够无上限的扩展,否则对象占用内存太多了,会引起内存溢出。
几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?
这里是使用GC。在pool.go里面的init函数 会注册清理函数:
func init()
runtime_registerPoolCleanup(poolCleanup)
// mgc.go
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func())
poolcleanup = f
编译器会把 runtime_registerPoolCleanup 函数调用链接到 mgc.go 里面的 sync_runtime_registerPoolCleanup函数调用,实际上注册到poolcleanup函数。
在 Golang GC 开始的时候 gcStart 调用 clearpools() 函数就会调用到 poolCleanup 函数。也就是说,每一轮 GC 都是对所有的 Pool 做一次清理。
整个调用链如下:
- gcStart() -> clearpools() -> poolcleanup() 也就是每一轮GC开始都会执行pool的清除操作。
这个是定期执行的,在 sync package init 的时候注册,由 runtime 后台执行,内容就是批量清理 allPools 里的元素。
func poolCleanup()
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools
p.victim = nil
p.victimSize = 0
// Move primary cache to victim cache.
for _, p := range allPools
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
poolCleanup 会在 STW 阶段被调用。整体看起来,比较简洁。主要是将 local 和 victim 作交换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在“兜底”。
- 如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。如果获取的速度下降了,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。
在Go1.13之前的poolCleanup比较粗暴,直接清空了所有 Pool 的 p.local 和poolLocal.shared。
通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。所以 p.victim 的作用其实就是次级缓存。
2.5 poolChain
这里我们先重点分析一下poolChain 是怎么实现并发无锁编程的。
// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
type poolChainElt struct
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
poolChain是一个动态大小的双向链接列表的双端队列。每个出站队列的大小是前一个队列的两倍。也就是说poolChain里面每个元素poolChainElt都是一个双端队列。
- head指向的poolChainElt,是用于Producer去Push元素的,不需要做同步处理。
- tail指向的poolChainElt,是用于Consumer从tail去pop元素的,这里的读写需要保证原子性
简单来说,poolChain是一个单Producer,多Consumer并发访问的双端队列链。
对于poolChain中的每一个双端队列 poolChainElt,包含了双端队列实体poolDequeue 一起前后链接的指针。
poolChain 主要方法有:
- popHead()(interface,bool);
- pushHead(val interface)
- popTail()(interface,bool)
其中,popHead和pushHead函数是给Producer调用的;popTail是给Consumer并发调用的。
2.5.1 poolChain.popHead()
前面我们说了,poolChain的head 指针的操作是单Producer的。
func (c *poolChain) popHead() (interface, bool)
d := c.head
for d != nil
if val, ok := d.popHead(); ok
return val, ok
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
return nil, false
poolChain要求,popHead函数只能被Producer调用。看一下逻辑:
- 获取头结点 head;
- 如果头结点非空就从头节点所代表的双端队列poolDequeue中调用popHead函数。注意这里poolDequeue的popHead函数和poolChain的popHead函数并不一样。poolDequeue是一个固定size的ring buffer。
- 如果从head中拿到了value,就直接返回;
- 如果从head中拿不到value,就从head.prev再次尝试获取;
- 最后都获取不到,就返回nil。
2.5.2 poolChain.pushHead()
func (c *poolChain) pushHead(val interface)
d := c.head
if d == nil
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
if d.pushHead(val)
return
// The current dequeue is full. Allocate a new one of twice
// the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit
// Can't make it any bigger.
newSize = dequeueLimit
d2 := &poolChainEltprev: d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
poolChain要求,pushHead函数同样只能被Producer调用。看一下逻辑:
- 首先还是获取头结点 head;
- 如果头结点为空,需要初始化chain
- 创建poolChainElt 节点,作为head, 当然也是tail。
- poolChainElt 其实也是固定size的双端队列poolDequeue,size必须是2的n次幂。
- 调用poolDequeue的pushHead函数将 val push进head的双端队列poolDequeue。
- 如果push失败了,说明双端队列满了,需要重新创建一个双端队列d2,新的双端队列的size是前一个双端队列size的2倍;
- 更新poolChain的head指向最新的双端队列,并且建立双链关系;
- 然后将val push到最新的双端队列。
这里需要注意一点的是head其实是指向最后chain中最后一个结点(poolDequeue),chain执行push操作是往最后一个节点push。 所以这里的head的语义不是针对链表结构,而是针对队列结构。
2.5.3 poolChain.popTail()
func (c *poolChain) popTail() (interface, bool)
d := loadPoolChainElt(&c.tail)
if d == nil
return nil, false
for
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok
return val, ok
if d2 == nil
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2))
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
d = d2
2.6 poolDequeue
poolChain中每一个结点都是一个双端队列poolDequeue。
poolDequeue是一个无锁的、固定size的、单Producer、多Consumer的deque。只有一个Producer可以从head去push或则pop;多个Consumer可以从tail去pop。
type poolDequeue struct
// 用高32位和低32位分别表示head和tail
// head是下一个fill的slot的index;
// tail是deque中最老的一个元素的index
// 队列中有效元素是[tail, head)
headTail uint64
vals []eface
type eface struct
typ, val unsafe.Pointer
这里通过一个字段 headTail 来表示head和tail的index。headTail是8个字节64位。
- 高32位表示head;
- 低32位表示tail。
- head和tail自加溢出时是安全的。
vals是一个固定size的slice,其实也就是一个 ring buffer,size必须是2的次幂(为了做位运算);
三. 总结
今天,我们一起讨论了另一个比较有用的同步工具——sync.Pool类型,它的值被我称为临时对象池。
临时对象池有一个New字段,我们在初始化这个池的时候最好给定它。临时对象池还拥有两个方法,即:Put和Get,它们分别被用于向池中存放临时对象,和从池中获取临时对象。
临时对象池中存储的每一个值都应该是独立的、平等的和可重用的。我们应该既不用关心从池中拿到的是哪一个值,也不用在意这个值是否已经被使用过。
要完全做到这两点,可能会需要我们额外地写一些代码。不过,这个代码量应该是微乎其微的,就像fmt包对临时对象池的用法那样。所以,在选用临时对象池的时候,我们必须要把它将要存储的值的特性考虑在内。
在临时对象池的内部,有一个多层的数据结构支撑着对临时对象的存储。它的顶层是本地池列表,其中包含了与某个 P 对应的那些本地池,并且其长度与 P 的数量总是相同的。
在每个本地池中,都包含一个私有的临时对象和一个共享的临时对象列表。前者只能被其对应的 P 所关联的那个 goroutine 中的代码访问到,而后者却没有这个约束。从另一个角度讲,前者用于临时对象的快速存取,而后者则用于临时对象的池内共享。
正因为有了这样的数据结构,临时对象池才能够有效地分散存储压力和性能压力。同时,又因为临时对象池的Get方法对这个数据结构的妙用,才使得其中的临时对象能够被高效地利用。比如,该方法有时候会从其他的本地池的共享临时对象列表中,“偷取”一个临时对象。
以上是关于Go sync.Pool池化的学习的主要内容,如果未能解决你的问题,请参考以下文章