go?sync?Waitgroup數(shù)據(jù)結構實現(xiàn)基本操作詳解
WaitGroup 示例
本文基于 Go 1.19。
go 里面的 WaitGroup
是非常常見的一種并發(fā)控制方式,它可以讓我們的代碼等待一組 goroutine 的結束。 比如在主協(xié)程中等待幾個子協(xié)程去做一些耗時的操作,如發(fā)起幾個 HTTP 請求,然后等待它們的結果。
下面的代碼展示了一個 goroutine 等待另外 2 個 goroutine 結束的例子:
func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 計數(shù)器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 計數(shù)器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 計數(shù)器 -1 wg.Done() }() // 阻塞。計數(shù)器為 0 的時候,Wait 返回 wg.Wait() } // 發(fā)起 HTTP GET 請求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }
在這個例子中,我們做了如下事情:
- 定義了一個
WaitGroup
對象wg
,調(diào)用wg.Add(2)
將其計數(shù)器+2
。 - 啟動兩個新的 goroutine,在這兩個 goroutine 中,使用
sendHttpRequest
函數(shù)發(fā)起了一個 HTTP 請求。 - 在 HTTP 請求返回之后,調(diào)用
wg.Done
將計數(shù)器-1
。 - 在函數(shù)的最后,我們調(diào)用了
wg.Wait
,這個方法會阻塞,直到WaitGroup
的計數(shù)器的值為 0 才會解除阻塞狀態(tài)。
WaitGroup 基本原理
WaitGroup
內(nèi)部通過一個計數(shù)器來統(tǒng)計有多少協(xié)程被等待。這個計數(shù)器的值在我們啟動 goroutine 之前先寫入(使用 Add
方法), 然后在 goroutine 結束的時候,將這個計數(shù)器減 1(使用 Done
方法)。除此之外,在啟動這些 goroutine 的協(xié)程中, 會調(diào)用 Wait
來進行等待,在 Wait
調(diào)用的地方會阻塞,直到 WaitGroup
內(nèi)部的計數(shù)器減到 0。 也就實現(xiàn)了等待一組 goroutine 的目的
背景知識
在操作系統(tǒng)中,有多種實現(xiàn)進程/線程間同步的方式,如:test_and_set
、compare_and_swap
、互斥鎖等。 除此之外,還有一種是信號量,它的功能類似于互斥鎖,但是它能提供更為高級的方法,以便進程能夠同步活動。
信號量
一個信號量(semaphore)S是一個整型變量,它除了初始化外只能通過兩個標準的原子操作:wait()
和 signal()
來訪問。 操作 wait()
最初稱為 P
(荷蘭語 proberen
,測試);操作 signal()
最初稱為 V
(荷蘭語 verhogen
,增加),可按如下來定義 wait()
:
PV 原語。
wait(S) { while (S <= 0) ; // 忙等待 S--; }
可按如下來定義 signal()
:
signal(S) { S++; }
在 wait()
和 signal()
操作中,信號量整數(shù)值的修改應不可分割地執(zhí)行。也就是說,當一個進程修改信號量值時,沒有其他進程能夠同時修改同一信號量的值。
簡單來說,信號量實現(xiàn)的功能是:
- 當信號量>0 時,表示資源可用,則
wait
會對信號量執(zhí)行減 1 操作。 - 當信號量<=0 時,表示資源暫時不可用,獲取信號量時,當前的進程/線程會阻塞,直到信號量為正時被喚醒。
WaitGroup 中的信號量
在 WaitGroup
中,使用了信號量來實現(xiàn) goroutine 的阻塞以及喚醒:
- 在調(diào)用
Wait
的地方,goroutine 會陷入阻塞,直到信號量大于等于 0 的時候解除阻塞狀態(tài),得以繼續(xù)執(zhí)行。 - 在調(diào)用
Done
的時候,如果WaitGroup
內(nèi)的等待協(xié)程的計數(shù)器減到 0 的時候,信號量會進行遞增,這樣那些阻塞的協(xié)程會進行執(zhí)行下去。
WaitGroup 數(shù)據(jù)結構
type WaitGroup struct { noCopy noCopy // 高 32 位為計數(shù)器,低 32 位為等待者數(shù)量 state atomic.Uint64 sema uint32 }
noCopy
我們發(fā)現(xiàn),WaitGroup
中有一個字段 noCopy
,顧名思義,它的目的是防止復制。 這個字段在運行時是沒有什么影響的,但是我們通過 go vet
可以發(fā)現(xiàn)我們對 WaitGroup
的復制。 為什么不能復制呢?因為一旦復制,WaitGroup
內(nèi)的計數(shù)器就不再準確了,比如下面這個例子:
func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }
go 里面的函數(shù)參數(shù)傳遞是值傳遞。調(diào)用 test(wg) 的時候將 WaitGroup
復制了一份。
在這個例子中,程序會永遠阻塞下去,因為 test
中調(diào)用 wg.Done()
的時候,只是將 WaitGroup
副本的計數(shù)器減去了 1, 而 TestWaitGroup
里面的 WaitGroup
的計數(shù)器并沒有發(fā)生改變,因此 Wait
會永遠阻塞。
我們?nèi)绻枰獙?WaitGroup
作為參數(shù),請傳遞指針:
func test(wg *sync.WaitGroup) { wg.Done() }
傳遞指針之后,我們在 test
中調(diào)用 wg.Done()
修改的就是 TestWaitGroup
里面同一個 WaitGroup
。 從而,Wait
方法可以正常返回。
state
WaitGroup
里面的 state
是一個 64 位的 atomic.Uint64
類型,它的高 32 位用來保存 counter
(也就是上面說的計數(shù)器),低 32 位用來保存 waiter
(也就是阻塞在 Wait
上的 goroutine 數(shù)量。)
sema
WaitGroup
通過 sema
來記錄信號量:
runtime_Semrelease
表示將信號量遞增(對應信號量中的signal
操作)runtime_Semacquire
表示將信號量遞減(對應信號量中的wait
操作)
簡單來說,在調(diào)用 runtime_Semacquire
的時候 goroutine 會阻塞,而調(diào)用 runtime_Semrelease
會喚醒阻塞在同一個信號量上的 goroutine。
WaitGroup 的三個基本操作
Add
: 這會將WaitGroup
里面的counter
加上一個整數(shù)(也就是傳遞給Add
的函數(shù)參數(shù))。Done
: 這會將WaitGroup
里面的counter
減去 1。Wait
: 這會將WaitGroup
里面的waiter
加上 1,并且調(diào)用Wait
的地方會阻塞。(有可能會有多個 goroutine 等待一個WaitGroup
)
WaitGroup 的實現(xiàn)
Add 的實現(xiàn)
Add
做了下面兩件事:
- 將
delta
加到state
的高 32 位上 - 如果
counter
為0
了,并且waiter
大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會喚醒那些阻塞在Wait
上的 goroutine。
源碼實現(xiàn):
func (wg *WaitGroup) Add(delta int) { // wg.state 的計數(shù)器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 計數(shù)器不能為負數(shù)(加上 delta 之后不能為負數(shù),最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情況下,是先調(diào)用 Add 再調(diào)用 Wait 的,這種情況下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,計數(shù)器大于 0 // w == 0,沒有在 Wait 的協(xié)程 // 說明還沒有到喚醒 waiter 的時候 if v > 0 || w == 0 { return } // Add 負數(shù)的時候,v 會減去對應的數(shù)值,減到最后 v 是 0。 // 計數(shù)器是 0,并且有等待的協(xié)程,現(xiàn)在要喚醒這些協(xié)程。 // 存在等待的協(xié)程時,goroutine 已將計數(shù)器設置為0。 // 現(xiàn)在不可能同時出現(xiàn)狀態(tài)突變: // - Add 不能與 Wait 同時發(fā)生, // - 如果看到計數(shù)器==0,則 Wait 不會增加等待的協(xié)程。 // 仍然要做一個廉價的健康檢查,以檢測 WaitGroup 的誤用。 if wg.state.Load() != state { // 不能在 Add 的同時調(diào)用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將等待的協(xié)程數(shù)量設置為 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,調(diào)用 Wait 的地方會解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }
Done 的實現(xiàn)
WaitGroup
里的 Done
其實只是對 Add
的調(diào)用,但是它的效果是,將計數(shù)器的值減去 1。 背后的含義是:一個被等待的協(xié)程執(zhí)行完畢了。
Wait 的實現(xiàn)
Wait
主要功能是阻塞當前的協(xié)程:
Wait
會先判斷計數(shù)器是否為0
,為0
說明沒有任何需要等待的協(xié)程,那么就可以直接返回了。- 如果計數(shù)器還不是
0
,說明有協(xié)程還沒執(zhí)行完,那么調(diào)用Wait
的地方就需要被阻塞起來,等待所有的協(xié)程完成。
源碼實現(xiàn):
func (wg *WaitGroup) Wait() { for { // 獲取當前計數(shù)器 state := wg.state.Load() // 計數(shù)器 v := int32(state >> 32) // waiter 數(shù)量 w := uint32(state) // v 為 0,不需要等待,直接返回 if v == 0 { // 計數(shù)器是 0,不需要等待 return } // 增加 waiter 數(shù)量。 // 調(diào)用一次 Wait,waiter 數(shù)量會加 1。 if wg.state.CompareAndSwap(state, state+1) { // 這會阻塞,直到 sema (信號量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 還沒有返回又繼續(xù)使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞狀態(tài)了,可以返回了 return } // 狀態(tài)沒有修改成功(state 沒有成功 +1),開始下一次嘗試。 } }
總結
WaitGroup
使用了信號量來實現(xiàn)了并發(fā)資源控制,sema
字段表示信號量。- 使用
runtime_Semacquire
會使得 goroutine 阻塞直到計數(shù)器減少至0
,而使用runtime_Semrelease
會使得信號量遞增,這等于是通知之前阻塞在信號量上的協(xié)程,告訴它們可以繼續(xù)執(zhí)行了。 WaitGroup
作為參數(shù)傳遞的時候,需要傳遞指針作為參數(shù),否則在被調(diào)用函數(shù)內(nèi)對Add
或者Done
的調(diào)用,在caller
里面調(diào)用的Wait
會觀測不到。WaitGroup
使用一個 64 位的數(shù)來保存計數(shù)器(高 32 位)和waiter
(低 32 位,正在等待的協(xié)程的數(shù)量)。WaitGroup
使用Add
增加計數(shù)器,使用Done
來將計數(shù)器減1
,使用Wait
來等待 goroutine。Wait
會阻塞直到計數(shù)器減少到0
。
以上就是go sync Waitgroup數(shù)據(jù)結構實現(xiàn)基本操作詳解的詳細內(nèi)容,更多關于go sync Waitgroup數(shù)據(jù)結構的資料請關注腳本之家其它相關文章!
相關文章
Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項的,即哈希后的數(shù)據(jù)無法解密,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2023-03-03