Golang?WaitGroup?底層原理及源碼解析
0 知識背景
在進入正文前,先對 WaitGroup
及其相關背景知識做個簡單的介紹,這里主要是 WaitGroup
的基本使用,以及系統(tǒng)信號量的基礎知識。對這些比較熟悉的小伙伴可以直接跳過這一節(jié)。
0.1 WaitGroup
WaitGroup
是 Golang 中最常見的并發(fā)控制技術之一,它的作用我們可以簡單類比為其他語言中多線程并發(fā)控制中的 join()
,實例代碼如下:
package main import ( "fmt" "sync" "time" ) func main() { fmt.Println("Main starts...") var wg sync.WaitGroup // 2 指的是下面有兩個協(xié)程需要等待 wg.Add(2) go waitFunc(&wg, 3) go waitFunc(&wg, 1) // 阻塞等待 wg.Wait() fmt.Println("Main ends...") } func waitFunc(wg *sync.WaitGroup, num int) { // 函數(shù)結(jié)束時告知 WaitGroup 自己已經(jīng)結(jié)束 defer wg.Done() time.Sleep(time.Duration(num) * time.Second) fmt.Printf("Hello World from %v\n", num) } // 結(jié)果輸出: Main starts... Hello World from 1 Hello World from 3 Main ends...
如果這里沒有 WaitGroup
,主協(xié)程(main 函數(shù))會直接跑到最后的 Main ends...
,而沒有中間兩個 goroutine 的輸出,加了 WaitGroup
后,main 就會在 wg.Wait()
處阻塞等待兩個協(xié)程都結(jié)束后才繼續(xù)執(zhí)行。
上面我們看到的 WaitGroup
的三個方法:Wait()
、Add(int)
和 Done()
也是 WaitGroup
對象僅有的三個方法。
0.2 信號量(Semaphore)
信號量(Semaphore)是一種用于實現(xiàn)多進程或多線程之間同步和互斥的機制,也是 WaitGroup
中所采用的技術。并且 WaitGroup
自身的同步原理,也與信號量很相似。
由于翻譯問題,不熟悉的小伙伴經(jīng)常將信號量(Semaphore)和信號(Signal)搞混,這倆實際上是兩個完全不同的東西。Semaphore 在英文中的本意是旗語,也就是航海領域的那個旗語,利用手旗或旗幟傳遞信號的溝通方式。在計算機領域,Semaphore,即信號量,在廣義上也可以理解為一種進程、線程間的通信方式,但它的主要作用,正如前面所說,是用于實現(xiàn)進程、線程間的同步和互斥。
信號量本質(zhì)上可以簡單理解為一個整型數(shù),主要包含兩種操作:P(Proberen,測試)操作和 V(Verhogen,增加)操作。其中,P 操作會嘗試獲取一個信號量,如果信號量的值大于 0,則將信號量的值減 1 并繼續(xù)執(zhí)行;否則,當前進程或線程就會被阻塞,直到有其他進程或線程釋放這個信號量為止。V 操作則是釋放一個信號量,將信號量的值加 1。
可以把信號量看作是一種類似鎖的東西,P 操作相當于獲取鎖,而 V 操作相當于釋放鎖。由于信號量是一種操作系統(tǒng)級別的機制,通常由內(nèi)核提供支持,因此我們不用擔心上述對信號量的操作本身會產(chǎn)生競態(tài)條件,相信內(nèi)核能搞定這種東西。
本文的重點不是信號量,因此不會過多展開關于信號量的技術細節(jié),有興趣的小伙伴可以查閱相關資料。
最后提一嘴技術之外的東西,Proberen 和 Verhogen 這倆單詞眼生吧?因為它們是荷蘭語,不是英語。為啥是荷蘭語嘞?因為發(fā)明信號量的人,是上古計算機大神,來自荷蘭的計算機先驅(qū) Edsger W. Dijkstra 先生。嗯,對,就是那個 Dijkstra。
1 WaitGroup 底層原理
聲明:本文所用源碼均基于 Go 1.20.3 版本,不同版本 Go 的 WaitGroup
源碼可能略有不同,但設計思想基本是一致的。
WaitGroup
相關源碼非常短,加上注釋和空行也只有 120 多行,它們?nèi)荚?src/sync/waitgroup.go
中。
1.1 定義
先來看 WaitGroup
的定義,這里我把源文件中的注釋都簡單翻譯了一下:
// WaitGroup 等待一組 Goroutine 完成。 // 主 Goroutine 調(diào)用 Add 方法設置要等待的 Goroutine 數(shù)量, // 然后每個 Goroutine 運行并在完成后調(diào)用 Done 方法。 // 同時,可以使用 Wait 方法阻塞,直到所有 Goroutine 完成。 // // WaitGroup 在第一次使用后不能被復制。 // // 根據(jù) Go 內(nèi)存模型的術語,Done 調(diào)用“同步于”任何它解除阻塞的 Wait 調(diào)用的返回。 type WaitGroup struct { noCopy noCopy state atomic.Uint64 // 高 32 位是計數(shù)器, 低 32 位是等待者數(shù)量(后文解釋)。 sema uint32 }
WaitGroup
類型是一個結(jié)構(gòu)體,它有三個私有成員,我們一個一個來看。
1.1.1 noCopy
首先是 noCopy
,這個東西是為了告訴編譯器,WaitGroup
結(jié)構(gòu)體對象不可復制,即 wg2 := wg
是非法的。之所以禁止復制,是為了防止可能發(fā)生的死鎖。但實際上如果我們對 WaitGroup
對象進行復制后,至少在 1.20 版本下,Go 的編譯器只是發(fā)出警告,沒有阻止編譯過程,我們依然可以編譯成功。警告的內(nèi)容如下:
assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
為什么編譯器沒有編譯失敗,我猜應該是 Go 官方想盡量減少編譯器對程序的干預,而更多地交給程序員自己去處理(此時 Rust 發(fā)出了一陣笑聲)??傊?,我們在使用 WaitGroup
的過程中,不要去復制它就對了,不然非常容易產(chǎn)生死鎖(其實結(jié)構(gòu)體注釋上也說了,WaitGroup 在第一次使用后不能被復制)。譬如我將文章開頭代碼中的 main 函數(shù)稍微改了改:
func main() { fmt.Println("Main starts...") var wg sync.WaitGroup // 2 指的是下面有兩個協(xié)程需要等待 wg.Add(1) wg2 := wg wg2.Add(1) go waitFunc(&wg, 3) go waitFunc(&wg2, 1) // 阻塞等待 wg.Wait() wg2.Wait() fmt.Println("Main ends...") } // 輸出結(jié)果 Main starts... Hello World from 1 Hello World from 3 fatal error: all goroutines are asleep - deadlock! goroutine 1 [semacquire]: sync.runtime_Semacquire(0xc000042060?) C:/Program Files/Go/src/runtime/sema.go:62 +0x27 sync.(*WaitGroup).Wait(0xe76b28?) C:/Program Files/Go/src/sync/waitgroup.go:116 +0x4b main.main() D:/Codes/Golang/waitgroup/main.go:23 +0x139 exit status 2
為什么會這樣?因為 wg 已經(jīng) Add(1)
了,這時我們復制了 wg 給 wg2,并且是個淺拷貝,意味著 wg2 內(nèi)實際上已經(jīng)是 Add(1)
后的狀態(tài)了(state 成員保存的狀態(tài),即它的值),此時我們再執(zhí)行 wg2.Add(1)
,其實相當于執(zhí)行了兩次 wg2.Add(1)
。而后面 waitFunc()
中對 wg2 只進行了一次 Done()
釋放操作,main 函數(shù)在 wg2.Wait()
時就陷入了無限等待,即 all goroutines are asleep
。等看了后面 Add()
和 Done()
的原理后,再回頭來看這段死鎖的代碼,會更加清晰。
那么這段代碼能既復制,又不死鎖嗎?當然可以,只需要把 wg2 := wg
提到 wg.Add(1)
前面即可。
1.1.2 state atomic.Uint64
state
是 WaitGroup
的核心,它是一個無符號的 64 位整型,并且用的是 atomic
包中的 Uint64
,所以 state
本身是線程安全的。至于 atomic.Uint64
為什么能保證線程安全,因為它使用了 CompareAndSwap(CAS)
操作,而這個操作依賴于 CPU 提供的原子性指令,是 CPU 級的原子操作。
state
的高 32 位是計數(shù)器(counter),低 32 位是等待者數(shù)量(waiters)。其中計數(shù)器其實就是 Add(int)
數(shù)量的總和,譬如 Add(1)
后再 Add(2)
,那么這個計數(shù)器就是 1 + 2 = 3;而等待數(shù)量就是現(xiàn)在有多少 goroutine 在執(zhí)行 Wait()
等待 WaitGroup
被釋放。
1.1.3 sema uint32
這玩意兒就是信號量,它的用法我們到后文結(jié)合代碼再講。
1.2 Add(delta int)
首先是 Add(delta int)
方法。WaitGroup
所有三個方法都沒有返回值,并且只有 Add
擁有參數(shù),整個設計可謂簡潔到了極點。
Add
方法的第一句代碼是:
if race.Enabled { if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() }
race.Enabled
是判斷當前程序是否開啟了競態(tài)條件檢查,這個檢查是在編譯時需要我們手動指定的:go build -race main.go
,默認情況下并不開啟,即 race.Enabled
在默認情況下就是 false
。這段代碼里如果程序開啟了競態(tài)條件檢查,會將其關閉,最后再重新打開。其他有關 race
的細節(jié)本文不再討論,這對我們理解 WaitGroup
也沒有太大影響,將其考慮進去反而會增加我們理解 WaitGroup
核心機制的復雜度,因此后續(xù)代碼中也會忽略所有與 race
相關的部分。
Add
方法整理后的代碼如下:
// Add 方法將 delta 值加上計數(shù)器,delta 可以為負數(shù)。如果計數(shù)器變?yōu)?0, // 則所有在 Wait 上阻塞的 Goroutine 都會被釋放。 // 如果計數(shù)器變?yōu)樨摂?shù),則 Add 方法會 panic。 // // 注意:當計數(shù)器為 0 時調(diào)用 delta 值為正數(shù)的 Add 方法必須在 Wait 方法之前執(zhí)行。 // 而 delta 值為負數(shù)或者 delta 值為正數(shù)但計數(shù)器大于 0 時,則可以在任何時間點執(zhí)行。 // 通常情況下,這意味著應該在創(chuàng)建 Goroutine 或其他等待事件的語句之前執(zhí)行 Add 方法。 // 如果一個 WaitGroup 用于等待多組獨立的事件, // 那么必須在所有先前的 Wait 調(diào)用返回之后再進行新的 Add 調(diào)用。 // 詳見 WaitGroup 示例代碼。 func (wg *WaitGroup) Add(delta int) { // 將 int32 的 delta 變成 unint64 后左移 32 位再與 state 累加。 // 相當于將 delta 與 state 的高 32 位累加。 state := wg.state.Add(uint64(delta) << 32) // 高 32 位,就是 counter,計數(shù)器 v := int32(state >> 32) // 低 32 位,就是 waiters,等待者數(shù)量 w := uint32(state) // 計數(shù)器為負數(shù)時直接 panic if v < 0 { panic("sync: negative WaitGroup counter") } // 當 Wait 和 Add 并發(fā)執(zhí)行時,會有概率觸發(fā)下面的 panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 如果計數(shù)器大于 0,或者沒有任何等待者,即沒有任何 goroutine 在 Wait(),那么就直接返回 if v > 0 || w == 0 { return } // 當 waiters > 0 時,這個 Goroutine 將計數(shù)器設置為 0。 // 現(xiàn)在不可能有對狀態(tài)的并發(fā)修改: // - Add 方法不能與 Wait 方法同時執(zhí)行, // - Wait 不會在看到計數(shù)器為 0 時增加等待者。 // 仍然需要進行簡單的健全性檢查來檢測 WaitGroup 的誤用情況。 if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 重置 state 為 0 wg.state.Store(0) // 喚醒所有等待者 for ; w != 0; w-- { // 使用信號量控制喚醒等待者 runtime_Semrelease(&wg.sema, false, 0) } }
這里我將原代碼中的注釋翻譯成了中文,并且自己在每句代碼前也都加了注釋。
一開始,方法將參數(shù) delta
變成 uint64 后左移 32 位,和 state
相加。因為 state
的高 32 位是這個 WaitGroup
的計數(shù)器,所以這里其實就是把計數(shù)器進行了累加操作:
state := wg.state.Add(uint64(delta) << 32)
接著,程序會分別取出已經(jīng)累加后的計數(shù)器 v
,和當前的等待者數(shù)量 w
:
v := int32(state >> 32) w := uint32(state)
然后是幾個判斷:
// 計數(shù)器為負數(shù)時直接 panic if v < 0 { panic("sync: negative WaitGroup counter") } // 當 Wait 和 Add 并發(fā)執(zhí)行時,會有概率觸發(fā)下面的 panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 如果計數(shù)器大于 0,或者沒有任何等待者, // 即沒有任何 goroutine 在 Wait(),那么就直接返回 if v > 0 || w == 0 { return }
注釋已經(jīng)比較清晰了,這里主要展開解釋一下第二個 if
:if w != 0 && delta > 0 && v == int32(delta)
。
w != 0
意味著當前有 goroutine 在Wait()
;delta > 0
意味著Add()
傳入的是正整數(shù),也就是正常調(diào)用;v == int32(delta)
意味著累加后的計數(shù)器等于傳入的delta
,這里最容易想到的符合這個等式的場景是:原計數(shù)器等于 0 時,也就是 wg 第一次使用,或前面的Wait()
已經(jīng)全部結(jié)束時。
上述三個條件看上去有些沖突:w != 0
表示存在 Wait()
,而 v == int32(delta)
按照分析應該不存在 Wait()
。再往下分析,其實應該是 v
在獲取的時候不存在 Wait()
,而 w
在獲取的時候存在 Wait()
。會有這種可能嗎?會!就是并發(fā)的時候:當前 goroutine 獲取了 v
,然后另一個 goroutine 立刻進行了 Wait()
,接著本 goroutine 又獲取了 w
,過程如下:
我們可以用下面這段代碼來復現(xiàn)這個 panic
:
func main() { var wg sync.WaitGroup // 并發(fā)問題不易復現(xiàn),所以循環(huán)多次 for i := 0; i < 100000; i++ { go addDoneFunc(&wg) go waitFunc(&wg) } wg.Wait() } func addDoneFunc(wg *sync.WaitGroup) { wg.Add(1) wg.Done() } func waitFunc(wg *sync.WaitGroup) { wg.Wait() } // 輸出結(jié)果 panic: sync: WaitGroup misuse: Add called concurrently with Wait goroutine 71350 [running]: sync.(*WaitGroup).Add(0x0?, 0xbf8aa5?) C:/Program Files/Go/src/sync/waitgroup.go:65 +0xce main.addDoneFunc(0xc1cf66?, 0x0?) D:/Codes/Golang/waitgroup/main.go:19 +0x1e created by main.main D:/Codes/Golang/waitgroup/main.go:11 +0x8f exit status 2
這段代碼可能要多運行幾次才會看到上述效果,因為這種并發(fā)操作在整個 WaitGroup
的生命周期中會造成好幾種 panic
,包括 Wait()
方法中的。
因此,我們在使用 WaitGroup
的時候應當注意一點:不要在被調(diào)用的 goroutine 內(nèi)部使用 Add
,而應當在外面使用,也就是:
// 正確 wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() }(&wg) wg.Wait() // 錯誤 go func(wg *sync.WaitGroup) { wg.Add(1) defer wg.Done() }(&wg) wg.Wait()
從而避免并發(fā)導致的異常。
上面三個 if
都結(jié)束后,會再次對 state
的一致性進行判斷,防止并發(fā)異常:
if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }
這里 state.Load()
包括后面會出現(xiàn)的 Store()
都是 atomic.Uint64
的原子操作。
根據(jù)前面代碼的邏輯,當程序運行到這里時,計數(shù)器一定為 0,而等待者則可能 >= 0,于是代碼會執(zhí)行一次 wg.state.Store(0)
將 state
設為 0,接著執(zhí)行通知等待者結(jié)束等待的操作:
wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
好了,這里又是讓人迷惑的地方,我第一次看到這段代碼時產(chǎn)生了下面幾個疑問:
- 為什么
Add
方法會有計數(shù)器為 0 的分支邏輯?計數(shù)器不是累加的嗎? - 為什么要在
Add
中通知等待者結(jié)束,不應該是Done
方法嗎? - 那個
runtime_Semrelease(&wg.sema, false, 0)
為什么需要循環(huán)w
次?
一個一個來看。
- 為什么
Add
方法會有計數(shù)器為 0 的分支邏輯?
首先,按照前面代碼的邏輯,只有計數(shù)器 v
為 0 的時候,代碼才會走到最后兩句,而之所以為 0,是因為 Add(delta int)
的參數(shù) delta
是一個 int
,也就是說,delta
可以為負數(shù)!那什么時候會傳入負數(shù)進來呢?Done
的時候。我們?nèi)タ?Done()
的代碼,會發(fā)現(xiàn)它非常簡單:
// Done 給 WaitGroup 的計數(shù)器減 1。 func (wg *WaitGroup) Done() { wg.Add(-1) }
所以,Done
操作或是我們手動給 Add
傳入負數(shù)時,就會進入到 Add
最后幾行邏輯,而 Done
本身也意味著當前 goroutine 的 WaitGroup
結(jié)束,需要同步給外部的 Wait
讓它不再阻塞。
- 為什么要在
Add
中通知等待者結(jié)束,不應該是Done
方法嗎?
嗯,這個問題其實在上一個問題已經(jīng)一起解決了,因為 Done()
實際上調(diào)用了 Add(-1)
。
- 那個
runtime_Semrelease(&wg.sema, false, 0)
為什么需要循環(huán)w
次?
這個函數(shù)按照字面意思,就是釋放信號量。源碼在 src/sync/runtime.go
中,函數(shù)聲明如下:
// Semrelease 函數(shù)用于原子地增加 *s 的值, // 并在有等待 Semacquire 函數(shù)被阻塞的協(xié)程時通知它們繼續(xù)執(zhí)行。 // 它旨在作為同步庫使用的簡單喚醒基元,不應直接使用。 // 如果 handoff 參數(shù)為 true,則將 count 直接傳遞給第一個等待者。 // skipframes 參數(shù)表示在跟蹤時要忽略的幀數(shù),從 runtime_Semrelease 的調(diào)用者開始計數(shù)。 func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
第一個參數(shù)就是信號量的值本身,釋放時會 +1。
第二個參數(shù) handoff
在我查閱了資料后,根據(jù)我的理解,應該是:當 handoff
為 false
時,僅正常喚醒其他等待的協(xié)程,但是不會立即調(diào)度被喚醒的協(xié)程;而當 handoff
為 true
時,會立刻調(diào)度被喚醒的協(xié)程。
第三個參數(shù) skipframes
,看上去應當也和調(diào)度有關,但具體含義我不太確定,這里就不猜了(水平有限,見諒哈)。
按照信號量本身的機制,這里釋放時會 +1,同理還存在一個信號量獲取函數(shù) runtime_Semacquire(s *uint32)
會在信號量 > 0 時將信號量 -1,否則等待,它會在 Wait()
中被調(diào)用。這也是 runtime_Semrelease
需要循環(huán) w
次的原因:因為那 w
個 Wait()
中會調(diào)用 runtime_Semacquire
并不斷將信號量 -1,也就是減了 w
次,所以兩個地方需要對沖一下嘛。
信號量和 WaitGroup
的機制很像,但計數(shù)器又是反的,所以這里再多嘴補充幾句:
信號量獲取時(runtime_Semacquire
),其實就是在阻塞等待,P(Proberen,測試)操作,如果此時信號量 > 0,則獲取成功,并將信號量 -1,否則繼續(xù)等待;
信號量釋放時(runtime_Semrelease
),會把信號量 +1,也就是 V(Verhogen,增加)操作。
1.2 Done()
Done()
方法我們在上面已經(jīng)看到過了:
// Done 給 WaitGroup 的計數(shù)器減 1。 func (wg *WaitGroup) Done() { wg.Add(-1) }
1.3 Wait()
同樣的,這里我會把與 race
相關的代碼都刪掉:
// Wait 會阻塞,直到計數(shù)器為 0。 func (wg *WaitGroup) Wait() { for { state := wg.state.Load() v := int32(state >> 32) // 計數(shù)器 w := uint32(state) // 等待者數(shù)量 if v == 0 { // 計數(shù)器為 0,直接返回。 return } // 增加等待者數(shù)量 if wg.state.CompareAndSwap(state, state+1) { // 獲取信號量 runtime_Semacquire(&wg.sema) // 這里依然是為了防止并發(fā)問題 if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
比 Add
簡單多了,而且有了前面 Add
的長篇大論為基礎,Wait
的代碼看上去一目了然。
當計數(shù)器為 0,即沒有任何 goroutine 調(diào)用 Add
時,直接調(diào)用 Wait
,沒有任何意義,因此直接返回,也不操作信號量。
最后 Wait
也有一個防止并發(fā)問題的判斷,而這個 panic 同樣可以用前面 Add
中的那段并發(fā)問題代碼復現(xiàn),大家可以試試。
Wait
中唯一不同的是,它用了一個無限循環(huán) for{}
,為什么?這是因為,wg.state.CompareAndSwap(state, state+1)
這個原子操作因為并發(fā)等原因有可能失敗,此時就需要重新獲取 state
,把整個過程再走一遍。而一旦操作成功,Wait
會在 runtime_Semacquire(&wg.sema)
處阻塞,直到 Done
操作將計數(shù)器減為 0,Add
中釋放了信號量。
2 結(jié)語
至此,WaitGroup
的源碼已全部解析完畢。作為 Golang 中最重要的并發(fā)組件之一,WaitGroup
的源碼居然只有這么寥寥百行代碼,倒是給我們理解它的原理降低了不少難度。
到此這篇關于Golang WaitGroup 底層原理及源碼詳解的文章就介紹到這了,更多相關Golang WaitGroup 原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Golang常見錯誤之值拷貝和for循環(huán)中的單一變量詳解
這篇文章主要給大家介紹了關于Golang常見錯誤之值拷貝和for循環(huán)中單一變量的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧。2017-11-11golang實現(xiàn)基于channel的通用連接池詳解
這篇文章主要給大家介紹了關于golang實現(xiàn)基于channel的通用連接池的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧。2018-02-02用go語言實現(xiàn)WebAssembly數(shù)據(jù)加密的示例講解
在Web開發(fā)中,有時候為了提升安全性需要對數(shù)據(jù)進行加密,由于js代碼相對比較易讀,直接在js中做加密安全性較低,而WebAssembly代碼不如js易讀,本文提供一個用go語言實現(xiàn)的WebAssembly數(shù)據(jù)加密示例,需要的朋友可以參考下2024-03-03