深入理解go sync.Waitgroup的使用
本文基于 Go 1.19。
go 里面的 WaitGroup
是非常常見的一種并發(fā)控制方式,它可以讓我們的代碼等待一組 goroutine 的結束。
比如在主協程中等待幾個子協程去做一些耗時的操作,如發(fā)起幾個 HTTP 請求,然后等待它們的結果。
WaitGroup 示例
下面的代碼展示了一個 goroutine 等待另外 2 個 goroutine 結束的例子:
func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 計數器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 計數器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 計數器 -1 wg.Done() }() // 阻塞。計數器為 0 的時候,Wait 返回 wg.Wait() } // 發(fā)起 HTTP GET 請求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }
在這個例子中,我們做了如下事情:
- 定義了一個
WaitGroup
對象wg
,調用wg.Add(2)
將其計數器+2
。 - 啟動兩個新的 goroutine,在這兩個 goroutine 中,使用
sendHttpRequest
函數發(fā)起了一個 HTTP 請求。 - 在 HTTP 請求返回之后,調用
wg.Done
將計數器-1
。 - 在函數的最后,我們調用了
wg.Wait
,這個方法會阻塞,直到WaitGroup
的計數器的值為 0 才會解除阻塞狀態(tài)。
WaitGroup 基本原理
WaitGroup
內部通過一個計數器來統(tǒng)計有多少協程被等待。這個計數器的值在我們啟動 goroutine 之前先寫入(使用 Add
方法),
然后在 goroutine 結束的時候,將這個計數器減 1(使用 Done
方法)。除此之外,在啟動這些 goroutine 的協程中,
會調用 Wait
來進行等待,在 Wait
調用的地方會阻塞,直到 WaitGroup
內部的計數器減到 0。
也就實現了等待一組 goroutine 的目的
背景知識
在操作系統(tǒng)中,有多種實現進程/線程間同步的方式,如:test_and_set
、compare_and_swap
、互斥鎖等。
除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進程能夠同步活動。
信號量
一個信號量(semaphore)S是一個整型變量,它除了初始化外只能通過兩個標準的原子操作:wait()
和 signal()
來訪問。
操作 wait()
最初稱為 P
(荷蘭語 proberen
,測試);操作 signal()
最初稱為 V
(荷蘭語 verhogen
,增加),可按如下來定義 wait()
:
PV 原語。
wait(S) { while (S <= 0) ; // 忙等待 S--; }
可按如下來定義 signal()
:
signal(S) { S++; }
在 wait()
和 signal()
操作中,信號量整數值的修改應不可分割地執(zhí)行。也就是說,當一個進程修改信號量值時,沒有其他進程能夠同時修改同一信號量的值。
簡單來說,信號量實現的功能是:
- 當信號量>0 時,表示資源可用,則
wait
會對信號量執(zhí)行減 1 操作。 - 當信號量<=0 時,表示資源暫時不可用,獲取信號量時,當前的進程/線程會阻塞,直到信號量為正時被喚醒。
WaitGroup 中的信號量
在 WaitGroup
中,使用了信號量來實現 goroutine 的阻塞以及喚醒:
- 在調用
Wait
的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時候解除阻塞狀態(tài),得以繼續(xù)執(zhí)行。 - 在調用
Done
的時候,如果WaitGroup
內的等待協程的計數器減到 0 的時候,信號量會進行遞增,這樣那些阻塞的協程會進行執(zhí)行下去。
WaitGroup 數據結構
type WaitGroup struct { noCopy noCopy // 高 32 位為計數器,低 32 位為等待者數量 state atomic.Uint64 sema uint32 }
noCopy
我們發(fā)現,WaitGroup
中有一個字段 noCopy
,顧名思義,它的目的是防止復制。
這個字段在運行時是沒有什么影響的,但是我們通過 go vet
可以發(fā)現我們對 WaitGroup
的復制。
為什么不能復制呢?因為一旦復制,WaitGroup
內的計數器就不再準確了,比如下面這個例子:
func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }
go 里面的函數參數傳遞是值傳遞。調用 test(wg) 的時候將
WaitGroup
復制了一份。
在這個例子中,程序會永遠阻塞下去,因為 test
中調用 wg.Done()
的時候,只是將 WaitGroup
副本的計數器減去了 1,
而 TestWaitGroup
里面的 WaitGroup
的計數器并沒有發(fā)生改變,因此 Wait
會永遠阻塞。
我們如果需要將 WaitGroup
作為參數,請傳遞指針:
func test(wg *sync.WaitGroup) { wg.Done() }
傳遞指針之后,我們在 test
中調用 wg.Done()
修改的就是 TestWaitGroup
里面同一個 WaitGroup
。
從而,Wait
方法可以正常返回。
state
WaitGroup
里面的 state
是一個 64 位的 atomic.Uint64
類型,它的高 32 位用來保存 counter
(也就是上面說的計數器),低 32 位用來保存 waiter
(也就是阻塞在 Wait
上的 goroutine 數量。)
sema
WaitGroup
通過 sema
來記錄信號量:
runtime_Semrelease
表示將信號量遞增(對應信號量中的signal
操作)runtime_Semacquire
表示將信號量遞減(對應信號量中的wait
操作)
簡單來說,在調用 runtime_Semacquire
的時候 goroutine 會阻塞,而調用 runtime_Semrelease
會喚醒阻塞在同一個信號量上的 goroutine。
WaitGroup 的三個基本操作
Add
: 這會將WaitGroup
里面的counter
加上一個整數(也就是傳遞給Add
的函數參數)。Done
: 這會將WaitGroup
里面的counter
減去 1。Wait
: 這會將WaitGroup
里面的waiter
加上 1,并且調用Wait
的地方會阻塞。(有可能會有多個 goroutine 等待一個WaitGroup
)
WaitGroup 的實現
Add 的實現
Add
做了下面兩件事:
- 將
delta
加到state
的高 32 位上 - 如果
counter
為0
了,并且waiter
大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在Wait
上的 goroutine。
源碼實現:
func (wg *WaitGroup) Add(delta int) { // wg.state 的計數器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 計數器不能為負數(加上 delta 之后不能為負數,最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情況下,是先調用 Add 再調用 Wait 的,這種情況下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,計數器大于 0 // w == 0,沒有在 Wait 的協程 // 說明還沒有到喚醒 waiter 的時候 if v > 0 || w == 0 { return } // Add 負數的時候,v 會減去對應的數值,減到最后 v 是 0。 // 計數器是 0,并且有等待的協程,現在要喚醒這些協程。 // 存在等待的協程時,goroutine 已將計數器設置為0。 // 現在不可能同時出現狀態(tài)突變: // - Add 不能與 Wait 同時發(fā)生, // - 如果看到計數器==0,則 Wait 不會增加等待的協程。 // 仍然要做一個廉價的健康檢查,以檢測 WaitGroup 的誤用。 if wg.state.Load() != state { // 不能在 Add 的同時調用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將等待的協程數量設置為 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,調用 Wait 的地方會解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }
Done 的實現
WaitGroup
里的 Done
其實只是對 Add
的調用,但是它的效果是,將計數器的值減去 1
。
背后的含義是:一個被等待的協程執(zhí)行完畢了。
Wait 的實現
Wait
主要功能是阻塞當前的協程:
Wait
會先判斷計數器是否為0
,為0
說明沒有任何需要等待的協程,那么就可以直接返回了。- 如果計數器還不是
0
,說明有協程還沒執(zhí)行完,那么調用Wait
的地方就需要被阻塞起來,等待所有的協程完成。
源碼實現:
func (wg *WaitGroup) Wait() { for { // 獲取當前計數器 state := wg.state.Load() // 計數器 v := int32(state >> 32) // waiter 數量 w := uint32(state) // v 為 0,不需要等待,直接返回 if v == 0 { // 計數器是 0,不需要等待 return } // 增加 waiter 數量。 // 調用一次 Wait,waiter 數量會加 1。 if wg.state.CompareAndSwap(state, state+1) { // 這會阻塞,直到 sema (信號量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 還沒有返回又繼續(xù)使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞狀態(tài)了,可以返回了 return } // 狀態(tài)沒有修改成功(state 沒有成功 +1),開始下一次嘗試。 } }
總結
WaitGroup
使用了信號量來實現了并發(fā)資源控制,sema
字段表示信號量。- 使用
runtime_Semacquire
會使得 goroutine 阻塞直到計數器減少至0
,而使用runtime_Semrelease
會使得信號量遞增,這等于是通知之前阻塞在信號量上的協程,告訴它們可以繼續(xù)執(zhí)行了。 WaitGroup
作為參數傳遞的時候,需要傳遞指針作為參數,否則在被調用函數內對Add
或者Done
的調用,在caller
里面調用的Wait
會觀測不到。WaitGroup
使用一個 64 位的數來保存計數器(高 32 位)和waiter
(低 32 位,正在等待的協程的數量)。WaitGroup
使用Add
增加計數器,使用Done
來將計數器減1
,使用Wait
來等待 goroutine。Wait
會阻塞直到計數器減少到0
。
到此這篇關于深入理解go sync.Waitgroup的使用的文章就介紹到這了,更多相關go sync.Waitgroup內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!