Go語言并發(fā)控制之sync.WaitGroup使用詳解
前段時間我在《Go 并發(fā)控制:errgroup 詳解》一文中講解了 errgroup
的用法和源碼,通過源碼我們知道 errgroup
內(nèi)部是使用 sync.WaitGroup
實現(xiàn)的,那么本文就更進(jìn)一步,來探索下 sync.WaitGroup
源碼是如何實現(xiàn)的。
使用示例
sync.WaitGroup
可以用來阻塞等待一組并發(fā)任務(wù)(goroutine)的完成,使用示例如下:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導(dǎo)致任務(wù)失敗 } var wg sync.WaitGroup errs := make([]error, len(urls)) // 使用 slice 收集錯誤 for i, url := range urls { wg.Add(1) go func() { defer wg.Done() resp, err := http.Get(url) if err != nil { errs[i] = fmt.Errorf("failed to fetch %s: %v", url, err) return } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) }() } wg.Wait() // 處理所有錯誤 for i, err := range errs { if err != nil { fmt.Printf("fetch url %s error: %s\n", urls[i], err) } } }
示例中,我們使用 sync.WaitGroup
來啟動 3 個 goroutine 并發(fā)訪問 3 個不同的 URL
,并在成功時打印響應(yīng)狀態(tài)碼,或失敗時記錄錯誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
fetch url http://www.somestupidname.com/ error: failed to fetch http://www.somestupidname.com/: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們得到了兩個成功的響應(yīng),并記錄了一條錯誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup
最典型的慣用法:
var wg sync.WaitGroup for ... { wg.Add(1) go func() { defer wg.Done() // do something }() } wg.Wait()
sync.WaitGroup
零值可用,它會在內(nèi)部維護(hù)一個計數(shù)器,wg.Add(1)
會將 sync.WaitGroup
計數(shù)器的值加 1,表示增加一個 goroutine 計數(shù);wg.Done()
則將計數(shù)器的值減 1,表示一個 goroutine 任務(wù)已經(jīng)完成;wg.Wait()
會阻塞調(diào)用者所在的 goroutine,直到計數(shù)器的值為 0。
源碼解讀
本文以 Go 1.23.0 版本源碼為基礎(chǔ)進(jìn)行講解。
WaitGroup 結(jié)構(gòu)體
首先 sync.WaitGroup
定義如下:
github.com/golang/go/blob/go1.23.0/src/sync/waitgroup.go
// WaitGroup 結(jié)構(gòu)體 type WaitGroup struct { noCopy noCopy // 避免復(fù)制 state atomic.Uint64 // 高 32 位是計數(shù)器(counter)的值,低 32 位是等待者(waiter)的數(shù)量 sema uint32 // 信號量,用于 阻塞/喚醒 waiter }
sync.WaitGroup
是一個結(jié)構(gòu)體,所以這也是其零值可用的原因。
這個結(jié)構(gòu)體包含 3 個字段:
noCopy
字段的類型也叫noCopy
,這個字段用于標(biāo)識sync.WaitGroup
結(jié)構(gòu)體不可被復(fù)制,vet 工具能夠識別它。這個字段的具體細(xì)節(jié)我們暫且不必深究,它不是sync.WaitGroup
的核心功能,在文章最后再來解釋它。state
字段是一個原子類型atomic.Uint64
,所以對state
字段的修改能夠保證原子性。它比較有意思,sync.WaitGroup
結(jié)構(gòu)體使用這一個字段來表示兩個“變量”值,高 32 位是計數(shù)器(counter
)的值,低 32 位是等待者(waiter
)的數(shù)量。我們調(diào)用wg.Add(1)
或wg.Done()
時操作的就是計數(shù)器counter
;調(diào)用wg.Wait()
時等待者waiter
數(shù)量就會加 1。sema
是一個信號量,用于阻塞/喚醒waiter
,即調(diào)用wg.Wait()
時的阻塞和喚醒都依賴這個信號量。
sync.WaitGroup
結(jié)構(gòu)體只有 3 個方法:Add
、Done
和 Wait
。
Done 方法
我們先來看 Done
方法的源碼實現(xiàn):
// Done 將計數(shù)器(counter)值減 1 func (wg *WaitGroup) Done() { wg.Add(-1) }
調(diào)用 Done
方法可以讓 counter
值減 1??梢园l(fā)現(xiàn),調(diào)用 wg.Done()
方法實際上等價于調(diào)用 wg.Add(-1)
,所以我們的重點還是要關(guān)注 Add
方法。
Add 方法
Add
方法源碼實現(xiàn)如下:
// Add 為計數(shù)器(counter)的值增加 delta(delta 可能為負(fù)數(shù)) func (wg *WaitGroup) Add(delta int) { state := wg.state.Add(uint64(delta) << 32) // delta 左移 32 位后與 state 相加,即為 counter 值加上 delta v := int32(state >> 32) // state 右移 32 位得到 counter 的值 w := uint32(state) // state 轉(zhuǎn)成 uint32 拿到低 32 位的值,得到 waiter 的值 // 如果 counter 值為負(fù)數(shù),直接 panic if v < 0 { panic("sync: negative WaitGroup counter") } // 并發(fā)調(diào)用 Wait 和 Add 會觸發(fā) panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 條件成立說明 counter 值加上 delta 操作成功,返回 if v > 0 || w == 0 { return } // 如果 counter 值為 0,并且還有被阻塞的 waiter,程序繼續(xù)向下執(zhí)行 // 并發(fā)調(diào)用 Wait 和 Add 會觸發(fā) panic if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 目前 counter 值已經(jīng)為 0,這里重置 waiter 數(shù)量為 0 wg.state.Store(0) for ; w != 0; w-- { // 喚醒所有 waiter runtime_Semrelease(&wg.sema, false, 0) } }
NOTE:
為了方便你理解,我將 Add
方法源碼中使用 race
包做競態(tài)檢查部分的代碼去掉了,并適當(dāng)?shù)脑黾恿丝招小?/p>
Add
方法接收一個 int
類型的 delta
值,在方法第一行,先將這個值轉(zhuǎn)換成 uint64
類型,然后通過移位操作,將其左移 32 位后與 wg.state
值相加,得到新的 state
。我在前文說過,wg.state
字段的高 32 位表示 counter
值,所以這行代碼的作用就是為 counter
值加上 delta
。當(dāng) delta
值為正數(shù),counter
值增加,當(dāng) delta
值為負(fù)數(shù),counter
值減少。
接著,將 state
值右移 32 位,得到高 32 位的 counter
值 v
;使用 uint32(state)
操作將 uint64
類型強(qiáng)轉(zhuǎn)成 uint32
,舍棄高 32 位,得到低 32 位的 waiter
值 w
。注意,這里拿到的 v
和 w
是與 delta
計算后的最新值。
接下來會做兩個校驗,先對 counter
進(jìn)行判斷,如果 v
的值為負(fù)數(shù),會觸發(fā) panic
,所以我們在使用時要小心 counter
不能為負(fù);然后又對并發(fā)調(diào)用 Wait
和 Add
方法的場景做了校驗,如果并發(fā)調(diào)用二者,同樣會觸發(fā) panic
。
關(guān)于判斷是否并發(fā)調(diào)用 Wait
和 Add
方法的場景,我在詳細(xì)解釋下:
w != 0
表明有等待者waiter
存在,即已經(jīng)有 goroutine 調(diào)用了wg.Wait()
方法,正在阻塞等待,還未返回。delta > 0
表明這次調(diào)用Add
方法是要增加計數(shù)器counter
的值,這也說明肯定不是通過調(diào)用wg.Done()
方法觸發(fā)的。v == int32(delta)
表明在調(diào)用Add
方法之前,counter
的值為 0。因為v
是計算后的counter
值,它等于delta
,就說明在計算之前counter
的值為 0。
如果這三個條件同時滿足,即 w != 0 && delta > 0 && v == int32(delta)
為 true
,就說明我們在調(diào)用 wg.Wait()
方法以后,還未等到喚醒它,就馬上又調(diào)用了 wg.Add(delta)
方法,此時就會觸發(fā) panic
。所以,我們在使用時要記住,一定要在調(diào)用 wg.Wait()
之前調(diào)用 wg.Add(delta)
。
做完了校驗以后,就到了 Add
方法的第一個出口,如果 v > 0
說明我們正常調(diào)用了 Add
方法或 Done
方法,計數(shù)器 counter
此時還未清零,那么無需喚醒 wg.Wait()
的阻塞等待,直接返回即可;或者如果 w == 0
說明當(dāng)前沒有正在等待的 waiter
,即還未調(diào)用 wg.Wait()
,那么也可以直接返回。
那么現(xiàn)在,Add
方法還能繼續(xù)往下執(zhí)行的條件是:v == 0 && w > 0
,即 counter
值為 0,并且還有被阻塞的 waiter
。
既然計數(shù)器 counter
值已經(jīng)為 0,那么就可以喚醒所有被阻塞的 wg.Wait()
調(diào)用了,這也是接下來的程序邏輯。
不過,這里會再次對并發(fā)調(diào)用 Wait
和 Add
方法的場景進(jìn)行校驗。如果此時從 wg.state
字段獲取到的最新值與變量 state
值不一致,即 wg.state.Load() != state
為 true
,則會觸發(fā) panic
。所以,當(dāng) counter
值變?yōu)?0,程序即將喚醒被阻塞的 waiter
之前這一小段時間,不要并發(fā)的調(diào)用 wg.Add(delta)
來改變計數(shù)器的值。
最后,通過 wg.state.Store(0)
將 waiter
的值置為 0(因為此時 counter
值已經(jīng)是 0 了,所以這個操作的目的是將 waiter
值置 0),并使用 runtime_Semrelease
來喚醒所有被阻塞的 waiter
。
NOTE:
關(guān)于 runtime_Semrelease
以及下文將要介紹的 runtime_Semacquire
方法則不必深究,這是 Go 語言底層 runtime
為我們實現(xiàn)的用于喚醒或阻塞當(dāng)前 goroutine 的函數(shù)。
至此,Add
方法就分析完成了。
我們現(xiàn)在可以總結(jié)下 Add
方法的作用:Add
為計數(shù)器 counter
的值增加 delta
(delta
可能為負(fù)數(shù)),如果計算結(jié)果 counter
為負(fù)數(shù),則觸發(fā) panic
;如果 counter
為正數(shù),則正常返回;如果 counter
為 0,則喚醒所有被阻塞的 waiter
。
所以 Add
方法主要用來管理計數(shù)器 counter
,并在 counter
為 0 時,喚醒 waiter
。
Wait 方法
現(xiàn)在,我們再來看下 sync.WaitGroup
結(jié)構(gòu)體最后一個方法 Wait
的源碼實現(xiàn):
// Wait 阻塞調(diào)用者當(dāng)前的 goroutine(waiter),直到計數(shù)器(counter)值為 0 func (wg *WaitGroup) Wait() { for { // 開啟無限循環(huán)保證 CAS 操作成功 state := wg.state.Load() v := int32(state >> 32) // 拿到 counter 值 // w := uint32(state) // 拿到 waiter 值 if v == 0 { // 如果 counter 值已經(jīng)為 0,直接返回 return } // 使用 CAS 操作增加 waiter 的數(shù)量 if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) // 阻塞當(dāng)前 waiter 所在的 goroutine,等待被喚醒 // 并發(fā)調(diào)用 Wait 和 Add 會觸發(fā) panic if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return // 如果 state 值為 0,說明 waiter 所等待的任務(wù)全部完成,成功返回 } } }
NOTE:
為了方便你理解,我同樣將 Wait
方法源碼中使用 race
包做競態(tài)檢查部分的代碼去掉了,并適當(dāng)?shù)脑黾恿丝招小?/p>
首先,這里開啟了一個無限 for
循環(huán),這是為了重試下面的 CAS 操作,保證其執(zhí)行成功。
Wait
方法同樣使用移位操作拿到到高 32 位的 counter
值 v
和低 32 位的 waiter
值 w
。因為 w
只會在競態(tài)檢查的代碼中被用到,所以被我手動注釋掉了。
接下來判斷計數(shù)器 counter
的值是否為 0,如果 v
已經(jīng)為 0,那么無需阻塞 waiter
,直接返回即可。否則,需要對 waiter
的值進(jìn)行加 1 操作,這里使用 CAS 操作(即 Compare And Swap)來完成。
所謂的 CAS 操作,就是先 Compare 再 Swap。當(dāng)我們調(diào)用 wg.state.CompareAndSwap(state, state+1)
時,CompareAndSwap
方法會先判斷 wg.state
值是否等于傳進(jìn)來的第一個參數(shù) state
,如果相等,則將其替換為第二個參數(shù) state+1
的值,并返回 true
;如果 wg.state
值與 state
不相等,則不會修改 wg.state
,并返回 false
。這樣,就保證了對 wg.state
的修改是原子性的。
在并發(fā)場景中,CAS 操作可能失敗,返回 false
,所以需要結(jié)合最外層的 for
無限循環(huán),來保證 CAS 操作成功。
一旦 CAS 操作成功,即 waiter
的數(shù)量加 1,就會使用 runtime_Semacquire
來阻塞當(dāng)前 waiter
所在的 goroutine
,等待被喚醒。而喚醒時機(jī),就是在 Add
方法的最后對 runtime_Semrelease(&wg.sema, false, 0)
的調(diào)用。
當(dāng) waiter
被喚醒后,會對并發(fā)調(diào)用 Wait
和 Add
方法的場景進(jìn)行校驗。如果 wg.state.Load() != 0
為 true
,則會觸發(fā) panic
。因為 Add
方法在調(diào)用 runtime_Semrelease
喚醒所有 waiter
之前,已經(jīng)通過 wg.state.Store(0)
將 waiter
的值置為 0 了,所以在不出現(xiàn)并發(fā)調(diào)用的情況下,wg.state.Load()
的值必然為 0。
而如果沒有出現(xiàn)并發(fā)調(diào)用 Wait
和 Add
方法,則說明 waiter
所等待的任務(wù)全部完成,正常返回即可。
至此,sync.WaitGroup
結(jié)構(gòu)體最后一個方法Wait
就分析完成了。
根據(jù)源碼,我們能夠分析出:Wait
方法主要用來管理 waiter
,它會阻塞所有 waiter
,并等待被 Add
喚醒。
noCopy 結(jié)構(gòu)體
現(xiàn)在 sync.WaitGroup
結(jié)構(gòu)體的核心功能就全部講解完成了,是時候介紹下 noCopy
了。
noCopy
實際上也是一個結(jié)構(gòu)體,其定義如下:
// noCopy may be added to structs which must not be copied // after the first use. // // See https://golang.org/issues/8005#issuecomment-190753527 // for details. // // Note that it must not be embedded, due to the Lock and Unlock methods. type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() {} func (*noCopy) Unlock() {}
noCopy
非常簡單,就是一個空結(jié)構(gòu)體實現(xiàn)了 Locker
接口。noCopy
結(jié)構(gòu)體的唯一功能,就是用于輔助 vet 工具檢查用的,vet 工具遇到它就會知道,這個結(jié)構(gòu)體是不能被復(fù)制的,僅此而已。
此外,我在《Go 中空結(jié)構(gòu)體慣用法,我?guī)湍憧偨Y(jié)全了!》一文中 標(biāo)識符 小節(jié)介紹了如何將它用在我們自定義的結(jié)構(gòu)體中,感興趣的讀者可以點擊鏈接跳轉(zhuǎn)過去學(xué)習(xí)。
好了,sync.WaitGroup
源碼解析就講解到這里。
總結(jié)
你一定要記住 sync.WaitGroup
的慣用法,首先,它無需初始化,零值可用;其次,它會在內(nèi)部維護(hù)一個計數(shù)器 counter
,通過 wg.Add(delta)
或wg.Done()
來操作計數(shù)器的值;它還會維護(hù)一個等待者數(shù)量 waiter
,調(diào)用 wg.Wait()
會阻塞 waiter
所在的 goroutine;當(dāng)計數(shù)器 counter
的值為 0,所有 waiter
都會被喚醒。
還要注意不要并發(fā)調(diào)用 Add
和 Wait
方法,也不要讓計數(shù)器 counter
的值為負(fù)數(shù),不然會觸發(fā) panic
。
雖然 sync.WaitGroup
的源碼很少,可卻因為里面使用了移位操作和一些邊界條件的檢查,使其不太容易理解。為此,我專門畫了一副 sync.WaitGroup
三大方法的執(zhí)行流程圖,來助你分析 sync.WaitGroup
各個方法的執(zhí)行流程和關(guān)聯(lián)關(guān)系。
流程圖如下:
Done
方法沒什么好解釋的,等價于 Add(-1)
。
Add
方法在第一次出現(xiàn) return
之前的代碼(即 檢查 counter 大于 0,或 waiter 等于 0
),其實可以看作是增加計數(shù)器的功能,即 delta
值大于 0 的情況;而接下來的代碼,則可以看作是調(diào)用 Done
方法,減少計數(shù)器的功能,即 delta
值小于 0 的情況。當(dāng)計數(shù)器的值為 0,就會喚醒所有 waiter
。
Wait
方法則用來管理 waiter
,并阻塞 waiter
,等待被 Add
方法喚醒。
如果你對上面的源碼分析理解還覺得有點不夠透徹,可以對照這幅圖,多梳理幾遍??炊诉@幅圖,那么你就完全掌握了 sync.WaitGroup
。
以上就是Go語言并發(fā)控制之sync.WaitGroup使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Go sync.WaitGroup的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言基于viper實現(xiàn)apollo多實例快速
viper是適用于go應(yīng)用程序的配置解決方案,這款配置管理神器,支持多種類型、開箱即用、極易上手。本文主要介紹了如何基于viper實現(xiàn)apollo多實例快速接入,感興趣的可以了解一下2023-01-01Go 使用Unmarshal將json賦給struct出錯的原因及解決
這篇文章主要介紹了Go 使用Unmarshal將json賦給struct出錯的原因及解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Golang自定義結(jié)構(gòu)體轉(zhuǎn)map的操作
這篇文章主要介紹了Golang自定義結(jié)構(gòu)體轉(zhuǎn)map的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12go中的參數(shù)傳遞是值傳遞還是引用傳遞的實現(xiàn)
參數(shù)傳遞機(jī)制是一個重要的概念,它決定了函數(shù)內(nèi)部對參數(shù)的修改是否會影響到原始數(shù)據(jù),本文主要介紹了go中的參數(shù)傳遞是值傳遞還是引用傳遞的實現(xiàn),感興趣的可以了解一下2024-12-12Go 語言json.Unmarshal 遇到的小問題(推薦)
這篇文章主要介紹了 Go 語言json.Unmarshal 遇到的小問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07輕松入門:使用Golang開發(fā)跨平臺GUI應(yīng)用
Golang是一種強(qiáng)大的編程語言,它的并發(fā)性和高性能使其成為開發(fā)GUI桌面應(yīng)用的理想選擇,Golang提供了豐富的標(biāo)準(zhǔn)庫和第三方庫,可以輕松地創(chuàng)建跨平臺的GUI應(yīng)用程序,通過使用Golang的GUI庫,開發(fā)人員可以快速構(gòu)建具有豐富用戶界面和交互功能的應(yīng)用程序,需要的朋友可以參考下2023-10-10