并发编程Pool 基本用法和如何实现

Posted @了凡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程Pool 基本用法和如何实现相关的知识,希望对你有一定的参考价值。

博主介绍:

我是了 凡 微信公众号【了凡银河系】期待你的关注。未来大家一起加油啊~


前言

Go是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收,所以我们一般都是想用就用,没有考虑如何提高性能的问题。但是,如果想要使用Go开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响

但是具体都带来什么影响呢?Go的自动垃圾回收机制有一个STW(stop-the-world,程序暂停)的时间,还有另一个耗时的问题,就是在大量的创建在堆上的对象,也会影响垃圾回收标记的时间。

一般情况下做性能优化的时候,都选择将不用的对象手动回收起来,避免被垃圾回收掉,这样下一次使用的时候就不必在堆上重新创建了。并且,比如数据库连接、TCP的长连接等都可以考虑将连接保存下来,避免每次使用的时候都重新创建,不仅可以大大减少业务的耗时,还能提高应用程序的整体性能。

所以为了解决以上问题,GO标准库中提供了一个通用的Pool数据结构,也就是sync.Pool,使用它可以创建池化的对象。但是这个类型有一个缺点,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的。


sync.Pool

sync.Pool数据类型用来保存一组可独立访问的临时对象。临时是指:池化的对象会在未来的某个时候被毫无预兆地移除掉。而且,如果没有别的对象引用这个被移除的对象的化,这个被移除的对象就会被垃圾回收掉。

知识点:

  • sync.Pool本身就是线程安全的,多个goroutine可以并发地调用它的方法存取对象;
  • sync.Pool不可在使用之后再复制使用。

sync.Pool的使用方法

这个数据类型提供了三个对外的方法:New、Get和Put。

  • New
    这个字段的类型是函数func() interface。当调用Pool的Get方法从池中获取元素,没有更多的空闲元素可返回时,就会调用这个New方法来创建新的的元素。如果没有设置New字段,没有更多的空闲元素可返回时,Get方法将返回nil,表明当前没有可用的元素。
  • Get
    如果调用Get方法,就会从Pool取走一个元素,取走指会从Pool中移除,返回给调用者。不过,除了返回值是正常实例化的元素,Get方法的返回值还可能会是一个nil(Pool.New字段没有设置,又没有空闲元素可以返回),所以在使用的时候需要判断。
  • Put
    Put用于将一个元素存入Pool,Pool会把这个元素保存到池中,并且可以复用。但如果Put一个nil值,Pool就会忽略这个值。
  • 缓冲池
    buffer池(缓冲池)是sync.Pool最常用的一个场景。
    因为byte slice是经常被创建销毁的一类对象,使用buffer池可以缓存已经创建的byte slice,例如,著名的静态网站生成工具Hugo中,包含了这样的实现bufpool。
var buffers = sync.Pool
    New: func() interface
        return new(bytes.Buffer)
    ,


func GetBuffer() *bytes.Buffer 
    return buffers.Get().(*bytes.Buffer)


func PutBuffer(buf *bytes.Buffer) 
    buf.Reset()
    buffers.Put(buf)

此段代码,很常见,但是存在一个问题,请不要使用在项目中。这段代码可能存在内存泄漏问题。

实现原理

  1. 每次GC都会回收创建的对象。
    如果缓存元素数量太多,就会导致STW耗时变长;缓存元素都被回收后,会导致Get命中率下降,Get方法不得不新创建很多对象。

  2. 底层实现使用了Mutex,对这个锁并发请求竞争记录的时候,会导致性能的下降。
    sync.Pool做了大量的优化。
    提高并发程序性能的优化点是尽量不要使用锁,如果不得已使用了锁,就把锁Go的粒度降到最低。Go对Pool的优化就是避免使用锁,同时将加锁的queue改成lock-free的queue的实现,给即将移除的元素再多一次“复活”的机会。

sync.Pool的数据结构如下图所示:

Pool最重要的两个字段是local和victim,因为它们两个主要作用就是存储空闲的元素。

每一次垃圾回收的时候,Pool会讲victim中的对象回收,然后把local的数据给victim,这样的话,local就会被清空,而victim就会像一个垃圾分拣站,里面的东西可能会被当作垃圾丢弃了,但是里面有用的东西也可能被捡回来重新使用。

victim中的元素如果被Get取走,那么这个元素就会很幸运,因为它再一次“复活”了。但是,如果这个时候Get的并发不是很大,元素没有被Get取走,那么就会被移除掉,因为没有别人引用它的话,就会被垃圾回收掉。

此段代码就是垃圾回收时sync.Pool的处理逻辑:

func poolCleanup()
    // 丢弃当前victim,STW所以不用加锁
    for _, p := range oldPools 
        p.victim = nil
        p.victimSize = 0
    
    
    // 将local复制给victim,并将原local置为nil
    for _, p := allPools 
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    
    
    oldPools, allPools = allPools, nil

在这段代码中,你需要关注一下local字段,因为所有当前主要的空闲可用的元素都存放在local字段中查找可用的元素。local字段包含一个poolLocallnternal字段,并提供CPU缓存对齐,从而避免false sharing。

poolLocallnternal包含两个字段:private和shared。

  • private,代表一个缓存的元素,而且只能由相应的一个P存取。因为一个P同时只能执行一个goroutine,所以不会有并发的问题。
  • shared,可以由任意的P访问,但是只有本地的P才能pushHead/popHead,其他P可以jpopTail,相当于只有一个本地的P作为生产者(Producer),多个P作为消费者,它是使用一个local-free 的queue列表实现的。

Get方法

func (p *Pool) Get() interface 
    // 把当前goroutine固定在当前的P上
    l, pid := p.pin()
    x := .private // 优先从local的private字段取,快速
    l.private = nil
    if x == nil 
        // 从当前的local.shared弹出一个,注意是从head读取并移除
        x, _ = l.shared.popHead()
        if x == nil  // 如果没有,则去偷一个
            x = p.getSlow(pid)
        
    
    runtime_procUnpin()
    // 如果没有获取到,尝试使用New函数生成一个新的
    if x == nil && p.New != nil 
        x = p.New()
    
    return x

从本地的private字段中获取可用元素,因为没有锁,获取元素的过程会非常快,如果么有获取到,就尝试从本地的shared获取一个,如果还没有,会使用getSlow方法去其他的shared中“偷”一个。最后,如果没有获取到,就尝试使用New函数创建一个新的。

getSlow方法,表示耗时可能比较长。首先要遍历所有的local,尝试从它们的shared他拿出一个元素。如果还没有找到一个,那么,就开始堆victim下手了。

在vintim中查询可用元素的逻辑还是一样的,先从对应的victim的private查找,如果查找不到,就再从其他victim的shared中查找。

getSlow源码逻辑:

func (p *Pool) getSlow(pid int) interface 
   // See the comment in pin regarding ordering of the loads.
   size := atomic.LoadUintptr(&p.localSize) // load-acquire
   locals := p.local                        // load-consume
   // Try to steal one element from other procs.
   for i := 0; i < int(size); i++ 
      l := indexLocal(locals, (pid+i+1)%int(size))
      if x, _ := l.shared.popTail(); x != nil 
         return x
      
   

   // Try the victim cache. We do this after attempting to steal
   // from all primary caches because we want objects in the
   // victim cache to age out if at all possible.
   size = atomic.LoadUintptr(&p.victimSize)
   if uintptr(pid) >= size 
      return nil
   
   locals = p.victim
   l := indexLocal(locals, pid)
   if x := l.private; x != nil 
      l.private = nil
      return x
   
   for i := 0; i < int(size); i++ 
      l := indexLocal(locals, (pid+i)%int(size))
      if x, _ := l.shared.popTail(); x != nil 
         return x
      
   

   // Mark the victim cache as empty for future gets don't bother
   // with it.
   atomic.StoreUintptr(&p.victimSize, 0)

   return nil

Put方法

func (p *Pool) Put(x interface) 
    if x == nil  // nil值直接丢弃
        return
    
    l, _ := p.pin()
    if l.private == nil  // 如果本地private没有值,直接设置这个值即可
        l.private = x
        x = nil
    
    if x != nil  // 否则加入到本地队列中
        l.shared.pushHead(x)
    
    runtime_procUnpin()

Put 的逻辑相对简单,优先设置本地private,如果private字段已经有值了,那么就把此元素push到本地队列中。

总结

本次共分享了Pool是什么?要如何使用,并且了解了sync.Pool的数据结构以及执行流程和实现过程,相信对于Pool已经了解了基础用法和认识,如果想要了解更多可以从Pool的一些坑入手,例如内存泄露,以及内存浪费等。后续会写一下关于Go语言的GC以及变量逃逸等问题。


创作不易,点个赞吧!
如果需要后续再看点个收藏!
如果对我的文章有兴趣给个关注!
如果有问题,可以关注公众号【了凡银河系】点击联系我私聊。


以上是关于并发编程Pool 基本用法和如何实现的主要内容,如果未能解决你的问题,请参考以下文章

并发编程Context 基本用法和如何实现

并发编程Context 基本用法和如何实现

并发编程WaitGroup 基本用法和如何实现以及常见错误

并发编程Once 基本用法和如何实现以及常见错误

并发编程Cond 基本用法和如何实现以及常见错误

并发编程map 基本用法和常见错误以及如何实现线程安全的map类型