Go語言并發(fā)控制之sync.WaitGroup使用詳解
前段時(shí)間我在《Go 并發(fā)控制:errgroup 詳解》一文中講解了 errgroup
的用法和源碼,通過源碼我們知道 errgroup
內(nèi)部是使用 sync.WaitGroup
實(shí)現(xiàn)的,那么本文就更進(jìn)一步,來探索下 sync.WaitGroup
源碼是如何實(shí)現(xiàn)的。
使用示例
sync.WaitGroup
可以用來阻塞等待一組并發(fā)任務(wù)(goroutine)的完成,使用示例如下:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗 } var wg sync.WaitGroup errs := make([]error, len(urls)) // 使用 slice 收集錯(cuò)誤 for i, url := range urls { wg.Add(1) go func() { defer wg.Done() resp, err := http.Get(url) if err != nil { errs[i] = fmt.Errorf("failed to fetch %s: %v", url, err) return } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) }() } wg.Wait() // 處理所有錯(cuò)誤 for i, err := range errs { if err != nil { fmt.Printf("fetch url %s error: %s\n", urls[i], err) } } }
示例中,我們使用 sync.WaitGroup
來啟動(dòng) 3 個(gè) goroutine 并發(fā)訪問 3 個(gè)不同的 URL
,并在成功時(shí)打印響應(yīng)狀態(tài)碼,或失敗時(shí)記錄錯(cuò)誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
fetch url http://www.somestupidname.com/ error: failed to fetch http://www.somestupidname.com/: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們得到了兩個(gè)成功的響應(yīng),并記錄了一條錯(cuò)誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup
最典型的慣用法:
var wg sync.WaitGroup for ... { wg.Add(1) go func() { defer wg.Done() // do something }() } wg.Wait()
sync.WaitGroup
零值可用,它會(huì)在內(nèi)部維護(hù)一個(gè)計(jì)數(shù)器,wg.Add(1)
會(huì)將 sync.WaitGroup
計(jì)數(shù)器的值加 1,表示增加一個(gè) goroutine 計(jì)數(shù);wg.Done()
則將計(jì)數(shù)器的值減 1,表示一個(gè) goroutine 任務(wù)已經(jīng)完成;wg.Wait()
會(huì)阻塞調(diào)用者所在的 goroutine,直到計(jì)數(shù)器的值為 0。
源碼解讀
本文以 Go 1.23.0 版本源碼為基礎(chǔ)進(jìn)行講解。
WaitGroup 結(jié)構(gòu)體
首先 sync.WaitGroup
定義如下:
github.com/golang/go/blob/go1.23.0/src/sync/waitgroup.go
// WaitGroup 結(jié)構(gòu)體 type WaitGroup struct { noCopy noCopy // 避免復(fù)制 state atomic.Uint64 // 高 32 位是計(jì)數(shù)器(counter)的值,低 32 位是等待者(waiter)的數(shù)量 sema uint32 // 信號(hào)量,用于 阻塞/喚醒 waiter }
sync.WaitGroup
是一個(gè)結(jié)構(gòu)體,所以這也是其零值可用的原因。
這個(gè)結(jié)構(gòu)體包含 3 個(gè)字段:
noCopy
字段的類型也叫noCopy
,這個(gè)字段用于標(biāo)識(shí)sync.WaitGroup
結(jié)構(gòu)體不可被復(fù)制,vet 工具能夠識(shí)別它。這個(gè)字段的具體細(xì)節(jié)我們暫且不必深究,它不是sync.WaitGroup
的核心功能,在文章最后再來解釋它。state
字段是一個(gè)原子類型atomic.Uint64
,所以對state
字段的修改能夠保證原子性。它比較有意思,sync.WaitGroup
結(jié)構(gòu)體使用這一個(gè)字段來表示兩個(gè)“變量”值,高 32 位是計(jì)數(shù)器(counter
)的值,低 32 位是等待者(waiter
)的數(shù)量。我們調(diào)用wg.Add(1)
或wg.Done()
時(shí)操作的就是計(jì)數(shù)器counter
;調(diào)用wg.Wait()
時(shí)等待者waiter
數(shù)量就會(huì)加 1。sema
是一個(gè)信號(hào)量,用于阻塞/喚醒waiter
,即調(diào)用wg.Wait()
時(shí)的阻塞和喚醒都依賴這個(gè)信號(hào)量。
sync.WaitGroup
結(jié)構(gòu)體只有 3 個(gè)方法:Add
、Done
和 Wait
。
Done 方法
我們先來看 Done
方法的源碼實(shí)現(xiàn):
// Done 將計(jì)數(shù)器(counter)值減 1 func (wg *WaitGroup) Done() { wg.Add(-1) }
調(diào)用 Done
方法可以讓 counter
值減 1。可以發(fā)現(xiàn),調(diào)用 wg.Done()
方法實(shí)際上等價(jià)于調(diào)用 wg.Add(-1)
,所以我們的重點(diǎn)還是要關(guān)注 Add
方法。
Add 方法
Add
方法源碼實(shí)現(xiàn)如下:
// Add 為計(jì)數(shù)器(counter)的值增加 delta(delta 可能為負(fù)數(shù)) func (wg *WaitGroup) Add(delta int) { state := wg.state.Add(uint64(delta) << 32) // delta 左移 32 位后與 state 相加,即為 counter 值加上 delta v := int32(state >> 32) // state 右移 32 位得到 counter 的值 w := uint32(state) // state 轉(zhuǎn)成 uint32 拿到低 32 位的值,得到 waiter 的值 // 如果 counter 值為負(fù)數(shù),直接 panic if v < 0 { panic("sync: negative WaitGroup counter") } // 并發(fā)調(diào)用 Wait 和 Add 會(huì)觸發(fā) panic if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 條件成立說明 counter 值加上 delta 操作成功,返回 if v > 0 || w == 0 { return } // 如果 counter 值為 0,并且還有被阻塞的 waiter,程序繼續(xù)向下執(zhí)行 // 并發(fā)調(diào)用 Wait 和 Add 會(huì)觸發(fā) panic if wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 目前 counter 值已經(jīng)為 0,這里重置 waiter 數(shù)量為 0 wg.state.Store(0) for ; w != 0; w-- { // 喚醒所有 waiter runtime_Semrelease(&wg.sema, false, 0) } }
NOTE:
為了方便你理解,我將 Add
方法源碼中使用 race
包做競態(tài)檢查部分的代碼去掉了,并適當(dāng)?shù)脑黾恿丝招小?/p>
Add
方法接收一個(gè) int
類型的 delta
值,在方法第一行,先將這個(gè)值轉(zhuǎn)換成 uint64
類型,然后通過移位操作,將其左移 32 位后與 wg.state
值相加,得到新的 state
。我在前文說過,wg.state
字段的高 32 位表示 counter
值,所以這行代碼的作用就是為 counter
值加上 delta
。當(dāng) delta
值為正數(shù),counter
值增加,當(dāng) delta
值為負(fù)數(shù),counter
值減少。
接著,將 state
值右移 32 位,得到高 32 位的 counter
值 v
;使用 uint32(state)
操作將 uint64
類型強(qiáng)轉(zhuǎn)成 uint32
,舍棄高 32 位,得到低 32 位的 waiter
值 w
。注意,這里拿到的 v
和 w
是與 delta
計(jì)算后的最新值。
接下來會(huì)做兩個(gè)校驗(yàn),先對 counter
進(jìn)行判斷,如果 v
的值為負(fù)數(shù),會(huì)觸發(fā) panic
,所以我們在使用時(shí)要小心 counter
不能為負(fù);然后又對并發(fā)調(diào)用 Wait
和 Add
方法的場景做了校驗(yàn),如果并發(fā)調(diào)用二者,同樣會(huì)觸發(fā) panic
。
關(guān)于判斷是否并發(fā)調(diào)用 Wait
和 Add
方法的場景,我在詳細(xì)解釋下:
w != 0
表明有等待者waiter
存在,即已經(jīng)有 goroutine 調(diào)用了wg.Wait()
方法,正在阻塞等待,還未返回。delta > 0
表明這次調(diào)用Add
方法是要增加計(jì)數(shù)器counter
的值,這也說明肯定不是通過調(diào)用wg.Done()
方法觸發(fā)的。v == int32(delta)
表明在調(diào)用Add
方法之前,counter
的值為 0。因?yàn)?v
是計(jì)算后的counter
值,它等于delta
,就說明在計(jì)算之前counter
的值為 0。
如果這三個(gè)條件同時(shí)滿足,即 w != 0 && delta > 0 && v == int32(delta)
為 true
,就說明我們在調(diào)用 wg.Wait()
方法以后,還未等到喚醒它,就馬上又調(diào)用了 wg.Add(delta)
方法,此時(shí)就會(huì)觸發(fā) panic
。所以,我們在使用時(shí)要記住,一定要在調(diào)用 wg.Wait()
之前調(diào)用 wg.Add(delta)
。
做完了校驗(yàn)以后,就到了 Add
方法的第一個(gè)出口,如果 v > 0
說明我們正常調(diào)用了 Add
方法或 Done
方法,計(jì)數(shù)器 counter
此時(shí)還未清零,那么無需喚醒 wg.Wait()
的阻塞等待,直接返回即可;或者如果 w == 0
說明當(dāng)前沒有正在等待的 waiter
,即還未調(diào)用 wg.Wait()
,那么也可以直接返回。
那么現(xiàn)在,Add
方法還能繼續(xù)往下執(zhí)行的條件是:v == 0 && w > 0
,即 counter
值為 0,并且還有被阻塞的 waiter
。
既然計(jì)數(shù)器 counter
值已經(jīng)為 0,那么就可以喚醒所有被阻塞的 wg.Wait()
調(diào)用了,這也是接下來的程序邏輯。
不過,這里會(huì)再次對并發(fā)調(diào)用 Wait
和 Add
方法的場景進(jìn)行校驗(yàn)。如果此時(shí)從 wg.state
字段獲取到的最新值與變量 state
值不一致,即 wg.state.Load() != state
為 true
,則會(huì)觸發(fā) panic
。所以,當(dāng) counter
值變?yōu)?0,程序即將喚醒被阻塞的 waiter
之前這一小段時(shí)間,不要并發(fā)的調(diào)用 wg.Add(delta)
來改變計(jì)數(shù)器的值。
最后,通過 wg.state.Store(0)
將 waiter
的值置為 0(因?yàn)榇藭r(shí) counter
值已經(jīng)是 0 了,所以這個(gè)操作的目的是將 waiter
值置 0),并使用 runtime_Semrelease
來喚醒所有被阻塞的 waiter
。
NOTE:
關(guān)于 runtime_Semrelease
以及下文將要介紹的 runtime_Semacquire
方法則不必深究,這是 Go 語言底層 runtime
為我們實(shí)現(xiàn)的用于喚醒或阻塞當(dāng)前 goroutine 的函數(shù)。
至此,Add
方法就分析完成了。
我們現(xiàn)在可以總結(jié)下 Add
方法的作用:Add
為計(jì)數(shù)器 counter
的值增加 delta
(delta
可能為負(fù)數(shù)),如果計(jì)算結(jié)果 counter
為負(fù)數(shù),則觸發(fā) panic
;如果 counter
為正數(shù),則正常返回;如果 counter
為 0,則喚醒所有被阻塞的 waiter
。
所以 Add
方法主要用來管理計(jì)數(shù)器 counter
,并在 counter
為 0 時(shí),喚醒 waiter
。
Wait 方法
現(xiàn)在,我們再來看下 sync.WaitGroup
結(jié)構(gòu)體最后一個(gè)方法 Wait
的源碼實(shí)現(xiàn):
// Wait 阻塞調(diào)用者當(dāng)前的 goroutine(waiter),直到計(jì)數(shù)器(counter)值為 0 func (wg *WaitGroup) Wait() { for { // 開啟無限循環(huán)保證 CAS 操作成功 state := wg.state.Load() v := int32(state >> 32) // 拿到 counter 值 // w := uint32(state) // 拿到 waiter 值 if v == 0 { // 如果 counter 值已經(jīng)為 0,直接返回 return } // 使用 CAS 操作增加 waiter 的數(shù)量 if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) // 阻塞當(dāng)前 waiter 所在的 goroutine,等待被喚醒 // 并發(fā)調(diào)用 Wait 和 Add 會(huì)觸發(fā) panic if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return // 如果 state 值為 0,說明 waiter 所等待的任務(wù)全部完成,成功返回 } } }
NOTE:
為了方便你理解,我同樣將 Wait
方法源碼中使用 race
包做競態(tài)檢查部分的代碼去掉了,并適當(dāng)?shù)脑黾恿丝招小?/p>
首先,這里開啟了一個(gè)無限 for
循環(huán),這是為了重試下面的 CAS 操作,保證其執(zhí)行成功。
Wait
方法同樣使用移位操作拿到到高 32 位的 counter
值 v
和低 32 位的 waiter
值 w
。因?yàn)?w
只會(huì)在競態(tài)檢查的代碼中被用到,所以被我手動(dòng)注釋掉了。
接下來判斷計(jì)數(shù)器 counter
的值是否為 0,如果 v
已經(jīng)為 0,那么無需阻塞 waiter
,直接返回即可。否則,需要對 waiter
的值進(jìn)行加 1 操作,這里使用 CAS 操作(即 Compare And Swap)來完成。
所謂的 CAS 操作,就是先 Compare 再 Swap。當(dāng)我們調(diào)用 wg.state.CompareAndSwap(state, state+1)
時(shí),CompareAndSwap
方法會(huì)先判斷 wg.state
值是否等于傳進(jìn)來的第一個(gè)參數(shù) state
,如果相等,則將其替換為第二個(gè)參數(shù) state+1
的值,并返回 true
;如果 wg.state
值與 state
不相等,則不會(huì)修改 wg.state
,并返回 false
。這樣,就保證了對 wg.state
的修改是原子性的。
在并發(fā)場景中,CAS 操作可能失敗,返回 false
,所以需要結(jié)合最外層的 for
無限循環(huán),來保證 CAS 操作成功。
一旦 CAS 操作成功,即 waiter
的數(shù)量加 1,就會(huì)使用 runtime_Semacquire
來阻塞當(dāng)前 waiter
所在的 goroutine
,等待被喚醒。而喚醒時(shí)機(jī),就是在 Add
方法的最后對 runtime_Semrelease(&wg.sema, false, 0)
的調(diào)用。
當(dāng) waiter
被喚醒后,會(huì)對并發(fā)調(diào)用 Wait
和 Add
方法的場景進(jìn)行校驗(yàn)。如果 wg.state.Load() != 0
為 true
,則會(huì)觸發(fā) panic
。因?yàn)?Add
方法在調(diào)用 runtime_Semrelease
喚醒所有 waiter
之前,已經(jīng)通過 wg.state.Store(0)
將 waiter
的值置為 0 了,所以在不出現(xiàn)并發(fā)調(diào)用的情況下,wg.state.Load()
的值必然為 0。
而如果沒有出現(xiàn)并發(fā)調(diào)用 Wait
和 Add
方法,則說明 waiter
所等待的任務(wù)全部完成,正常返回即可。
至此,sync.WaitGroup
結(jié)構(gòu)體最后一個(gè)方法Wait
就分析完成了。
根據(jù)源碼,我們能夠分析出:Wait
方法主要用來管理 waiter
,它會(huì)阻塞所有 waiter
,并等待被 Add
喚醒。
noCopy 結(jié)構(gòu)體
現(xiàn)在 sync.WaitGroup
結(jié)構(gòu)體的核心功能就全部講解完成了,是時(shí)候介紹下 noCopy
了。
noCopy
實(shí)際上也是一個(gè)結(jié)構(gòu)體,其定義如下:
// noCopy may be added to structs which must not be copied // after the first use. // // See https://golang.org/issues/8005#issuecomment-190753527 // for details. // // Note that it must not be embedded, due to the Lock and Unlock methods. type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() {} func (*noCopy) Unlock() {}
noCopy
非常簡單,就是一個(gè)空結(jié)構(gòu)體實(shí)現(xiàn)了 Locker
接口。noCopy
結(jié)構(gòu)體的唯一功能,就是用于輔助 vet 工具檢查用的,vet 工具遇到它就會(huì)知道,這個(gè)結(jié)構(gòu)體是不能被復(fù)制的,僅此而已。
此外,我在《Go 中空結(jié)構(gòu)體慣用法,我?guī)湍憧偨Y(jié)全了!》一文中 標(biāo)識(shí)符 小節(jié)介紹了如何將它用在我們自定義的結(jié)構(gòu)體中,感興趣的讀者可以點(diǎn)擊鏈接跳轉(zhuǎn)過去學(xué)習(xí)。
好了,sync.WaitGroup
源碼解析就講解到這里。
總結(jié)
你一定要記住 sync.WaitGroup
的慣用法,首先,它無需初始化,零值可用;其次,它會(huì)在內(nèi)部維護(hù)一個(gè)計(jì)數(shù)器 counter
,通過 wg.Add(delta)
或wg.Done()
來操作計(jì)數(shù)器的值;它還會(huì)維護(hù)一個(gè)等待者數(shù)量 waiter
,調(diào)用 wg.Wait()
會(huì)阻塞 waiter
所在的 goroutine;當(dāng)計(jì)數(shù)器 counter
的值為 0,所有 waiter
都會(huì)被喚醒。
還要注意不要并發(fā)調(diào)用 Add
和 Wait
方法,也不要讓計(jì)數(shù)器 counter
的值為負(fù)數(shù),不然會(huì)觸發(fā) panic
。
雖然 sync.WaitGroup
的源碼很少,可卻因?yàn)槔锩媸褂昧艘莆徊僮骱鸵恍┻吔鐥l件的檢查,使其不太容易理解。為此,我專門畫了一副 sync.WaitGroup
三大方法的執(zhí)行流程圖,來助你分析 sync.WaitGroup
各個(gè)方法的執(zhí)行流程和關(guān)聯(lián)關(guān)系。
流程圖如下:
Done
方法沒什么好解釋的,等價(jià)于 Add(-1)
。
Add
方法在第一次出現(xiàn) return
之前的代碼(即 檢查 counter 大于 0,或 waiter 等于 0
),其實(shí)可以看作是增加計(jì)數(shù)器的功能,即 delta
值大于 0 的情況;而接下來的代碼,則可以看作是調(diào)用 Done
方法,減少計(jì)數(shù)器的功能,即 delta
值小于 0 的情況。當(dāng)計(jì)數(shù)器的值為 0,就會(huì)喚醒所有 waiter
。
Wait
方法則用來管理 waiter
,并阻塞 waiter
,等待被 Add
方法喚醒。
如果你對上面的源碼分析理解還覺得有點(diǎn)不夠透徹,可以對照這幅圖,多梳理幾遍??炊诉@幅圖,那么你就完全掌握了 sync.WaitGroup
。
以上就是Go語言并發(fā)控制之sync.WaitGroup使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Go sync.WaitGroup的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言基于viper實(shí)現(xiàn)apollo多實(shí)例快速
viper是適用于go應(yīng)用程序的配置解決方案,這款配置管理神器,支持多種類型、開箱即用、極易上手。本文主要介紹了如何基于viper實(shí)現(xiàn)apollo多實(shí)例快速接入,感興趣的可以了解一下2023-01-01Go 使用Unmarshal將json賦給struct出錯(cuò)的原因及解決
這篇文章主要介紹了Go 使用Unmarshal將json賦給struct出錯(cuò)的原因及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Golang自定義結(jié)構(gòu)體轉(zhuǎn)map的操作
這篇文章主要介紹了Golang自定義結(jié)構(gòu)體轉(zhuǎn)map的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12go中的參數(shù)傳遞是值傳遞還是引用傳遞的實(shí)現(xiàn)
參數(shù)傳遞機(jī)制是一個(gè)重要的概念,它決定了函數(shù)內(nèi)部對參數(shù)的修改是否會(huì)影響到原始數(shù)據(jù),本文主要介紹了go中的參數(shù)傳遞是值傳遞還是引用傳遞的實(shí)現(xiàn),感興趣的可以了解一下2024-12-12Go 語言json.Unmarshal 遇到的小問題(推薦)
這篇文章主要介紹了 Go 語言json.Unmarshal 遇到的小問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07輕松入門:使用Golang開發(fā)跨平臺(tái)GUI應(yīng)用
Golang是一種強(qiáng)大的編程語言,它的并發(fā)性和高性能使其成為開發(fā)GUI桌面應(yīng)用的理想選擇,Golang提供了豐富的標(biāo)準(zhǔn)庫和第三方庫,可以輕松地創(chuàng)建跨平臺(tái)的GUI應(yīng)用程序,通過使用Golang的GUI庫,開發(fā)人員可以快速構(gòu)建具有豐富用戶界面和交互功能的應(yīng)用程序,需要的朋友可以參考下2023-10-10