golang sync.Pool在1.14中的优化
Posted 惜暮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang sync.Pool在1.14中的优化相关的知识,希望对你有一定的参考价值。
golang sync.Pool在1.14中的优化
本文基于golang 1.14 对sync.Pool进行分析;
sync.Pool在1.12中实现的原理简述
参考 Golang 的 sync.Pool设计思路与原理,这篇文章基于1.12版本对golang的sync.Pool实现原理进行了分析。关于sync.Pool的使用场景、基本概念的理解可以参考前面的文章。
在1.12中sync.Pool的设计思路简单总结一下:
- 将 G 和 P 绑定,设置与P绑定的M禁止抢占以防止 G 被抢占。在绑定期间,GC 无法清理缓存的对象。
- 每个p都有独享的缓存队列,当g进行sync.pool操作时,先找到所属p的private,如果没有对象可用,加锁从 shared切片里获取数据。如果还没有拿到缓存对象,那么到其他P的poolLocal进行偷数据,如果偷不到,那么就创建新对象。
1.12 sync.pool的源码,可以发现sync.pool里会有各种的锁逻辑,从自己的shared拿数据加锁。getSlow()偷其他P缓存,也是需要给每个p加锁。put归还缓存的时候,还是会mutex加一次锁。
go mutex锁的实现原理简单说,他开始也是atomic cas自旋,默认是4次尝试,当还没有拿到锁的时候进行waitqueue gopack休眠调度处理,等待其他协程释放锁时进行goready调度唤醒。
Go 1.13之后,Go 团队对sync.Pool的锁竞争这块进行了很多优化,这里还改变了shared的数据结构,以前的版本用切片做缓存,现在换成了poolChain双端链表。这个双端链表的设计很有意思,你看sync.pool源代码会发现跟redis quicklist相似,都是链表加数组的设计。
1.14 Pool 数据结构
type Pool struct
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface
// Local per-P Pool appendix.
type poolLocalInternal struct
private interface // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
type poolLocal struct
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal)%128]byte
Pool.local
实际上是一个类型 [P]poolLocal
数组,数组长度是调度器中P的数量,也就是说每一个P有自己独立的poolLocal。通过P.id来获取每个P自己独立的poolLocal。在poolLocal中有一个poolChain。
这里我们先忽略其余的机构,重点关注poolLocalInternal.shared
这个字段。poolChain是一个双端队列链,缓存对象。 1.12版本中对于这个字段的并发安全访问是通过mutex加锁实现的;1.14优化后通过poolChain(无锁化)实现的。
这里我们先重点分析一下poolChain 是怎么实现并发无锁编程的。
poolChain
type poolChain struct
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
type poolChainElt struct
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
poolChain
是一个动态大小的双向链接列表的双端队列。每个出站队列的大小是前一个队列的两倍。也就是说poolChain里面每个元素poolChainElt都是一个双端队列
。
head指向的poolChainElt,是用于Producer去Push元素的,不需要做同步处理。
tail指向的poolChainElt,是用于Consumer从tail去pop元素的,这里的读写需要保证原子性。
简单来说,poolChain
是一个单Producer,多Consumer并发访问的双端队列链。
对于poolChain
中的每一个双端队列 poolChainElt,包含了双端队列实体poolDequeue
一起前后链接的指针。
poolChain
主要方法有:
popHead() (interface, bool);
pushHead(val interface)
popTail() (interface, bool)
popHead
和pushHead
函数是给Producer调用的;popTail
是给Consumer并发调用的。
poolChain.popHead()
前面我们说了,poolChain
的head 指针的操作是单Producer的。
func (c *poolChain) popHead() (interface, bool)
d := c.head
for d != nil
if val, ok := d.popHead(); ok
return val, ok
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
return nil, false
poolChain要求,popHead
函数只能被Producer调用。看一下逻辑:
- 获取头结点 head;
- 如果头结点非空就从头节点所代表的双端队列
poolDequeue
中调用popHead
函数。注意这里poolDequeue
的popHead
函数和poolChain
的popHead
函数并不一样。poolDequeue
是一个固定size的ring buffer。 - 如果从head中拿到了value,就直接返回;
- 如果从head中拿不到value,就从head.prev再次尝试获取;
- 最后都获取不到,就返回nil。
poolChain.pushHead()
func (c *poolChain) pushHead(val interface)
d := c.head
if d == nil
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
if d.pushHead(val)
return
// The current dequeue is full. Allocate a new one of twice
// the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit
// Can't make it any bigger.
newSize = dequeueLimit
d2 := &poolChainEltprev: d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
poolChain要求,pushHead
函数同样只能被Producer调用。看一下逻辑:
- 首先还是获取头结点 head;
- 如果头结点为空,需要初始化chain
- 创建
poolChainElt
节点,作为head, 当然也是tail。 poolChainElt
其实也是固定size的双端队列poolDequeue
,size必须是2的n次幂。
- 创建
- 调用
poolDequeue
的pushHead
函数将 val push进head的双端队列poolDequeue
。 - 如果push失败了,说明双端队列满了,需要重新创建一个双端队列d2,新的双端队列的size是前一个双端队列size的2倍;
- 更新poolChain的head指向最新的双端队列,并且建立双链关系;
- 然后将val push到最新的双端队列。
这里需要注意一点的是head其实是指向最后chain中最后一个结点(poolDequeue),chain执行push操作是往最后一个节点push。 所以这里的head的语义不是针对链表结构,而是针对队列结构。
poolChain.popTail()
func (c *poolChain) popTail() (interface, bool)
d := loadPoolChainElt(&c.tail)
if d == nil
return nil, false
for
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok
return val, ok
if d2 == nil
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2))
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
d = d2
poolChain要求,popTail
函数能被任何P调用,也就是所有的P都是Consumer。这里总结下,当前G所对应的P在Pool里面是Producer角色,任何P都是Consumer角色。
popTail
函数是并发调用的,所以需要特别注意。
- 首先需要原子的load chain的tail指向的双端队列d(
poolDequeue
); - 如果d为空,pool还是空,所以直接return nil
- 下面就是典型的无锁原子化编程:进入一个for循环
- 首先就是获取tail的next结点d2。这里需要强调一下为什么需要在tail执行popTail之前先load tail 的next结点。
- tail有可能存在短暂性为空的场景。比如head和tail实际指向同一个结点(双端队列)时候,可能tail为空只是暂时的,因为存在有线程往head push数据的情况。
- 如果因为tail 执行popTail()时因为tail为空而失败了,然后再load tail.next,发现 tail.next非空,再将tail原子切换到tail.next,这个时候就会出现错误了。假设tail和head指向同一个结点,在判断tail是空之后,head往里面插入了很多个数据,直接将tail结点打满,然后head指向下一个结点了。这时候tail.next也非空了。然后就将tail更新到tail.next,就会导致丢数据了。
- 所以必须在:1)tail执行popTail之前tail.next是非空的,2)tail执行popTail时发现tail是空的。满足这两个条件才能说明tail是永久性是空的。也就是需要提前load tail.next指针。
- 如果从tail里面pop数据成功,就直接返回val。
- 如果从tail里面pop数据失败,并且d2也是空,说明当前chain里面只有一个结点,并且是空。直接返回nil
- 如果从tail里面pop数据失败并且d2非空,说明tail已经被drain干净了,原子的tail到tail.next,并清除双向链表关系。
- 从d2开始新的一轮for循环。
- 首先就是获取tail的next结点d2。这里需要强调一下为什么需要在tail执行popTail之前先load tail 的next结点。
上面的流程是典型的的无锁并发编程。
poolDequeue
poolChain中每一个结点都是一个双端队列poolDequeue
。
poolDequeue
是一个无锁的、固定size的、单Producer、多Consumer的deque。只有一个Producer可以从head去push或则pop;多个Consumer可以从tail去pop。
数据结构
type poolDequeue struct
// 用高32位和低32位分别表示head和tail
// head是下一个fill的slot的index;
// tail是deque中最老的一个元素的index
// 队列中有效元素是[tail, head)
headTail uint64
vals []eface
type eface struct
typ, val unsafe.Pointer
这里通过一个字段 headTail
来表示head和tail的index。headTail
是8个字节64位。
- 高32位表示head;
- 低32位表示tail。
- head和tail自加溢出时是安全的。
vals
是一个固定size的slice,其实也就是一个 ring buffer
,size必须是2的次幂(为了做位运算);
pack/unpack
一个字段 headTail
来表示head和tail的index,所以需要有具体的pack和unpack逻辑:
const dequeueBits = 32
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32)
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
func (d *poolDequeue) pack(head, tail uint32) uint64
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
pack:
- 首先拿到mask,这里实际上就是 0xffffffff(2^32-1)
- head左移32位 | tail&0xffffffff 就可以得到head和tail pack之后的值。
unpack:
- 首先拿到mask,这里实际上就是 0xffffffff(2^32-1)
- ptrs右移32位拿到高32位然后 & mask 就可以得到head;
- ptrs直接 & mask 就可以得到低32位,也就是tail。
poolDequeue.pushHead
pushHead
将val push到head指向的位置,如果deque满了,就返回false。
func (d *poolDequeue) pushHead(val interface) bool
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head
// Queue is full.
return false
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
// The head slot is free, so we own it.
if val == nil
val = dequeueNil(nil)
*(*interface)(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
主要逻辑:
- 原子load head和tail,
- 如果tail + len(vals) == head 说明deque已经满了。
- 拿到head在vals中index的slot
- 如果slot的type非空,说明该slot还没有被popTail release,实际上deque还是满的;所以直接return false;
- 更新val到slot的指针指向的值。
- 原子的自加head
需要注意的是,pushHead不是并发安全的,只能有一个Producer去执行;只有slot的的type指针为空时候slot才是空。
poolDequeue.popHead
popHead
将head指向的前一个位置弹出,如果deque是空,就返回false。
func (d *poolDequeue) popHead() (interface, bool)
var slot *eface
for
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head
// Queue is empty.
return nil, false
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
val := *(*interface)(unsafe.Pointer(slot))
if val == dequeueNil(nil)
val = nil
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface
return val, true
主要逻辑:
- 由于从head前一个位置pop元素,可能会与tail位置pop冲突,所以不可避免的需要cas操作。所以最开始进入就是一个for循环;
- 原子load
poolDequeue.headTail
然后unpack拿到head和tail - 如果head == tail,表示deque是空,直接return nil.
- head –
- 根据新的head和老的tail, 重新pack出ptrs2;
- 原子cas更新
poolDequeue.headTail
,atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
, - 如果更新成功,就拿到head执行的slot,并获取到实际的value,并return;
- 如果原子更新失败了,重新进入for循环再次执行。
poolDequeue.popTail
这个函数是可以被Consumer并发访问的。
func (d *poolDequeue) popTail() (interface, bool)
var slot *eface
for
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head
// Queue is empty.
return nil, false
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
// We now own slot.
val := *(*interface)(unsafe.Pointer(slot))
if val == dequeueNil(nil)
val = nil
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
主要逻辑:
- 并发访问,所以与cas相关的for循环不可少;
- 原子load,拿到head和tail值;
- 将(tail+1)和head重新pack成ptrs2;
- CAS:
atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2)
; 如果更新成功,就拿到vals[tail]t的指针。如果失败就再次返回1的for循环。 - 拿到slot对应的val。
- 将slot的val和type都清为nil, 告诉pushHead, slot我们已经使用完了,pushHead可以往里面填充数据了。
数据结构总结
用一张图完整描述sync.Pool的数据结构:
强调一点:
- head的操作只能是local P;
- tail的操作是任意P;
参考网上一张图来看更加清晰:
Pool 并没有直接使用 poolDequeue,因为它是fixed size的,而 Pool 的大小是没有限制的。因此,在 poolDequeue 之上包装了一下,变成了一个 poolChainElt 的双向链表,可以动态增长。
Pool.Put
func (p *Pool) Put(x interface)
if x == nil
return
l, _ := p.pin()
if l.private == nil
l.private = x
x = nil
if x != nil
l.shared.pushHead(x)
runtime_procUnpin()
func (p *Pool) pin() (*poolLocal, int)
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s
return indexLocal(l, pid), pid
return p.pinSlow()
func (p *Pool) pinSlow() (*poolLocal, int)
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s
return indexLocal(l, pid), pid
if p.local == nil
allPools = append(allPools, p)
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
Put函数主要逻辑:
- 先调用
p.pin()
函数,这个函数会将当前 goroutine与P绑定,并设置当前g不可被抢占(也就不会出现多个协程并发读写当前P上绑定的数据);
1. 在p.pin()
函数里面还会check per P的[P]poolLocal数组是否发生了扩容(P 扩张)。
2. 如果发生了扩容,需要调用pinSlow()
来执行具体扩容。扩容获取一个调度器全局大锁allPoolsMu
,然后根据当前最新的P的数量去执行新的扩容。这里的成本很高,所以尽可能避免手动增加P的数量。 - 拿到per P的poolLocal后,优先将val put到private,如果private已经存在,就通过调用
shared.pushHead(x)
塞到poolLocal里面的无锁双端队列的chain中。Put函数对于双端队列来说是作为一个Producer角色,所以这里的调用是无锁的。 - 最后解除当前goroutine的禁止抢占。
Pool.Get
func (p *Pool) Get() interface
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil
x = p.getSlow(pid)
runtime_procUnpin()
if x == 以上是关于golang sync.Pool在1.14中的优化的主要内容,如果未能解决你的问题,请参考以下文章