sync:与golang的并发息息相关的包

Posted traditional

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sync:与golang的并发息息相关的包相关的知识,希望对你有一定的参考价值。

楔子

我们知道golang除了兼顾了开发速度和运行效率之外,最大的亮点就是在语言层面原生支持并发,也就是通过所谓的goroutine。不过既然是并发,那么就势必会面临很多问题。比如:资源竞争,多个goroutine同时访问一个资源会发生竞争从而产生意想不到的结果,那么这时候我们会通过加锁来解决;主goroutine不能先退出,这时候我们会等待子goroutine。还有单例模式,以及对象池等等。那么golang是如何实现的呢?就是通过下面我们要介绍的sync包。

sync.Mutex

sync.Mutex称为互斥锁,主要就是解决资源竞争的问题,这个时候我们通过对会发生资源竞争的部分进行加锁来解决。因为锁只有一把,只能一个goroutine执行完毕把锁释放,其他的goroutine才有机会执行。

我们先来看看不加锁的版本

package main

import (
    "fmt"
    "time"
)

var num = 0

func add() {
    i := 0
    for ; i < 100000; i++ {
        num++
    }
}

func sub() {
    i := 0
    for ; i < 100000; i++ {
        num--
    }
}

func main() {

    go add()
    go sub()
    time.Sleep(time.Second * 3)
    fmt.Println(num)  //1314
}
//如果发现结果正常(为0的话),那就多执行即便

此时由于没有加锁,那么两个goroutine都访问num,那么得到的结果是会有误差的,理论上应该是0才对。下面我们就来加锁:

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.Mutex) {
    i := 0
    for ; i < 100000; i++ {
        //表示上锁,在该锁为释放的话,那么其他goroutine使用lock.Lock()的时候就会阻塞
        lock.Lock()
        num++
        //不要忘记把锁释放,否则其他goroutine在尝试获取锁的时候都会阻塞
        lock.Unlock()
    }
}

func sub(lock *sync.Mutex) {
    i := 0
    for ; i < 100000; i++ {
        lock.Lock()
        num--
        lock.Unlock()
    }
}

func main() {

    //这个sync.Mutex是一个结构体,传递的时候是只传递,因此我们需要传递指针
    var lock = new(sync.Mutex)
    go add(lock)
    go sub(lock)
    time.Sleep(time.Second * 3)
    fmt.Println(num)  //0
}

sync.RWMutex

Mutex表示互斥锁,RWMutex表示读写互斥锁(简称:"读写锁")。为什么会有读写互斥锁,因为我们不希望多个goroutine对同一个资源进行修改,但如果是读取的话还是可以的。而Mutex比较严格,只要我上锁了,那么其它人就无法访问了,无论你是读还是写,必须等我完事之后你才可以开始。于是就有了RWMutex,读写锁的话,就可以解决这一问题。读写锁,可以设置为读锁,也可以设置为写锁。

  • 写锁:设置为写锁,那么此时就等同于互斥锁,其他goroutine不可读也不可写
  • 读锁:设置为读锁,那么其它goroutine可以读,但是不可以写。
// Lock 将 rw 设置为写锁定状态,禁止其他goroutine读取或写入。
func (rw *RWMutex) Lock()

// Unlock 解除 rw 的写锁定状态,如果 rw 未被写锁定,则该操作会引发 panic。
func (rw *RWMutex) Unlock()

// RLock 将 rw 设置为读锁定状态,禁止其他goroutine写入,但可以读取。
func (rw *RWMutex) RLock()

// Runlock 解除 rw 的读锁定状态,如果 rw 未被读锁顶,则该操作会引发 panic。
func (rw *RWMutex) RUnlock()

先设置为写锁

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.RWMutex) {
    defer lock.Unlock()
    lock.Lock()
    time.Sleep(1e9)
    fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
    defer lock.Unlock()
    lock.Lock()
    fmt.Println("sub")
}

func main() {

    var lock = new(sync.RWMutex)
    go add(lock)
    //确保add先执行
    time.Sleep(1000)
    go sub(lock)
    time.Sleep(time.Second * 2)
    /*
    add
    sub
     */
}

设置为写锁,那么由于是add函数先执行,尽管里面出现了sleep,但是add函数先将锁获取了,所以必须要等add先执行完,才可以执行sub。

设置为读锁

package main

import (
    "fmt"
    "sync"
    "time"
)

var num = 0

func add(lock *sync.RWMutex) {
    defer lock.RUnlock()
    lock.RLock()
    time.Sleep(1e9)
    fmt.Println("add")
}

func sub(lock *sync.RWMutex) {
    defer lock.RUnlock()
    lock.RLock()
    fmt.Println("sub")
}

func main() {

    var lock = new(sync.RWMutex)
    go add(lock)
    //确保add先执行
    time.Sleep(1000)
    go sub(lock)
    time.Sleep(time.Second * 2)
    /*
    sub
    add
     */
}

我们看到设置为读锁,那么sub就不会等待add了,因为大家都是读锁。

sync.Cond

sync.Cond用在goroutine之间,用于协程的挂起和唤醒。这个Cond是需要通过锁才能实现,也就是底层还是使用了锁。调用cond.L.Lock()会进行上锁,但是其它的goroutine同时也是可以获取锁的,因此锁不是唯一的,而一旦调用cond.Wait(),那么程序会阻塞在这里(将当前goroutine加入到等待队列里),比如使用另一个goroutine将其唤醒。唤醒的方式有两种:cond.Signal,唤醒等待队列里面的一个goroutine;cond.Broadcase,唤醒等待队列里面的所有goroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)


func add1(cond *sync.Cond) {
    // 获取锁
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add1已成功获取锁")
    //此时程序会卡在这个地方,直到另一个goroutine唤醒
    cond.Wait()
    fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add2已成功获取锁")
    cond.Wait()
    fmt.Println("add2醒了")
}



func main() {

    //我们同样需要传递指针
    var cond = new(sync.Cond)
    //Cond是需要搭配锁来执行的
    cond.L = new(sync.Mutex)
    //或者我们在创建cond的时候直接通过 var cond = sync.NewCond(new(sync.Mutex))
    go add1(cond)
    go add2(cond)

    time.Sleep(time.Second)
    //唤醒一个goroutine
    cond.Signal()
    time.Sleep(time.Second)
    /*
    add1已成功获取锁
    add2已成功获取锁
    add1醒了
    */
}
package main

import (
    "fmt"
    "sync"
    "time"
)


func add1(cond *sync.Cond) {
    // 获取锁
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add1已成功获取锁")
    cond.Wait()
    fmt.Println("add1醒了")
}

func add2(cond *sync.Cond) {
    cond.L.Lock()
    defer cond.L.Unlock()
    fmt.Println("add2已成功获取锁")
    cond.Wait()
    fmt.Println("add2醒了")
}



func main() {

    var cond = new(sync.Cond)
    cond.L = new(sync.Mutex)
    go add1(cond)
    go add2(cond)

    time.Sleep(time.Second)
    cond.Broadcast()
    time.Sleep(time.Second)
    /*
    add1已成功获取锁
    add2已成功获取锁
    add1醒了
    add2醒了
     */
}

sync.WaitGroup

我们看一下上面写的代码,是不是很low呢?因为我们希望主线程等待子协程执行完毕之后再退出,使用的方式是time.Sleep,这是很低级的,当然在介绍语法的时候很方便。但是在项目开发中肯一般不会这么写,而且你也不知道子协程什么时候执行完毕。于是就有了组的概念,sync.WaitGroup是一个结构体,有以下三个方法。

// 计数器增加 delta,delta 可以是负数。
func (wg *WaitGroup) Add(delta int)

// 计数器减少 1
func (wg *WaitGroup) Done()

// 等待直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
func (wg *WaitGroup) Wait()

//所以只要计数器不为0,那么Wait会阻塞
//Add会使计数器增加指定的数值
//Done会使计数器减一

//那么你想到了什么?
//对,假设我们要开20个协程,那么就Add(20)
//每执行一个协程Done()一下
//Wait()不就会等待所有的子协程全部执行完毕吗
package main

import (
    "fmt"
    "sync"
)


func add(wg *sync.WaitGroup, value int) {
    fmt.Printf("satori %d号
", value)
    wg.Done()
}


func main() {

    //我们同样需要传递指针
    var wg = new(sync.WaitGroup)
    wg.Add(10)
    for i:=0;i<10;i++{
        go add(wg, i)
    }
    wg.Wait()
    /*
    satori 4号
    satori 0号
    satori 1号
    satori 3号
    satori 9号
    satori 6号
    satori 5号
    satori 7号
    satori 8号
    satori 2号
     */
}

sync.Once

sync.Once可以保证一些对象只被实例化一次,或者某个函数只被执行一次。经常用于单例模式、系统初始化等等。再比如channel的Close,对一个通道进行多次Close会引发panic,那么我们通过sync.Once就可以保证channel只会被Close一次。

package main

import (
    "fmt"
    "sync"
)


func foo(){
    fmt.Println(123)
}

func mmp(once *sync.Once){
    once.Do(foo)
}



func main() {
    var once = new(sync.Once)
    for i:=0;i<10;i++{
        once.Do(foo)
        /*
        123
         */
    }
    //once.Do里面函数只会被执行一次。
    //这里的也不会被执行,因为我们传递的是指针,如果传值的话会进行拷贝,那么还是会执行的,因为不是一个sync.Once对象了
    mmp(once)
}

另外我们发现once.Do里面的函数,是一个函数名,而且参数类型也指明了是一个无参无返回值的函数。那如果需要参数呢?很简单,使用闭包即可。

package main

import (
    "fmt"
    "sync"
)

func girl(name string) func() {
    return func() {
        fmt.Println("i'm a girl named", name)
    }
}

func main() {
    var once = new(sync.Once)
    once.Do(girl("mashiro1"))
    once.Do(girl("mashiro2"))
    once.Do(girl("mashiro3"))
    /*
    i'm a girl named mashiro1
     */
}

sync.Pool

从名字也能看出来,sync.Pool指的是临时对象池,为了减少GC的负担,我们对于那些可能会后续使用、但是暂时不用的对象放到池子里,当使用的时候,再从池子里面拿出来。

package main

import (
    "bytes"
    "fmt"
    "sync"
)

func main() {
    //Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
    var pool = sync.Pool{
        New: func() interface{} {
            return &bytes.Buffer{}
        },
    }

    //如果我们创建Pool的时候,指定了函数,那么池子里面就有东西了,就是函数的返回值
    //可以直接使用Get函数获取,但它是一个interface{},所以我们要转换成相应的类型
    //如果初始胡没有指定,那么获取的结果就是nil
    buf := pool.Get().(*bytes.Buffer)
    buf.WriteString("哈哈")
    pool.Put(buf) //调用put函数,可以将对象放回去
    //然后我们再取出来
    buf = pool.Get().(*bytes.Buffer)
    // 成功打印我们写入的内容
    fmt.Println(buf.String())  //哈哈

    //这个pool不一定非要放相同的对象
    var num  = 123
    pool.Put(num)
    //我方进去一个int也是可以的
    num = pool.Get().(int)
    fmt.Println(num) // 123
}

如果初始化的时候,不指定函数。

package main

import (
    "fmt"
    "sync"
)

func main() {
    //Pool是一个结构体,里面有一个New字段,接收一个无参、返回值为interface{}类型的函数
    var pool = sync.Pool{}
    //不指定的话,是一个nil
    fmt.Println(pool.Get()) // <nil>

    //这个时候可以直接put
    //但是我们也可以指定函数
    pool.New = func() interface{} {
        return 123
    }
    fmt.Println(pool.Get().(int)) // 123
}

sync.Map

golang在1.9的时候,引用了sync.Map,它是原生支持并发安全的map。对于普通的map,我们一般够用了,尽管它并不是线程安全的。但是有时我们需要涉及到线程安全的时候,我们可以使用sync.Map。sync.Map和原生的map语法差别较大,但是很好理解。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var m sync.Map
    //设置key  value
    //参数类型都是interface{}
    m.Store("name", "satori")
    m.Store("age", 17)
    m.Store("gender", "f")

    //Load:查找一个key,如果存在那么返回 值和true,否则返回 nil和false
    if value, ok := m.Load("name"); ok {
        fmt.Println(value) // satori
    }

    //LoadOrStore:查找一个key的同时指定一个value
    //如果查找的key存在,那么返回 对应的值和true; 不存在就将该key和指定的value设置进去,返回 设置的值和false
    fmt.Println(m.Load("where"))  // <nil> false
    //设置成功
    fmt.Println(m.LoadOrStore("where", "japan"))  //japan false

    //再次获取,设置成功
    fmt.Println(m.Load("where"))  // japan true
    fmt.Println(m.LoadOrStore("where", "America"))  //japan true

    //遍历,接收一个函数,参数是两个interface{},返回一个bool
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value)
        return true
        /*
        name satori
        age 17
        gender f
        where japan
         */
    })

    //删除一个元素
    fmt.Println(m.Load("gender"))  // f true
    m.Delete("gender")
    fmt.Println(m.Load("gender"))  // <nil> false
}

下面我们来测试一下同时写和删除有什么表现,一个goroutine往map里面写,一个从map里面删除。

package main

import (
    "time"
)

func readMap(m map[int]int){
    for i:=0;i<5;i++{
        time.Sleep(1000)
        delete(m, i)
    }
}

func writeMap(m map[int]int){
    for i:=0;i<6;i++{
        time.Sleep(1000)
        m[i] = 1
    }
}

func main() {
    var m = make(map[int]int)
    go readMap(m)
    go writeMap(m)
    time.Sleep(time.Second)
    /*
    fatal error: concurrent map writes

    goroutine 18 [running]:
    runtime.throw(0x47d496, 0x15)
    ...
    ...
     */
}

我们看到对于普通map直接报错了,我们再来试试sync.Map

package main

import (
    "fmt"
    "sync"
    "time"
)

func readMap(m *sync.Map){
    for i:=0;i<5;i++{
        time.Sleep(1000)
        m.Delete(i)
    }
}

func writeMap(m *sync.Map){
    for i:=0;i<6;i++{
        time.Sleep(1000)
        m.Store(i, 0)
    }
}

func main() {
    var m = new(sync.Map)
    go readMap(m)
    go writeMap(m)
    time.Sleep(time.Second)

    //此时则没有任何问题,因此sync.Map是线程安全的。
    m.Range(func(key, value interface{}) bool {
        fmt.Println(key, value)
        return true
    })
    /*
    2 0
    4 0
    5 0
     */
    //但是打印的结果貌似不正常,因为我们写了6个,但是删了5个,应该剩下一个啊
    //其实goroutine的执行顺序不确定,有可能删的时候还没有这个key呢,但是等到有的时候就进入下一层循环了。
    //所以上面的结果是正常的。
}

以上是关于sync:与golang的并发息息相关的包的主要内容,如果未能解决你的问题,请参考以下文章

golang sync.pool对象复用 并发原理 缓存池

golang goroutine例子[golang并发代码片段]

Golang中sync.Map的实现原理

golang代码片段(摘抄)

Golang Cond源码分析

go语言中sync包和channel机制