Go sync.Pool池化的学习

Posted 技术能量站

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go sync.Pool池化的学习相关的知识,希望对你有一定的参考价值。

一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。

一.前言

Go 语言标准库也大量使用了 sync.Pool,例如 fmt 和 encoding/json。

1.1 要解决的问题

一个新技术亦或是一个新名词,总是为了解决一些问题才出现的,所以先搞明白解决什么问题是第一步。

核心来说,我们的代码中会各种创建对象,比如new一个结构体、创建一个连接、甚至创建一个int都属于对象。那么假设在某些场景下,你的代码会频繁的创建某一种对象,那么这种操作可能会影响你程序的性能,原因是什么呢?

  1. 我们知道创建对象肯定是需要申请内存的
  2. 频繁的创建对象,会对GC造成较大的压力,其实主要是GC压力较大,golang的官方库sync.pool就是为了解决它,看名字就是池的方法。

1.2 池和缓存

sync.pool的思想很简单就是对象池,由于最近一直在做相关的事情,这里我们说个题外话,关于池和缓存说下我的一些看法。

  1. 工作中遇到过很多池:连接池,线程池,协程池,内存池等,会发现这些所谓池,都是解决同一个类型的问题,创建连接、线程等比较消耗资源,所以用池化的思想来解决这些问题,直接复用已经创建好的。
  2. 其实缓存也是,用到缓存的地方比如说,本地缓存、容灾缓存,性能缓存等名词,这些缓存的思想无非就是把计算好的存起来,真正的流量过来的时候,直接使用缓存好的内容,能提服务响应高速度。

1.3 总结下来就是

  1. 复用之前的内容,不用每次新建
  2. 提前准备好,不用临时创建
  3. 采用性能高的存储做缓存,更加提高响应速度 其实看下来跟我们的对象池,没什么区别,我们对象池也就是复用之前创建好的对象。

最后发散下思想,影响我们的程序性能的有以下几个,存储、计算、网络等,其实都可以做缓存,或者提前准备好,亦或者复用之前的结果。我们程序中很多init的东西不就是提前准备好的存储吗,我们很多做的local cache其实就是减少网络传输时间等,以后优化服务性能可以从这个角度考虑。

二. 工作原理

我们先看下如何使用如下结构体

package main

import (
	"fmt"
	"sync"
)

type item struct 
	value int


func main() 
	pool := sync.Pool
		New: func() interface 
			return item
		,
	
	pool.Put(itemvalue: 1)
	data := pool.Get()
	
	fmt.Println("data = ",data)

看起来使用方式很简单,创建一个对象池的方式传进去一个New对象的函数,然后就是两个函数获取对象Get和放入对象Put。

想彻底搞明白原理,莫过于直接去读源码。 先瞅一眼结构体

2.1 Pool

Pool是一个可以分别存取的临时对象的集合。Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。

Pool可以安全的被多个线程同时使用。Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。

Pool的合理用法是管理一组被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。

Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。**这时应该由这些对象自己实现空闲列表。第一次使用后不得复制池。

// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
//
// An appropriate use of a Pool is to manage a group of temporary items
// silently shared among and potentially reused by concurrent independent
// clients of a package. Pool provides a way to amortize allocation overhead
// across many clients.
//
// An example of good use of a Pool is in the fmt package, which maintains a
// dynamically-sized store of temporary output buffers. The store scales under
// load (when many goroutines are actively printing) and shrinks when
// quiescent.
//
// On the other hand, a free list maintained as part of a short-lived object is
// not a suitable use for a Pool, since the overhead does not amortize well in
// that scenario. It is more efficient to have such objects implement their own
// free list.
//
// A Pool must not be copied after first use.
type Pool struct 
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface

  • local :指向的是poolLocal的数组的指针,[P]poolLocal 数组长度是调度器中P的数量,也就是说每一个P有自己独立的poolLocal。通过P.id来获取每个P自己独立的poolLocal。在poolLocal中有一个poolChain
  • localSize:local数组的长度
  • victim :victim 和 victimSize 这个是在 poolCleanup 流程里赋值了,赋值的内容就是 local 和 localSize 。victim 机制是把 Pool 池的清理由一轮 GC 改成 两轮 GC,进而提高对象的复用率,减少抖动;
  • New func() interface:当池中没有对象时,get方法将使用New方法来返回一个对象,如果你不设置New方法的话,将会返回nil

那么看下这个本地资源池的结构.一层套一层

// Local per-P Pool appendix.
type poolLocalInternal struct 
	private interface // Can be used only by the respective P.
	shared  poolChain   // Local P can pushHead/popHead; any P can popTail.


type poolLocal struct 
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal)%128]byte

  • poolChain:是一个双端队列链,缓存对象。 1.12版本中对于这个字段的并发安全访问是通过mutex加锁实现的;1.14优化后通过poolChain(无锁化)实现的。

2.2 pool.put

// Put adds x to the pool.
func (p *Pool) Put(x interface) 
	if x == nil 
		return
	
	if race.Enabled 
		if fastrand()%4 == 0 
			// Randomly drop x on floor.
			return
		
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	
	l, _ := p.pin()
	if l.private == nil 
		l.private = x
		x = nil
	
	if x != nil 
		l.shared.pushHead(x)
	
	runtime_procUnpin()
	if race.Enabled 
		race.Enable()
	

Put函数主要逻辑是
(1)先调用p.pin() 函数,这个函数会将当前 goroutine与P绑定,并设置当前g不可被抢占(也就不会出现多个协程并发读写当前P上绑定的数据);

  1. 在p.pin() 函数里面还会check per P的[P]poolLocal数组是否发生了扩容(P 扩张)。
  2. 如果发生了扩容,需要调用pinSlow()来执行具体扩容。扩容获取一个调度器全局大锁allPoolsMu,然后根据当前最新的P的数量去执行新的扩容。这里的成本很高,所以尽可能避免手动增加P的数量。

(2)拿到per P的poolLocal后,优先将val put到private,如果private已经存在,就通过调用shared.pushHead(x) 塞到poolLocal里面的无锁双端队列的chain中。Put函数对于双端队列来说是作为一个Producer角色,所以这里的调用是无锁的。

(3)最后解除当前goroutine的禁止抢占。

// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() (*poolLocal, int) 
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s 
		return indexLocal(l, pid), pid
	
	return p.pinSlow()


func (p *Pool) pinSlow() (*poolLocal, int) 
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	if uintptr(pid) < s 
		return indexLocal(l, pid), pid
	
	if p.local == nil 
		allPools = append(allPools, p)
	
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid

2.3 pool.get

// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface 
	if race.Enabled 
		race.Disable()
	
	l, pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil 
		// Try to pop the head of the local shard. We prefer
		// the head over the tail for temporal locality of
		// reuse.
		x, _ = l.shared.popHead()
		if x == nil 
			x = p.getSlow(pid)
		
	
	runtime_procUnpin()
	if race.Enabled 
		race.Enable()
		if x != nil 
			race.Acquire(poolRaceAddr(x))
		
	
	if x == nil && p.New != nil 
		x = p.New()
	
	return x

Get函数主要逻辑:

  1. 设置当前 goroutine 禁止抢占(race竞态检查);
  2. 从 poolLocal的private取,如果private不为空直接return;
  3. 从 poolLocal.shared这个双端队列chain里面无锁调用去取,如果取得到也直接return;
  4. 上面都去不到,调用getSlow(pid)去取
    a. 首先会通过 steal 算法,去别的P里面的poolLocal去取,这里的实现是无锁的cas。如果能够steal一个过来,就直接return;
    b. 如果steal不到,则从 victim 里找,和 poolLocal 的逻辑类似。最后,实在没找到,就把 victimSize 置 0,防止后来的“人”再到 victim 里找。
  5. 最后还拿不到,就通过New函数来创建一个新的对象。

这里是一个很明显的多层级缓存优化 + GPM调度结合起来。

private -> shared -> steal from other P -> victim cache -> New

func (p *Pool) getSlow(pid int) interface 
	// See the comment in pin regarding ordering of the loads.
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// Try to steal one element from other procs.
	for i := 0; i < int(size); i++ 
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil 
			return x
		
	

	// Try the victim cache. We do this after attempting to steal
	// from all primary caches because we want objects in the
	// victim cache to age out if at all possible.
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size 
		return nil
	
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil 
		l.private = nil
		return x
	
	for i := 0; i < int(size); i++ 
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil 
			return x
		
	

	// Mark the victim cache as empty for future gets don't bother
	// with it.
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil

2.4 victim cache优化与GC

对于Pool来说并不能够无上限的扩展,否则对象占用内存太多了,会引起内存溢出。

几乎所有的池技术中,都会在某个时刻清空或清除部分缓存对象,那么在 Go 中何时清理未使用的对象呢?

这里是使用GC。在pool.go里面的init函数 会注册清理函数:

func init() 
	runtime_registerPoolCleanup(poolCleanup)


// mgc.go
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) 
	poolcleanup = f

编译器会把 runtime_registerPoolCleanup 函数调用链接到 mgc.go 里面的 sync_runtime_registerPoolCleanup函数调用,实际上注册到poolcleanup函数。
在 Golang GC 开始的时候 gcStart 调用 clearpools() 函数就会调用到 poolCleanup 函数。也就是说,每一轮 GC 都是对所有的 Pool 做一次清理。

整个调用链如下:

  • gcStart() -> clearpools() -> poolcleanup() 也就是每一轮GC开始都会执行pool的清除操作。

这个是定期执行的,在 sync package init 的时候注册,由 runtime 后台执行,内容就是批量清理 allPools 里的元素。

func poolCleanup() 
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.

	// Because the world is stopped, no pool user can be in a
	// pinned section (in effect, this has all Ps pinned).

	// Drop victim caches from all pools.
	for _, p := range oldPools 
		p.victim = nil
		p.victimSize = 0
	

	// Move primary cache to victim cache.
	for _, p := range allPools 
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	

	// The pools with non-empty primary caches now have non-empty
	// victim caches and no pools have primary caches.
	oldPools, allPools = allPools, nil

poolCleanup 会在 STW 阶段被调用。整体看起来,比较简洁。主要是将 local 和 victim 作交换,这样也就不致于让 GC 把所有的 Pool 都清空了,有 victim 在“兜底”。

  • 如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。如果获取的速度下降了,那么对象可能会在两个 GC 周期内被释放,而不是以前的一个 GC 周期。

在Go1.13之前的poolCleanup比较粗暴,直接清空了所有 Pool 的 p.local 和poolLocal.shared。
通过两者的对比发现,新版的实现相比 Go 1.13 之前,GC 的粒度拉大了,由于实际回收的时间线拉长,单位时间内 GC 的开销减小。所以 p.victim 的作用其实就是次级缓存。

2.5 poolChain

这里我们先重点分析一下poolChain 是怎么实现并发无锁编程的。

// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct 
	// head is the poolDequeue to push to. This is only accessed
	// by the producer, so doesn't need to be synchronized.
	head *poolChainElt

	// tail is the poolDequeue to popTail from. This is accessed
	// by consumers, so reads and writes must be atomic.
	tail *poolChainElt


type poolChainElt struct 
	poolDequeue

	// next and prev link to the adjacent poolChainElts in this
	// poolChain.
	//
	// next is written atomically by the producer and read
	// atomically by the consumer. It only transitions from nil to
	// non-nil.
	//
	// prev is written atomically by the consumer and read
	// atomically by the producer. It only transitions from
	// non-nil to nil.
	next, prev *poolChainElt

poolChain是一个动态大小的双向链接列表的双端队列。每个出站队列的大小是前一个队列的两倍。也就是说poolChain里面每个元素poolChainElt都是一个双端队列。

  • head指向的poolChainElt,是用于Producer去Push元素的,不需要做同步处理。
  • tail指向的poolChainElt,是用于Consumer从tail去pop元素的,这里的读写需要保证原子性

简单来说,poolChain是一个单Producer,多Consumer并发访问的双端队列链。

对于poolChain中的每一个双端队列 poolChainElt,包含了双端队列实体poolDequeue 一起前后链接的指针。

poolChain 主要方法有:

  • popHead()(interface,bool);
  • pushHead(val interface)
  • popTail()(interface,bool)

其中,popHead和pushHead函数是给Producer调用的;popTail是给Consumer并发调用的。

2.5.1 poolChain.popHead()

前面我们说了,poolChain的head 指针的操作是单Producer的。

func (c *poolChain) popHead() (interface, bool) 
	d := c.head
	for d != nil 
		if val, ok := d.popHead(); ok 
			return val, ok
		
		// There may still be unconsumed elements in the
		// previous dequeue, so try backing up.
		d = loadPoolChainElt(&d.prev)
	
	return nil, false

poolChain要求,popHead函数只能被Producer调用。看一下逻辑:

  1. 获取头结点 head;
  2. 如果头结点非空就从头节点所代表的双端队列poolDequeue中调用popHead函数。注意这里poolDequeue的popHead函数和poolChain的popHead函数并不一样。poolDequeue是一个固定size的ring buffer。
  3. 如果从head中拿到了value,就直接返回;
  4. 如果从head中拿不到value,就从head.prev再次尝试获取;
  5. 最后都获取不到,就返回nil。

2.5.2 poolChain.pushHead()

func (c *poolChain) pushHead(val interface) 
	d := c.head
	if d == nil 
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	

	if d.pushHead(val) 
		return
	

	// The current dequeue is full. Allocate a new one of twice
	// the size.
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit 
		// Can't make it any bigger.
		newSize = dequeueLimit
	

	d2 := &poolChainEltprev: d
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)

poolChain要求,pushHead函数同样只能被Producer调用。看一下逻辑:

  1. 首先还是获取头结点 head;
  2. 如果头结点为空,需要初始化chain
  3. 创建poolChainElt 节点,作为head, 当然也是tail。
  4. poolChainElt 其实也是固定size的双端队列poolDequeue,size必须是2的n次幂。
  5. 调用poolDequeue的pushHead函数将 val push进head的双端队列poolDequeue。
  6. 如果push失败了,说明双端队列满了,需要重新创建一个双端队列d2,新的双端队列的size是前一个双端队列size的2倍;
  7. 更新poolChain的head指向最新的双端队列,并且建立双链关系;
  8. 然后将val push到最新的双端队列。

这里需要注意一点的是head其实是指向最后chain中最后一个结点(poolDequeue),chain执行push操作是往最后一个节点push。 所以这里的head的语义不是针对链表结构,而是针对队列结构。

2.5.3 poolChain.popTail()

func (c *poolChain) popTail() (interface, bool) 
	d := loadPoolChainElt(&c.tail)
	if d == nil 
		return nil, false
	

	for 
		// It's important that we load the next pointer
		// *before* popping the tail. In general, d may be
		// transiently empty, but if next is non-nil before
		// the pop and the pop fails, then d is permanently
		// empty, which is the only condition under which it's
		// safe to drop d from the chain.
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok 
			return val, ok
		

		if d2 == nil 
			// This is the only dequeue. It's empty right
			// now, but could be pushed to in the future.
			return nil, false
		

		// The tail of the chain has been drained, so move on
		// to the next dequeue. Try to drop it from the chain
		// so the next pop doesn't have to look at the empty
		// dequeue again.
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) 
			// We won the race. Clear the prev pointer so
			// the garbage collector can collect the empty
			// dequeue and so popHead doesn't back up
			// further than necessary.
			storePoolChainElt(&d2.prev, nil)
		
		d = d2
	

2.6 poolDequeue

poolChain中每一个结点都是一个双端队列poolDequeue。

poolDequeue是一个无锁的、固定size的、单Producer、多Consumer的deque。只有一个Producer可以从head去push或则pop;多个Consumer可以从tail去pop。

type poolDequeue struct 
	// 用高32位和低32位分别表示head和tail
	// head是下一个fill的slot的index;
	// tail是deque中最老的一个元素的index
	// 队列中有效元素是[tail, head)
	headTail uint64

	vals []eface


type eface struct 
	typ, val unsafe.Pointer

这里通过一个字段 headTail 来表示head和tail的index。headTail是8个字节64位。

  1. 高32位表示head;
  2. 低32位表示tail。
  3. head和tail自加溢出时是安全的。
    vals是一个固定size的slice,其实也就是一个 ring buffer,size必须是2的次幂(为了做位运算);

三. 总结

今天,我们一起讨论了另一个比较有用的同步工具——sync.Pool类型,它的值被我称为临时对象池。

临时对象池有一个New字段,我们在初始化这个池的时候最好给定它。临时对象池还拥有两个方法,即:Put和Get,它们分别被用于向池中存放临时对象,和从池中获取临时对象。

临时对象池中存储的每一个值都应该是独立的、平等的和可重用的。我们应该既不用关心从池中拿到的是哪一个值,也不用在意这个值是否已经被使用过。

要完全做到这两点,可能会需要我们额外地写一些代码。不过,这个代码量应该是微乎其微的,就像fmt包对临时对象池的用法那样。所以,在选用临时对象池的时候,我们必须要把它将要存储的值的特性考虑在内。

在临时对象池的内部,有一个多层的数据结构支撑着对临时对象的存储。它的顶层是本地池列表,其中包含了与某个 P 对应的那些本地池,并且其长度与 P 的数量总是相同的。

在每个本地池中,都包含一个私有的临时对象和一个共享的临时对象列表。前者只能被其对应的 P 所关联的那个 goroutine 中的代码访问到,而后者却没有这个约束。从另一个角度讲,前者用于临时对象的快速存取,而后者则用于临时对象的池内共享。

正因为有了这样的数据结构,临时对象池才能够有效地分散存储压力和性能压力。同时,又因为临时对象池的Get方法对这个数据结构的妙用,才使得其中的临时对象能够被高效地利用。比如,该方法有时候会从其他的本地池的共享临时对象列表中,“偷取”一个临时对象。

以上是关于Go sync.Pool池化的学习的主要内容,如果未能解决你的问题,请参考以下文章

go语言学习--go的临时对象池--sync.Pool

golang sync.pool对象复用 并发原理 缓存池

Go sync.Pool 浅析

Go36-33-临时对象池(sync.Pool)

golang sync.Pool的用法及实现

[Go] sync.Pool 的实现原理 和 适用场景