Go-Mutex互斥量
Posted straylesley
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go-Mutex互斥量相关的知识,希望对你有一定的参考价值。
先来看一段go1.12.5中Mutex的源码:
// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package sync provides basic synchronization primitives such as mutual // exclusion locks. Other than the Once and WaitGroup types, most are intended // for use by low-level library routines. Higher-level synchronization is // better done via channels and communication. // // Values containing the types defined in this package should not be copied. package sync import ( "internal/race" "sync/atomic" "unsafe" ) func throw(string) // provided by runtime // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 } // A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota // Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don‘t try to acquire the mutex even if it appears // to be unlocked, and don‘t try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 ) // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don‘t spin in starvation mode, ownership is handed off to waiters // so we won‘t be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don‘t try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don‘t do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } // Unlock unlocks m. // It is a run-time error if m is not locked on entry to Unlock. // // A locked Mutex is not associated with a particular goroutine. // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won‘t acquire it. runtime_Semrelease(&m.sema, true) } }
Mutex结构体
type Mutex struct { state int32 //互斥锁的状态 sema uint32 //信号量,协程阻塞等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。 }
查资料的时候找到一张很好的展示Mutex的内存布局的图:
- Locked:表示该Mutex是否已被锁定,0代表未被锁定,1代表已被锁定。
- Woken:表示是否有协程已被唤醒,0代表没有协程被唤醒,1代表已有协程被唤醒,正在加锁。
- Starving:表示该Mutex是否处于饥饿状态,0代表不处于饥饿状态,1代表处于饥饿状态,即阻塞超过1ms。
- Waiter:表示阻塞等待锁的协程个数,协程解锁时根据此值来判断是否需要释放信号量。
协程之间抢锁实际上是抢给Locked赋值的权利,能给Locked域置1,就说明抢锁成功。抢不到就阻塞等待Mutex.sema信号量,一旦持有锁的协程解锁,等待的协程会依次被唤醒。
(这个作者写的真好,原文链接我放到文末了~)
(有时候很恍惚,自己到底有没有写博客的必要,大部分时候都是互联网的搬运工,总有一天,我也能写出纯原创的技术博客~)
两种操作模式
go的互斥有两种操作模式:正常模式和饥饿模式
在正常模式下,等待状态的协程(等待者)按照FIFO顺序排队。一个由沉睡(sleep)醒转(就绪)来的协程不拥有互斥锁,并且它对于互斥锁的竞争并不如新到来的协程有优势。新到来的协程可能已经在CPU上运行,并且可能数量还很多。但是,如果醒转来的等待者在队列中阻塞等待互斥锁超过1毫秒,正常模式将会切换到饥饿模式。
在饥饿模式下,互斥锁的所有权直接从释放锁的协程传递到队列最前的等待者。新到来的协程即使有竞争优势,也不会去争取互斥锁,相反,它们会到队列尾部排队等候。
如果醒转来的等待者获得互斥锁的所有权并且发现(1)它是队列中的最后一个等待者,或者(2)它等待不到1ms,它便会把饥饿模式切换到正常模式。
正常模式具有相当好的性能,因为即使存在阻塞的等待者,协程也可以连续多次获取互斥锁。
而饥饿模式可以预防到达协程迟迟得不到处理(反而可能被远远排在新协程之后),说白了,饥饿模式就是为防止协程进入饥饿状态忙等而设。
两种方法
Mutex只提供了两种方法,即Lock()加锁和Unlock()解锁。
加锁
以上面那张Mutex内存布局图为例,最简单毫无阻塞的加锁就是将Locked置为1,当锁已被占用,则将Waiter++,协程进入阻塞,直到Locked值变为0后被唤醒。
解锁
仍是以上面那张Mutex内存布局图为例,没有其他协程阻塞等待加锁(即Waiter为0),则只要将Locked置为0,不需要释放信号量。若解锁时,有1个或多个协程阻塞(即Waiter>0),则需要在将Locked置为0后,释放信号量唤醒阻塞的协程。
自旋
基本概念
上面我们说到,加锁时可能被阻塞,此时,协程并不是立即进入阻塞,而是会持续检测Locked是否变为0,这个过程即为自旋(spin)过程。从源码mutex源码中runtime_canSpin()和runtime_doSpin()两个方法,它们就是用来判断是否可以自旋(即是否符合自旋条件)和执行自旋的。
自旋必须满足一下所有条件:
- 自旋次数要足够小,通常为4,即每个协程最多自旋4次。
- CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁。
- 协程调度机制中的Process数量要大于1,比如使用GOMAXPROCS()将处理器设置为1就不能启动自旋。
- 协程调度机制中的可运行队列必须为空,否则会延迟协程调度。
限制自旋次数很好理解,关于CPU核数,一开始我不理解为什么要限制这个,不妨来设想一下协程自旋的场景,假设一个协程主动释放了锁并释放了信号量,我们的协程在对处理器的竞争中惨败,因此进入短暂自旋,以期寻找其他门路,即看看其他处理器是不是正有协程准备解锁,试想,假如只有1核,刚刚在和我们的竞争中获取取得锁控制权的协程,怎么可能在短期内释放锁,因此只能直接进入阻塞。
至于什么是GOMAXPROCS呢?就是逻辑CPU数量,它可以被设置为如下几种数值:
- <1:不修改任何数值。
- =1:单核心执行。
- >1:多核并发执行
一般情况下,可以使用runtime.NumCPU查询CPU数量,并使用runtime.GOMAXPROCS()进行设置,如:runtime.GOMAXPROCS(runtime.NumCPU),将逻辑CPU数量设置为物理CPU数量。
现在想想,对Process进行限制,是不是显而易见的事。
至于可运行队列为什么必须为空,我的理解,就是当前只能有这一条就绪线程,也就是说同时只能有一条自旋。
自旋的好处
可以更充分的利用CPU。
自旋的坏处
如果协程通过自旋获得锁,那么之前被阻塞的协程将无法获得锁,如果加锁的协程特别多,每次都通过自旋获得锁,那么之前被阻塞的协程将很难获得锁,从而进入饥饿状态。因此,在1.8版本以后,饥饿状态(即Starving为1)下不允许自旋。
补充
自旋和模式切换是有区别的,自旋发生在阻塞之前,模式切换发生在阻塞之后。
整个互斥过程是围绕着Mutex量进行的,即争夺对Mutex内存的修改权,Mutex可以看作是处理器的钥匙,争夺到Mutex的协程可以被处理。
Woken的作用:
Woken用于加锁和解锁过程的通信,譬如,同一时刻,有两个协程,一个在加锁,一个在解锁,在加锁的协程可能在自旋,此时把Woken置为1,通知解锁协程不必释放信号量了。也就是说Woken标志着当前有就绪状态的进程,不用解锁协程去通知。
参考链接
https://my.oschina.net/renhc/blog/2876211
另外还有一篇详解mutex源码的博客:
https://blog.csdn.net/qq_31967569/article/details/80987352
以上是关于Go-Mutex互斥量的主要内容,如果未能解决你的问题,请参考以下文章
LockSupport.java 中的 FIFO 互斥代码片段
临界区(critical section 每个线程中访问 临界资源 的那段代码)和互斥锁(mutex)的区别(进程间互斥量共享内存虚拟地址)