Golang Mutex實現(xiàn)互斥的具體方法
Mutex是Golang常見的并發(fā)原語,不僅在開發(fā)過程中經(jīng)常使用到,如channel這種具有g(shù)olang特色的并發(fā)結(jié)構(gòu)也依托于Mutex從而實現(xiàn)
type Mutex struct { // 互斥鎖的狀態(tài),比如是否被鎖定 state int32 // 表示信號量。堵塞的協(xié)程會等待該信號量,解鎖的協(xié)程會釋放該信號量 sema int32 }
const ( ? // 當前是否已經(jīng)上鎖 ? mutexLocked = 1 << iota // 1 ? // 當前是否有喚醒的goroutine ? mutexWoken // 2 ? // 當前是否為饑餓狀態(tài) ? mutexStarving // 4 ? // state >> mutexWaiterShift 得到等待者數(shù)量 ? mutexWaiterShift = iota // 3 ? starvationThresholdNs = 1e6 // 判斷是否要進入饑餓狀態(tài)的閾值 )
Mutex有正常饑餓模式。
- 正常模式:等待者會入隊,但一個喚醒的等待者不能持有鎖,以及與新到來的goroutine進行競爭。新來的goroutine有一個優(yōu)勢——他們已經(jīng)運行在CPU上。
超過1ms沒有獲取到鎖,就會進入饑餓模式 - 饑餓模式:鎖的所有權(quán)直接移交給隊列頭goroutine,新來的goroutine不會嘗試獲取互斥鎖,即使互斥鎖看起來已經(jīng)解鎖,也不會嘗試旋轉(zhuǎn)。相反,他們自己排在等待隊列的末尾。
若等待者是最后一個,或者等待小于1ms就會切換回正常模式
獲取鎖
未鎖——直接獲取
func (m *Mutex) Lock() { // 快路徑。直接獲取未鎖的mutex // 初始狀態(tài)為0,所以只要狀態(tài)存在其他任何狀態(tài)位都是無法直接獲取的 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
在不饑餓且旋的不多的情況下,嘗試自旋
// 只要原狀態(tài)已鎖且不處于饑餓狀態(tài),并滿足自旋條件 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 在當前goroutine沒有喚醒,且沒有其他goroutine在嘗試喚醒,且存在等待的情況下,cas標記存在goroutine正在嘗試喚醒。若標記成功就設(shè)置當前goroutine已經(jīng)喚醒了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 自旋 runtime_doSpin() // 自旋次數(shù)加一 iter++ // 更新原狀態(tài) old = m.state continue }
具體的自旋條件如下
- 自旋次數(shù)小于4
- 多核CPU
- p數(shù)量大于1
- 至少存在一個p的隊列為空
const ( ?? ?locked uintptr = 1 ?? ?active_spin ? ? = 4 ?? ?active_spin_cnt = 30 ?? ?passive_spin ? ?= 1 ) func sync_runtime_canSpin(i int) bool { ?? ?// sync.Mutex is cooperative, so we are conservative with spinning. ?? ?// Spin only few times and only if running on a multicore machine and ?? ?// GOMAXPROCS>1 and there is at least one other running P and local runq is empty. ?? ?// As opposed to runtime mutex we don't do passive spinning here, ?? ?// because there can be work on global runq or on other Ps. ?? ?if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { ?? ??? ?return false ?? ?} ?? ?if p := getg().m.p.ptr(); !runqempty(p) { ?? ??? ?return false ?? ?} ?? ?return true }
自旋究竟在做什么呢?
自旋是由方法runtime_doSpin()執(zhí)行的,實際調(diào)用了procyield()
# 定義了一個runtime.procyield的文本段,通過NOSPLIT不使用棧分裂,$0-0 表示該函數(shù)不需要任何輸入?yún)?shù)和返回值 TEXT runtime·procyield(SB),NOSPLIT,$0-0 # 從棧幀中讀取cycles參賽值,并儲存在寄存器R0中 MOVWU cycles+0(FP), R0 # 組成無限循環(huán)。在每次循環(huán)中,通過YIELD告訴CPU將當前線程置于休眠狀態(tài) # YIELD: x86上,實現(xiàn)為PAUSE指令,會暫停處理器執(zhí)行,切換CPU到低功耗模式并等待更多數(shù)據(jù)到達。通常用于忙等待機制,避免無謂CPU占用 # ARM上,實現(xiàn)為WFE(Wait For Event),用于等待中斷或者其他事件發(fā)生。在某些情況下可能會導(dǎo)致CPU陷入死循環(huán),因此需要特殊處理邏輯解決 again: YIELD # 將R0值減1 SUBW $1, R0 # CBNZ(Compare and Branch on Non-Zero)檢查剩余的時鐘周期數(shù)是否為0。不為0就跳轉(zhuǎn)到標簽again并再次調(diào)用YIELD,否則就退出函數(shù) CBNZ R0, again RET
以上匯編代碼分析過程感謝chatgpt的大力支持
從代碼中可以看到自旋次數(shù)是30次
const active_spin_cnt = 30 func sync_runtime_doSpin() { ?? ?procyield(active_spin_cnt) }
計算期望狀態(tài)
1.原狀態(tài)不處于饑餓狀態(tài),新狀態(tài)設(shè)置已鎖狀態(tài)位
原狀態(tài)處于已鎖狀態(tài)或饑餓模式,新狀態(tài)設(shè)置等待數(shù)量遞增
當前goroutine是最新獲取鎖的goroutine,在正常模式下期望就是要獲取鎖,那么就應(yīng)該設(shè)置新狀態(tài)已鎖狀態(tài)位
如果鎖已經(jīng)被搶占了,或者處于饑餓模式,那么就應(yīng)該去排隊
2.若之前嘗試獲取時已經(jīng)超過饑餓閾值時間,且原狀態(tài)已鎖,那么新狀態(tài)設(shè)置饑餓狀態(tài)位
3.若goroutine處于喚醒,則新狀態(tài)清除正在喚醒狀態(tài)位
期望是已經(jīng)獲取到鎖了,那么自然要清除正在獲取的狀態(tài)位
new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. // 若原狀態(tài)不處于饑餓狀態(tài),就給新狀態(tài)設(shè)置已加鎖 if old&mutexStarving == 0 { new |= mutexLocked } // 只要原狀態(tài)處于已鎖或者饑餓模式,就將新狀態(tài)等待數(shù)量遞增 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 若已經(jīng)等待超過饑餓閾值時間且原狀態(tài)已鎖,就設(shè)置新狀態(tài)為饑餓 // 這也意味著如果已經(jīng)不處于已鎖狀態(tài),就可以切換回正常模式了 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 如果已經(jīng)喚醒了(也就是沒有其他正在搶占的goroutine),則在新狀態(tài)中清除正在喚醒狀態(tài)位 if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
嘗試達成獲取鎖期望
cas嘗試從原狀態(tài)更新為新的期望狀態(tài)
如果失敗,則更新最新狀態(tài),繼續(xù)嘗試獲取鎖
說明這期間鎖已經(jīng)被搶占了
若原來既沒有被鎖住,也沒有處于饑餓模式,那么就獲取到鎖,直接返回
排隊。若之前已經(jīng)在等待了就排到隊列頭
獲取信號量。此處會堵塞等待
被喚醒,認定已經(jīng)持有鎖。并做以下饑餓相關(guān)處理
- 計算等待時長,若超出饑餓閾值時間,就標記當前goroutine處于饑餓
- 若鎖處于饑餓模式,遞減等待數(shù)量,并且在只有一個等待的時候,切換鎖回正常模式
if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果原狀態(tài)既沒有處于已鎖狀態(tài),也沒有處于饑餓模式 // 那么就表示已經(jīng)獲取到鎖,直接退出 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 若已經(jīng)在等待了,就排到隊列頭 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 嘗試獲取信號量。此處獲取一個信號量以實現(xiàn)互斥 // 此處會進行堵塞 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 被信號量喚醒之后,發(fā)現(xiàn)若等待時間超過饑餓閾值,就切換到饑餓模式 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 處于饑餓模式下 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // 若既沒有已鎖且正在嘗試喚醒,或者等待隊列為空,就代表產(chǎn)生了不一致的狀態(tài) if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 當前goroutine已經(jīng)獲取鎖,等待隊列減1;若等待者就一個,就切換正常模式。退出 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } // 不處于饑餓模式下,設(shè)置當前goroutine為喚醒狀態(tài),重置自璇次數(shù),繼續(xù)嘗試獲取鎖 awoke = true iter = 0 } else { // 若鎖被其他goroutine占用了,就更新原狀態(tài),繼續(xù)嘗試獲取鎖 old = m.state }
考慮幾種場景
- 如果lock當前只有一個goroutine g1去獲取鎖,那么會直接快路徑,cas更新已鎖狀態(tài)位,獲取到鎖
- 如果鎖已經(jīng)被g1持有,
- 此時g2會先自旋4次,
- 然后計算期望狀態(tài)為已鎖、等待數(shù)量為1、喚醒狀態(tài)位被清除
- 在cas更新的時候嘗試更新鎖狀態(tài)成功,接著因為原狀態(tài)本身處于已鎖,所以就不能獲取到鎖,只能排隊,信號量堵塞
- g1釋放鎖后,g2被喚醒,接著再次計算期望狀態(tài),并cas更新狀態(tài)成功,直接獲取到鎖
- 如果鎖已經(jīng)被g1持有,且g2在第一次嘗試獲取時超過了1ms(也就是饑餓閾值),那么
- 計算期望狀態(tài)為已鎖、饑餓、清除喚醒狀態(tài)位
- cas更新狀態(tài)成功,排在隊列頭,并被信號量堵塞
- g1釋放鎖后,g2被喚醒就直接獲取到鎖,并減去排隊數(shù)量以及清空饑餓位
釋放鎖
只有已鎖——直接釋放
如果沒有排隊的goroutine,沒有處于饑餓狀態(tài),也沒有真正嘗試獲取鎖的goroutine,那么就可以直接cas更新狀態(tài)為0
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
慢釋放
- 如果原鎖沒有被鎖住,就報錯
- 若原狀態(tài)不處于饑餓狀態(tài),嘗試喚醒等待者
- 若現(xiàn)在鎖已經(jīng)被獲取、正在獲取、饑餓或者沒有等待者,直接返回
- 期望狀態(tài)等待數(shù)量減1,并設(shè)置正在喚醒狀態(tài)位
- cas嘗試更新期望狀態(tài),若成功,釋放
- 失敗說明在這過程中又有g(shù)oroutine在嘗試獲取,那么繼續(xù)下一輪釋放
- 處于饑餓狀態(tài),直接釋放信號量,移交鎖所有權(quán)
func (m *Mutex) unlockSlow(new int32) { // 若原狀態(tài)根本沒有已鎖狀態(tài)位 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 若原狀態(tài)不處于饑餓狀態(tài) if new&mutexStarving == 0 { old := new for { // 若沒有等待,或者存在goroutine已經(jīng)被喚醒,或者已經(jīng)被鎖住了,就不需要喚醒任何人,返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. // 設(shè)置期望狀態(tài)為正在獲取狀態(tài)位,并減去一個等待者 new = (old - 1<<mutexWaiterShift) | mutexWoken // 嘗試cas更新為期望新狀態(tài),若成功就釋放信號量,失敗就更新原狀態(tài),進行下一輪釋放 // 失敗說明在這過程中又有g(shù)oroutine在嘗試獲取,比如已經(jīng)獲取到了、變成饑餓了、自旋等 if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 饑餓模式下就移交鎖所有權(quán)給下一個等待者,并放棄時間片,以便該等待者可以快速開始 runtime_Semrelease(&m.sema, true, 1) } }
到此這篇關(guān)于Golang Mutex實現(xiàn)互斥的具體方法的文章就介紹到這了,更多相關(guān)Golang Mutex互斥內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中空的切片轉(zhuǎn)化成 JSON 后變?yōu)?nbsp;null 問題的解決方案
在 Golang 中,經(jīng)常需要將其他類型(例如 slice、map、struct 等類型)的數(shù)據(jù)轉(zhuǎn)化為 JSON 格式,有時候轉(zhuǎn)化的結(jié)果并不是預(yù)期中的,例如將一個空的切片轉(zhuǎn)化為 JSON 時,會變成"null",所以本文將給大家介紹一下解決方法,需要的朋友可以參考下2023-09-09基于go interface{}==nil 的幾種坑及原理分析
這篇文章主要介紹了基于go interface{}==nil 的幾種坑及原理分析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04