Golang并發(fā)控制之errgroup使用詳解
errgroup
是 Go 官方庫(kù) x 中提供的一個(gè)非常實(shí)用的工具,用于并發(fā)執(zhí)行多個(gè) goroutine,并且方便的處理錯(cuò)誤。
我們知道,Go 標(biāo)準(zhǔn)庫(kù)中有個(gè) sync.WaitGroup
可以用來并發(fā)執(zhí)行多個(gè) goroutine,errgroup
就是在其基礎(chǔ)上實(shí)現(xiàn)了 errgroup.Group
。不過,errgroup.Group
和 sync.WaitGroup
在功能上是有區(qū)別的,盡管它們都用于管理 goroutine 的同步。
errgroup 優(yōu)勢(shì)
與 sync.WaitGroup
相比,以下是設(shè)計(jì) errgroup.Group
的原因和優(yōu)勢(shì):
錯(cuò)誤處理:
sync.WaitGroup
只負(fù)責(zé)等待 goroutine 完成,不處理 goroutine 的返回值或錯(cuò)誤。errgroup.Group
雖然目前也不能直接處理 goroutine 的返回值,但在 goroutine 返回錯(cuò)誤時(shí),可以立即取消其他正在運(yùn)行的 goroutine,并在Wait
方法中返回第一個(gè)非nil
的錯(cuò)誤。
上下文取消:
errgroup
可以與 context.Context
配合使用,支持在某個(gè) goroutine 出現(xiàn)錯(cuò)誤時(shí)自動(dòng)取消其他 goroutine,這樣可以更好地控制資源,避免不必要的工作。
簡(jiǎn)化并發(fā)編程:
使用 errgroup
可以減少錯(cuò)誤處理的樣板代碼,開發(fā)者不需要手動(dòng)管理錯(cuò)誤狀態(tài)和同步邏輯,使得并發(fā)編程更簡(jiǎn)單、更易于維護(hù)。
限制并發(fā)數(shù)量:
errgroup
提供了便捷的接口來限制并發(fā) goroutine 的數(shù)量,避免過載,而 sync.WaitGroup
沒有這樣的功能。
以上,errgroup
為處理并發(fā)任務(wù)提供了更強(qiáng)大的錯(cuò)誤管理和控制機(jī)制,因此在許多并發(fā)場(chǎng)景下是更優(yōu)的選擇。
隨著本文接下來的深入講解,你就能深刻體會(huì)到上面所說的優(yōu)勢(shì)了。
sync.WaitGroup 使用示例
在介紹 errgroup.Group
前,我們還是先來一起回顧下 sync.WaitGroup
的用法。
示例如下:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗 } var err error var wg sync.WaitGroup // 零值可用,不必顯式初始化 for _, url := range urls { wg.Add(1) // 增加 WaitGroup 計(jì)數(shù)器 // 啟動(dòng)一個(gè) goroutine 來獲取 URL go func() { defer wg.Done() // 當(dāng) goroutine 完成時(shí)遞減 WaitGroup 計(jì)數(shù)器 resp, e := http.Get(url) if e != nil { // 發(fā)生錯(cuò)誤返回,并記錄該錯(cuò)誤 err = e return } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) }() } // 等待所有 goroutine 執(zhí)行完成 wg.Wait() if err != nil { // err 會(huì)記錄最后一個(gè)錯(cuò)誤 fmt.Printf("Error: %s\n", err) } }
示例中,我們使用 sync.WaitGroup
來啟動(dòng) 3 個(gè) goroutine 并發(fā)訪問 3 個(gè)不同的 URL
,并在成功時(shí)打印響應(yīng)狀態(tài)碼,或失敗時(shí)記錄錯(cuò)誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們獲取了兩個(gè)成功的響應(yīng),并打印了一條錯(cuò)誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup
最典型的慣用法:
var wg sync.WaitGroup for ... { wg.Add(1) go func() { defer wg.Done() // do something }() } wg.Wait()
errgroup.Group 使用示例
其實(shí) errgroup.Group
的使用套路與 sync.WaitGroup
非常類似。
基本使用
errgroup
基本使用套路如下:
- 導(dǎo)入
errgroup
包。 - 創(chuàng)建一個(gè)
errgroup.Group
實(shí)例。 - 使用
Group.Go
方法啟動(dòng)多個(gè)并發(fā)任務(wù)。 - 使用
Group.Wait
方法等待所有 goroutine 完成或有一個(gè)返回錯(cuò)誤。
將前文中的 sync.WaitGroup
程序示例使用 errgroup.Group
重寫為如下示例:
package main import ( "fmt" "net/http" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗 } // 使用 errgroup 創(chuàng)建一個(gè)新的 goroutine 組 var g errgroup.Group // 零值可用,不必顯式初始化 for _, url := range urls { // 使用 errgroup 啟動(dòng)一個(gè) goroutine 來獲取 URL g.Go(func() error { resp, err := http.Get(url) if err != nil { return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤 } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個(gè)錯(cuò)誤(如果有) if err := g.Wait(); err != nil { fmt.Printf("Error: %s\n", err) } }
可以發(fā)現(xiàn),這段程序與 sync.WaitGroup
示例很像,根據(jù)代碼中的注釋,很容易看懂。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
輸出結(jié)果也沒什么變化。
上下文取消
errgroup
提供了 errgroup.WithContext
可以附加取消功能,在任意一個(gè) goroutine 返回錯(cuò)誤時(shí),可以立即取消其他正在運(yùn)行的 goroutine,并在 Wait
方法中返回第一個(gè)非 nil
的錯(cuò)誤。
示例如下:
package main import ( "context" "fmt" "net/http" "sync" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗 } // 創(chuàng)建一個(gè)帶有 context 的 errgroup // 任何一個(gè) goroutine 返回非 nil 的錯(cuò)誤,或 Wait() 等待所有 goroutine 完成后,context 都會(huì)被取消 g, ctx := errgroup.WithContext(context.Background()) // 創(chuàng)建一個(gè) map 來保存結(jié)果 var result sync.Map for _, url := range urls { // 使用 errgroup 啟動(dòng)一個(gè) goroutine 來獲取 URL g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤 } // 發(fā)起請(qǐng)求 resp, err := http.DefaultClient.Do(req) if err != nil { return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤 } defer resp.Body.Close() // 保存每個(gè) URL 的響應(yīng)狀態(tài)碼 result.Store(url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個(gè)錯(cuò)誤(如果有) if err := g.Wait(); err != nil { fmt.Println("Error: ", err) } // 所有 goroutine 都執(zhí)行完成,遍歷并打印成功的結(jié)果 result.Range(func(key, value any) bool { fmt.Printf("fetch url %s status %s\n", key, value) return true }) }
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/withcontext/main.go
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK
由測(cè)試結(jié)果來看,對(duì)于 [http://www.google.com/](http://www.google.com/)
的請(qǐng)求可以接收到成功響應(yīng),由于對(duì) [http://www.somestupidname.com/](http://www.somestupidname.com/)
請(qǐng)求報(bào)錯(cuò),程序來不及等待 [http://www.golang.org/](http://www.golang.org/)
響應(yīng),就被取消了。
其實(shí)我們大致可以猜測(cè)到,取消功能應(yīng)該是通過 context.cancelCtx
來實(shí)現(xiàn)的,我們暫且不必深究,稍后探索源碼就能驗(yàn)證我們的猜想了。
限制并發(fā)數(shù)量
errgroup
提供了 errgroup.SetLimit
可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個(gè) errgroup.Group var g errgroup.Group // 設(shè)置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動(dòng) 10 個(gè) goroutine for i := 1; i <= 10; i++ { g.Go(func() error { // 打印正在運(yùn)行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬任務(wù)耗時(shí) fmt.Printf("Goroutine %d is done\n", i) return nil }) } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
可以限制最大并發(fā)為 3 個(gè) goroutine。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
根據(jù)輸出可以發(fā)現(xiàn),雖然我們通過 for
循環(huán)啟動(dòng)了 10 個(gè) goroutine,但程序執(zhí)行時(shí)最多只允許同時(shí)啟動(dòng) 3 個(gè) goroutine,當(dāng)這 3 個(gè) goroutine 中有某個(gè)執(zhí)行完成并退出,才會(huì)有新的 goroutine 被啟動(dòng)。
嘗試啟動(dòng)
errgroup
還提供了 errgroup.TryGo
可以嘗試啟動(dòng)一個(gè)任務(wù),它返回一個(gè) bool
值,標(biāo)識(shí)任務(wù)是否啟動(dòng)成功,true
表示成功,false
表示失敗。
errgroup.TryGo
需要搭配 errgroup.SetLimit
一同使用,因?yàn)槿绻幌拗撇l(fā)數(shù)量,那么 errgroup.TryGo
始終返回 true
,當(dāng)達(dá)到最大并發(fā)數(shù)量限制時(shí),errgroup.TryGo
返回 false
。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個(gè) errgroup.Group var g errgroup.Group // 設(shè)置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動(dòng) 10 個(gè) goroutine for i := 1; i <= 10; i++ { if g.TryGo(func() error { // 打印正在運(yùn)行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬工作 fmt.Printf("Goroutine %d is done\n", i) return nil }) { // 如果成功啟動(dòng),打印提示 fmt.Printf("Goroutine %d started successfully\n", i) } else { // 如果達(dá)到并發(fā)限制,打印提示 fmt.Printf("Goroutine %d could not start (limit reached)\n", i) } } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
限制最大并發(fā)為 3 個(gè) goroutine,調(diào)用 g.TryGo
如果啟動(dòng)任務(wù)成功,打印 Goroutine {i} started successfully
提示信息;啟動(dòng)任務(wù)失敗,則打印 Goroutine {i} could not start (limit reached)
提示信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
因?yàn)橄拗谱畲蟛l(fā)數(shù)量為 3,所以前面 3 個(gè) goroutine 啟動(dòng)成功,并且正常執(zhí)行完成,其他幾個(gè) goroutine 全部執(zhí)行失敗。
以上就是 errgroup
的全部用法了,更多使用場(chǎng)景你可以在實(shí)踐中去嘗試和感悟。
源碼解讀
接下來,我們一起閱讀下 errgroup
源碼,以此來加深對(duì) errgroup
的理解。
errgroup
源碼非常少,僅有 3 個(gè)文件。這 3 個(gè)文件源碼內(nèi)容分別如下:
主邏輯代碼:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
// Copyright 2016 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package errgroup provides synchronization, error propagation, and Context // cancelation for groups of goroutines working on subtasks of a common task. // // [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks // returning errors. package errgroup import ( "context" "fmt" "sync" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func(error) wg sync.WaitGroup sem chan token errOnce sync.Once err error } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } return g.err } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group's context, if the // group was created by calling WithContext. The error will be returned by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
為 Go 1.20 及更高版本提供的 withCancelCause
函數(shù)實(shí)現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
為低于 Go 1.20 版本提供的 withCancelCause
函數(shù)實(shí)現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
可以看到,errgroup
全部源碼加起來也不到 100 行,可謂短小精悍。
現(xiàn)在我們來分析下 errgroup
源碼。
根據(jù)包注釋我們可以知道,errgroup
包提供了同步、錯(cuò)誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務(wù)的子任務(wù)。errgroup.Group
與 sync.WaitGroup
相關(guān),增加了處理任務(wù)返回錯(cuò)誤的能力。
為了提供以上功能,首先 errgroup
定義了 token
和 Group
兩個(gè)結(jié)構(gòu)體:
// 定義一個(gè)空結(jié)構(gòu)體類型 token,會(huì)作為信號(hào)進(jìn)行傳遞,用于控制并發(fā)數(shù) type token struct{} // Group 是一組協(xié)程的集合,這些協(xié)程處理同一整體任務(wù)的子任務(wù) // // 零值 Group 是有效的,對(duì)活動(dòng)協(xié)程的數(shù)量沒有限制,并且不會(huì)在出錯(cuò)時(shí)取消 type Group struct { cancel func(error) // 取消函數(shù),就是 context.CancelCauseFunc 類型 wg sync.WaitGroup // 內(nèi)部使用了 sync.WaitGroup sem chan token // 信號(hào) channel,可以控制協(xié)程并發(fā)數(shù)量 errOnce sync.Once // 確保錯(cuò)誤僅處理一次 err error // 記錄子協(xié)程集中返回的第一個(gè)錯(cuò)誤 }
token
被定義為空結(jié)構(gòu)體,用來傳遞信號(hào),這也是 Go 中空結(jié)構(gòu)體的慣用法。
NOTE:
你可以在我的另一篇文章《Go 中空結(jié)構(gòu)體慣用法,我?guī)湍憧偨Y(jié)全了!》中查看空結(jié)構(gòu)體的更多用法。
Group
是 errgroup
包提供的唯一公開結(jié)構(gòu)體,其關(guān)聯(lián)的方法承載了所有功能。
cancel
屬性為一個(gè)函數(shù),上下文取消時(shí)會(huì)被調(diào)用,其實(shí)就是 context.CancelCauseFunc
類型,調(diào)用 errgroup.WithContext
時(shí)被賦值。
wg
屬性即為 sync.WaitGroup
,承擔(dān)并發(fā)控制的主邏輯,errgroup.Go
和 errgroup.TryGo
內(nèi)部并發(fā)控制邏輯都會(huì)代理給 sync.WaitGroup
。
sem
屬性是 token
類型的 channel
,用于限制并發(fā)數(shù)量,調(diào)用 errgroup.SetLimit
是被賦值。
err
會(huì)記錄所有 goroutine 中出現(xiàn)的第一個(gè)錯(cuò)誤,由errOnce
確保錯(cuò)誤錯(cuò)誤僅處理一次,所以后面再出現(xiàn)更多的錯(cuò)誤都會(huì)被忽略。
接下來我們先看 errgroup.SetLimit
方法定義:
// SetLimit 限制該 Group 中活動(dòng)的協(xié)程數(shù)量最多為 n,負(fù)值表示沒有限制 // // 任何后續(xù)對(duì) Go 方法的調(diào)用都將阻塞,直到可以在不超過限額的情況下添加活動(dòng)協(xié)程 // // 在 Group 中存在任何活動(dòng)的協(xié)程時(shí),限制不得修改 func (g *Group) SetLimit(n int) { // 傳進(jìn)來的 n 就是 channel 長(zhǎng)度,以此來限制協(xié)程的并發(fā)數(shù)量 if n < 0 { // 這里檢查如果小于 0 則不限制協(xié)程并發(fā)數(shù)量。此外,也不要將其設(shè)置為 0,會(huì)產(chǎn)生死鎖 g.sem = nil return } if len(g.sem) != 0 { // 如果存在活動(dòng)的協(xié)程,調(diào)用此方法將產(chǎn)生 panic panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
errgroup.SetLimit
方法可以限制并發(fā)屬性,其內(nèi)部邏輯很簡(jiǎn)單,不過要注意在調(diào)用 errgroup.Go
或 errgroup.TryGo
方法前調(diào)用 errgroup.SetLimit
,以防程序出現(xiàn) panic
。
然后看下主邏輯 errgroup.Go
方法實(shí)現(xiàn):
// Go 會(huì)在新的協(xié)程中調(diào)用給定的函數(shù) // 它會(huì)阻塞,直到可以在不超過配置的活躍協(xié)程數(shù)量限制的情況下添加新的協(xié)程 // // 首次返回非 nil 錯(cuò)誤的調(diào)用會(huì)取消該 Group 的上下文(context),如果該 context 是通過調(diào)用 WithContext 創(chuàng)建的,該錯(cuò)誤將由 Wait 返回 func (g *Group) Go(f func() error) { if g.sem != nil { // 這個(gè)是限制并發(fā)數(shù)的信號(hào)通道 g.sem <- token{} // 如果超過了配置的活躍協(xié)程數(shù)量限制,向 channel 發(fā)送 token 會(huì)阻塞 } g.wg.Add(1) // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Add(1),將活動(dòng)協(xié)程數(shù)加一 go func() { defer g.done() // 當(dāng)一個(gè)協(xié)程完成時(shí),調(diào)用此方法,內(nèi)部會(huì)將調(diào)用轉(zhuǎn)發(fā)給 sync.WaitGroup.Done() if err := f(); err != nil { // f() 就是我們要執(zhí)行的任務(wù) g.errOnce.Do(func() { // 僅執(zhí)行一次,即只處理一次錯(cuò)誤,所以會(huì)記錄第一個(gè)非 nil 的錯(cuò)誤,與協(xié)程啟動(dòng)順序無關(guān) g.err = err // 記錄錯(cuò)誤 if g.cancel != nil { // 如果 cancel 不為 nil,則調(diào)用取消函數(shù),并設(shè)置 cause g.cancel(g.err) } }) } }() }
首先會(huì)檢測(cè)是否使用 errgroup.SetLimit
方法設(shè)置了并發(fā)限制,如果有限制,則使用 channel
來控制并發(fā)數(shù)量。
否則執(zhí)行主邏輯,其實(shí)就是 sync.WaitGroup
的套路代碼。
在 defer
中調(diào)用了 g.done()
,done
方法定義如下:
// 當(dāng)一個(gè)協(xié)程完成時(shí),調(diào)用此方法 func (g *Group) done() { // 如果設(shè)置了最大并發(fā)數(shù),則 sem 不為 nil,從 channel 中消費(fèi)一個(gè) token,表示一個(gè)協(xié)程已完成 if g.sem != nil { <-g.sem } g.wg.Done() // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Done(),將活動(dòng)協(xié)程數(shù)減一 }
另外,如果某個(gè)任務(wù)返回了錯(cuò)誤,則通過 errOnce
確保錯(cuò)誤只被處理一次,處理方式就是先記錄錯(cuò)誤,然后調(diào)用 cancel
方法。
cancel
實(shí)際上是在 errgroup.WithContext
方法中賦值的:
// WithContext 返回一個(gè)新的 Group 和一個(gè)從 ctx 派生的關(guān)聯(lián) Context // // 派生的 Context 會(huì)在傳遞給 Go 的函數(shù)首次返回非 nil 錯(cuò)誤或 Wait 首次返回時(shí)被取消,以先發(fā)生者為準(zhǔn)。 func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx }
這里的 withCancelCause
有兩種實(shí)現(xiàn)。
如果 Go 版本大于等于 1.20,提供的 withCancelCause
函數(shù)實(shí)現(xiàn)如下:
// 構(gòu)建約束標(biāo)識(shí)了這個(gè)文件是 Go 1.20 版本被加入的 //go:build go1.20 package errgroup import "context" // 代理到 context.WithCancelCause func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
如果 Go 版本小于 1.20,提供的 withCancelCause
函數(shù)實(shí)現(xiàn)如下:
//go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
因?yàn)?nbsp;context.WithCancelCause
方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在這個(gè) Commit: 93782cc 中看到 withCancelCause
函數(shù)變更記錄。
調(diào)用 errgroup.Go
方法啟動(dòng)任務(wù)后,我們會(huì)調(diào)用 errgroup.Wait
等待所有任務(wù)完成,其實(shí)現(xiàn)如下:
// Wait 會(huì)阻塞,直到來自 Go 方法的所有函數(shù)調(diào)用返回,然后返回它們中的第一個(gè)非 nil 錯(cuò)誤(如果有的話) func (g *Group) Wait() error { g.wg.Wait() // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Wait(),等待所有協(xié)程執(zhí)行完成 if g.cancel != nil { // 如果 cancel 不為 nil,則調(diào)用取消函數(shù),并設(shè)置 cause g.cancel(g.err) } return g.err // 返回錯(cuò)誤 }
所以,最終 errgroup.Wait
返回的錯(cuò)誤其實(shí)就是 errgroup.Go
方法中記錄的第一個(gè)錯(cuò)誤。
現(xiàn)在,我們還剩下最后一個(gè)方法 errgroup.TryGo
的源碼沒有分析,我把源碼貼在下面,并寫上了詳細(xì)的注釋:
// TryGo 僅在 Group 中活動(dòng)的協(xié)程數(shù)量低于限額時(shí),才在新的協(xié)程中調(diào)用給定的函數(shù) // // 返回值標(biāo)識(shí)協(xié)程是否啟動(dòng) func (g *Group) TryGo(f func() error) bool { if g.sem != nil { // 如果設(shè)置了最大并發(fā)數(shù) select { case g.sem <- token{}: // 可以向 channel 寫入 token,說明沒有達(dá)到限額,可以啟動(dòng)協(xié)程 // Note: this allows barging iff channels in general allow barging. default: // 如果超過了配置的活躍協(xié)程數(shù)量限制,會(huì)走到這個(gè) case return false } } // 接下來的代碼與 Go 中的邏輯相同 g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true }
主邏輯與 errgroup.Go
方法一樣,不同的是 errgroup.Go
方法如果達(dá)到并發(fā)限額會(huì)阻塞,而 errgroup.TryGo
方法在達(dá)到并發(fā)限額時(shí)直接返回 false
。
其實(shí) <font style="color:rgb(31, 35, 40);">errgroup.TryGo</font>
和 <font style="color:rgb(31, 35, 40);">errgroup.SetLimit</font>
兩個(gè)方法是后添加的功能,你可以在 issues/27837 中看到討論記錄。
至此,errgroup
源碼就都解讀完成了。
總結(jié)
errgroup
是官方為我們提供的擴(kuò)展庫(kù),在 sync.WaitGroup
基礎(chǔ)上,增加了處理任務(wù)返回錯(cuò)誤的能力。提供了同步、錯(cuò)誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務(wù)的子任務(wù)。
errgroup.WithContext
方法可以附加取消功能,在任意一個(gè) goroutine 返回錯(cuò)誤時(shí),立即取消其他正在運(yùn)行的 goroutine,并在 Wait
方法中返回第一個(gè)非 nil
的錯(cuò)誤。
errgroup.SetLimit
方法可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
errgroup.TryGo
可以嘗試啟動(dòng)一個(gè)任務(wù),返回值標(biāo)識(shí)啟動(dòng)成功或失敗。
errgroup
源碼設(shè)計(jì)精妙,值得借鑒。
以上就是Golang并發(fā)控制之errgroup使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang errgroup的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Go語言連接和操作數(shù)據(jù)庫(kù)的基本步驟
在Go語言中,連接和操作數(shù)據(jù)庫(kù)通常使用database/sql包,它提供了一個(gè)數(shù)據(jù)庫(kù)抽象層,支持多種數(shù)據(jù)庫(kù)引擎,如MySQL、PostgreSQL、SQLite等,下面我將以MySQL為例,詳細(xì)講解如何使用Go語言連接和操作數(shù)據(jù)庫(kù),需要的朋友可以參考下2024-06-06golang利用函數(shù)閉包實(shí)現(xiàn)簡(jiǎn)單的中間件
中間件設(shè)計(jì)模式是一種常見的軟件設(shè)計(jì)模式,它在許多編程語言和框架中被廣泛應(yīng)用,這篇文章主要為大家介紹一下golang利用函數(shù)閉包實(shí)現(xiàn)一個(gè)簡(jiǎn)單的中間件,感興趣的可以了解下2023-10-10golang?select?機(jī)制和超時(shí)問題
golang 中的協(xié)程使用非常方便,但是協(xié)程什么時(shí)候結(jié)束是一個(gè)控制問題,可以用 select 配合使用,這篇文章主要介紹了golang?select?機(jī)制和超時(shí)問題,需要的朋友可以參考下2022-06-06如何讓shell終端和goland控制臺(tái)輸出彩色的文字
這篇文章主要介紹了如何讓shell終端和goland控制臺(tái)輸出彩色的文字的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-05-05