golang?waitgroup的具體使用
案例
WaitGroup 可以解決一個(gè) goroutine 等待多個(gè) goroutine 同時(shí)結(jié)束的場(chǎng)景,這個(gè)比較常見的場(chǎng)景就是例如 后端 worker 啟動(dòng)了多個(gè)消費(fèi)者干活,還有爬蟲并發(fā)爬取數(shù)據(jù),多線程下載等等。
我們這里模擬一個(gè) worker 的例子
package main import ( ?? ?"fmt" ?? ?"sync" ) func worker(i int) { ?? ?fmt.Println("worker: ", i) } func main() { ?? ?var wg sync.WaitGroup ?? ?for i := 0; i < 10; i++ { ?? ??? ?wg.Add(1) ?? ??? ?go func(i int) { ?? ??? ??? ?defer wg.Done() ?? ??? ??? ?worker(i) ?? ??? ?}(i) ?? ?} ?? ?wg.Wait() }
問題: 反過來(lái)支持多個(gè) goroutine 等待一個(gè) goroutine 完成后再干活嗎? 看我們接下來(lái)的源碼分析你就知道了
源碼分析
type WaitGroup struct { ?? ?noCopy noCopy ?? ?// 64-bit value: high 32 bits are counter, low 32 bits are waiter count. ?? ?// 64-bit atomic operations require 64-bit alignment, but 32-bit ?? ?// compilers do not ensure it. So we allocate 12 bytes and then use ?? ?// the aligned 8 bytes in them as state, and the other 4 as storage ?? ?// for the sema. ?? ?state1 [3]uint32 }
WaitGroup 結(jié)構(gòu)十分簡(jiǎn)單,由 nocopy 和 state1 兩個(gè)字段組成,其中 nocopy 是用來(lái)防止復(fù)制的
type noCopy struct{} // Lock is a no-op used by -copylocks checker from `go vet`. func (*noCopy) Lock() ? {} func (*noCopy) Unlock() {}
由于嵌入了 nocopy 所以在執(zhí)行 go vet 時(shí)如果檢查到 WaitGroup 被復(fù)制了就會(huì)報(bào)錯(cuò)。這樣可以一定程度上保證 WaitGroup 不被復(fù)制,對(duì)了直接 go run 是不會(huì)有錯(cuò)誤的,所以我們代碼 push 之前都會(huì)強(qiáng)制要求進(jìn)行 lint 檢查,在 ci/cd 階段也需要先進(jìn)行 lint 檢查,避免出現(xiàn)這種類似的錯(cuò)誤。
~/project/Go-000/Week03/blog/06_waitgroup/02 main* ? go run ./main.go ~/project/Go-000/Week03/blog/06_waitgroup/02 main* ? go vet . # github.com/mohuishou/go-training/Week03/blog/06_waitgroup/02 ./main.go:7:9: assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
state1 的設(shè)計(jì)非常巧妙,這是一個(gè)是十二字節(jié)的數(shù)據(jù),這里面主要包含兩大塊,counter 占用了 8 字節(jié)用于計(jì)數(shù),sema 占用 4 字節(jié)用做信號(hào)量
可以看出 state1 是一個(gè)元素個(gè)數(shù)為 3 個(gè)數(shù)組,且每個(gè)元素都是 占 32 bits
在 64 位系統(tǒng)里面,64位原子操作需要64位對(duì)齊
那么高位的 32 bits 對(duì)應(yīng)的是 counter 計(jì)數(shù)器,用來(lái)表示目前還沒有完成任務(wù)的協(xié)程個(gè)數(shù)
低 32 bits 對(duì)應(yīng)的是 waiter 的數(shù)量,表示目前已經(jīng)調(diào)用了 WaitGroup.Wait 的協(xié)程個(gè)數(shù)
那么剩下的一個(gè) 32 bits 就是 sema 信號(hào)量的了(后面的源碼中會(huì)有體現(xiàn))
為什么要這么搞呢?直接用兩個(gè)字段一個(gè)表示 counter,一個(gè)表示 sema 不行么?
不行,我們看看注釋里面怎么寫的。
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count. > // 64-bit atomic operations require 64-bit alignment, but 32-bit > // compilers do not ensure it. So we allocate 12 bytes and then use > // the aligned 8 bytes in them as state, and the other 4 as storage > // for the sema.
這段話的關(guān)鍵點(diǎn)在于,在做 64 位的原子操作的時(shí)候必須要保證 64 位(8 字節(jié))對(duì)齊,如果沒有對(duì)齊的就會(huì)有問題,但是 32 位的編譯器并不能保證 64 位對(duì)齊所以這里用一個(gè) 12 字節(jié)的 state1 字段來(lái)存儲(chǔ)這兩個(gè)狀態(tài),然后根據(jù)是否 8 字節(jié)對(duì)齊選擇不同的保存方式。
此處我們可以看到 , state 函數(shù)是 返回存儲(chǔ)在 wg.state1 中的狀態(tài)和 sema字段 的指針
這里需要重點(diǎn)注意 state() 函數(shù)的實(shí)現(xiàn),有 2 種情況
第 1 種 情況是,在 64 位系統(tǒng)下面,返回 sema字段 的指針取的是 &wg.state1[2] ,說(shuō)明 64 位系統(tǒng)時(shí),state1 數(shù)據(jù)排布是 : counter , waiter,sema
第 2 種情況是,32 位系統(tǒng)下面,返回 sema字段 的指針取的是 &wg.state1[0] ,說(shuō)明 64 位系統(tǒng)時(shí),state1 數(shù)據(jù)排布是 : sema ,counter , waiter
在 32 位機(jī)器上,uint64 類型的變量通常會(huì)被編譯器按照 4 字節(jié)對(duì)齊,而不是 8 字節(jié)對(duì)齊。因此,如果 uint64
類型的變量沒有按照 4 字節(jié)對(duì)齊,就可能會(huì)導(dǎo)致原子操作失敗。
在 32 位機(jī)器上,64 位原子操作需要使用兩個(gè) 32 位的寄存器來(lái)完成,如果 uint64 類型的變量沒有按照 4字節(jié)對(duì)齊,那么在讀取或者寫入 uint64 類型變量時(shí),就可能會(huì)跨越兩個(gè) 32位寄存器,從而導(dǎo)致原子操作失敗。這種情況下,編譯器可能會(huì)將多個(gè) 32 位讀寫操作組合成一個(gè) 64 位操作,或者使用特殊的匯編指令來(lái)實(shí)現(xiàn)原子性,但這樣會(huì)增加代碼的復(fù)雜度和性能開銷。
為了避免這種問題,sync.WaitGroup 在 32 位機(jī)器上使用了一個(gè)包含 3 個(gè) uint32
元素的數(shù)組來(lái)表示狀態(tài),其中前兩個(gè)元素占用了 8 字節(jié),可以按照 uint64 對(duì)齊,從而可以使用 64
位原子操作來(lái)保證狀態(tài)的原子性。這種設(shè)計(jì)方式既可以在 32 位機(jī)器上保證狀態(tài)的原子性,也可以在 64 位機(jī)器上提高程序的性能。
這個(gè)操作巧妙在哪里呢?
- 如果是 64 位的機(jī)器那肯定是 8 字節(jié)對(duì)齊了的,所以是上面第一種方式
- 如果在 32 位的機(jī)器上
- 如果恰好 8 字節(jié)對(duì)齊了,那么也是第一種方式取前面的 8 字節(jié)數(shù)據(jù)
- 如果是沒有對(duì)齊,但是 32 位 4 字節(jié)是對(duì)齊了的,所以我們只需要后移四個(gè)字節(jié),那么就 8 字節(jié)對(duì)齊了,所以是第二種方式
所以通過 sema 信號(hào)量這四個(gè)字節(jié)的位置不同,保證了 counter 這個(gè)字段無(wú)論在 32 位還是 64 為機(jī)器上都是 8 字節(jié)對(duì)齊的,后續(xù)做 64 位原子操作的時(shí)候就沒問題了。
這個(gè)實(shí)現(xiàn)是在 state 方法實(shí)現(xiàn)的
golang 這樣用,主要原因是 golang 把 counter 和 waiter 合并到一起統(tǒng)一看成是 1 個(gè) 64位的數(shù)據(jù)了,因此在不同的操作系統(tǒng)中
由于字節(jié)對(duì)齊的原因,64位系統(tǒng)時(shí),前面 2 個(gè) 32 位數(shù)據(jù)加起來(lái),正好是 64 位,正好對(duì)齊
對(duì)于 32 位系統(tǒng),則是 第 1 個(gè) 32 位數(shù)據(jù)放 sema 更加合適,后面的 2 個(gè) 32 位數(shù)據(jù)就可以統(tǒng)一取出,作為一個(gè) 64 位變量
為什么要counter和waiter合一起?不能用三個(gè)變量嗎
- 在并發(fā)編程中,多個(gè) goroutine可能會(huì)同時(shí)訪問共享的變量,這種并發(fā)訪問可能會(huì)導(dǎo)致競(jìng)態(tài)條件,從而導(dǎo)致程序出現(xiàn)意料之外的結(jié)果。為了保證并發(fā)程序的正確性,需要使用同步原語(yǔ)來(lái)協(xié)調(diào)不同
- 首先,sync.WaitGroup 的狀態(tài)包含兩個(gè)值:計(jì)數(shù)器和等待的 goroutine 數(shù)量。在并發(fā)程序中,對(duì)于這兩個(gè)值的修改必須是原子的,否則會(huì)導(dǎo)致競(jìng)態(tài)條件。如果使用兩個(gè)單獨(dú)的 uint32 變量來(lái)表示這兩個(gè)值,那么在對(duì)它們進(jìn)行增減操作時(shí),必須使用互斥鎖或原子操作來(lái)保證它們的原子性。而使用一個(gè) uint32 數(shù)組,則可以使用原子操作來(lái)同時(shí)修改這兩個(gè)值,從而避免了互斥鎖的開銷。
- goroutine 的訪問,其中原子操作是一種常用的同步原語(yǔ)。
原子操作是一種基本的操作,它可以在一個(gè)步驟內(nèi)完成讀取和修改操作,從而保證了操作的原子性。在 Go 中,原子操作主要通過sync/atomic 包提供。
sync/atomic 包提供了一系列原子操作,包括原子讀寫、原子增減、原子比較交換等等。這些原子操作可以被多個(gè) goroutine并發(fā)調(diào)用,而不會(huì)導(dǎo)致競(jìng)態(tài)條件。在底層實(shí)現(xiàn)上,sync/atomic 包使用了 CPU 提供的原子指令,通過鎖總線或者其他硬件機(jī)制來(lái)保證多個(gè)CPU 同時(shí)訪問一個(gè)共享變量時(shí)的原子性。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } }
state 方法返回 counter 和信號(hào)量,通過 uintptr(unsafe.Pointer(&wg.state1))%8 == 0 來(lái)判斷是否 8 字節(jié)對(duì)齊
Add
func (wg *WaitGroup) Add(delta int) { ? ? // 先從 state 當(dāng)中把數(shù)據(jù)和信號(hào)量取出來(lái) ?? ?statep, semap := wg.state() ? ? // 在 waiter 上加上 delta 值 ?? ?state := atomic.AddUint64(statep, uint64(delta)<<32) ? ? // 取出當(dāng)前的 counter ?? ?v := int32(state >> 32) ? ? // 取出當(dāng)前的 waiter,正在等待 goroutine 數(shù)量 ?? ?w := uint32(state) ? ? // counter 不能為負(fù)數(shù) ?? ?if v < 0 { ?? ??? ?panic("sync: negative WaitGroup counter") ?? ?} ? ? // 這里屬于防御性編程 ? ? // w != 0 說(shuō)明現(xiàn)在已經(jīng)有 goroutine 在等待中,說(shuō)明已經(jīng)調(diào)用了 Wait() 方法 ? ? // 這時(shí)候 delta > 0 && v == int32(delta) 說(shuō)明在調(diào)用了 Wait() 方法之后又想加入新的等待者 ? ? // 這種操作是不允許的 ?? ?if w != 0 && delta > 0 && v == int32(delta) { ?? ??? ?panic("sync: WaitGroup misuse: Add called concurrently with Wait") ?? ?} ? ? // 如果當(dāng)前沒有人在等待就直接返回,并且 counter > 0 ?? ?if v > 0 || w == 0 { ?? ??? ?return ?? ?} ? ? // 這里也是防御 主要避免并發(fā)調(diào)用 add 和 wait ?? ?if *statep != state { ?? ??? ?panic("sync: WaitGroup misuse: Add called concurrently with Wait") ?? ?} ?? ?// 喚醒所有 waiter,看到這里就回答了上面的問題了 ?? ?*statep = 0 ?? ?for ; w != 0; w-- { ?? ??? ?runtime_Semrelease(semap, false, 0) ?? ?} }
Add 函數(shù)主要功能是將 counter +delta ,增加等待協(xié)程的個(gè)數(shù):
我們可以看到 Add 函數(shù),通過 state 函數(shù)獲取到 上述 64位的變量(counter 和 waiter) 和 sema 信號(hào)量后,通過 atomic.AddUint64 函數(shù) 將 delta 數(shù)據(jù) 加到 counter 上面
這里為什么是 delta 要左移 32 位呢?
上面我們有說(shuō)到嘛, state 函數(shù)拿出的 64 位變量,高 32 bits 是 counter,低 32 bits 是waiter,此處的 delta 是要加到 counter 上,因此才需要 delta 左移 32 位
Wait
wait 主要就是等待其他的 goroutine 完事之后喚醒
func (wg *WaitGroup) Wait() { ?? ?// 先從 state 當(dāng)中把數(shù)據(jù)和信號(hào)量的地址取出來(lái) ? ? statep, semap := wg.state() ?? ?for { ? ? ??? ?// 這里去除 counter 和 waiter 的數(shù)據(jù) ?? ??? ?state := atomic.LoadUint64(statep) ?? ??? ?v := int32(state >> 32) ?? ??? ?w := uint32(state) ? ? ? ? // counter = 0 說(shuō)明沒有在等的,直接返回就行 ? ? ? ? if v == 0 { ?? ??? ??? ?// Counter is 0, no need to wait. ?? ??? ??? ?return ?? ??? ?} ?? ??? ?// waiter + 1,調(diào)用一次就多一個(gè)等待者,然后休眠當(dāng)前 goroutine 等待被喚醒 ?? ??? ?if atomic.CompareAndSwapUint64(statep, state, state+1) { ?? ??? ??? ?runtime_Semacquire(semap) ?? ??? ??? ?if *statep != 0 { ?? ??? ??? ??? ?panic("sync: WaitGroup is reused before previous Wait has returned") ?? ??? ??? ?} ?? ??? ??? ?return ?? ??? ?} ?? ?} }
Done
這個(gè)只是 add 的簡(jiǎn)單封裝
func (wg *WaitGroup) Done() { wg.Add(-1) }
總結(jié)
- WaitGroup 可以用于一個(gè) goroutine 等待多個(gè) goroutine 干活完成,也可以多個(gè) goroutine 等待一個(gè) goroutine 干活完成,是一個(gè)多對(duì)多的關(guān)系
- 多個(gè)等待一個(gè)的典型案例是 singleflight,這個(gè)在后面將微服務(wù)可用性的時(shí)候還會(huì)再講到,感興趣可以看看源碼
- Add(n>0) 方法應(yīng)該在啟動(dòng) goroutine 之前調(diào)用,然后在 goroution 內(nèi)部調(diào)用 Done 方法
- WaitGroup 必須在 Wait 方法返回之后才能再次使用
- Done 只是 Add 的簡(jiǎn)單封裝,所以實(shí)際上是可以通過一次加一個(gè)比較大的值減少調(diào)用,或者達(dá)到快速喚醒的目的。
到此這篇關(guān)于golang waitgroup的具體使用的文章就介紹到這了,更多相關(guān)golang waitgroup內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Golang中的同步工具sync.WaitGroup詳解
- Golang?WaitGroup?底層原理及源碼解析
- 一文帶你了解Golang中的WaitGroups
- Golang WaitGroup實(shí)現(xiàn)原理解析
- golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)
- Golang 標(biāo)準(zhǔn)庫(kù) tips之waitgroup詳解
- 解決Golang 中使用WaitGroup的那點(diǎn)坑
- 在golang中使用Sync.WaitGroup解決等待的問題
- Golang中的sync包的WaitGroup操作
- Golang中的sync.WaitGroup用法實(shí)例
相關(guān)文章
golang的time包:秒、毫秒、納秒時(shí)間戳輸出方式
這篇文章主要介紹了golang的time包:秒、毫秒、納秒時(shí)間戳輸出方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2020-12-12golang提示dial?tcp?172?.217.163.49:443:?connectex:?A?con
這篇文章主要為大家介紹了golang提示dial?tcp?172?.217.163.49:443:?connectex:?A?connection?attempt?failed解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07Go語(yǔ)言實(shí)現(xiàn)對(duì)XML的讀取和修改
這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言實(shí)現(xiàn)對(duì)XML的讀取和修改的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-12-12用Go+Vue.js快速搭建一個(gè)Web應(yīng)用(初級(jí)demo)
這篇文章主要介紹了用Go+Vue.js快速搭建一個(gè)Web應(yīng)用(初級(jí)demo),本文給大家介紹的非常詳細(xì),具有參考借鑒價(jià)值,需要的朋友參考下吧2017-11-11