并发编程Cond 基本用法和如何实现以及常见错误

Posted 了 凡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程Cond 基本用法和如何实现以及常见错误相关的知识,希望对你有一定的参考价值。

博主介绍:

我是了 凡,喜欢每日在简书上投稿日更的读书感悟笔名:三月_刘超。专注于 Go Web 后端,了解过一些Python、Java、算法、前端等领域。微信公众号【了凡银河系】期待你的关注。未来大家一起加油啊~


前言


一个常见的面试问题就是关于等待/通知(wait/notify)机制:例如请实现一个限定容量的队列(queue),当队列满或者空的时候,利用等待/通知机制实现阻塞或者唤醒。

Go语言中提供了一个类似的限定容量的队列,只要用条件变量(Cond)并发原语就可以。Cond并发原语相对来说不是那么常用,但在特定的场景使用会事半功倍,比如需要在唤醒一个或者所有的等待者做一些检查操作的时候。


Cond是什么?

Cond是Go标准库提供的原语,是为了给等待/通知场景下的并发问题提供支持。 Cond通常应用于等待某个条件的一组goroutine,等条件变为true的时候,其中一个goroutine或者所有的goroutine都会被唤醒执行。

顾名思义,Cond是和某个条件相关,这个条件需要一组goroutine写作共同完成,在条件还没有满足的时候,所有等待这个条件的groutine都会被阻塞住,只有这一组goroutine通过协议达到了这个条件,等待的goroutine次啊可以继续进行下去。

这有个问题,等待的条件是什么?条件是某个变量达到了某个阈值或者某个时间点,也可以是一组变量分别都达到了某个阈值,还可以是某个对象的状态满足了待定的条件。

总结:等待的条件是一种可以用来计算结果是true还是false的条件


Cond基础用法

标准库中的Cond并发原语初始化的时候,需要关联一个Locker接口的实例,一般Mutex或者RWMutex。
Cond实现:

type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()

Cond关联的Locker实例可以通过c.L访问,它内部维护着一个先入先出的等待队列。

看一下以上的三个方法Signal、Broadcast、Wait方法。(我只在操作系统里见到了Signal和Wait,没有见过Broadcast方法,接下来看下都有什么功能吧)

  • Signal方法:允许调用者Caller唤醒一个等待此Cond和goroutine。如果此时没有等待的goroutine,显然无需通知waiter;如果Cond等待队列中有一个或者多个等待的goroutine,则需要从等待队列中移除第一个goroutine并把它唤醒。在Java语言中,Signal方法也叫做notify方法。调用Signal方法时,不强求你一定要持有c.L的锁
  • Broadcast方法,允许调用者Caller唤醒所有等待此Cond的goroutine。如果此时没有等待的goroutine,显然无需通知waiter;如果Cond等待队列中有一个或者多个等待的goroutine,则清空所有等待的goroutine,并全部唤醒。在Java语言中,Broadcast方法也被叫做notifyAll方法。同样地,调用Broadcast方法时,也不强求你一定持有c.L的锁
  • Wait方法,会把调用者Caller放入Cond的等待队列中并阻塞,直到被Signal或者Broadcast的方法从等待队列中移除并唤醒。调用Wait方法时必须要持有c.L的锁

Go实现的sync.Cond的方法名是Wait、Signal和Broadcast,这是计算机科学中条件变量的通用方法名。C语言对应的方法名是pthread_cond_wait、pthread_cond_signal和pthread_cond_broadcast。

接下来,看一个百米赛跑开始时的例子,来学习Cond的使用方法。10个运动员进入赛场之后需要先做拉伸活动活动筋骨,向观众和粉丝招手致敬,在自己的赛道上做好准备;等所有的运动员都准好之后,裁判员才会打响发令枪。

每个运动员做好准备之后,将ready加一,表明自己做好准备了,同时调用Broadcast方法通知裁判员。因为裁判员只有一个,所以这里可以直接替换成Signal方法调用。

裁判员会等待运动员都准备好(第22行)。虽然每个运动员准备好之后都唤醒了裁判员,但是裁判员被唤醒之后需要检查等待条件是否满足(运动员都准备好了)。可以看到,裁判员被唤醒之后一定要检查等待条件,如果条件不满足还是要继续等待。

func main(){
   c := sync.NewCond(&sync.Mutex{})
   var ready int

   for i := 0; i < 10; i++ {
      go func(i int) {
         time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

         // 加锁更改等待条件
         c.L.Lock()
         ready ++
         c.L.Unlock()

         log.Printf("运动员#%d 已准备就绪\\n", i)
         // 广播唤醒所有的等待者
         c.Broadcast()
      }(i)
   }

   c.L.Lock()
   for ready != 10 {
      c.Wait()
      log.Println("裁判员被唤醒一次")
   }
   c.L.Unlock()

   // 所有的运动员是否就绪
   log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

执行结果


Cond的使用其实没那么简单。
复杂在于:

  • 一,这段代码有时候需要加锁,有时候可以不加;
  • 二,Wait唤醒后需要检查条件;
  • 三,条件变量的更改,其实是需要原子操作或者互斥锁保护的。

Cond实现原理

Cond本身实现还是比较简单的,复杂的逻辑都在Locker或者runtime的等待队列实现了。(由于为了更加清晰一点,有一些源码都拉出来会比较多,耐心看完)

type noCopy struct{}

type Locker interface {
   Lock()
   Unlock()
}

type notifyList struct {
   wait   uint32
   notify uint32
   lock   uintptr
   head   unsafe.Pointer
   tail   unsafe.Pointer
}

type copyChecker uintptr

type Cond struct {
   noCopy noCopy

   // 当观察或者修改等待条件的时候需要加锁
   L Locker

   // 等待队列
   notify notifyList
   checker copyChecker
}

func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}

func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

func (c *Cond) Wait() {
   c.checker.check()
   // 增加到等待队列中
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   // 阻塞休眠直到被唤醒
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}

func (c *Cond) Signal()  {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast()  {
   c.checker.check()
   runtime_notifyListNotifyAll(&c.notify)
}
  1. runtime_notifyListXXX
    是运行时实现的方法(之前的文章应该有看到过),实现了一个等待/通知的队列。如果想深入学习这部分,可以去看看runtime/sema.go代码中。

  2. copyChecker是一个辅助结构,可以在运行时检查Cond是否复制使用。

  3. Signal和Broadcast只涉及到notifyList数据结构,不涉及到锁。

  4. Wait把调用者加入到等待队列时会释放锁,在被唤醒之后还会请求锁。在阻塞休眠期间,调用者是不持有锁的,这样能让其他goroutine有机会检查或者过呢更新等待变量。


Cond的常见错误

第一个是调用Wait的时候没有加锁

以百米赛跑为例,在调用cond.Wait时,把前后的Lock/Unlock注释掉,比如以下代码第20行和第25行

func main(){
   c := sync.NewCond(&sync.Mutex{})
   var ready int

   for i := 0; i < 10; i++ {
      go func(i int) {
         time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

         // 加锁更改等待条件
         c.L.Lock()
         ready ++
         c.L.Unlock()

         log.Printf("运动员#%d 已准备就绪\\n", i)
         // 广播唤醒所有的等待者
         c.Broadcast()
      }(i)
   }

   //c.L.Lock()
   for ready != 10 {
      c.Wait()
      log.Println("裁判员被唤醒一次")
   }
   //c.L.Unlock()

   // 所有的运动员是否就绪
   log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")

执行结果:

会发现上面报了Unlock of unlocked mutex 未加锁的panic,这个panic的原因在于,cond.Wait方法的实现是,把当前调用者加入到notify队列之中后会释放锁(如果不释放锁,其他Wait的调用者就没有机会加入到notify队列中了),然后一直等待;等调用者被唤醒之后,又会去争抢这把锁。如果调用Wait之前不加锁的话,就有可能Unlock一个未加锁的Locker。所以,调用cond.Wait方法之前一定要加锁

第二个是没有检查条件是否满足程序就继续执行

这个问题原因是,误以为Cond的使用,就像WaitGroup那样调用一下Wait方法等待那么简单。例如还是上个问题,把21行和24行注释掉:

func main(){
   c := sync.NewCond(&sync.Mutex{})
   var ready int

   for i := 0; i < 10; i++ {
      go func(i int) {
         time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)

         // 加锁更改等待条件
         c.L.Lock()
         ready ++
         c.L.Unlock()

         log.Printf("运动员#%d 已准备就绪\\n", i)
         // 广播唤醒所有的等待者
         c.Broadcast()
      }(i)
   }

   c.L.Lock()
   //for ready != 10 {
      c.Wait()
      log.Println("裁判员被唤醒一次")
   //}
   c.L.Unlock()

   // 所有的运动员是否就绪
   log.Println("所有运动员都准备就绪。比赛开始,3,2,1, ......")
}

运行结果:


可以看到,没有将所有的运动员都执行完,不是我们想的的那样所有运动员都准备好才进行下一步。原因在于,每一个运动员准备好之后都会唤醒所有的等待者,也就是这里的裁判员,比如第一个运动员准备好后就唤醒了裁判员,结果这个裁判员傻傻地没做任何检查,以为所有的运动员都准备好了,就继续执行了。

所以,waiter goroutine被唤醒不等于等待条件满足,只用goroutine把它唤醒了而已,等待条件有可能已经满足了,也有可能不满足,需要进一步检查。也可以说,等待者被唤醒,知识的呢到了一次检查的机会而已。

总结

时刻记住调用cond.Wait方法前一定要加锁,以及waiter goroutine被唤醒不等于等待条件被满足。


这次就先讲到这里,如果想要了解更多的golang语言内容一键三连后序每周持续更新!

以上是关于并发编程Cond 基本用法和如何实现以及常见错误的主要内容,如果未能解决你的问题,请参考以下文章

并发编程Once 基本用法和如何实现以及常见错误

并发编程map 基本用法和常见错误以及如何实现线程安全的map类型

Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现

Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现

Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现

Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现