golang中sync.Mutex的實現(xiàn)方法
mutex 的實現(xiàn)思想
mutex 主要有兩個 method: Lock()
和 Unlock()
Lock()
可以通過一個 CAS 操作來實現(xiàn)
func (m *Mutex) Lock() { for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) { } } func (m *Mutex) Unlock() { atomic.StoreUint32(&m.locked, 0) }
Lock() 一直進(jìn)行 CAS 操作,比較耗 CPU。因此帶來了一個優(yōu)化:如果協(xié)程在一段時間內(nèi)搶不到鎖,可以把該協(xié)程掛到一個等待隊列上,Unlock()
的一方除了更新鎖的狀態(tài),還需要從等待隊列中喚醒一個協(xié)程。
但是這個優(yōu)化會存在一個問題,如果一個協(xié)程從等待隊列中喚醒后再次搶鎖時,鎖已經(jīng)被一個新來的協(xié)程搶走了,它就只能再次被掛到等待隊列中,接著再被喚醒,但又可能搶鎖失敗...... 這個悲催的協(xié)程可能會一直搶不到鎖,由此產(chǎn)生饑餓 (starvation) 現(xiàn)象。
饑餓現(xiàn)象會導(dǎo)致尾部延遲 (Tail Latency) 特別高。什么是尾部延遲?用一句話來說就是:最慢的特別慢!
如果共有 1000 個協(xié)程,假設(shè) 999 個協(xié)程可以在 1ms 內(nèi)搶到鎖,雖然平均時間才 2ms,但是最慢的那個協(xié)程卻需要 1s 才搶到鎖,這就是尾部延遲。
golang 中 mutex 的實現(xiàn)思想
? go version go version go1.16.5 darwin/arm64
本次閱讀的 go 源碼版本為 go1.16.5。
golang 標(biāo)準(zhǔn)庫里的 mutex 避免了饑餓現(xiàn)象的發(fā)生,先大致介紹一下 golang 的加鎖和解鎖流程,對后面的源碼閱讀有幫助。
鎖有兩種 mode,分別是 normal mode 和 starvation mode。初始為 normal mode,當(dāng)一個協(xié)程來搶鎖時,依舊是做 CAS 操作,如果成功了,就直接返回,如果沒有搶到鎖,它會做一定次數(shù)的自旋操作,等待鎖被釋放,在自旋操作結(jié)束后,如果鎖依舊沒有被釋放,那么這個協(xié)程就會被放到等待隊列中。如果一個處于等待隊列中的協(xié)程一直都沒有搶到鎖,mutex 就會從 normal mode 變成 starvation mode,在 starvation mode 下,當(dāng)有協(xié)程釋放鎖時,這個鎖會被直接交給等待隊列中的協(xié)程,從而避免產(chǎn)生饑餓線程。
除此之外,golang 還有一點小優(yōu)化,當(dāng)有協(xié)程正在自旋搶鎖時,Unlock()
的一方不會從等待隊列中喚醒協(xié)程,因為即使喚醒了,被喚醒的協(xié)程也搶不過正在自旋的協(xié)程。
下面正式開始閱讀源碼。
mutex 的結(jié)構(gòu)以及一些 const 常量值
type Mutex struct { state int32 sema uint32 }
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota // 3 // Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 )
mutex 的狀態(tài)是通過 state
來維護的,state
有 32 個 bit。
前面 29 個 bit 用來記錄當(dāng)前等待隊列中有多少個協(xié)程在等待,將等待隊列的協(xié)程數(shù)量記錄為 waiterCount。
state >> mutexWaiterShift // mutexWaiterShift 的值為 3
第 30 個 bit 表示當(dāng)前 mutex 是否處于 starvation mode,將這個 bit 記為 starvationFlag。
state & mutexStarving
第 31 個 bit 表示當(dāng)前是否有協(xié)程正在 (第一次) 自旋,將這個 bit 記為 wokenFlag,woken 的意思也就是醒著,代表它不在等待隊列上睡眠。
state & mutexWoken
第 32 個 bit 表示當(dāng)前鎖是否被鎖了 (感覺有點繞口哈哈) ,將這個 bit 記為 lockFlag。
state & mutexLocked
用一個圖來表示這些 bit
0 0 0 0 0 0 0 0 ... 0 0 0 0 | | | waiterCount starvationFlag wokenFlag lockFlag
sema
是一個信號量,它會被用來關(guān)聯(lián)一個等待隊列。
分別討論幾種 case 下,代碼的執(zhí)行情況。
Mutex 沒有被鎖住,第一個協(xié)程來拿鎖
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
在 Mutex 沒有被鎖住時,state 的值為 0,此時第一個協(xié)程來拿鎖時,由于 state 的值為 0,因此 CAS 操作會成功,CAS 操作之后的 state 的值變成 1 (lockFlag = 1) ,然后 return 掉,不會進(jìn)入到 m.lockSlow()
里面。
Mutex 僅被協(xié)程 A 鎖住,沒有其他協(xié)程搶鎖,協(xié)程 A 釋放鎖
func (m *Mutex) Unlock() { // ... // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
緊接上面,state 的值為 1,AddInt32(m.state,-1)
之后,state 的值變成了 0 (lockFlag = 0) ,new 的值為 0,然后就返回了。
Mutex 已經(jīng)被協(xié)程 A 鎖住,協(xié)程 B 來拿鎖
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
因為 state 的值不為 0,CompareAndSwapInt32 會返回 false,所以會進(jìn)入到 lockSlow() 里面
lockSlow()
首先看一下 lockSlow() 這個方法的全貌
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
第一步: doSpin (空轉(zhuǎn))
進(jìn)入 for 循環(huán)后,會執(zhí)行一個判斷
for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // ... }
runtime_canSpin(iter)
的作用是根據(jù) iter 的值判斷自否應(yīng)該自旋下去。 (這個方法的實現(xiàn)可以在后面看到)
最初的幾次判斷,由于 iter 的值為 0,runtime_canSpin(iter) 會返回 true。因此
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter)
這個判斷會一直通過,由于 old>>mutexWaiterShift = 0
(waiterCount = 0) ,不滿足第二個判斷的條件,因此不會執(zhí)行 CAS 操作和 awoke = true
。
接著就是執(zhí)行 runtime_doSpin()
了,runtime_doSpin()
會進(jìn)行一些空循環(huán),消耗了一下 CPU 時間,然后就通過 continue
進(jìn)入到下一次循環(huán)了。 (runtime_doSpin
具體實現(xiàn)也可以在后面看到)
看到看到,這段代碼不是用來搶鎖的,而是用來等鎖變成 unlock 狀態(tài)的,它會空轉(zhuǎn)一定的次數(shù),期待在空轉(zhuǎn)的過程中,鎖被其他的協(xié)程釋放。
runtime_doSpin()
// src/runtime/lock_sema.go const active_spin_cnt = 30 //go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
# /src/runtime/asm_amd64.s TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
procyield()
會循環(huán)執(zhí)行 PAUSE
指令。
runtime_canSpin()
runtime_canSpin() 的實現(xiàn)在 src/runtime/proc.go 里面,里面的判斷比較多,但是我們只需要關(guān)注 i >= active_spin
這一個判斷就行。
const active_spin = 4 // Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq or on other Ps. if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
一個小插曲
在利用斷點來 debug 時,發(fā)現(xiàn)沒辦法 watch sync_runtime_canSpin() 內(nèi)引用的一些全局變量,例如 active_spin
,ncpu
,sched.npidle
這些,所以我就大力出奇跡,強行修改源碼在里面聲明了幾個局部變量,這下可以通過 watch 局部變量來得知全局變量的值了 (機智如我哈哈) 。
func sync_runtime_canSpin(i int) bool { local_active_spin := active_spin local_ncpu := ncpu local_gomaxprocs := gomaxprocs npidle := sched.npidle nmspinning := sched.nmspinning if i >= local_active_spin || local_ncpu <= 1 ||local_gomaxprocs <= int32(npidle+nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
第二步: 根據(jù)舊狀態(tài)來計算新狀態(tài)
new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. // ... new &^= mutexWoken }
這一段代碼,是根據(jù) old state 來計算 new state,有 4 個操作
- set lockFlag:
new |= mutexLocked
- 增加 waiterCount:
new += 1 << mutexWaiterShift
- set starvationFlag:
new |= mutexStarving
- clear wokenFlag:
new &^= mutexWoken
由于在這里我們只討論 ”Mutex 已經(jīng)被協(xié)程 A 鎖住,協(xié)程 B 來拿鎖“ 這種情況,可以分為兩種 case
- case1: 在第一步自旋的過程中,鎖已經(jīng)被釋放了,此時 old state =
000000...000
(所有 bit 都為 0) ,經(jīng)過這四個操作的洗禮后,lockFlag 被設(shè)置成了 1。 - case2: 在第一步自旋結(jié)束后,鎖還沒有被釋放,即 old state 此時為
00000000...001
(僅 lockFlag 為 1),經(jīng)過這四個操作的洗禮后,waiterCounter = 1,lockFlag 也為 1。
第三步: 更新 state (搶鎖)
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // ... } else { old = m.state }
這一步會通過 CAS 操作將 mutex.state
更新為我們剛剛計算得到的 new state
。如果 CAS 成功,且 old 處于未上鎖的狀態(tài)時,就直接利用 break 退出循環(huán)返回了 (也就是上面的 case1) 。如果 CAS 失敗,將會更新 old state 的值,進(jìn)行下一次循環(huán),再重復(fù)一二三步;
如果是 case2 的話,情況會稍微復(fù)雜一點
if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... }
主要就是通過 runtime_SemacquireMutex()
把自己放進(jìn)了等待隊列里面,之后 runtime 不會再調(diào)度該協(xié)程,直到協(xié)程被喚醒。
關(guān)于 runtime_SemacquireMutex()
的實現(xiàn),我暫時就不追究下去了,再追究下去就沒完沒了啦。
Mutex 被協(xié)程 A 鎖住,協(xié)程 B 來搶鎖但失敗被放入等待隊列,此時協(xié)程 A 釋放鎖
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } }
緊接上回,最初 state 的值為 00000000000...0001001
(waiterCount = 1, lockFlag = 1)。執(zhí)行完 AddInt32(&m.state, -mutexLocked)
后,變成了 0000...001000
(waiterCount = 1) ,new
的值也為 0000...001000
,接著就進(jìn)入到 unlockSlow
里面了。
unlockSlow()
看看 unlockSlow()
的全貌
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } }
此時 old >> mutexWaiterShift = 0000...0001
≠ 0, 所以不會直接返回
old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state }
接著計算 new = 0000...1000
- 0000...1000
= 0000...0000
,waiterCount 由 1 變成了 0。之后進(jìn)行 CAS 操作,如果 CAS 成功,則從等待隊列中喚醒一個 goroutine。
Mutex 被協(xié)程 A 鎖住,協(xié)程 B 來搶鎖但失敗被放入等待隊列,此時協(xié)程 A 釋放鎖,協(xié)程 B 被喚醒
讓我們會視線切到 lockSlow
的后半截。
const starvationThresholdNs = 1e6 if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // ... iter = 0 }
當(dāng)協(xié)程 B 從 runtime_SemacquireMutex
處醒來后,會根據(jù)該協(xié)程的等待的時間來判斷是否饑餓了。這里我們先假設(shè)此時還沒有饑餓,后面會詳細(xì)討論饑餓時的情況。之后會將 iter
重置為 0,接著就進(jìn)行下一次的循環(huán)了,由于 iter
已經(jīng)被重置為 0 了,所以在下一次循環(huán)時,sync_runtime_doSpin(iter)
會返回 true
。
由于此時 state 已經(jīng)變成了 0 了,所以在下一次循環(huán)里可以暢通無阻的拿到鎖。
饑餓情況下的解鎖行為: starvationFlag 的作用
設(shè)想這樣一種情況:goroutine A 拿到鎖,goroutine B 搶鎖失敗,被放入等待隊列。goroutine A 釋放鎖,goroutine B 被喚醒,但是正當(dāng)它搶鎖時,鎖被新來的 goroutine C 搶走了... 連續(xù)好幾次,每當(dāng) goroutine B 要搶鎖時,鎖都被其他協(xié)程搶先一步拿走。直到某一次,goroutine B 再次被喚醒后執(zhí)行
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
它就進(jìn)入饑餓模式 (starvation mode) 啦!
// The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving }
之后通過 CAS 操作將饑餓標(biāo)志設(shè)置到了 mutex.state
里面,然后它就又被放到等待隊列中了。
atomic.CompareAndSwapInt32(&m.state, old, new)
Unlock()
視角切換到 Unlock() 這一邊
func (m *Mutex) Unlock() { // ... // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } func (m *Mutex) unlockSlow(new int32) { // ... if new&mutexStarving == 0 { // ... for { // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } // ... } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } }
在 unlockSlow()
中,此時 new&mutexStarving != 0
,所以會直接進(jìn)入到 else 分支內(nèi),調(diào)用 runtime_Semrelease()
方法,但要注意 else 分支內(nèi)runtime_Semrelease()
的參數(shù)和 if 分支的參數(shù)不一樣,在這里 runtime_Semrelease(&m.sema, true, 1)
起到的作用是喚醒了等待隊列中的第一個協(xié)程并立馬調(diào)度該協(xié)程 (runtime_Semrelease()
方法的詳解在后面 )。
同時正如注釋所說,在 Unlock()
中由于進(jìn)行了 atomic.AddInt32(&m.state, -mutexLocked)
操作,所以 mutex.state 的 lockFlag 是為 0 的,但是沒關(guān)系,starvationFlag 是為 1 的,所以會依舊被認(rèn)為是鎖住的狀態(tài)。
Lock()
func (m *Mutex) Lock() { // ... m.lockSlow() } func (m *Mutex) lockSlow() { // ... for { // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. // ... delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } // ... }
視角再次切換到 Lock()
這邊,饑餓的 goroutine 被喚醒并調(diào)度后,首先執(zhí)行 old = m.state
, 此時 old 的 starvationFlag = 1。
之后就正如注釋所說,它會嘗試修復(fù) mutex.state 的"不一致" (inconsistent) 狀態(tài)。
修復(fù)工作主要做了三件事情:
在 starvation mode 下的 Unlock() 沒有將 waitterCount - 1, 所以這里需要給 mutexWaiter 減 1
將 state 的 locked flag 置為 1
如果該 goroutine 沒有饑餓或者是等待隊列中的最后一個 goroutine 的話,清理 starvationFlag
這三件事情通過 atomic.AddInt32(&m.state, delta)
一步到位。
runtime_Semrelease()
// Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
handoff 就是傳球的意思,handoff 為 false 時,僅僅喚醒等待隊列中第一個協(xié)程,但是不會立馬調(diào)度該協(xié)程;當(dāng) handoff 為 true 時,會立馬調(diào)度被喚醒的協(xié)程,此外,當(dāng) handoff = true 時,被喚醒的協(xié)程會繼承當(dāng)前協(xié)程的時間片。具體例子,假設(shè)每個 goroutine 的時間片為 2ms,gorounte A 已經(jīng)執(zhí)行了 1ms,假設(shè)它通過 runtime_Semrelease(handoff = true) 喚醒了 goroutine B,則 goroutine B 剩余的時間片為 2 - 1 = 1ms。
饑餓模式下新來的 goroutine 的加鎖行為: starvationFlag 的作用
如果在饑餓模式下,有新的 goroutine 來請求鎖,它會執(zhí)行下面這些步驟
func (m *Mutex) lockSlow() { // ... old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // ... runtime_doSpin() } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { // .. runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... } else { // ... } } // ... }
由于 old&(mutexLocked|mutexStarving) != mutexLocked
,所以它不會自旋。
由于 old&mutexStarving != 0
,所以它不會 set lockFlag。
由于 old&(mutexLocked|mutexStarving) != 0
,所以它會 增加 waiterCount。
可以看到,它實際上就做了增加 waiterCount
這一個操作,之后通過 CAS 更新 state 的狀態(tài),更新完成之后就跑去等待隊列睡覺去了。
因此在饑餓狀態(tài)下,新的來爭搶鎖的 goroutine 是不會去搶鎖 (set lockFlag) 的,它們只會登記一下 (waiterCount + 1) ,然后乖乖加入到等待隊列里面。
當(dāng)有協(xié)程正在自旋時的解鎖行為: wokenFlag 的作用
wokenFlag 是在 lockSlow() 里面被設(shè)置的,wokenFlag 為 1 時,表示此時有協(xié)程正在進(jìn)行自旋。
func (m *Mutex) lockSlow() { starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } // ... if atomic.CompareAndSwapInt32(&m.state, old, new) { // ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) // ... awoke = true iter = 0 } // ... } // ... }
當(dāng)一個新來的協(xié)程 (從未被放到等待隊列中) 在第一次自旋時,wokenFlag 的設(shè)置邏輯為:
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true }
但是當(dāng)協(xié)程從等待隊列中被喚醒后自旋時,卻 lockSlow()
找不到設(shè)置 wokenFlag 的邏輯,為何?因為這段邏輯被放到了 unlockSlow
里面了。
視線切換到 unlockSlow()
那一邊
func (m *Mutex) unlockSlow(new int32) { // ... if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { // 當(dāng) mutexwokenFlag 被設(shè)置時,會直接 return // 不會去等待隊列喚醒 goroutine return } // Grab the right to wake someone. // 這個地方會設(shè)置 wokenFlag 哦 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // ... } }
可以看到,當(dāng)有協(xié)程正在自旋時 (wokenFlag = 1) ,不會從等待隊列喚醒協(xié)程,這樣就避免了等待隊列上的協(xié)程加入競爭,當(dāng)然,正在自旋中的協(xié)程之間彼此之間還是會競爭的;如果 wokenFlag = 0,則會從等待隊列中喚醒一個協(xié)程,在喚醒之前會將 wokenFlag 設(shè)置為 1,這樣協(xié)程被喚醒后就不用再去設(shè)置 wokenFlag 了,妙呀!
為什么當(dāng)有協(xié)程在自旋時,不要去等待隊列中喚醒協(xié)程呢?協(xié)程從被喚醒到被調(diào)度 (在 CPU 上面執(zhí)行) 是要花時間的,等真正自旋時 mutex 早就被搶走了。
協(xié)程從等待隊列被喚醒后如果還是沒有搶到鎖,會被放到隊列首部還是尾部?
但是是頭部,代碼如下:
// If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1)
復(fù)雜情景分析
基于上面的邏輯來分析一下復(fù)雜的邏輯吧!
假設(shè)有協(xié)程 g1,g2,g3,g4,g5,g6, 共同爭搶一把鎖 m
一開始 g1 拿到鎖
owner: g1 waitqueue: null
g2 開始搶鎖,沒有搶到,被放到等待隊列
owner: g1 waitqueue: g2
g1 釋放鎖,g2 從等待隊列中被喚醒
owner: null waitqueue: null
此時 g3 也開始搶鎖,g2 沒有搶過,又被放回等待隊列
owner: g3 waitqueue: g2
g4 開始搶鎖,沒有搶到,被放到等待隊列
owner: g3 waitqueue: g2, g4
g3 釋放鎖,g2 被喚醒
owner: null waitqueue: g4
此時 g5 開始搶鎖,g2 沒有搶過,又被放回等待隊列首部
owner: g5 waitqueue: g2, g4
g6 開始搶鎖,正在自旋中
owner: g5 waitqueue: g2, g4 wokenFlag: 1 spinning: g6
g5 釋放鎖,由于此時有協(xié)程正在自旋,因此不會去等待隊列中喚醒協(xié)程,鎖被 g6 輕松搶到
owner: g6 waitqueue: g2, g4 wokenFlag: 0 spinning: null
g6 釋放鎖,g2 被喚醒,此時 g7 開始搶鎖,g2 沒有搶過,又被放回等待隊列首部,但是 g2 由于太久沒有搶到鎖,進(jìn)入饑餓模式了
owner: g7 waitqueue: g2(饑餓), g4 starvationFlag: 1
g8 來搶鎖,由于處于饑餓狀態(tài),g8 會被直接放在等待隊列尾部
owner: g7 waitqueue: g2(饑餓), g4, g8 starvationFlag: 1
g7 釋放鎖,由于處于饑餓狀態(tài),會直接喚醒 g2 并調(diào)度它
owner: g2 waitqueue: g4, g8 starvationFlag: 1
g2 執(zhí)行完畢,釋放鎖,注意此刻依舊是饑餓狀態(tài),直接調(diào)度 g4,g4 蘇醒后,發(fā)現(xiàn)它自己沒有饑餓,于是 clear starvationFlag
owner: g4 waitqueue: g8 starvationFlag: 0
此時新來的 g8 可以正常加入到對鎖的爭搶中了,之后就是正常的加鎖解鎖邏輯了。
一點小瑕疵: 一種很邊緣的 starvation case
由于等待隊列中的協(xié)程只有當(dāng)被喚醒之后才會根據(jù)等待時間來判斷是否進(jìn)入 starvation mode,因此會存在一個協(xié)程在等待隊列中等待了很久,它實際上已經(jīng)饑餓了,但是一直沒被喚醒過,就沒機會 set starvationFlag,這就會導(dǎo)致饑餓現(xiàn)象的發(fā)生。
那么會存在等待隊列里的協(xié)程一直不被喚醒的情況么?
有的!在 unlockSlow()
時如果 wokenFlag = 1,那就不會去喚醒等待隊列中的線程。就會存在這樣一種情況,假設(shè)每次 Unlock()
時恰好有一個新來的協(xié)程在自旋,那等待隊列中的協(xié)程就會永遠(yuǎn)饑餓下去!
reference
Tail Latency Might Matter More Than You Think
到此這篇關(guān)于golang中sync.Mutex的實現(xiàn)方法的文章就介紹到這了,更多相關(guān)golang sync.Mutex 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Golang中創(chuàng)建error的方式總結(jié)與應(yīng)用場景
Golang中創(chuàng)建error的方式包括errors.New、fmt.Errorf、自定義實現(xiàn)了error接口的類型等,本文主要為大家介紹了這些方式的具體應(yīng)用場景,需要的可以參考一下2023-07-07Go語言fmt庫詳解與應(yīng)用實例(格式化輸入輸出功能)
fmt庫是Go語言中一個強大而靈活的庫,提供了豐富的格式化輸入輸出功能,通過本文的介紹和實例演示,相信你對fmt庫的使用有了更深的理解,感興趣的朋友一起看看吧2023-10-10golang實現(xiàn)sql結(jié)果集以json格式輸出的方法
這篇文章主要介紹了golang實現(xiàn)sql結(jié)果集以json格式輸出的方法,涉及Go語言針對sql結(jié)果集的遍歷、轉(zhuǎn)換及json格式相關(guān)操作技巧,需要的朋友可以參考下2017-03-03go語言實現(xiàn)一個簡單的http客戶端抓取遠(yuǎn)程url的方法
這篇文章主要介紹了go語言實現(xiàn)一個簡單的http客戶端抓取遠(yuǎn)程url的方法,實例分析了Go語言http操作技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-03-03