Go語(yǔ)言學(xué)習(xí)之WaitGroup用法詳解
前言
在前面的文章中,我們使用過 WaitGroup 進(jìn)行任務(wù)編排,Go語(yǔ)言中的 WaitGroup 和 Java 中的 CyclicBarrier、CountDownLatch 非常類似。比如我們有一個(gè)主任務(wù)在執(zhí)行,執(zhí)行到某一點(diǎn)時(shí)需要并行執(zhí)行三個(gè)子任務(wù),并且需要等到三個(gè)子任務(wù)都執(zhí)行完后,再繼續(xù)執(zhí)行主任務(wù)。那我們就需要設(shè)置一個(gè)檢查點(diǎn),使主任務(wù)一直阻塞在這,等三個(gè)子任務(wù)執(zhí)行完后再放行。
說明:本文中的示例,均是基于Go1.17 64位機(jī)器
小試牛刀
我們先來個(gè)簡(jiǎn)單的例子,看下 WaitGroup 是怎么使用的。示例中使用 Add(5) 表示我們有 5個(gè) 子任務(wù),然后起了 5個(gè) 協(xié)程去完成任務(wù),主協(xié)程使用 Wait() 方法等待 子協(xié)程執(zhí)行完畢,輸出一共等待的時(shí)間。
func main() {
var waitGroup sync.WaitGroup
start := time.Now()
waitGroup.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer waitGroup.Done()
time.Sleep(time.Second)
fmt.Println("done")
}()
}
waitGroup.Wait()
fmt.Println(time.Now().Sub(start).Seconds())
}
/*
done
done
done
done
done
1.000306089
*/總覽
WaitGroup 一共有三個(gè)方法:
(wg *WaitGroup) Add(delta int) (wg *WaitGroup) Done() (wg *WaitGroup) Wait()
Add方法用于設(shè)置 WaitGroup 的計(jì)數(shù)值,可以理解為子任務(wù)的數(shù)量Done方法用于將 WaitGroup 的計(jì)數(shù)值減一,可以理解為完成一個(gè)子任務(wù)Wait方法用于阻塞調(diào)用者,直到 WaitGroup 的計(jì)數(shù)值為0,即所有子任務(wù)都完成
正常來說,我們使用的時(shí)候,需要先確定子任務(wù)的數(shù)量,然后調(diào)用 Add() 方法傳入相應(yīng)的數(shù)量,在每個(gè)子任務(wù)的協(xié)程中,調(diào)用 Done(),需要等待的協(xié)程調(diào)用 Wait() 方法,狀態(tài)流轉(zhuǎn)如下圖:

底層實(shí)現(xiàn)
結(jié)構(gòu)體
type WaitGroup struct {
noCopy noCopy // noCopy 字段標(biāo)識(shí),由于 WaitGroup 不能復(fù)制,方便工具檢測(cè)
state1 [3]uint32 // 12個(gè)字節(jié),8個(gè)字節(jié)標(biāo)識(shí) 計(jì)數(shù)值和等待數(shù)量,4個(gè)字節(jié)用于標(biāo)識(shí)信號(hào)量
}state1 是個(gè)復(fù)合字段,會(huì)拆分為兩部分: 64位(8個(gè)字節(jié))的 statep 作為一個(gè)整體用于原子操作, 其中前面4個(gè)字節(jié)表示計(jì)數(shù)值,后面四個(gè)字節(jié)表示等待數(shù)量;剩余 32位(4個(gè)字節(jié))semap 用于標(biāo)識(shí)信號(hào)量。
Go語(yǔ)言中對(duì)于64位的變量進(jìn)行原子操作,需要保證該變量是64位對(duì)齊的,也就是要保證這 8個(gè)字節(jié) 的首地址是 8 的整數(shù)倍。因此當(dāng) state1 的首地址是 8 的整數(shù)倍時(shí),取前8個(gè)字節(jié)作為 statep ,后4個(gè)字節(jié)作為 semap;當(dāng) state1 的首地址不是 8 的整數(shù)倍時(shí),取后8個(gè)字節(jié)作為 statep ,前4個(gè)字節(jié)作為 semap。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 首地址是8的倍數(shù)時(shí),前8個(gè)字節(jié)為 statep, 后四個(gè)字節(jié)為 semap
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 后8個(gè)字節(jié)為 statep, 前四個(gè)字節(jié)為 semap
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}Add
Add方法用于添加一個(gè)計(jì)數(shù)值(負(fù)數(shù)相當(dāng)于減),當(dāng)計(jì)數(shù)值變?yōu)?后,Wait方法阻塞的所有等待者都會(huì)被釋放- 計(jì)數(shù)值變?yōu)樨?fù)數(shù)是非法操作,產(chǎn)生
panic - 當(dāng)計(jì)數(shù)值為0時(shí)(初始狀態(tài)),
Add方法不能和Wait方法并發(fā)調(diào)用,需要保證Add方法在Wait方法之前調(diào)用,否則會(huì)panic
func (wg *WaitGroup) Add(delta int) {
// 拿到計(jì)數(shù)值等待者變量 statep 和 信號(hào)量 semap
statep, semap := wg.state()
// 計(jì)數(shù)值加上 delta: statep 的前四個(gè)字節(jié)是計(jì)數(shù)值,因此將 delta 前移 32位
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 計(jì)數(shù)值
v := int32(state >> 32)
// 等待者數(shù)量
w := uint32(state)
// 如果加上 delta 之后,計(jì)數(shù)值變?yōu)樨?fù)數(shù),不合法,panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// delta > 0 && v == int32(delta) : 表示從 0 開始添加計(jì)數(shù)值
// w!=0 :表示已經(jīng)有了等待者
// 說明在添加計(jì)數(shù)值的時(shí)候,同時(shí)添加了等待者,非法操作。添加等待者需要在添加計(jì)數(shù)值之后
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v>0 : 計(jì)數(shù)值不等于0,不需要喚醒等待者,直接返回
// w==0: 沒有等待者,不需要喚醒,直接返回
if v > 0 || w == 0 {
return
}
// 再次檢查數(shù)據(jù)是否一致
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 到這里說明計(jì)數(shù)值為0,且等待者大于0,需要喚醒所有的等待者,并把系統(tǒng)置為初始狀態(tài)(0狀態(tài))
// 將計(jì)數(shù)值和等待者數(shù)量都置為0
*statep = 0
// 喚醒等待者
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}Done
// 完成一個(gè)任務(wù),將計(jì)數(shù)值減一,當(dāng)計(jì)數(shù)值減為0時(shí),需要喚醒所有的等待者
func (wg *WaitGroup) Done() {
wg.Add(-1)
}Wait
// 調(diào)用 Wait 方法會(huì)被阻塞,直到 計(jì)數(shù)值 變?yōu)?
func (wg *WaitGroup) Wait() {
// 獲取計(jì)數(shù)、等待數(shù)和信號(hào)量
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
// 計(jì)數(shù)值
v := int32(state >> 32)
// 等待者數(shù)量
w := uint32(state)
// 計(jì)數(shù)值數(shù)量為0,直接返回,無(wú)需等待
if v == 0 {
return
}
// 到這里說明計(jì)數(shù)值數(shù)量大于0
// 增加等待者數(shù)量:這里會(huì)有競(jìng)爭(zhēng),比如多個(gè) Wait 調(diào)用,或者在同時(shí)調(diào)用 Add 方法,增加不成功會(huì)繼續(xù) for 循環(huán)
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 增加成功后,阻塞在信號(hào)量這里,等待被喚醒
runtime_Semacquire(semap)
// 被喚醒的時(shí)候,應(yīng)該是0狀態(tài)。如果重用 WaitGroup,需要等 Wait 返回
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}易錯(cuò)點(diǎn)
上面分析源碼可以看到幾個(gè)會(huì)產(chǎn)生 panic 的點(diǎn),這也是我們使用 WaitGroup 需要注意的地方
1.計(jì)數(shù)值變?yōu)樨?fù)數(shù)
調(diào)用 Add 時(shí)參數(shù)值傳負(fù)數(shù)
func main() {
var wg sync.WaitGroup
wg.Add(1)
wg.Add(-1)
wg.Add(-1)
}多次調(diào)用 Done 方法
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("test")
wg.Done()
wg.Done()
}()
time.Sleep(time.Second)
wg.Wait()
}2.Add 和 Wait 并發(fā)調(diào)用
Add 和 Wait 并發(fā)調(diào)用,有可能達(dá)不到我們預(yù)期的效果,甚至 panic。如下示例中,我們想要等待 3 個(gè)子任務(wù)都執(zhí)行完后再執(zhí)行主任務(wù),但實(shí)際情況可能是子任務(wù)還沒起來,主任務(wù)就繼續(xù)往下執(zhí)行了。
func doSomething(wg *sync.WaitGroup) {
wg.Add(1)
fmt.Println("do something")
defer wg.Done()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
go doSomething(&wg)
}
wg.Wait()
fmt.Println("main")
}
//main
//do something
//do something正確的使用方式,應(yīng)該是在調(diào)用 Wait 前先調(diào)用 Add
func doSomething(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("do something")
}
func main() {
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go doSomething(&wg)
}
wg.Wait()
fmt.Println("main")
}
//do something
//do something
//do something
//main3.沒有等 Wait 返回,就重用 WaitGroup
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("do something")
wg.Done()
wg.Add(1)
}()
wg.Wait()
}4.復(fù)制使用
我們知道 Go 語(yǔ)言中的參數(shù)傳遞,都是值傳遞,就會(huì)產(chǎn)生復(fù)制操作。因此在向函數(shù)傳遞 WaitGroup 時(shí),使用指針進(jìn)行操作。
// 錯(cuò)誤使用方式,沒有使用指針
func doSomething(wg sync.WaitGroup) {
fmt.Println("do something")
defer wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
// 這里沒使用指針,wg狀態(tài)一直不會(huì)改變,導(dǎo)致 Wait 一直阻塞
go doSomething(wg)
}
wg.Wait()
fmt.Println("main")
}總結(jié)
我們通過源碼+示例的方式,一起學(xué)習(xí)了 sync.WaitGroup 實(shí)現(xiàn)邏輯,同時(shí)也給出了一些注意點(diǎn),只要做到如下操作,就不會(huì)出現(xiàn)問題:
- 保證 Add 在 Wait 前調(diào)用
- Add 中不傳遞負(fù)數(shù)
- 任務(wù)完成后不要忘記調(diào)用 Done 方法,建議使用 defer wg.Done()
- 不要復(fù)制使用 WaitGroup,函數(shù)傳遞時(shí)使用指針傳遞
- 盡量不復(fù)用 WaigGroup,減少出問題的風(fēng)險(xiǎn)
到此這篇關(guān)于Go語(yǔ)言學(xué)習(xí)之WaitGroup用法詳解的文章就介紹到這了,更多相關(guān)Go語(yǔ)言 WaitGroup內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何在golang中使用shopspring/decimal來處理精度問題
本文主要介紹了如何在golang中使用shopspring/decimal來處理精度問題,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
Go語(yǔ)言學(xué)習(xí)技巧之如何合理使用Pool
這篇文章主要給大家介紹了關(guān)于Go語(yǔ)言學(xué)習(xí)技巧之如何合理使用Pool的相關(guān)資料,Pool用于存儲(chǔ)那些被分配了但是沒有被使用,而未來可能會(huì)使用的值,以減小垃圾回收的壓力。文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。2017-12-12
golang實(shí)現(xiàn)微信小程序商城后臺(tái)系統(tǒng)(moshopserver)
這篇文章主要介紹了golang實(shí)現(xiàn)微信小程序商城后臺(tái)系統(tǒng)(moshopserver),本文通過截圖實(shí)例代碼的形式給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02
在Go中實(shí)現(xiàn)高效可靠的鏈路追蹤系統(tǒng)
在當(dāng)今互聯(lián)網(wǎng)應(yīng)用的架構(gòu)中,分布式系統(tǒng)已經(jīng)成為主流,分布式系統(tǒng)的優(yōu)勢(shì)在于能夠提供高可用性、高并發(fā)性和可擴(kuò)展性,本文將介紹鏈路追蹤的概念和原理,并重點(diǎn)介紹如何在Golang中實(shí)現(xiàn)高效可靠的鏈路追蹤系統(tǒng),需要的朋友可以參考下2023-10-10

