深入理解go sync.Waitgroup的使用
本文基于 Go 1.19。
go 里面的 WaitGroup
是非常常見的一種并發(fā)控制方式,它可以讓我們的代碼等待一組 goroutine 的結(jié)束。
比如在主協(xié)程中等待幾個(gè)子協(xié)程去做一些耗時(shí)的操作,如發(fā)起幾個(gè) HTTP 請求,然后等待它們的結(jié)果。
WaitGroup 示例
下面的代碼展示了一個(gè) goroutine 等待另外 2 個(gè) goroutine 結(jié)束的例子:
func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 計(jì)數(shù)器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 計(jì)數(shù)器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 計(jì)數(shù)器 -1 wg.Done() }() // 阻塞。計(jì)數(shù)器為 0 的時(shí)候,Wait 返回 wg.Wait() } // 發(fā)起 HTTP GET 請求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }
在這個(gè)例子中,我們做了如下事情:
- 定義了一個(gè)
WaitGroup
對象wg
,調(diào)用wg.Add(2)
將其計(jì)數(shù)器+2
。 - 啟動兩個(gè)新的 goroutine,在這兩個(gè) goroutine 中,使用
sendHttpRequest
函數(shù)發(fā)起了一個(gè) HTTP 請求。 - 在 HTTP 請求返回之后,調(diào)用
wg.Done
將計(jì)數(shù)器-1
。 - 在函數(shù)的最后,我們調(diào)用了
wg.Wait
,這個(gè)方法會阻塞,直到WaitGroup
的計(jì)數(shù)器的值為 0 才會解除阻塞狀態(tài)。
WaitGroup 基本原理
WaitGroup
內(nèi)部通過一個(gè)計(jì)數(shù)器來統(tǒng)計(jì)有多少協(xié)程被等待。這個(gè)計(jì)數(shù)器的值在我們啟動 goroutine 之前先寫入(使用 Add
方法),
然后在 goroutine 結(jié)束的時(shí)候,將這個(gè)計(jì)數(shù)器減 1(使用 Done
方法)。除此之外,在啟動這些 goroutine 的協(xié)程中,
會調(diào)用 Wait
來進(jìn)行等待,在 Wait
調(diào)用的地方會阻塞,直到 WaitGroup
內(nèi)部的計(jì)數(shù)器減到 0。
也就實(shí)現(xiàn)了等待一組 goroutine 的目的
背景知識
在操作系統(tǒng)中,有多種實(shí)現(xiàn)進(jìn)程/線程間同步的方式,如:test_and_set
、compare_and_swap
、互斥鎖等。
除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進(jìn)程能夠同步活動。
信號量
一個(gè)信號量(semaphore)S是一個(gè)整型變量,它除了初始化外只能通過兩個(gè)標(biāo)準(zhǔn)的原子操作:wait()
和 signal()
來訪問。
操作 wait()
最初稱為 P
(荷蘭語 proberen
,測試);操作 signal()
最初稱為 V
(荷蘭語 verhogen
,增加),可按如下來定義 wait()
:
PV 原語。
wait(S) { while (S <= 0) ; // 忙等待 S--; }
可按如下來定義 signal()
:
signal(S) { S++; }
在 wait()
和 signal()
操作中,信號量整數(shù)值的修改應(yīng)不可分割地執(zhí)行。也就是說,當(dāng)一個(gè)進(jìn)程修改信號量值時(shí),沒有其他進(jìn)程能夠同時(shí)修改同一信號量的值。
簡單來說,信號量實(shí)現(xiàn)的功能是:
- 當(dāng)信號量>0 時(shí),表示資源可用,則
wait
會對信號量執(zhí)行減 1 操作。 - 當(dāng)信號量<=0 時(shí),表示資源暫時(shí)不可用,獲取信號量時(shí),當(dāng)前的進(jìn)程/線程會阻塞,直到信號量為正時(shí)被喚醒。
WaitGroup 中的信號量
在 WaitGroup
中,使用了信號量來實(shí)現(xiàn) goroutine 的阻塞以及喚醒:
- 在調(diào)用
Wait
的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時(shí)候解除阻塞狀態(tài),得以繼續(xù)執(zhí)行。 - 在調(diào)用
Done
的時(shí)候,如果WaitGroup
內(nèi)的等待協(xié)程的計(jì)數(shù)器減到 0 的時(shí)候,信號量會進(jìn)行遞增,這樣那些阻塞的協(xié)程會進(jìn)行執(zhí)行下去。
WaitGroup 數(shù)據(jù)結(jié)構(gòu)
type WaitGroup struct { noCopy noCopy // 高 32 位為計(jì)數(shù)器,低 32 位為等待者數(shù)量 state atomic.Uint64 sema uint32 }
noCopy
我們發(fā)現(xiàn),WaitGroup
中有一個(gè)字段 noCopy
,顧名思義,它的目的是防止復(fù)制。
這個(gè)字段在運(yùn)行時(shí)是沒有什么影響的,但是我們通過 go vet
可以發(fā)現(xiàn)我們對 WaitGroup
的復(fù)制。
為什么不能復(fù)制呢?因?yàn)橐坏?fù)制,WaitGroup
內(nèi)的計(jì)數(shù)器就不再準(zhǔn)確了,比如下面這個(gè)例子:
func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }
go 里面的函數(shù)參數(shù)傳遞是值傳遞。調(diào)用 test(wg) 的時(shí)候?qū)?nbsp;
WaitGroup
復(fù)制了一份。
在這個(gè)例子中,程序會永遠(yuǎn)阻塞下去,因?yàn)?nbsp;test
中調(diào)用 wg.Done()
的時(shí)候,只是將 WaitGroup
副本的計(jì)數(shù)器減去了 1,
而 TestWaitGroup
里面的 WaitGroup
的計(jì)數(shù)器并沒有發(fā)生改變,因此 Wait
會永遠(yuǎn)阻塞。
我們?nèi)绻枰獙?nbsp;WaitGroup
作為參數(shù),請傳遞指針:
func test(wg *sync.WaitGroup) { wg.Done() }
傳遞指針之后,我們在 test
中調(diào)用 wg.Done()
修改的就是 TestWaitGroup
里面同一個(gè) WaitGroup
。
從而,Wait
方法可以正常返回。
state
WaitGroup
里面的 state
是一個(gè) 64 位的 atomic.Uint64
類型,它的高 32 位用來保存 counter
(也就是上面說的計(jì)數(shù)器),低 32 位用來保存 waiter
(也就是阻塞在 Wait
上的 goroutine 數(shù)量。)
sema
WaitGroup
通過 sema
來記錄信號量:
runtime_Semrelease
表示將信號量遞增(對應(yīng)信號量中的signal
操作)runtime_Semacquire
表示將信號量遞減(對應(yīng)信號量中的wait
操作)
簡單來說,在調(diào)用 runtime_Semacquire
的時(shí)候 goroutine 會阻塞,而調(diào)用 runtime_Semrelease
會喚醒阻塞在同一個(gè)信號量上的 goroutine。
WaitGroup 的三個(gè)基本操作
Add
: 這會將WaitGroup
里面的counter
加上一個(gè)整數(shù)(也就是傳遞給Add
的函數(shù)參數(shù))。Done
: 這會將WaitGroup
里面的counter
減去 1。Wait
: 這會將WaitGroup
里面的waiter
加上 1,并且調(diào)用Wait
的地方會阻塞。(有可能會有多個(gè) goroutine 等待一個(gè)WaitGroup
)
WaitGroup 的實(shí)現(xiàn)
Add 的實(shí)現(xiàn)
Add
做了下面兩件事:
- 將
delta
加到state
的高 32 位上 - 如果
counter
為0
了,并且waiter
大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在Wait
上的 goroutine。
源碼實(shí)現(xiàn):
func (wg *WaitGroup) Add(delta int) { // wg.state 的計(jì)數(shù)器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 計(jì)數(shù)器不能為負(fù)數(shù)(加上 delta 之后不能為負(fù)數(shù),最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情況下,是先調(diào)用 Add 再調(diào)用 Wait 的,這種情況下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,計(jì)數(shù)器大于 0 // w == 0,沒有在 Wait 的協(xié)程 // 說明還沒有到喚醒 waiter 的時(shí)候 if v > 0 || w == 0 { return } // Add 負(fù)數(shù)的時(shí)候,v 會減去對應(yīng)的數(shù)值,減到最后 v 是 0。 // 計(jì)數(shù)器是 0,并且有等待的協(xié)程,現(xiàn)在要喚醒這些協(xié)程。 // 存在等待的協(xié)程時(shí),goroutine 已將計(jì)數(shù)器設(shè)置為0。 // 現(xiàn)在不可能同時(shí)出現(xiàn)狀態(tài)突變: // - Add 不能與 Wait 同時(shí)發(fā)生, // - 如果看到計(jì)數(shù)器==0,則 Wait 不會增加等待的協(xié)程。 // 仍然要做一個(gè)廉價(jià)的健康檢查,以檢測 WaitGroup 的誤用。 if wg.state.Load() != state { // 不能在 Add 的同時(shí)調(diào)用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將等待的協(xié)程數(shù)量設(shè)置為 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,調(diào)用 Wait 的地方會解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }
Done 的實(shí)現(xiàn)
WaitGroup
里的 Done
其實(shí)只是對 Add
的調(diào)用,但是它的效果是,將計(jì)數(shù)器的值減去 1
。
背后的含義是:一個(gè)被等待的協(xié)程執(zhí)行完畢了。
Wait 的實(shí)現(xiàn)
Wait
主要功能是阻塞當(dāng)前的協(xié)程:
Wait
會先判斷計(jì)數(shù)器是否為0
,為0
說明沒有任何需要等待的協(xié)程,那么就可以直接返回了。- 如果計(jì)數(shù)器還不是
0
,說明有協(xié)程還沒執(zhí)行完,那么調(diào)用Wait
的地方就需要被阻塞起來,等待所有的協(xié)程完成。
源碼實(shí)現(xiàn):
func (wg *WaitGroup) Wait() { for { // 獲取當(dāng)前計(jì)數(shù)器 state := wg.state.Load() // 計(jì)數(shù)器 v := int32(state >> 32) // waiter 數(shù)量 w := uint32(state) // v 為 0,不需要等待,直接返回 if v == 0 { // 計(jì)數(shù)器是 0,不需要等待 return } // 增加 waiter 數(shù)量。 // 調(diào)用一次 Wait,waiter 數(shù)量會加 1。 if wg.state.CompareAndSwap(state, state+1) { // 這會阻塞,直到 sema (信號量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 還沒有返回又繼續(xù)使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞狀態(tài)了,可以返回了 return } // 狀態(tài)沒有修改成功(state 沒有成功 +1),開始下一次嘗試。 } }
總結(jié)
WaitGroup
使用了信號量來實(shí)現(xiàn)了并發(fā)資源控制,sema
字段表示信號量。- 使用
runtime_Semacquire
會使得 goroutine 阻塞直到計(jì)數(shù)器減少至0
,而使用runtime_Semrelease
會使得信號量遞增,這等于是通知之前阻塞在信號量上的協(xié)程,告訴它們可以繼續(xù)執(zhí)行了。 WaitGroup
作為參數(shù)傳遞的時(shí)候,需要傳遞指針作為參數(shù),否則在被調(diào)用函數(shù)內(nèi)對Add
或者Done
的調(diào)用,在caller
里面調(diào)用的Wait
會觀測不到。WaitGroup
使用一個(gè) 64 位的數(shù)來保存計(jì)數(shù)器(高 32 位)和waiter
(低 32 位,正在等待的協(xié)程的數(shù)量)。WaitGroup
使用Add
增加計(jì)數(shù)器,使用Done
來將計(jì)數(shù)器減1
,使用Wait
來等待 goroutine。Wait
會阻塞直到計(jì)數(shù)器減少到0
。
到此這篇關(guān)于深入理解go sync.Waitgroup的使用的文章就介紹到這了,更多相關(guān)go sync.Waitgroup內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言大揭秘:適用于哪些類型的項(xiàng)目開發(fā)?
想知道Go編程語言適合開發(fā)哪些類型的項(xiàng)目嗎?無論是網(wǎng)絡(luò)服務(wù)、分布式系統(tǒng)還是嵌入式設(shè)備,Go都能輕松應(yīng)對,本文將帶你了解Go在各種場景下的應(yīng)用,讓你更好地選擇和使用Go進(jìn)行開發(fā),需要的朋友可以參考下2024-01-01詳解Go語言如何使用標(biāo)準(zhǔn)庫sort對切片進(jìn)行排序
Sort?標(biāo)準(zhǔn)庫提供了對基本數(shù)據(jù)類型的切片和自定義類型的切片進(jìn)行排序的函數(shù)。今天主要分享的內(nèi)容是使用?Go?標(biāo)準(zhǔn)庫?sort?對切片進(jìn)行排序,感興趣的可以了解一下2022-12-12Go實(shí)現(xiàn)將任何網(wǎng)頁轉(zhuǎn)化為PDF
在許多應(yīng)用場景中,可能需要將網(wǎng)頁內(nèi)容轉(zhuǎn)化為?PDF?格式,使用Go編程語言,結(jié)合一些現(xiàn)有的庫,可以非常方便地實(shí)現(xiàn)這一功能,下面我們就來看看具體實(shí)現(xiàn)方法吧2024-11-11創(chuàng)建第一個(gè)Go語言程序Hello,Go!
這篇文章主要介紹了創(chuàng)建第一個(gè)Go語言程序Hello,Go!本文詳細(xì)的給出項(xiàng)目創(chuàng)建、代碼編寫的過程,同時(shí)講解了GOPATH、Go install等內(nèi)容,需要的朋友可以參考下2014-10-10go語言發(fā)送smtp郵件的實(shí)現(xiàn)示例
這篇文章主要介紹了go發(fā)送smtp郵件的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09