一文詳解go同步協(xié)程的必備工具WaitGroup
1. 簡(jiǎn)介
本文將介紹 Go 語言中的 WaitGroup 并發(fā)原語,包括 WaitGroup 的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng)以及常見的使用方式。能夠更好地理解和應(yīng)用 WaitGroup 來協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行,提高 Go 并發(fā)編程的效率和穩(wěn)定性。
2. 基本使用
2.1 定義
WaitGroup是Go語言標(biāo)準(zhǔn)庫中的一個(gè)結(jié)構(gòu)體,它提供了一種簡(jiǎn)單的機(jī)制,用于同步多個(gè)協(xié)程的執(zhí)行。適用于需要并發(fā)執(zhí)行多個(gè)任務(wù)并等待它們?nèi)客瓿珊蟛拍芾^續(xù)執(zhí)行后續(xù)操作的場(chǎng)景。
2.2 使用方式
首先主協(xié)程創(chuàng)建WaitGroup實(shí)例,然后在每個(gè)協(xié)程的開始處,調(diào)用Add(1)方法,表示需要等待一個(gè)任務(wù)執(zhí)行完成,然后協(xié)程在任務(wù)執(zhí)行完成之后,調(diào)用Done方法,表示任務(wù)已經(jīng)執(zhí)行完成了。
主協(xié)程中,需要調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù),示例如下:
func main(){
//首先主協(xié)程創(chuàng)建WaitGroup實(shí)例
var wg sync.WaitGroup
// 開始時(shí)調(diào)用Add方法表示有個(gè)任務(wù)開始執(zhí)行
wg.Add(1)
go func() {
// 開始執(zhí)行...
//完成之后,調(diào)用Done方法
wg.Done()
}()
// 調(diào)用Wait()方法,等待所有協(xié)程完成任務(wù)
wg.Wait()
// 執(zhí)行后續(xù)邏輯
}
2.3 使用例子
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Printf("任務(wù)%d開始執(zhí)行\(zhòng)n", i)
// 模擬協(xié)程任務(wù)執(zhí)行一段時(shí)間
time.Sleep(time.Duration(rand.Int() % 100))
// 線程任務(wù)執(zhí)行完成
fmt.Printf("任務(wù)%d執(zhí)行完畢\n", i)
}(i)
}
fmt.Println("主協(xié)程開始等待所有任務(wù)執(zhí)行完成...")
wg.Wait()
fmt.Println("所有協(xié)程已經(jīng)執(zhí)行完畢...")
}
在這個(gè)例子中,我們使用了sync.WaitGroup來等待5個(gè)協(xié)程執(zhí)行完畢。在循環(huán)中,每創(chuàng)建一個(gè)任務(wù),我們調(diào)用一次wg.Add(1)方法,然后啟動(dòng)一個(gè)協(xié)程去執(zhí)行任務(wù),當(dāng)協(xié)程完成任務(wù)后,調(diào)用wg.Done方法,告知主協(xié)程任務(wù)已經(jīng)執(zhí)行完畢。然后主協(xié)程會(huì)在5個(gè)協(xié)程任務(wù)全部執(zhí)行完畢之后,才會(huì)繼續(xù)向下執(zhí)行。
3.實(shí)現(xiàn)原理
3.1 設(shè)計(jì)初衷
WaitGroup的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作,通常會(huì)在一組協(xié)程中使用。
3.2 基本原理
sync.WaitGroup 結(jié)構(gòu)體中的 state1 和 state2 字段是用于實(shí)現(xiàn) WaitGroup 功能的重要變量。
type WaitGroup struct {
noCopy noCopy
state1 uint64
state2 uint32
}
由于 WaitGroup 需要等待一組操作完成之后再執(zhí)行,因此需要等待所有操作完成之后才能繼續(xù)執(zhí)行。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)計(jì)數(shù)器 counter 來記錄還有多少個(gè)操作沒有完成,如果 counter 的值為 0,則表示所有操作已經(jīng)完成。
同時(shí),WaitGroup 在所有任務(wù)都完成之后,需要喚醒所有處于等待的協(xié)程,此時(shí)需要知道有多少個(gè)協(xié)程處于等待狀態(tài)。為了實(shí)現(xiàn)這個(gè)功能,WaitGroup 使用了一個(gè)等待計(jì)數(shù)器 waiter 來記錄當(dāng)前有多少個(gè)協(xié)程正在等待操作完成。
這里WaitGroup對(duì)于計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn),是通過一個(gè)64位無符號(hào)整數(shù)來實(shí)現(xiàn)的,也就是WaitGroup結(jié)構(gòu)體中的state1,其中高32位保存了任務(wù)計(jì)數(shù)器counter的值,低32位保存了等待計(jì)數(shù)器waiter的值。當(dāng)我們創(chuàng)建一個(gè) WaitGroup 實(shí)例時(shí),該實(shí)例的任務(wù)計(jì)數(shù)器和等待計(jì)數(shù)器都被初始化為 0。
而且,等待協(xié)程需要等待所有任務(wù)完成之后才能繼續(xù)執(zhí)行,所以等待協(xié)程在任務(wù)未完成時(shí)會(huì)被阻塞,當(dāng)任務(wù)全部完成后,自動(dòng)被喚醒。WaitGroup使用 state2 用于實(shí)現(xiàn)信號(hào)量機(jī)制。通過調(diào)用 runtime_Semacquire() 和 runtime_Semrelease() 函數(shù),可以在不阻塞線程的情況下進(jìn)行等待和通知操作。
3.3 代碼實(shí)現(xiàn)
3.3.1 Add方法
調(diào)用 Add() 方法增加/減小counter的值,delta的值可以是正數(shù),也可以是負(fù)數(shù),下面是Add方法的源碼實(shí)現(xiàn):
func (wg *WaitGroup) Add(delta int) {
// delta 的值可以為負(fù)數(shù),Done方法便是通過Add(-1)來實(shí)現(xiàn)的
// statep: 為state1的地址 semap: 為state2的地址
statep, semap := wg.state()
// 高32位的值 加上 delta,增加任務(wù)計(jì)數(shù)器的值
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù)
v := int32(state >> 32)
// 取低32位數(shù)據(jù),獲取到等待線程的值
w := uint32(state)
// v > 0: 說明還有待完成的任務(wù)數(shù),此時(shí)不應(yīng)該喚醒等待協(xié)程
// w = 0: 說明沒有協(xié)程在等待,此時(shí)可以直接退出
if v > 0 || w == 0 {
return
}
// 此時(shí)v = 0,所有任務(wù)都完成了,喚醒等待協(xié)程
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
3.3.2 Done方法實(shí)現(xiàn)
調(diào)用 Done() 方法表示完成了一個(gè)任務(wù),通過調(diào)用Add方法,delta值為-1,減少任務(wù)計(jì)數(shù)器counter的值,當(dāng)其歸為0時(shí),便自動(dòng)喚醒所有處于等待的協(xié)程。
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
3.3.3 Wait方法實(shí)現(xiàn)
調(diào)用Wait方法,等待任務(wù)執(zhí)行完成,增加等待計(jì)數(shù)器Waiter的值:
func (wg *WaitGroup) Wait() {
// statep: 為state1的地址 semap: 為state2的地址
statep, semap := wg.state()
for {
// 加載state1的值
state := atomic.LoadUint64(statep)
// v: 取高32位數(shù)據(jù),獲取到待完成任務(wù)數(shù)
v := int32(state >> 32)
// 沒有任務(wù)待執(zhí)行,全部都完成了
if v == 0 {
return
}
// 增加waiter計(jì)數(shù)器的值
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 等待被喚醒
runtime_Semacquire(semap)
return
}
}
}
3.4 實(shí)現(xiàn)補(bǔ)充
Add方法,Done方法以及Wait方法實(shí)現(xiàn)中,有一些異常場(chǎng)景的驗(yàn)證邏輯被我刪除掉了。當(dāng)出現(xiàn)異常場(chǎng)景時(shí),說明用戶使用方式和WaitGroup的設(shè)計(jì)初衷相違背了,此時(shí)WaitGroup就會(huì)直接panic。
下面通過說明使用的注意事項(xiàng),來間接介紹WaitGroup的異常驗(yàn)證邏輯。
4.使用注意事項(xiàng)
4.1 Add方法和Done方法需要成對(duì)出現(xiàn)
下面是一個(gè)Add方法和Done方法沒有成對(duì)出現(xiàn)的例子,此時(shí)Add方法調(diào)多了,此時(shí)計(jì)數(shù)器永遠(yuǎn)大于0,Wait 方法會(huì)一直阻塞等待。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("Goroutine 1")
}()
go func() {
fmt.Println("Goroutine 2")
}()
wg.Wait()
fmt.Println("All goroutines finished")
}
在上述代碼中,我們調(diào)用了wg.Add(2),但只調(diào)用了一次wg.Done()。這會(huì)導(dǎo)致counter的值大于0,因此調(diào)用wg.Wait()會(huì)被永久阻塞,不會(huì)繼續(xù)向下繼續(xù)執(zhí)行。
還有另外一種情況時(shí)Done方法調(diào)用多了,此時(shí)任務(wù)計(jì)數(shù)器counter的值為負(fù)數(shù),從WaitGroup設(shè)計(jì)的語意來看,就是需要等待完成的任務(wù)數(shù)為負(fù)數(shù),這個(gè)不符合預(yù)期,此時(shí)將會(huì)直接panic
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("Goroutine 1 started")
wg.Done() // 第一次調(diào)用Done方法
wg.Done() // 第二次調(diào)用Done方法
fmt.Println("Goroutine 1 completed")
}()
wg.Wait()
fmt.Println("All goroutines completed")
}
在上面的例子中,我們啟動(dòng)了一個(gè)goroutine,第一次調(diào)用Add方法,counter的值變?yōu)?,在第14行調(diào)用Done,此時(shí)計(jì)數(shù)器的值變?yōu)?,此時(shí)等待中的goroutine將會(huì)被喚醒。在第15行又調(diào)用了一次Done方法,當(dāng)counter減小為0時(shí),再次調(diào)用Done方法會(huì)導(dǎo)致panic,因?yàn)榇藭r(shí)waitGroup的計(jì)數(shù)器已經(jīng)為0,再次減少將導(dǎo)致負(fù)數(shù)計(jì)數(shù),這是不被允許的。
所以在調(diào)用Done方法時(shí),需要保證每次調(diào)用都與Add方法的調(diào)用一一對(duì)應(yīng),否則會(huì)導(dǎo)致程序出現(xiàn)錯(cuò)誤。
4.2 在所有任務(wù)都已經(jīng)添加之后,才調(diào)用Wait方法進(jìn)行等待
WaitGroup的設(shè)計(jì)初衷就是為了等待一組操作完成后再執(zhí)行下一步操作。所以,如果在所有任務(wù)添加之前,便調(diào)用Wait方法進(jìn)行等待,此時(shí)有可能會(huì)導(dǎo)致等待協(xié)程提前被喚醒,執(zhí)行下一步操作,而尚未添加的任務(wù)則不會(huì)被等待,這違反了WaitGroup的設(shè)計(jì)初衷,也不符合預(yù)期。下面是一個(gè)簡(jiǎn)單的例子:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
go func(id int) {
wg.Add(1)
defer wg.Done()
fmt.Printf("Goroutine %d started\n", id)
time.Sleep(time.Duration(id) * time.Second)
fmt.Printf("Goroutine %d finished\n", id)
}(i)
}
// 不等待所有任務(wù)添加,就開始等待
wg.Wait()
fmt.Println("All goroutines finished")
time.Sleep(10 * time.Second)
}
代碼執(zhí)行結(jié)果如下,等待協(xié)程被提前喚醒,執(zhí)行之后的操作,而子任務(wù)在等待協(xié)程喚醒后才開始執(zhí)行:
All goroutines finished
Goroutine 1 started
Goroutine 3 started
Goroutine 2 started
Goroutine 1 finished
Goroutine 2 finished
Goroutine 3 finished
在這個(gè)例子中,我們創(chuàng)建了三個(gè)協(xié)程并打印出它們開始和結(jié)束的消息。但是,我們沒有在任務(wù)開始前調(diào)用Add方法添加任務(wù),而是在任務(wù)開始之后再調(diào)用Add方法添加任務(wù)。
這可能會(huì)導(dǎo)致某些任務(wù)未被加入到WaitGroup中,等待協(xié)程就調(diào)用了wg.Wait方法,這樣就會(huì)導(dǎo)致一些任務(wù)未被加入WaitGrou,從而導(dǎo)致等待協(xié)程不會(huì)等待這些任務(wù)執(zhí)行完成。如果這種情況發(fā)生了,我們會(huì)看到"All goroutines finished"被輸出,但實(shí)際上有一些協(xié)程還沒有完成。
因此,我們應(yīng)該在所有任務(wù)添加完畢之后再調(diào)用Wait方法,以保證等待的正確性。
5. WaitGroup常見使用場(chǎng)景
在函數(shù)或方法中使用,如果一個(gè)大任務(wù)可以拆分為多個(gè)獨(dú)立的子任務(wù),此時(shí)會(huì)將其進(jìn)行拆分,并使用多個(gè)協(xié)程來并發(fā)執(zhí)行這些任務(wù),提高執(zhí)行效率,同時(shí)使用WaitGroup等待所有子任務(wù)執(zhí)行完成,完成協(xié)程間的同步。
下面來看go-redis中ClusterClient結(jié)構(gòu)體中ForEachMaster方法中對(duì)于WaitGroup的使用。ForEachMaster方法通常用于在 Redis 集群中執(zhí)行針對(duì)所有主節(jié)點(diǎn)的某種操作,例如在集群中添加或刪除鍵,或者執(zhí)行一些全局的診斷操作,具體執(zhí)行的操作由傳入?yún)?shù)fn指定。
這里ForEachMaster方法會(huì)對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作,這里的實(shí)現(xiàn)是對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作這個(gè)大任務(wù),拆分為多個(gè)獨(dú)立的子任務(wù),每個(gè)子任務(wù)完成對(duì)一個(gè)Master節(jié)點(diǎn)執(zhí)行指定操作,然后每個(gè)子任務(wù)啟動(dòng)一個(gè)協(xié)程去執(zhí)行,主協(xié)程使用WaitGroup等待所有協(xié)程完成指定子任務(wù),ForEachMaster也就完成了對(duì)所有主節(jié)點(diǎn)執(zhí)行某種操作的任務(wù)。具體實(shí)現(xiàn)如下:
func (c *ClusterClient) ForEachMaster(
ctx context.Context,
fn func(ctx context.Context, client *Client) error,
) error {
// 重新加載集群狀態(tài),以確保狀態(tài)信息是最新的
state, err := c.state.ReloadOrGet(ctx)
if err != nil {
return err
}
var wg sync.WaitGroup
// 用于協(xié)程間通信
errCh := make(chan error, 1)
// 獲取到redis集群中所有的master節(jié)點(diǎn)
for _, master := range state.Masters {
// 啟動(dòng)一個(gè)協(xié)程來執(zhí)行該任務(wù)
wg.Add(1)
go func(node *clusterNode) {
// 任務(wù)完成時(shí),調(diào)用Done告知WaitGroup任務(wù)已完成
defer wg.Done()
err := fn(ctx, node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(master)
}
// 主協(xié)程等待所有任務(wù)完成
wg.Wait()
return nil
}
總結(jié)
本文介紹了 Go 語言中的 WaitGroup 并發(fā)原語,它提供了一種簡(jiǎn)單且強(qiáng)大的機(jī)制來協(xié)調(diào)多個(gè) Goroutine 的執(zhí)行。我們首先學(xué)習(xí)了 WaitGroup 的基本使用方法,包括如何創(chuàng)建 WaitGroup、如何向計(jì)數(shù)器中添加值、如何等待所有 Goroutine 完成以及如何在 Goroutine 中通知 WaitGroup 完成。
接著,我們了解了 WaitGroup 的實(shí)現(xiàn)原理,包括計(jì)數(shù)器和等待計(jì)數(shù)器的實(shí)現(xiàn)。了解了實(shí)現(xiàn)原理之后,我們可以更好地理解 WaitGroup 的內(nèi)部機(jī)制以及如何更好地使用它來實(shí)現(xiàn)我們的需求。
在接下來的部分中,我們介紹了一些使用 WaitGroup 的注意事項(xiàng),以及常見的使用方式。基于此,我們完成了對(duì)WaitGroup的介紹,更多關(guān)于go同步協(xié)程WaitGroup的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang 操作 Kafka 如何設(shè)置消息的失效時(shí)間
在使用 Golang 操作 Kafka 時(shí),你可以使用 Sarama 庫來設(shè)置消息的失效時(shí)間,這篇文章主要介紹了Golang操作Kafka設(shè)置消息的失效時(shí)間,需要的朋友可以參考下2023-06-06
深入理解Go高級(jí)并發(fā)模式編寫更高效可擴(kuò)展的應(yīng)用程序
Go對(duì)并發(fā)提供了強(qiáng)大的原生支持,本文討論Go的高級(jí)并發(fā)模式,理解這些并發(fā)模式,可以幫助我們編寫高效的Go應(yīng)用程序,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-02-02
Go語言實(shí)戰(zhàn)之實(shí)現(xiàn)均衡器功能
這篇文章主要為大家詳細(xì)介紹了如何利用Golang?實(shí)現(xiàn)一個(gè)簡(jiǎn)單的流浪均衡器,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-04-04
golang?select?機(jī)制和超時(shí)問題
golang 中的協(xié)程使用非常方便,但是協(xié)程什么時(shí)候結(jié)束是一個(gè)控制問題,可以用 select 配合使用,這篇文章主要介紹了golang?select?機(jī)制和超時(shí)問題,需要的朋友可以參考下2022-06-06

