Go sync.Cond条件变量的学习

Posted 技术能量站

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go sync.Cond条件变量的学习相关的知识,希望对你有一定的参考价值。

1. 前言

1.1 sync.cond 可以用来干什么

Golang 的 sync 包中的 Cond 实现了一种条件变量,可以使用多个 Reader 等待公共资源。

每个 Cond 都会关联一个 Lock ,当修改条件或者调用 Wait 方法,必须加锁,保护 Condition。 有点类似 Java 中的 Wait 和 NotifyAll。

sync.Cond 条件变量是用来协调想要共享资源的那些 goroutine, 当共享资源的状态发生变化时,可以被用来通知被互斥锁阻塞的 gorountine。

1.2 条件变量与互斥锁

我们常常会把条件变量这个同步工具拿来与互斥锁一起讨论。实际上,条件变量是基于互斥锁的,它必须有互斥锁的支撑才能发挥作用。

条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。

你知道怎么使用条件变量吗?所以,我们今天的问题就是:条件变量怎样与互斥锁配合使用?

这道题的典型回答是:条件变量的初始化离不开互斥锁,并且它的方法有的也是基于互斥锁的。

条件变量提供的方法有三个:等待通知(wait)、单发通知(signal)和广播通知(broadcast)。

1、条件变量怎样与互斥锁配合使用?

(1)sync.Cond 基于互斥锁/读写锁,它和互斥锁的区别是什么呢?

  • 互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine。
  • sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。

我们想象一个非常简单的场景:

有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。

这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。

Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。

2. 夯实基础

2.1 条件变量的Wait方法

2.2 条件变量的Signal方法和Broadcast方法

3. 应用实践

一句话总结:sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。

4. 源码学习

4.1 sync.Cond 的四个方法

sync.Cond 内部维护了一个等待队列,队列中存放的是所有在等待这个 sync.Cond 的 Go 程,即保存了一个通知列表。sync.Cond 可以用来唤醒一个或所有因等待条件变量而阻塞的 Go 程,以此来实现多个 Go 程间的同步。

sync.Cond 的定义如下:

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct 
        noCopy noCopy

        // L is held while observing or changing the condition
        L Locker

        notify  notifyList
        checker copyChecker

每个 Cond 实例都会关联一个锁 L(互斥锁 *Mutex,或读写锁 *RWMutex),当修改条件或者调用 Wait 方法时,必须加锁。

和 sync.Cond 相关的有如下几个方法:

1. NewCond 创建实例

NewCond 创建 Cond 实例时,需要关联一个锁。

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond 
	return &CondL: l

2. Broadcast 广播唤醒所有

Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() 
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)

3. Signal 唤醒一个协程

Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() 
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)

4. Wait 等待

调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() 
//        c.Wait()
//    
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait() 
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t)
	c.L.Lock()

对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码。

c.L.Lock()
for !condition() 
    c.Wait()

... make use of condition ...
c.L.Unlock()

4.2 使用实例

1. 3个wait1个Broadcast()

接下来我们实现一个简单的例子,三个协程调用 Wait() 等待,另一个协程调用 Broadcast() 唤醒所有等待的协程。

var done = false

func read(name string, c *sync.Cond) 
	c.L.Lock()
	for !done 
		c.Wait()
	
	log.Println(name, "starts reading")
	c.L.Unlock()


func write(name string, c *sync.Cond) 
	log.Println(name, "starts writing")
	time.Sleep(time.Second)
	c.L.Lock()
	done = true
	c.L.Unlock()
	log.Println(name, "wakes all")
	c.Broadcast()


func main() 
	cond := sync.NewCond(&sync.Mutex)

	go read("reader1", cond)
	go read("reader2", cond)
	go read("reader3", cond)
	write("writer", cond)

	time.Sleep(time.Second * 3)

  • done 即互斥锁需要保护的条件变量。
  • read() 调用 Wait() 等待通知,直到 done 为 true。
  • write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
  • write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕。

运行结果如下:

$ go run main.go
2021/01/14 23:18:20 writer starts writing
2021/01/14 23:18:21 writer wakes all
2021/01/14 23:18:21 reader2 starts reading
2021/01/14 23:18:21 reader3 starts reading
2021/01/14 23:18:21 reader1 starts reading

writer 接收数据花费了 1s,同步通知所有等待的协程。

5. 总结

我们今天主要讲了条件变量,它是基于互斥锁的一种同步工具。在 Go 语言中,我们需要用sync.NewCond函数来初始化一个sync.Cond类型的条件变量。

sync.NewCond函数需要一个sync.Locker类型的参数值。sync.Mutex类型的值以及sync.RWMutex类型的值都可以满足这个要求。另外,后者的RLocker方法可以返回这个值中的读锁,也同样可以作为sync.NewCond函数的参数值,如此就可以生成与读写锁中的读锁对应的条件变量了。

条件变量的Wait方法需要在它基于的互斥锁保护下执行,否则就会引发不可恢复的 panic。此外,我们最好使用for语句来检查共享资源的状态,并包裹对条件变量的Wait方法的调用。

不要用if语句,因为它不能重复地执行“检查状态 - 等待通知 - 被唤醒”的这个流程。重复执行这个流程的原因是,一个“因为等待通知,而被阻塞”的 goroutine,可能会在共享资源的状态不满足其要求的情况下被唤醒。

条件变量的Signal方法只会唤醒一个因等待通知而被阻塞的 goroutine,而它的Broadcast方法却可以唤醒所有为此而等待的 goroutine。后者比前者的适应场景要多得多。

这两个方法并不需要受到互斥锁的保护,我们也最好不要在解锁互斥锁之前调用它们。还有,条件变量的通知具有即时性。当通知被发送的时候,如果没有任何 goroutine 需要被唤醒,那么该通知就会立即失效。

再强调一下。对于同一个锁,多个 goroutine 对它重复锁定时只会有一个成功,其余的会阻塞;多个 goroutine 对它重复解锁时也只会有一个成功,但其余的会抛 panic。

相关资料

  1. go sync.cond 源码
  2. https://geektutu.com/ go sync.cond
  3. 关于 sync.Cond 的讨论可参考 How to correctly use sync.Cond? - StackOverflow

以上是关于Go sync.Cond条件变量的学习的主要内容,如果未能解决你的问题,请参考以下文章

手摸手Go 深入理解sync.Cond

go条件变量同步机制

Go中多协程协作之sync.Cond

go语言sync.Cond

Golang Cond源码分析

Go语言基础-sync包