Golang Mutex互斥鎖深入理解
引言
Golang的并發(fā)編程令人著迷,使用輕量的協(xié)程、基于CSP的channel、簡單的go func()就可以開始并發(fā)編程,在并發(fā)編程中,往往離不開鎖的概念。
本文介紹了常用的同步原語 sync.Mutex,同時從源碼剖析它的結(jié)構(gòu)與實現(xiàn)原理,最后簡單介紹了mutex在日常使用中可能遇到的問題,希望大家讀有所獲。
Mutex結(jié)構(gòu)
Mutex運行時數(shù)據(jù)結(jié)構(gòu)位于sync/mutex.go包
type Mutex struct {
state int32
sema uint32
}
其中state表示當前互斥鎖的狀態(tài),sema表示 控制鎖狀態(tài)的信號量.
互斥鎖的狀態(tài)定義在常量中:
const ( mutexLocked = 1 << iota // 1 ,處于鎖定狀態(tài); 2^0 mutexWoken // 2 ;從正常模式被從喚醒; 2^1 mutexStarving // 4 ;處于饑餓狀態(tài); 2^2 mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個數(shù)需要左移的位數(shù): 1 << mutexWaiterShift starvationThresholdNs = 1e6 // 鎖進入饑餓狀態(tài)的等待時間 )
0即其他狀態(tài)。
sema是一個組合,低三位分別表示鎖的三種狀態(tài),高29位表示正在等待互斥鎖釋放的gorountine個數(shù),和Java表示線程池狀態(tài)那部分有點類似

一個mutex對象僅占用8個字節(jié),讓人不禁感嘆其設計的巧妙

饑餓模式和正常模式
正常模式
在正常模式下,等待的協(xié)程會按照先進先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新創(chuàng)建的goroutine競爭時,大概率無法獲得鎖。
饑餓模式
為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了饑餓模式,保證了Mutex的公平性
在饑餓模式中,互斥鎖會直接交給等待隊列最前面的goroutine。新的goroutine 在該狀態(tài)下不能獲取鎖、也不會進入自旋狀態(tài),它們只會在隊列的末尾等待。
狀態(tài)的切換
在正常模式下,一旦Goroutine超過1ms沒有獲取到鎖,它就會將當前互斥鎖切換饑餓模式
如果一個goroutine 獲得了互斥鎖并且它在隊列的末尾或者它等待的時間少于 1ms,那么當前的互斥鎖就會切換回正常模式。
加鎖和解鎖
加鎖
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 原注釋: Slow path (outlined so that the fast path can be inlined)
// 將
m.lockSlow()
}
可以看到,當前互斥鎖的狀態(tài)為0時,嘗試將當前鎖狀態(tài)設置為更新鎖定狀態(tài),且這些操作是原子的。
若當前狀態(tài)不為0,則進入lockSlow方法
先定義了幾個參數(shù)
var waitStartTime int64 starving := false // awoke := false iter := 0 old := m.state
隨后進入一個很大的for循環(huán),讓我們來逐步分析
自旋
for {
// 1 && 2
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 3.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
old&(mutexLocked|mutexStarving) == mutexLocked
當且僅當當前鎖狀態(tài)為mutexLocked時,表達式為true
runtime_canSpin(iter) 是否滿足自旋條件
- 運行在擁有多個CPU的機器上;
- 當前Goroutine為了獲取該鎖進入自旋的次數(shù)小于四次;
- 當前機器上至少存在一個正在運行的處理器 P,并且處理的運行隊列為空;
如果當前狀態(tài)下自旋是合理的,將awoke置為true,同時設置鎖狀態(tài)為mutexWoken,進入自旋邏輯
runtime_doSpin()會執(zhí)行30次PAUSE指令,并且僅占用CPU資源 代碼位于:runtime\asm_amd64.s +567
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
計算鎖的新狀態(tài)
停止了自旋后,
new := old
// 1.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 3 && 4.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 5.
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
old&mutexStarving == 0表明原來不是饑餓模式。如果是饑餓模式的話,其他goroutine不會執(zhí)行接下來的代碼,直接進入等待隊列隊尾- 如果原來是
mutexLocked或者mutexStarving模式,waiterCounts數(shù)加一 - 如果被標記為饑餓狀態(tài),且鎖狀態(tài)為
mutexLocked的話,設置鎖的新狀態(tài)為饑餓狀態(tài)。 - 被標記為饑餓狀態(tài)的前提是
被喚醒過且搶鎖失敗 - 計算新狀態(tài)
更新鎖狀態(tài)
// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 3.
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 4.
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 5.
if old&mutexStarving != 0 {
/
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
- 嘗試將鎖狀態(tài)設置為new 。這里設置成功不代表上鎖成功,有可能new不為
mutexLocked或者是waiterCount數(shù)量的改變 waitStartTime不為0 說明當前goroutine已經(jīng)等待過了,將當前goroutine放到等待隊列的隊頭- 走到這里,會調(diào)用
runtime_SemacquireMutex方法使當前協(xié)程阻塞,runtime_SemacquireMutex方法中會不斷嘗試獲得鎖,并會陷入休眠 等待信號量釋放。 - 當前協(xié)程可以獲得信號量,從
runtime_SemacquireMutex方法中返回。此時協(xié)程會去更新starving標志位:如果當前starving標志位為true或者等待時間超過starvationThresholdNs,將starving置為true
之后會按照饑餓模式與正常模式,走不同的邏輯
- - 在正常模式下,這段代碼會設置喚醒和饑餓標記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán);
- - 在饑餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待隊列中只存在當前 Goroutine,互斥鎖還會從饑餓模式中退出;
解鎖

func (m *Mutex) Unlock() {
// 1.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 2.
m.unlockSlow(new)
}
}
- 將鎖狀態(tài)的值增加 -mutexLocked 。如果新狀態(tài)不等于0,進入
unlockSlow方法
func (m *Mutex) unlockSlow(new int32) {
// 1.
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 2.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 2.1.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 2.2.
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 3.
runtime_Semrelease(&m.sema, true, 1)
}
}
1.new+mutexLocked代表將鎖置為1,如果兩個狀態(tài)& 不為0,則說明重復解鎖.如果重復解鎖則拋出panic
2. 如果等待者數(shù)量等于0,或者鎖的狀態(tài)已經(jīng)變?yōu)閙utexWoken、mutexStarving、mutexStarving,則直接返回
- 將waiterCount數(shù)量-1,嘗試選擇一個goroutine喚醒
- 嘗試更新鎖狀態(tài),如果更新鎖狀態(tài)成功,則喚醒隊尾的一個gorountine
3. 如果不滿足 2的判斷條件,則進入饑餓模式,同時交出鎖的使用權(quán)
可能遇到的問題
鎖拷貝
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
此時mu2能夠正常解鎖,那么我們再試試解鎖mu1呢
mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()

可以看到發(fā)生了error
panic導致沒有unlock
當lock()之后,可能由于代碼問題導致程序發(fā)生了panic,那么mutex無法被及時unlock(),由于其他協(xié)程還在等待鎖,此時可能觸發(fā)死鎖
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums個協(xié)程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
log.Println("recover")
}
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
safeSlice.lock.Unlock()
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}

修改:
func TestWithLock() {
nums := 100
wg := &sync.WaitGroup{}
safeSlice := SafeSlice{
s: []int{},
lock: new(sync.RWMutex),
}
i := 0
for idx := 0; idx < nums; idx++ { // 并行nums個協(xié)程做append
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
}
safeSlice.lock.Unlock()
wg.Done()
}()
safeSlice.lock.Lock()
safeSlice.s = append(safeSlice.s, i)
if i == 98{
panic("123")
}
i++
}()
}
wg.Wait()
log.Println(len(safeSlice.s))
}以上就是Golang Mutex互斥鎖深入理解的詳細內(nèi)容,更多關(guān)于Golang Mutex互斥鎖的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang將切片或數(shù)組根據(jù)某個字段進行分組操作
這篇文章主要介紹了golang將切片或數(shù)組根據(jù)某個字段進行分組操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
使用Go語言構(gòu)建高效的二叉搜索樹聯(lián)系簿
樹是一種重要的數(shù)據(jù)結(jié)構(gòu),而二叉搜索樹(BST)則是樹的一種常見形式,在本文中,我們將學習如何構(gòu)建一個高效的二叉搜索樹聯(lián)系簿,感興趣的可以了解下2024-01-01
Golang中結(jié)構(gòu)體映射mapstructure庫深入詳解
mapstructure用于將通用的map[string]interface{}解碼到對應的 Go 結(jié)構(gòu)體中,或者執(zhí)行相反的操作。很多時候,解析來自多種源頭的數(shù)據(jù)流時,我們一般事先并不知道他們對應的具體類型。只有讀取到一些字段之后才能做出判斷2023-01-01

