Go語言的互斥鎖的詳細使用
前言
當提到并發(fā)編程、多線程編程時,都會在第一時間想到鎖,鎖是并發(fā)編程中的同步原語,他可以保證多線程在訪問同一片內(nèi)存時不會出現(xiàn)競爭來保證并發(fā)安全;在
Go語言中更推崇由channel通過通信的方式實現(xiàn)共享內(nèi)存,這個設計點與許多主流編程語言不一致,但是Go語言也在sync包中提供了互斥鎖、讀寫鎖,畢竟channel也不能滿足所有場景,互斥鎖、讀寫鎖的使用與我們是分不開的,所以接下來我會分兩篇來分享互斥鎖、讀寫鎖是怎么實現(xiàn)的,本文我們先來看看互斥鎖的實現(xiàn)。
本文基于Golang版本:1.18
Go語言互斥鎖設計實現(xiàn)
mutex介紹
sync 包下的mutex就是互斥鎖,其提供了三個公開方法:調(diào)用Lock()獲得鎖,調(diào)用Unlock()釋放鎖,在Go1.18新提供了TryLock()方法可以非阻塞式的取鎖操作:
Lock():調(diào)用Lock方法進行加鎖操作,使用時應注意在同一個goroutine中必須在鎖釋放時才能再次上鎖,否則會導致程序panic。Unlock():調(diào)用UnLock方法進行解鎖操作,使用時應注意未加鎖的時候釋放鎖會引起程序panic,已經(jīng)鎖定的 Mutex 并不與特定的 goroutine 相關聯(lián),這樣可以利用一個 goroutine 對其加鎖,再利用其他 goroutine 對其解鎖。tryLock():調(diào)用TryLock方法嘗試獲取鎖,當鎖被其他 goroutine 占有,或者當前鎖正處于饑餓模式,它將立即返回 false,當鎖可用時嘗試獲取鎖,獲取失敗不會自旋/阻塞,也會立即返回false;
mutex的結(jié)構比較簡單只有兩個字段:
type Mutex struct {
state int32
sema uint32
}
state:表示當前互斥鎖的狀態(tài),復合型字段;sema:信號量變量,用來控制等待goroutine的阻塞休眠和喚醒
初看結(jié)構你可能有點懵逼,互斥鎖應該是一個復雜東西,怎么就兩個字段就可以實現(xiàn)?那是因為設計使用了位的方式來做標志,state的不同位分別表示了不同的狀態(tài),使用最小的內(nèi)存來表示更多的意義,其中低三位由低到高分別表示mutexed、mutexWoken 和 mutexStarving,剩下的位則用來表示當前共有多少個goroutine在等待鎖:
const ( mutexLocked = 1 << iota // 表示互斥鎖的鎖定狀態(tài) mutexWoken // 表示從正常模式被從喚醒 mutexStarving // 當前的互斥鎖進入饑餓狀態(tài) mutexWaiterShift = iota // 當前互斥鎖上等待者的數(shù)量 )

mutex最開始的實現(xiàn)只有正常模式,在正常模式下等待的線程按照先進先出的方式獲取鎖,但是新創(chuàng)建的gouroutine會與剛被喚起的 goroutine競爭,會導致剛被喚起的 goroutine獲取不到鎖,這種情況的出現(xiàn)會導致線程長時間被阻塞下去,所以Go語言在1.9中進行了優(yōu)化,引入了饑餓模式,當goroutine超過1ms沒有獲取到鎖,就會將當前互斥鎖切換到饑餓模式,在饑餓模式中,互斥鎖會直接交給等待隊列最前面的goroutine,新的 goroutine 在該狀態(tài)下不能獲取鎖、也不會進入自旋狀態(tài),它們只會在隊列的末尾等待。如果一個 goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會切換回正常模式。
mutex的基本情況大家都已經(jīng)掌握了,接下來我們從加鎖到解鎖來分析mutex是如何實現(xiàn)的;
Lock加鎖
從Lock方法入手:
func (m *Mutex) Lock() {
// 判斷當前鎖的狀態(tài),如果鎖是完全空閑的,即m.state為0,則對其加鎖,將m.state的值賦為1
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
上面的代碼主要兩部分邏輯:
- 通過
CAS判斷當前鎖的狀態(tài),也就是state字段的低1位,如果鎖是完全空閑的,即m.state為0,則對其加鎖,將m.state的值賦為1 - 若當前鎖已經(jīng)被其他
goroutine加鎖,則進行lockSlow方法嘗試通過自旋或饑餓狀態(tài)下饑餓goroutine競爭方式等待鎖的釋放,我們在下面介紹lockSlow方法;
lockSlow代碼段有點長,主體是一個for循環(huán),其主要邏輯可以分為以下三部分:
- 狀態(tài)初始化
- 判斷是否符合自旋條件,符合條件進行自旋操作
- 搶鎖準備期望狀態(tài)
- 通過
CAS操作更新期望狀態(tài)
初始化狀態(tài)
在locakSlow方法內(nèi)會先初始化5個字段:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
........
}
waitStartTime用來計算waiter的等待時間starving是饑餓模式標志,如果等待時長超過1ms,starving置為true,后續(xù)操作會把Mutex也標記為饑餓狀態(tài)。awoke表示協(xié)程是否喚醒,當goroutine在自旋時,相當于CPU上已經(jīng)有在等鎖的協(xié)程。為避免Mutex解鎖時再喚醒其他協(xié)程,自旋時要嘗試把Mutex置為喚醒狀態(tài),Mutex處于喚醒狀態(tài)后 要把本協(xié)程的 awoke 也置為true。iter用于記錄協(xié)程的自旋次數(shù),old記錄當前鎖的狀態(tài)
自旋
自旋的判斷條件非常苛刻:
for {
// 判斷是否允許進入自旋 兩個條件,條件1是當前鎖不能處于饑餓狀態(tài)
// 條件2是在runtime_canSpin內(nèi)實現(xiàn),其邏輯是在多核CPU運行,自旋的次數(shù)小于4
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// !awoke 判斷當前goroutine不是在喚醒狀態(tài)
// old&mutexWoken == 0 表示沒有其他正在喚醒的goroutine
// old>>mutexWaiterShift != 0 表示等待隊列中有正在等待的goroutine
// atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 嘗試將當前鎖的低2位的Woken狀態(tài)位設置為1,表示已被喚醒, 這是為了通知在解鎖Unlock()中不要再喚醒其他的waiter了
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 設置當前goroutine喚醒成功
awoke = true
}
// 進行自旋
runtime_doSpin()
// 自旋次數(shù)
iter++
// 記錄當前鎖的狀態(tài)
old = m.state
continue
}
}
自旋這里的條件還是很復雜的,我們想讓當前goroutine進入自旋轉(zhuǎn)的原因是我們樂觀的認為當前正在持有鎖的goroutine能在較短的時間內(nèi)歸還鎖,所以我們需要一些條件來判斷,mutex的判斷條件我們在文字描述一下:
old&(mutexLocked|mutexStarving) == mutexLocked 用來判斷鎖是否處于正常模式且加鎖,為什么要這么判斷呢?
mutexLocked 二進制表示為 0001
mutexStarving 二進制表示為 0100
mutexLocked|mutexStarving 二進制為 0101. 使用0101在當前狀態(tài)做 &操作,如果當前處于饑餓模式,低三位一定會是1,如果當前處于加鎖模式,低1位一定會是1,所以使用該方法就可以判斷出當前鎖是否處于正常模式且加鎖;
runtime_canSpin()方法用來判斷是否符合自旋條件:
// / go/go1.18/src/runtime/proc.go
const active_spin = 4
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
自旋條件如下:
- 自旋的次數(shù)要在4次以內(nèi)
CPU必須為多核GOMAXPROCS>1- 當前機器上至少存在一個正在運行的處理器 P 并且處理的運行隊列為空;
判斷當前goroutine可以進自旋后,調(diào)用runtime_doSpin方法進行自旋:
const active_spin_cnt = 30
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
循環(huán)次數(shù)被設置為30次,自旋操作就是執(zhí)行30次PAUSE指令,通過該指令占用CPU并消費CPU時間,進行忙等待;
這就是整個自旋操作的邏輯,這個就是為了優(yōu)化 等待阻塞->喚醒->參與搶占鎖這個過程不高效,所以使用自旋進行優(yōu)化,在期望在這個過程中鎖被釋放。
搶鎖準備期望狀態(tài)
自旋邏輯處理好后開始根據(jù)上下文計算當前互斥鎖最新的狀態(tài),根據(jù)不同的條件來計算mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:
首先計算mutexLocked的值:
// 基于old狀態(tài)聲明到一個新狀態(tài)
new := old
// 新狀態(tài)處于非饑餓的條件下才可以加鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
計算mutexWaiterShift的值:
//如果old已經(jīng)處于加鎖或者饑餓狀態(tài),則等待者按照FIFO的順序排隊
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
計算mutexStarving的值:
// 如果當前鎖處于饑餓模式,并且已被加鎖,則將低3位的Starving狀態(tài)位設置為1,表示饑餓
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
計算mutexWoken的值:
// 當前goroutine的waiter被喚醒,則重置flag
if awoke {
// 喚醒狀態(tài)不一致,直接拋出異常
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 新狀態(tài)清除喚醒標記,因為后面的goroutine只會阻塞或者搶鎖成功
// 如果是掛起狀態(tài),那就需要等待其他釋放鎖的goroutine來喚醒。
// 假如其他goroutine在unlock的時候發(fā)現(xiàn)Woken的位置不是0,則就不會去喚醒,那該goroutine就無法在被喚醒后加鎖
new &^= mutexWoken
}
通過CAS操作更新期望狀態(tài)
上面我們已經(jīng)得到了鎖的期望狀態(tài),接下來通過CAS將鎖的狀態(tài)進行更新:
// 這里嘗試將鎖的狀態(tài)更新為期望狀態(tài)
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果原來鎖的狀態(tài)是沒有加鎖的并且不處于饑餓狀態(tài),則表示當前goroutine已經(jīng)獲取到鎖了,直接推出即可
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 到這里就表示goroutine還沒有獲取到鎖,waitStartTime是goroutine開始等待的時間,waitStartTime != 0就表示當前goroutine已經(jīng)等待過了,則需要將其放置在等待隊列隊頭,否則就排到隊列隊尾
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 被信號量喚醒后檢查當前goroutine是否應該表示為饑餓
// 1. 當前goroutine已經(jīng)饑餓
// 2. goroutine已經(jīng)等待了1ms以上
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 再次獲取當前鎖的狀態(tài)
old = m.state
// 如果當前處于饑餓模式,
if old&mutexStarving != 0 {
// 如果當前鎖既不是被獲取也不是被喚醒狀態(tài),或者等待隊列為空 這代表鎖狀態(tài)產(chǎn)生了不一致的問題
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 當前goroutine已經(jīng)獲取了鎖,等待隊列-1
delta := int32(mutexLocked - 1<<mutexWaiterShift
// 當前goroutine非饑餓狀態(tài) 或者 等待隊列只剩下一個waiter,則退出饑餓模式(清除饑餓標識位)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新狀態(tài)值并中止for循環(huán),拿到鎖退出
atomic.AddInt32(&m.state, delta)
break
}
// 設置當前goroutine為喚醒狀態(tài),且重置自璇次數(shù)
awoke = true
iter = 0
} else {
// 鎖被其他goroutine占用了,還原狀態(tài)繼續(xù)for循環(huán)
old = m.state
}
這塊的邏輯很復雜,通過CAS來判斷是否獲取到鎖,沒有通過 CAS 獲得鎖,會調(diào)用 runtime.sync_runtime_SemacquireMutex通過信號量保證資源不會被兩個 goroutine 獲取,runtime.sync_runtime_SemacquireMutex會在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當前 goroutine 可以獲取信號量,它就會立刻返回,如果是新來的goroutine,就需要放在隊尾;如果是被喚醒的等待鎖的goroutine,就放在隊頭,整個過程還需要啃代碼來加深理解。
解鎖
相對于加鎖操作,解鎖的邏輯就沒有那么復雜了,接下來我們來看一看UnLock的邏輯:
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
使用AddInt32方法快速進行解鎖,將m.state的低1位置為0,然后判斷新的m.state值,如果值為0,則代表當前鎖已經(jīng)完全空閑了,結(jié)束解鎖,不等于0說明當前鎖沒有被占用,會有等待的goroutine還未被喚醒,需要進行一系列喚醒操作,這部分邏輯就在unlockSlow方法內(nèi):
func (m *Mutex) unlockSlow(new int32) {
// 這里表示解鎖了一個沒有上鎖的鎖,則直接發(fā)生panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式的釋放鎖邏輯
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有等待者則直接返回即可
// 如果鎖處于加鎖的狀態(tài),表示已經(jīng)有goroutine獲取到了鎖,可以返回
// 如果鎖處于喚醒狀態(tài),這表明有等待的goroutine被喚醒了,不用嘗試獲取其他goroutine了
// 如果鎖處于饑餓模式,鎖之后會直接給等待隊頭goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 搶占喚醒標志位,這里是想要把鎖的狀態(tài)設置為被喚醒,然后waiter隊列-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 搶占成功喚醒一個goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 執(zhí)行搶占不成功時重新更新一下狀態(tài)信息,下次for循環(huán)繼續(xù)處理
old = m.state
}
} else {
// 饑餓模式釋放鎖邏輯,直接喚醒等待隊列goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
我們在喚醒goroutine時正常模式/饑餓模式都調(diào)用func runtime_Semrelease(s *uint32, handoff bool, skipframes int),這兩種模式在第二個參數(shù)的傳參上不同,如果handoff is true, pass count directly to the first waiter.。
非阻塞加鎖
Go語言在1.18版本中引入了非阻塞加鎖的方法TryLock(),其實現(xiàn)就很簡潔:
func (m *Mutex) TryLock() bool {
// 記錄當前狀態(tài)
old := m.state
// 處于加鎖狀態(tài)/饑餓狀態(tài)直接獲取鎖失敗
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 嘗試獲取鎖,獲取失敗直接獲取失敗
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}
TryLock的實現(xiàn)就比較簡單了,主要就是兩個判斷邏輯:
- 判斷當前鎖的狀態(tài),如果鎖處于加鎖狀態(tài)或饑餓狀態(tài)直接獲取鎖失敗
- 嘗試獲取鎖,獲取失敗直接獲取鎖失敗
TryLock并不被鼓勵使用,至少我還沒想到有什么場景可以使用到它。
總結(jié)
通讀源碼后你會發(fā)現(xiàn)互斥鎖的邏輯真的十分復雜,代碼量雖然不多,但是很難以理解,一些細節(jié)點還需要大家多看看幾遍才能理解其為什么這樣做,文末我們再總結(jié)一下互斥鎖的知識點:
- 互斥鎖有兩種模式:正常模式、饑餓模式,饑餓模式的出現(xiàn)是為了優(yōu)化正常模式下剛被喚起的
goroutine與新創(chuàng)建的goroutine競爭時長時間獲取不到鎖,在Go1.9時引入饑餓模式,如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式,如果一個goroutine獲得了鎖,并且他在等待隊列隊尾 或者 他等待小于1ms,則會將Mutex的模式切換回正常模式 - 加鎖的過程:
- 鎖處于完全空閑狀態(tài),通過CAS直接加鎖
- 當鎖處于正常模式、加鎖狀態(tài)下,并且符合自旋條件,則會嘗試最多4次的自旋
- 若當前
goroutine不滿足自旋條件時,計算當前goroutine的鎖期望狀態(tài) - 嘗試使用CAS更新鎖狀態(tài),若更新鎖狀態(tài)成功判斷當前
goroutine是否可以獲取到鎖,獲取到鎖直接退出即可,若獲取不到鎖則陷入睡眠,等待被喚醒 - goroutine被喚醒后,如果鎖處于饑餓模式,則直接拿到鎖,否則重置自旋次數(shù)、標志喚醒位,重新走for循環(huán)自旋、獲取鎖邏輯;
- 解鎖的過程
- 原子操作mutexLocked,如果鎖為完全空閑狀態(tài),直接解鎖成功
- 如果鎖不是完全空閑狀態(tài),,那么進入
unlockedslow邏輯 - 如果解鎖一個未上鎖的鎖直接panic,因為沒加鎖
mutexLocked的值為0,解鎖時進行mutexLocked - 1操作,這個操作會讓整個互斥鎖混亂,所以需要有這個判斷 - 如果鎖處于饑餓模式直接喚醒等待隊列隊頭的waiter
- 如果鎖處于正常模式下,沒有等待的goroutine可以直接退出,如果鎖已經(jīng)處于鎖定狀態(tài)、喚醒狀態(tài)、饑餓模式則可以直接退出,因為已經(jīng)有被喚醒的
goroutine獲得了鎖.
- 使用互斥鎖時切記拷貝
Mutex,因為拷貝Mutex時會連帶狀態(tài)一起拷貝,因為Lock時只有鎖在完全空閑時才會獲取鎖成功,拷貝時連帶狀態(tài)一起拷貝后,會造成死鎖 - TryLock的實現(xiàn)邏輯很簡單,主要判斷當前鎖處于加鎖狀態(tài)、饑餓模式就會直接獲取鎖失敗,嘗試獲取鎖失敗直接返回;
到此這篇關于Go語言的互斥鎖的詳細使用的文章就介紹到這了,更多相關Go語言 互斥鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Go語言模型:string的底層數(shù)據(jù)結(jié)構與高效操作詳解
這篇文章主要介紹了Go語言模型:string的底層數(shù)據(jù)結(jié)構與高效操作詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理
這篇文章主要介紹了詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-08-08

