Golang Mutex互斥鎖深入理解
引言
Golang的并發(fā)編程令人著迷,使用輕量的協(xié)程、基于CSP的channel、簡單的go func()就可以開始并發(fā)編程,在并發(fā)編程中,往往離不開鎖的概念。
本文介紹了常用的同步原語 sync.Mutex
,同時從源碼剖析它的結構與實現(xiàn)原理,最后簡單介紹了mutex在日常使用中可能遇到的問題,希望大家讀有所獲。
Mutex結構
Mutex運行時數(shù)據(jù)結構位于sync/mutex.go
包
type Mutex struct { state int32 sema uint32 }
其中state
表示當前互斥鎖的狀態(tài),sema
表示 控制鎖狀態(tài)的信號量.
互斥鎖的狀態(tài)定義在常量中:
const ( mutexLocked = 1 << iota // 1 ,處于鎖定狀態(tài); 2^0 mutexWoken // 2 ;從正常模式被從喚醒; 2^1 mutexStarving // 4 ;處于饑餓狀態(tài); 2^2 mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個數(shù)需要左移的位數(shù): 1 << mutexWaiterShift starvationThresholdNs = 1e6 // 鎖進入饑餓狀態(tài)的等待時間 )
0即其他狀態(tài)。
sema
是一個組合,低三位分別表示鎖的三種狀態(tài),高29位表示正在等待互斥鎖釋放的gorountine個數(shù),和Java表示線程池狀態(tài)那部分有點類似
一個mutex對象僅占用8個字節(jié),讓人不禁感嘆其設計的巧妙
饑餓模式和正常模式
正常模式
在正常模式下,等待的協(xié)程會按照先進先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新創(chuàng)建的goroutine競爭時,大概率無法獲得鎖。
饑餓模式
為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了饑餓模式,保證了Mutex的公平性
在饑餓模式中,互斥鎖會直接交給等待隊列最前面的goroutine。新的goroutine 在該狀態(tài)下不能獲取鎖、也不會進入自旋狀態(tài),它們只會在隊列的末尾等待。
狀態(tài)的切換
在正常模式下,一旦Goroutine超過1ms沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式
如果一個goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會切換回正常模式。
加鎖和解鎖
加鎖
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // 原注釋: Slow path (outlined so that the fast path can be inlined) // 將 m.lockSlow() }
可以看到,當前互斥鎖的狀態(tài)為0時,嘗試將當前鎖狀態(tài)設置為更新鎖定狀態(tài),且這些操作是原子的。
若當前狀態(tài)不為0,則進入lockSlow
方法
先定義了幾個參數(shù)
var waitStartTime int64 starving := false // awoke := false iter := 0 old := m.state
隨后進入一個很大的for循環(huán),讓我們來逐步分析
自旋
for { // 1 && 2 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 3. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
old&(mutexLocked|mutexStarving) == mutexLocked
當且僅當當前鎖狀態(tài)為mutexLocked時,表達式為true
runtime_canSpin(iter)
是否滿足自旋條件
- 運行在擁有多個CPU的機器上;
- 當前Goroutine為了獲取該鎖進入自旋的次數(shù)小于四次;
- 當前機器上至少存在一個正在運行的處理器 P,并且處理的運行隊列為空;
如果當前狀態(tài)下自旋是合理的,將awoke
置為true,同時設置鎖狀態(tài)為mutexWoken
,進入自旋邏輯
runtime_doSpin()
會執(zhí)行30次PAUSE
指令,并且僅占用CPU資源 代碼位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
計算鎖的新狀態(tài)
停止了自旋后,
new := old // 1. if old&mutexStarving == 0 { new |= mutexLocked } // 2. if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 3 && 4. if starving && old&mutexLocked != 0 { new |= mutexStarving } // 5. if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
old&mutexStarving == 0
表明原來不是饑餓模式。如果是饑餓模式的話,其他goroutine不會執(zhí)行接下來的代碼,直接進入等待隊列隊尾- 如果原來是
mutexLocked
或者mutexStarving
模式,waiterCounts數(shù)加一 - 如果被標記為饑餓狀態(tài),且鎖狀態(tài)為
mutexLocked
的話,設置鎖的新狀態(tài)為饑餓狀態(tài)。 - 被標記為饑餓狀態(tài)的前提是
被喚醒過且搶鎖失敗
- 計算新狀態(tài)
更新鎖狀態(tài)
// 1. if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 2. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 3. runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 4. starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 5. if old&mutexStarving != 0 { / if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } }
- 嘗試將鎖狀態(tài)設置為new 。這里設置成功不代表上鎖成功,有可能new不為
mutexLocked
或者是waiterCount數(shù)量的改變 waitStartTime
不為0 說明當前goroutine已經(jīng)等待過了,將當前goroutine放到等待隊列的隊頭- 走到這里,會調(diào)用
runtime_SemacquireMutex
方法使當前協(xié)程阻塞,runtime_SemacquireMutex
方法中會不斷嘗試獲得鎖,并會陷入休眠 等待信號量釋放。 - 當前協(xié)程可以獲得信號量,從
runtime_SemacquireMutex
方法中返回。此時協(xié)程會去更新starving
標志位:如果當前starving
標志位為true或者等待時間超過starvationThresholdNs
,將starving
置為true
之后會按照饑餓模式與正常模式,走不同的邏輯
- - 在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán);
- - 在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出;
解鎖
func (m *Mutex) Unlock() { // 1. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 2. m.unlockSlow(new) } }
- 將鎖狀態(tài)的值增加 -mutexLocked 。如果新狀態(tài)不等于0,進入
unlockSlow
方法
func (m *Mutex) unlockSlow(new int32) { // 1. if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // 2. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 2.1. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 2.2. runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 3. runtime_Semrelease(&m.sema, true, 1) } }
1.new+mutexLocked
代表將鎖置為1,如果兩個狀態(tài)& 不為0,則說明重復解鎖.如果重復解鎖則拋出panic
2. 如果等待者數(shù)量等于0,或者鎖的狀態(tài)已經(jīng)變?yōu)閙utexWoken、mutexStarving、mutexStarving,則直接返回
- 將waiterCount數(shù)量-1,嘗試選擇一個goroutine喚醒
- 嘗試更新鎖狀態(tài),如果更新鎖狀態(tài)成功,則喚醒隊尾的一個gorountine
3. 如果不滿足 2
的判斷條件,則進入饑餓模式,同時交出鎖的使用權
可能遇到的問題
鎖拷貝
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock()
此時mu2
能夠正常解鎖,那么我們再試試解鎖mu1
呢
mu1 := &sync.Mutex{} mu1.Lock() mu2 := mu1 mu2.Unlock() mu1.Unlock()
可以看到發(fā)生了error
panic導致沒有unlock
當lock()之后,可能由于代碼問題導致程序發(fā)生了panic,那么mutex無法被及時unlock(),由于其他協(xié)程還在等待鎖,此時可能觸發(fā)死鎖
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個協(xié)程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { log.Println("recover") } wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ safeSlice.lock.Unlock() }() } wg.Wait() log.Println(len(safeSlice.s)) }
修改:
func TestWithLock() { nums := 100 wg := &sync.WaitGroup{} safeSlice := SafeSlice{ s: []int{}, lock: new(sync.RWMutex), } i := 0 for idx := 0; idx < nums; idx++ { // 并行nums個協(xié)程做append wg.Add(1) go func() { defer func() { if r := recover(); r != nil { } safeSlice.lock.Unlock() wg.Done() }() safeSlice.lock.Lock() safeSlice.s = append(safeSlice.s, i) if i == 98{ panic("123") } i++ }() } wg.Wait() log.Println(len(safeSlice.s)) }
以上就是Golang Mutex互斥鎖深入理解的詳細內(nèi)容,更多關于Golang Mutex互斥鎖的資料請關注腳本之家其它相關文章!
相關文章
golang將切片或數(shù)組根據(jù)某個字段進行分組操作
這篇文章主要介紹了golang將切片或數(shù)組根據(jù)某個字段進行分組操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12