go?sync?Waitgroup數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)基本操作詳解
WaitGroup 示例
本文基于 Go 1.19。
go 里面的 WaitGroup
是非常常見(jiàn)的一種并發(fā)控制方式,它可以讓我們的代碼等待一組 goroutine 的結(jié)束。 比如在主協(xié)程中等待幾個(gè)子協(xié)程去做一些耗時(shí)的操作,如發(fā)起幾個(gè) HTTP 請(qǐng)求,然后等待它們的結(jié)果。
下面的代碼展示了一個(gè) goroutine 等待另外 2 個(gè) goroutine 結(jié)束的例子:
func TestWaitgroup(t *testing.T) { var wg sync.WaitGroup // 計(jì)數(shù)器 +2 wg.Add(2) go func() { sendHttpRequest("https://baidu.com") // 計(jì)數(shù)器 -1 wg.Done() }() go func() { sendHttpRequest("https://baidu.com") // 計(jì)數(shù)器 -1 wg.Done() }() // 阻塞。計(jì)數(shù)器為 0 的時(shí)候,Wait 返回 wg.Wait() } // 發(fā)起 HTTP GET 請(qǐng)求 func sendHttpRequest(url string) (string, error) { method := "GET" client := &http.Client{} req, err := http.NewRequest(method, url, nil) if err != nil { return "", err } res, err := client.Do(req) if err != nil { return "", err } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { return "", err } return string(body), err }
在這個(gè)例子中,我們做了如下事情:
- 定義了一個(gè)
WaitGroup
對(duì)象wg
,調(diào)用wg.Add(2)
將其計(jì)數(shù)器+2
。 - 啟動(dòng)兩個(gè)新的 goroutine,在這兩個(gè) goroutine 中,使用
sendHttpRequest
函數(shù)發(fā)起了一個(gè) HTTP 請(qǐng)求。 - 在 HTTP 請(qǐng)求返回之后,調(diào)用
wg.Done
將計(jì)數(shù)器-1
。 - 在函數(shù)的最后,我們調(diào)用了
wg.Wait
,這個(gè)方法會(huì)阻塞,直到WaitGroup
的計(jì)數(shù)器的值為 0 才會(huì)解除阻塞狀態(tài)。
WaitGroup 基本原理
WaitGroup
內(nèi)部通過(guò)一個(gè)計(jì)數(shù)器來(lái)統(tǒng)計(jì)有多少協(xié)程被等待。這個(gè)計(jì)數(shù)器的值在我們啟動(dòng) goroutine 之前先寫(xiě)入(使用 Add
方法), 然后在 goroutine 結(jié)束的時(shí)候,將這個(gè)計(jì)數(shù)器減 1(使用 Done
方法)。除此之外,在啟動(dòng)這些 goroutine 的協(xié)程中, 會(huì)調(diào)用 Wait
來(lái)進(jìn)行等待,在 Wait
調(diào)用的地方會(huì)阻塞,直到 WaitGroup
內(nèi)部的計(jì)數(shù)器減到 0。 也就實(shí)現(xiàn)了等待一組 goroutine 的目的
背景知識(shí)
在操作系統(tǒng)中,有多種實(shí)現(xiàn)進(jìn)程/線程間同步的方式,如:test_and_set
、compare_and_swap
、互斥鎖等。 除此之外,還有一種是信號(hào)量,它的功能類似于互斥鎖,但是它能提供更為高級(jí)的方法,以便進(jìn)程能夠同步活動(dòng)。
信號(hào)量
一個(gè)信號(hào)量(semaphore)S是一個(gè)整型變量,它除了初始化外只能通過(guò)兩個(gè)標(biāo)準(zhǔn)的原子操作:wait()
和 signal()
來(lái)訪問(wèn)。 操作 wait()
最初稱為 P
(荷蘭語(yǔ) proberen
,測(cè)試);操作 signal()
最初稱為 V
(荷蘭語(yǔ) verhogen
,增加),可按如下來(lái)定義 wait()
:
PV 原語(yǔ)。
wait(S) { while (S <= 0) ; // 忙等待 S--; }
可按如下來(lái)定義 signal()
:
signal(S) { S++; }
在 wait()
和 signal()
操作中,信號(hào)量整數(shù)值的修改應(yīng)不可分割地執(zhí)行。也就是說(shuō),當(dāng)一個(gè)進(jìn)程修改信號(hào)量值時(shí),沒(méi)有其他進(jìn)程能夠同時(shí)修改同一信號(hào)量的值。
簡(jiǎn)單來(lái)說(shuō),信號(hào)量實(shí)現(xiàn)的功能是:
- 當(dāng)信號(hào)量>0 時(shí),表示資源可用,則
wait
會(huì)對(duì)信號(hào)量執(zhí)行減 1 操作。 - 當(dāng)信號(hào)量<=0 時(shí),表示資源暫時(shí)不可用,獲取信號(hào)量時(shí),當(dāng)前的進(jìn)程/線程會(huì)阻塞,直到信號(hào)量為正時(shí)被喚醒。
WaitGroup 中的信號(hào)量
在 WaitGroup
中,使用了信號(hào)量來(lái)實(shí)現(xiàn) goroutine 的阻塞以及喚醒:
- 在調(diào)用
Wait
的地方,goroutine 會(huì)陷入阻塞,直到信號(hào)量大于等于 0 的時(shí)候解除阻塞狀態(tài),得以繼續(xù)執(zhí)行。 - 在調(diào)用
Done
的時(shí)候,如果WaitGroup
內(nèi)的等待協(xié)程的計(jì)數(shù)器減到 0 的時(shí)候,信號(hào)量會(huì)進(jìn)行遞增,這樣那些阻塞的協(xié)程會(huì)進(jìn)行執(zhí)行下去。
WaitGroup 數(shù)據(jù)結(jié)構(gòu)
type WaitGroup struct { noCopy noCopy // 高 32 位為計(jì)數(shù)器,低 32 位為等待者數(shù)量 state atomic.Uint64 sema uint32 }
noCopy
我們發(fā)現(xiàn),WaitGroup
中有一個(gè)字段 noCopy
,顧名思義,它的目的是防止復(fù)制。 這個(gè)字段在運(yùn)行時(shí)是沒(méi)有什么影響的,但是我們通過(guò) go vet
可以發(fā)現(xiàn)我們對(duì) WaitGroup
的復(fù)制。 為什么不能復(fù)制呢?因?yàn)橐坏?fù)制,WaitGroup
內(nèi)的計(jì)數(shù)器就不再準(zhǔn)確了,比如下面這個(gè)例子:
func test(wg sync.WaitGroup) { wg.Done() } func TestWaitGroup(t *testing.T) { var wg sync.WaitGroup wg.Add(1) test(wg) wg.Wait() }
go 里面的函數(shù)參數(shù)傳遞是值傳遞。調(diào)用 test(wg) 的時(shí)候?qū)?WaitGroup
復(fù)制了一份。
在這個(gè)例子中,程序會(huì)永遠(yuǎn)阻塞下去,因?yàn)?test
中調(diào)用 wg.Done()
的時(shí)候,只是將 WaitGroup
副本的計(jì)數(shù)器減去了 1, 而 TestWaitGroup
里面的 WaitGroup
的計(jì)數(shù)器并沒(méi)有發(fā)生改變,因此 Wait
會(huì)永遠(yuǎn)阻塞。
我們?nèi)绻枰獙?WaitGroup
作為參數(shù),請(qǐng)傳遞指針:
func test(wg *sync.WaitGroup) { wg.Done() }
傳遞指針之后,我們?cè)?test
中調(diào)用 wg.Done()
修改的就是 TestWaitGroup
里面同一個(gè) WaitGroup
。 從而,Wait
方法可以正常返回。
state
WaitGroup
里面的 state
是一個(gè) 64 位的 atomic.Uint64
類型,它的高 32 位用來(lái)保存 counter
(也就是上面說(shuō)的計(jì)數(shù)器),低 32 位用來(lái)保存 waiter
(也就是阻塞在 Wait
上的 goroutine 數(shù)量。)
sema
WaitGroup
通過(guò) sema
來(lái)記錄信號(hào)量:
runtime_Semrelease
表示將信號(hào)量遞增(對(duì)應(yīng)信號(hào)量中的signal
操作)runtime_Semacquire
表示將信號(hào)量遞減(對(duì)應(yīng)信號(hào)量中的wait
操作)
簡(jiǎn)單來(lái)說(shuō),在調(diào)用 runtime_Semacquire
的時(shí)候 goroutine 會(huì)阻塞,而調(diào)用 runtime_Semrelease
會(huì)喚醒阻塞在同一個(gè)信號(hào)量上的 goroutine。
WaitGroup 的三個(gè)基本操作
Add
: 這會(huì)將WaitGroup
里面的counter
加上一個(gè)整數(shù)(也就是傳遞給Add
的函數(shù)參數(shù))。Done
: 這會(huì)將WaitGroup
里面的counter
減去 1。Wait
: 這會(huì)將WaitGroup
里面的waiter
加上 1,并且調(diào)用Wait
的地方會(huì)阻塞。(有可能會(huì)有多個(gè) goroutine 等待一個(gè)WaitGroup
)
WaitGroup 的實(shí)現(xiàn)
Add 的實(shí)現(xiàn)
Add
做了下面兩件事:
- 將
delta
加到state
的高 32 位上 - 如果
counter
為0
了,并且waiter
大于 0,表示所有被等待的 goroutine 都完成了,而還有在等待的 goroutine,這會(huì)喚醒那些阻塞在Wait
上的 goroutine。
源碼實(shí)現(xiàn):
func (wg *WaitGroup) Add(delta int) { // wg.state 的計(jì)數(shù)器加上 delta //(加到 state 的高 32 上) state := wg.state.Add(uint64(delta) << 32) // 高 32 位加上 delta v := int32(state >> 32) // 高 32 位(counter) w := uint32(state) // 低 32 位(waiter) // 計(jì)數(shù)器不能為負(fù)數(shù)(加上 delta 之后不能為負(fù)數(shù),最小只能到 0) if v < 0 { panic("sync: negative WaitGroup counter") } // 正常使用情況下,是先調(diào)用 Add 再調(diào)用 Wait 的,這種情況下,w 是 0,v > 0 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // v > 0,計(jì)數(shù)器大于 0 // w == 0,沒(méi)有在 Wait 的協(xié)程 // 說(shuō)明還沒(méi)有到喚醒 waiter 的時(shí)候 if v > 0 || w == 0 { return } // Add 負(fù)數(shù)的時(shí)候,v 會(huì)減去對(duì)應(yīng)的數(shù)值,減到最后 v 是 0。 // 計(jì)數(shù)器是 0,并且有等待的協(xié)程,現(xiàn)在要喚醒這些協(xié)程。 // 存在等待的協(xié)程時(shí),goroutine 已將計(jì)數(shù)器設(shè)置為0。 // 現(xiàn)在不可能同時(shí)出現(xiàn)狀態(tài)突變: // - Add 不能與 Wait 同時(shí)發(fā)生, // - 如果看到計(jì)數(shù)器==0,則 Wait 不會(huì)增加等待的協(xié)程。 // 仍然要做一個(gè)廉價(jià)的健康檢查,以檢測(cè) WaitGroup 的誤用。 if wg.state.Load() != state { // 不能在 Add 的同時(shí)調(diào)用 Wait panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 將等待的協(xié)程數(shù)量設(shè)置為 0。 wg.state.Store(0) for ; w != 0; w-- { // signal,調(diào)用 Wait 的地方會(huì)解除阻塞 runtime_Semrelease(&wg.sema, false, 0) // goyield } }
Done 的實(shí)現(xiàn)
WaitGroup
里的 Done
其實(shí)只是對(duì) Add
的調(diào)用,但是它的效果是,將計(jì)數(shù)器的值減去 1。 背后的含義是:一個(gè)被等待的協(xié)程執(zhí)行完畢了。
Wait 的實(shí)現(xiàn)
Wait
主要功能是阻塞當(dāng)前的協(xié)程:
Wait
會(huì)先判斷計(jì)數(shù)器是否為0
,為0
說(shuō)明沒(méi)有任何需要等待的協(xié)程,那么就可以直接返回了。- 如果計(jì)數(shù)器還不是
0
,說(shuō)明有協(xié)程還沒(méi)執(zhí)行完,那么調(diào)用Wait
的地方就需要被阻塞起來(lái),等待所有的協(xié)程完成。
源碼實(shí)現(xiàn):
func (wg *WaitGroup) Wait() { for { // 獲取當(dāng)前計(jì)數(shù)器 state := wg.state.Load() // 計(jì)數(shù)器 v := int32(state >> 32) // waiter 數(shù)量 w := uint32(state) // v 為 0,不需要等待,直接返回 if v == 0 { // 計(jì)數(shù)器是 0,不需要等待 return } // 增加 waiter 數(shù)量。 // 調(diào)用一次 Wait,waiter 數(shù)量會(huì)加 1。 if wg.state.CompareAndSwap(state, state+1) { // 這會(huì)阻塞,直到 sema (信號(hào)量)大于 0 runtime_Semacquire(&wg.sema) // goparkunlock // state 不等 0 // wait 還沒(méi)有返回又繼續(xù)使用了 WaitGroup if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } // 解除阻塞狀態(tài)了,可以返回了 return } // 狀態(tài)沒(méi)有修改成功(state 沒(méi)有成功 +1),開(kāi)始下一次嘗試。 } }
總結(jié)
WaitGroup
使用了信號(hào)量來(lái)實(shí)現(xiàn)了并發(fā)資源控制,sema
字段表示信號(hào)量。- 使用
runtime_Semacquire
會(huì)使得 goroutine 阻塞直到計(jì)數(shù)器減少至0
,而使用runtime_Semrelease
會(huì)使得信號(hào)量遞增,這等于是通知之前阻塞在信號(hào)量上的協(xié)程,告訴它們可以繼續(xù)執(zhí)行了。 WaitGroup
作為參數(shù)傳遞的時(shí)候,需要傳遞指針作為參數(shù),否則在被調(diào)用函數(shù)內(nèi)對(duì)Add
或者Done
的調(diào)用,在caller
里面調(diào)用的Wait
會(huì)觀測(cè)不到。WaitGroup
使用一個(gè) 64 位的數(shù)來(lái)保存計(jì)數(shù)器(高 32 位)和waiter
(低 32 位,正在等待的協(xié)程的數(shù)量)。WaitGroup
使用Add
增加計(jì)數(shù)器,使用Done
來(lái)將計(jì)數(shù)器減1
,使用Wait
來(lái)等待 goroutine。Wait
會(huì)阻塞直到計(jì)數(shù)器減少到0
。
以上就是go sync Waitgroup數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)基本操作詳解的詳細(xì)內(nèi)容,更多關(guān)于go sync Waitgroup數(shù)據(jù)結(jié)構(gòu)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解 Go 語(yǔ)言中 Map 類型和 Slice 類型的傳遞
這篇文章主要介紹了詳解 Go 語(yǔ)言中 Map 類型和 Slice 類型的傳遞的相關(guān)資料,需要的朋友可以參考下2017-09-09Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項(xiàng)的,即哈希后的數(shù)據(jù)無(wú)法解密,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2023-03-03golang 切片的三種使用方式及區(qū)別的說(shuō)明
這篇文章主要介紹了golang 切片的三種使用方式及區(qū)別的說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04go語(yǔ)言題解LeetCode1160拼寫(xiě)單詞示例詳解
這篇文章主要為大家介紹了go語(yǔ)言題解LeetCode1160拼寫(xiě)單詞示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12go語(yǔ)言基礎(chǔ) seek光標(biāo)位置os包的使用
這篇文章主要介紹了go語(yǔ)言基礎(chǔ) seek光標(biāo)位置os包的使用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05golang微服務(wù)框架基礎(chǔ)Gin基本路由使用詳解
這篇文章主要為大家介紹了golang微服務(wù)框架Gin基本路由的使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-11-11