Go的sync

Posted lijianming180

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go的sync相关的知识,希望对你有一定的参考价值。

关于 pool 的由来可以参考:

sync.Pool 的作用及为什么要用到它

Rob Pike 扩展了sync.pool 类型的文档,并且将其目的描述得更清楚:

Pool设计用意是在全局变量里维护的释放链表,尤其是被多个 goroutine 同时访问的全局变量。使用Pool代替自己写的释放链表,可以让程序运行的时候,在恰当的场景下从池里重用某项值。sync.Pool一种合适的方法是,为临时缓冲区创建一个池,多个客户端使用这个缓冲区来共享全局资源。另一方面,如果释放链表是某个对象的一部分,并由这个对象维护,而这个对象只由一个客户端使用,在这个客户端工作完成后释放链表,那么用Pool实现这个释放链表是不合适的。

“临时对象”的意思是:不需要持久使用的某一类值。

这类值对于程序来说可有可无,但如果有的话会明显更好。它们的创建和销毁可以在任何时候发生,并且完全不会影响到程序的功能。

sync.Pool主要是为了重用对象,一方面缩短了申请空间的时间,另一方面,还减轻了GC的压力。

不过它是一个临时对象池,为什么这么说呢?因为对象池中的对象会被GC回收。所以说,有状态的对象,比如数据库连接是不能够用sync.Pool来实现的。

gc触发的时机:2分钟或者内存占用达到一个阈值(当前堆内存占用是上次gc后对内存占用的两倍,当GOGC=100时)




Pool 类型数据结构及其两个方法

如下是 pool 类型的数据结构:

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal ;[P]poolLocal 数组指针 
	localSize uintptr        // size of the local array,数组大小

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
	private interface{}   // Can be used only by the respective P.;私有缓存区
	shared  []interface{} // Can be used by any P.;公共缓存区
	Mutex                 // Protects shared.
}

type poolLocal struct {
	poolLocalInternal

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

Pool是提供给外部使用的对象。其中的local成员的真实类型是一个poolLocal数组,localSize是数组长度。poolLocal是真正保存数据的地方。priveate保存了一个临时对象,shared是保存临时对象的数组。

为什么Pool中需要这么多poolLocal对象呢?实际上,Pool是给每个线程分配了一个poolLocal对象。也就是说local数组的长度,就是工作线程的数量(size := runtime.GOMAXPROCS(0))。当多线程在并发读写的时候,通常情况下都是在自己线程的poolLocal中存取数据。当自己线程的poolLocal中没有数据时,才会尝试加锁去其他线程的poolLocal中“偷”数据。

sync.Pool 类型只有两个方法——Put 和 Get。

Put 是将临时对象放回当前池中,用于存放的作用。

Get 用于从当前的池中获取临时对象,返回一个Interface{}。

func (p *Pool) Put(x interface{}){
    ......
}

func (p *Pool) Get() interface{}{
    ......
}

其中特别说一下这个Get方法,它是去池中获取临时对象,从池中选择一个任意项,将其从池中删除,然后将其返回给调用者。

所以 Get 把返回的对象从池子里面删除。所以用完了的对象,还是得重新放回池子。

如果我们在使用Get申请新对象时pool中没有可用的对象,那么就会返回nil,除非设置了sync.Pool的New func。

我们来看看具体Get的源码解释:

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l := p.pin()
	x := l.private
	l.private = nil
	runtime_procUnpin()
	if x == nil {
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
			x = p.getSlow()
		}
	}
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

func (p *Pool) getSlow() (x interface{}) {
	// See the comment in pin regarding ordering of the loads.
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local                         // load-consume
	// Try to steal one element from other procs.
	pid := runtime_procPin()
	runtime_procUnpin()
	for i := 0; i < int(size); i++ {
		l := indexLocal(local, (pid+i+1)%int(size))
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}
  1. Get()获取对象时:优先从 private 空间获取 -> 没有则加锁从 share 空间获取 ( 从尾部开始获取)-> 没有再 new func 新的对象 (此对象不会放回池中)直接返回给调用者

为什么这里要锁住。答案在getSlow中。因为当shared中没有数据的时候,会尝试去其他的poolLocal的shared中偷数据。

  1. 注意:Get 操作后 (在返回之前就会将它从池中删除),缓存对象彻底与 Pool 失去引用关联,需要自行 Put 放回。

放回到池中的对象会被GC回收。

但当你Get 的时候,是任意获取对象的,也就是说有可能是当前池中被放回的资源,也有可能是最后被New 出来的资源。

也就是说我们不能对从对象池申请到的对象值做任何假设,可能是New新生成的,可能是被某个协程修改过放回来的这当中会被入坑。

在来看看Put具体源码解释:

// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		l.Lock()
		l.shared = append(l.shared, x)
		l.Unlock 大专栏  Go的sync()
	}
	if race.Enabled {
		race.Enable()
	}
}

Put() 优先放入 private 空间 -> 其次再考虑 share 空间

Pool 高效的设计的地方就在于将数据分散在了各个真正并发的线程中,每个线程优先从自己的poolLocal中获取数据,很大程度上降低了锁竞争。 




sync.Pool实际调用:

1. 最简单的数据存放:

package main
 
import (
	"fmt"
	"sync"
)
func main() {
	p:=&sync.Pool{
		New: func() interface{}{
			return 0
		},
	}
	p.Put("jiangzhou")
	p.Put(123456)
	fmt.Println(p.Get())
	fmt.Println(p.Get())
	fmt.Println(p.Get())
}

输出:

jiangzhou
123456
0

2.临时使用一些大型结构体,可以用Pool来减少GC。

package main
import (
	"sync"
	"time"
	"fmt"
)
 
type structR6 struct {
	B1 [100000]int
}
var r6Pool = sync.Pool{
	New: func() interface{} {
		return new(structR6)
	},
}
func usePool() {
	startTime := time.Now()
	for i := 0; i < 10000; i++ {
		sr6 := r6Pool.Get().(*structR6)
		sr6.B1[0] = 0
		r6Pool.Put(sr6)
	}
	fmt.Println("pool Used:", time.Since(startTime))
}
func standard() {
	startTime := time.Now()
	for i := 0; i < 10000; i++ {
		var sr6 structR6
		sr6.B1[0] = 0
	}
	fmt.Println("standard Used:", time.Since(startTime))
}
func main() {
	standard()
	usePool()
}

输出:

standard Used: 263.24691ms
pool Used: 733.61μs

很明显,运用临时池存放再调用要省事的多。

一个含有100000个int值的结构体,在标准方法中,每次均新建,重复10000次,一共需要耗费263.24691ms;

如果用完的struct可以废物利用,放回pool中。需要新的结构体的时候,尝试去pool中取,而不是重新生成,重复10000次仅需要733.61μs。注意单位哦。

这样简单的操作,却节约了99.75%的时间,也节约了各方面的资源。最重要的是它可以有效减少GC CPU和GC Pause。

func main() {
	//我们创建一个Pool,并实现New()函数
	sp := sync.Pool{
		//New()函数的作用是当我们从Pool中Get()对象时,如果Pool为空,则先通过New创建一个对象,插入Pool中,然后返回对象。
		New: func() interface{} {
			return make([]int, 16)
		},
	}
	item := sp.Get()
	//打印可以看到,我们通过New返回的大小为16的[]int
	fmt.Println("item : ", item)
 
	//然后我们对item进行操作
	//New()返回的是interface{},我们需要通过类型断言来转换
	for i := 0; i < len(item.([]int)); i++ {
		item.([]int)[i] = i
	}
	fmt.Println("item : ", item)
 
	//使用完后,我们把item放回池中,让对象可以重用
	sp.Put(item)
 
	//再次从池中获取对象
	item2 := sp.Get()
	//注意这里获取的对象就是上面我们放回池中的对象
	fmt.Println("item2 : ", item2)
	//我们再次获取对象
	item3 := sp.Get()
	//因为池中的对象已经没有了,所以又重新通过New()创建一个新对象,放入池中,然后返回
	//所以item3是大小为16的空[]int
	fmt.Println("item3 : ", item3)
 
	//测试sync.Pool保存socket长连接池
	//testTcpConnPool()
}

输出如下:

item :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
item :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item2 :  [0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
item3 :  [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]

注意 Pool不适用于有状态的数据

上面提到过:

因为对象池中的对象会被GC回收。所以说,有状态的对象,比如数据库连接、socket长连接等 是不能够用sync.Pool来实现的

package main
 
import (
	"sync"
	"net"
	"fmt"
	"runtime"
)
 
func main() {
	sp2 := sync.Pool{
		New: func() interface{} {
			conn, err := net.Dial("tcp", "127.0.0.1:8888");
			if err != nil {
				return nil
			}
			return conn
		},
	}
	buf := make([]byte, 1024)
	//获取对象
	conn := sp2.Get().(net.Conn)
	//使用对象
	conn.Write([]byte("GET / HTTP/1.1 rnrn"))
	n, _ := conn.Read(buf)
	fmt.Println("conn read : ", string(buf[:n]))
	//打印conn的地址
	fmt.Println("coon地址:",conn)
	//把对象放回池中
	sp2.Put(conn)
	//我们人为的进行一次垃圾回收
	runtime.GC()
	//再次获取池中的对象
	conn2 := sp2.Get().(net.Conn)
	//这时发现conn2的地址与上面的conn的地址不一样了
	//说明池中我们之前放回的对象被全部清除了,显然这并不是我们想看到的
	//所以sync.Pool不适合用于scoket长连接或数据库连接池
	fmt.Println("coon2地址",conn2)
}

推荐一开源的pool库:

go-commons-pool

另外,当理解这篇文章后,又看过源码解释,我建议去看看

极客时间 Go语言核心36讲 第33 sync.Pool,这篇文章,结合源码理解看看会又有新的理解,但不适合没看过源码的朋友,不然你会觉得:这是讲的什么鬼。。。


以上是关于Go的sync的主要内容,如果未能解决你的问题,请参考以下文章

45. sync.Mutex 互斥和互斥锁 | 厚土Go学习笔记

go sync.once用法

go sync.once用法

Go 源码分析:sync.Once

Go语言sync包中的WaitGroup使用实例

Go 实现线程安全 map 读写(sync.RWMutex)