Golang并發(fā)控制之errgroup使用詳解
errgroup 是 Go 官方庫 x 中提供的一個(gè)非常實(shí)用的工具,用于并發(fā)執(zhí)行多個(gè) goroutine,并且方便的處理錯(cuò)誤。
我們知道,Go 標(biāo)準(zhǔn)庫中有個(gè) sync.WaitGroup 可以用來并發(fā)執(zhí)行多個(gè) goroutine,errgroup 就是在其基礎(chǔ)上實(shí)現(xiàn)了 errgroup.Group。不過,errgroup.Group 和 sync.WaitGroup 在功能上是有區(qū)別的,盡管它們都用于管理 goroutine 的同步。
errgroup 優(yōu)勢
與 sync.WaitGroup 相比,以下是設(shè)計(jì) errgroup.Group 的原因和優(yōu)勢:
錯(cuò)誤處理:
sync.WaitGroup只負(fù)責(zé)等待 goroutine 完成,不處理 goroutine 的返回值或錯(cuò)誤。errgroup.Group雖然目前也不能直接處理 goroutine 的返回值,但在 goroutine 返回錯(cuò)誤時(shí),可以立即取消其他正在運(yùn)行的 goroutine,并在Wait方法中返回第一個(gè)非nil的錯(cuò)誤。
上下文取消:
errgroup 可以與 context.Context 配合使用,支持在某個(gè) goroutine 出現(xiàn)錯(cuò)誤時(shí)自動(dòng)取消其他 goroutine,這樣可以更好地控制資源,避免不必要的工作。
簡化并發(fā)編程:
使用 errgroup 可以減少錯(cuò)誤處理的樣板代碼,開發(fā)者不需要手動(dòng)管理錯(cuò)誤狀態(tài)和同步邏輯,使得并發(fā)編程更簡單、更易于維護(hù)。
限制并發(fā)數(shù)量:
errgroup 提供了便捷的接口來限制并發(fā) goroutine 的數(shù)量,避免過載,而 sync.WaitGroup 沒有這樣的功能。
以上,errgroup 為處理并發(fā)任務(wù)提供了更強(qiáng)大的錯(cuò)誤管理和控制機(jī)制,因此在許多并發(fā)場景下是更優(yōu)的選擇。
隨著本文接下來的深入講解,你就能深刻體會(huì)到上面所說的優(yōu)勢了。
sync.WaitGroup 使用示例
在介紹 errgroup.Group 前,我們還是先來一起回顧下 sync.WaitGroup 的用法。
示例如下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗
}
var err error
var wg sync.WaitGroup // 零值可用,不必顯式初始化
for _, url := range urls {
wg.Add(1) // 增加 WaitGroup 計(jì)數(shù)器
// 啟動(dòng)一個(gè) goroutine 來獲取 URL
go func() {
defer wg.Done() // 當(dāng) goroutine 完成時(shí)遞減 WaitGroup 計(jì)數(shù)器
resp, e := http.Get(url)
if e != nil { // 發(fā)生錯(cuò)誤返回,并記錄該錯(cuò)誤
err = e
return
}
defer resp.Body.Close()
fmt.Printf("fetch url %s status %s\n", url, resp.Status)
}()
}
// 等待所有 goroutine 執(zhí)行完成
wg.Wait()
if err != nil { // err 會(huì)記錄最后一個(gè)錯(cuò)誤
fmt.Printf("Error: %s\n", err)
}
}
示例中,我們使用 sync.WaitGroup 來啟動(dòng) 3 個(gè) goroutine 并發(fā)訪問 3 個(gè)不同的 URL,并在成功時(shí)打印響應(yīng)狀態(tài)碼,或失敗時(shí)記錄錯(cuò)誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們獲取了兩個(gè)成功的響應(yīng),并打印了一條錯(cuò)誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup 最典型的慣用法:
var wg sync.WaitGroup
for ... {
wg.Add(1)
go func() {
defer wg.Done()
// do something
}()
}
wg.Wait()
errgroup.Group 使用示例
其實(shí) errgroup.Group 的使用套路與 sync.WaitGroup 非常類似。
基本使用
errgroup 基本使用套路如下:
- 導(dǎo)入
errgroup包。 - 創(chuàng)建一個(gè)
errgroup.Group實(shí)例。 - 使用
Group.Go方法啟動(dòng)多個(gè)并發(fā)任務(wù)。 - 使用
Group.Wait方法等待所有 goroutine 完成或有一個(gè)返回錯(cuò)誤。
將前文中的 sync.WaitGroup 程序示例使用 errgroup.Group 重寫為如下示例:
package main
import (
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗
}
// 使用 errgroup 創(chuàng)建一個(gè)新的 goroutine 組
var g errgroup.Group // 零值可用,不必顯式初始化
for _, url := range urls {
// 使用 errgroup 啟動(dòng)一個(gè) goroutine 來獲取 URL
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤
}
defer resp.Body.Close()
fmt.Printf("fetch url %s status %s\n", url, resp.Status)
return nil // 返回 nil 表示成功
})
}
// 等待所有 goroutine 完成并返回第一個(gè)錯(cuò)誤(如果有)
if err := g.Wait(); err != nil {
fmt.Printf("Error: %s\n", err)
}
}
可以發(fā)現(xiàn),這段程序與 sync.WaitGroup 示例很像,根據(jù)代碼中的注釋,很容易看懂。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
輸出結(jié)果也沒什么變化。
上下文取消
errgroup 提供了 errgroup.WithContext 可以附加取消功能,在任意一個(gè) goroutine 返回錯(cuò)誤時(shí),可以立即取消其他正在運(yùn)行的 goroutine,并在 Wait 方法中返回第一個(gè)非 nil 的錯(cuò)誤。
示例如下:
package main
import (
"context"
"fmt"
"net/http"
"sync"
"golang.org/x/sync/errgroup"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/", // 這是一個(gè)錯(cuò)誤的 URL,會(huì)導(dǎo)致任務(wù)失敗
}
// 創(chuàng)建一個(gè)帶有 context 的 errgroup
// 任何一個(gè) goroutine 返回非 nil 的錯(cuò)誤,或 Wait() 等待所有 goroutine 完成后,context 都會(huì)被取消
g, ctx := errgroup.WithContext(context.Background())
// 創(chuàng)建一個(gè) map 來保存結(jié)果
var result sync.Map
for _, url := range urls {
// 使用 errgroup 啟動(dòng)一個(gè) goroutine 來獲取 URL
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤
}
// 發(fā)起請求
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // 發(fā)生錯(cuò)誤,返回該錯(cuò)誤
}
defer resp.Body.Close()
// 保存每個(gè) URL 的響應(yīng)狀態(tài)碼
result.Store(url, resp.Status)
return nil // 返回 nil 表示成功
})
}
// 等待所有 goroutine 完成并返回第一個(gè)錯(cuò)誤(如果有)
if err := g.Wait(); err != nil {
fmt.Println("Error: ", err)
}
// 所有 goroutine 都執(zhí)行完成,遍歷并打印成功的結(jié)果
result.Range(func(key, value any) bool {
fmt.Printf("fetch url %s status %s\n", key, value)
return true
})
}
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/withcontext/main.go
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK
由測試結(jié)果來看,對于 [http://www.google.com/](http://www.google.com/) 的請求可以接收到成功響應(yīng),由于對 [http://www.somestupidname.com/](http://www.somestupidname.com/) 請求報(bào)錯(cuò),程序來不及等待 [http://www.golang.org/](http://www.golang.org/) 響應(yīng),就被取消了。
其實(shí)我們大致可以猜測到,取消功能應(yīng)該是通過 context.cancelCtx 來實(shí)現(xiàn)的,我們暫且不必深究,稍后探索源碼就能驗(yàn)證我們的猜想了。
限制并發(fā)數(shù)量
errgroup 提供了 errgroup.SetLimit 可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
示例如下:
package main
import (
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
// 創(chuàng)建一個(gè) errgroup.Group
var g errgroup.Group
// 設(shè)置最大并發(fā)限制為 3
g.SetLimit(3)
// 啟動(dòng) 10 個(gè) goroutine
for i := 1; i <= 10; i++ {
g.Go(func() error {
// 打印正在運(yùn)行的 goroutine
fmt.Printf("Goroutine %d is starting\n", i)
time.Sleep(2 * time.Second) // 模擬任務(wù)耗時(shí)
fmt.Printf("Goroutine %d is done\n", i)
return nil
})
}
// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error: %v\n", err)
}
fmt.Println("All goroutines complete.")
}
使用 g.SetLimit(3) 可以限制最大并發(fā)為 3 個(gè) goroutine。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
根據(jù)輸出可以發(fā)現(xiàn),雖然我們通過 for 循環(huán)啟動(dòng)了 10 個(gè) goroutine,但程序執(zhí)行時(shí)最多只允許同時(shí)啟動(dòng) 3 個(gè) goroutine,當(dāng)這 3 個(gè) goroutine 中有某個(gè)執(zhí)行完成并退出,才會(huì)有新的 goroutine 被啟動(dòng)。
嘗試啟動(dòng)
errgroup 還提供了 errgroup.TryGo 可以嘗試啟動(dòng)一個(gè)任務(wù),它返回一個(gè) bool 值,標(biāo)識(shí)任務(wù)是否啟動(dòng)成功,true 表示成功,false 表示失敗。
errgroup.TryGo 需要搭配 errgroup.SetLimit 一同使用,因?yàn)槿绻幌拗撇l(fā)數(shù)量,那么 errgroup.TryGo 始終返回 true,當(dāng)達(dá)到最大并發(fā)數(shù)量限制時(shí),errgroup.TryGo 返回 false。
示例如下:
package main
import (
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
// 創(chuàng)建一個(gè) errgroup.Group
var g errgroup.Group
// 設(shè)置最大并發(fā)限制為 3
g.SetLimit(3)
// 啟動(dòng) 10 個(gè) goroutine
for i := 1; i <= 10; i++ {
if g.TryGo(func() error {
// 打印正在運(yùn)行的 goroutine
fmt.Printf("Goroutine %d is starting\n", i)
time.Sleep(2 * time.Second) // 模擬工作
fmt.Printf("Goroutine %d is done\n", i)
return nil
}) {
// 如果成功啟動(dòng),打印提示
fmt.Printf("Goroutine %d started successfully\n", i)
} else {
// 如果達(dá)到并發(fā)限制,打印提示
fmt.Printf("Goroutine %d could not start (limit reached)\n", i)
}
}
// 等待所有 goroutine 完成
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error: %v\n", err)
}
fmt.Println("All goroutines complete.")
}
使用 g.SetLimit(3) 限制最大并發(fā)為 3 個(gè) goroutine,調(diào)用 g.TryGo 如果啟動(dòng)任務(wù)成功,打印 Goroutine {i} started successfully 提示信息;啟動(dòng)任務(wù)失敗,則打印 Goroutine {i} could not start (limit reached) 提示信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
因?yàn)橄拗谱畲蟛l(fā)數(shù)量為 3,所以前面 3 個(gè) goroutine 啟動(dòng)成功,并且正常執(zhí)行完成,其他幾個(gè) goroutine 全部執(zhí)行失敗。
以上就是 errgroup 的全部用法了,更多使用場景你可以在實(shí)踐中去嘗試和感悟。
源碼解讀
接下來,我們一起閱讀下 errgroup 源碼,以此來加深對 errgroup 的理解。
errgroup 源碼非常少,僅有 3 個(gè)文件。這 3 個(gè)文件源碼內(nèi)容分別如下:
主邏輯代碼:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup
import (
"context"
"fmt"
"sync"
)
type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
為 Go 1.20 及更高版本提供的 withCancelCause 函數(shù)實(shí)現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}
為低于 Go 1.20 版本提供的 withCancelCause 函數(shù)實(shí)現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}
可以看到,errgroup 全部源碼加起來也不到 100 行,可謂短小精悍。
現(xiàn)在我們來分析下 errgroup 源碼。
根據(jù)包注釋我們可以知道,errgroup 包提供了同步、錯(cuò)誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務(wù)的子任務(wù)。errgroup.Group 與 sync.WaitGroup 相關(guān),增加了處理任務(wù)返回錯(cuò)誤的能力。
為了提供以上功能,首先 errgroup 定義了 token 和 Group 兩個(gè)結(jié)構(gòu)體:
// 定義一個(gè)空結(jié)構(gòu)體類型 token,會(huì)作為信號進(jìn)行傳遞,用于控制并發(fā)數(shù)
type token struct{}
// Group 是一組協(xié)程的集合,這些協(xié)程處理同一整體任務(wù)的子任務(wù)
//
// 零值 Group 是有效的,對活動(dòng)協(xié)程的數(shù)量沒有限制,并且不會(huì)在出錯(cuò)時(shí)取消
type Group struct {
cancel func(error) // 取消函數(shù),就是 context.CancelCauseFunc 類型
wg sync.WaitGroup // 內(nèi)部使用了 sync.WaitGroup
sem chan token // 信號 channel,可以控制協(xié)程并發(fā)數(shù)量
errOnce sync.Once // 確保錯(cuò)誤僅處理一次
err error // 記錄子協(xié)程集中返回的第一個(gè)錯(cuò)誤
}
token 被定義為空結(jié)構(gòu)體,用來傳遞信號,這也是 Go 中空結(jié)構(gòu)體的慣用法。
NOTE:
你可以在我的另一篇文章《Go 中空結(jié)構(gòu)體慣用法,我?guī)湍憧偨Y(jié)全了!》中查看空結(jié)構(gòu)體的更多用法。
Group 是 errgroup 包提供的唯一公開結(jié)構(gòu)體,其關(guān)聯(lián)的方法承載了所有功能。
cancel 屬性為一個(gè)函數(shù),上下文取消時(shí)會(huì)被調(diào)用,其實(shí)就是 context.CancelCauseFunc 類型,調(diào)用 errgroup.WithContext 時(shí)被賦值。
wg 屬性即為 sync.WaitGroup,承擔(dān)并發(fā)控制的主邏輯,errgroup.Go 和 errgroup.TryGo 內(nèi)部并發(fā)控制邏輯都會(huì)代理給 sync.WaitGroup。
sem屬性是 token 類型的 channel,用于限制并發(fā)數(shù)量,調(diào)用 errgroup.SetLimit 是被賦值。
err 會(huì)記錄所有 goroutine 中出現(xiàn)的第一個(gè)錯(cuò)誤,由errOnce 確保錯(cuò)誤錯(cuò)誤僅處理一次,所以后面再出現(xiàn)更多的錯(cuò)誤都會(huì)被忽略。
接下來我們先看 errgroup.SetLimit 方法定義:
// SetLimit 限制該 Group 中活動(dòng)的協(xié)程數(shù)量最多為 n,負(fù)值表示沒有限制
//
// 任何后續(xù)對 Go 方法的調(diào)用都將阻塞,直到可以在不超過限額的情況下添加活動(dòng)協(xié)程
//
// 在 Group 中存在任何活動(dòng)的協(xié)程時(shí),限制不得修改
func (g *Group) SetLimit(n int) { // 傳進(jìn)來的 n 就是 channel 長度,以此來限制協(xié)程的并發(fā)數(shù)量
if n < 0 { // 這里檢查如果小于 0 則不限制協(xié)程并發(fā)數(shù)量。此外,也不要將其設(shè)置為 0,會(huì)產(chǎn)生死鎖
g.sem = nil
return
}
if len(g.sem) != 0 { // 如果存在活動(dòng)的協(xié)程,調(diào)用此方法將產(chǎn)生 panic
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
errgroup.SetLimit 方法可以限制并發(fā)屬性,其內(nèi)部邏輯很簡單,不過要注意在調(diào)用 errgroup.Go 或 errgroup.TryGo 方法前調(diào)用 errgroup.SetLimit,以防程序出現(xiàn) panic。
然后看下主邏輯 errgroup.Go 方法實(shí)現(xiàn):
// Go 會(huì)在新的協(xié)程中調(diào)用給定的函數(shù)
// 它會(huì)阻塞,直到可以在不超過配置的活躍協(xié)程數(shù)量限制的情況下添加新的協(xié)程
//
// 首次返回非 nil 錯(cuò)誤的調(diào)用會(huì)取消該 Group 的上下文(context),如果該 context 是通過調(diào)用 WithContext 創(chuàng)建的,該錯(cuò)誤將由 Wait 返回
func (g *Group) Go(f func() error) {
if g.sem != nil { // 這個(gè)是限制并發(fā)數(shù)的信號通道
g.sem <- token{} // 如果超過了配置的活躍協(xié)程數(shù)量限制,向 channel 發(fā)送 token 會(huì)阻塞
}
g.wg.Add(1) // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Add(1),將活動(dòng)協(xié)程數(shù)加一
go func() {
defer g.done() // 當(dāng)一個(gè)協(xié)程完成時(shí),調(diào)用此方法,內(nèi)部會(huì)將調(diào)用轉(zhuǎn)發(fā)給 sync.WaitGroup.Done()
if err := f(); err != nil { // f() 就是我們要執(zhí)行的任務(wù)
g.errOnce.Do(func() { // 僅執(zhí)行一次,即只處理一次錯(cuò)誤,所以會(huì)記錄第一個(gè)非 nil 的錯(cuò)誤,與協(xié)程啟動(dòng)順序無關(guān)
g.err = err // 記錄錯(cuò)誤
if g.cancel != nil { // 如果 cancel 不為 nil,則調(diào)用取消函數(shù),并設(shè)置 cause
g.cancel(g.err)
}
})
}
}()
}
首先會(huì)檢測是否使用 errgroup.SetLimit 方法設(shè)置了并發(fā)限制,如果有限制,則使用 channel 來控制并發(fā)數(shù)量。
否則執(zhí)行主邏輯,其實(shí)就是 sync.WaitGroup 的套路代碼。
在 defer 中調(diào)用了 g.done(),done 方法定義如下:
// 當(dāng)一個(gè)協(xié)程完成時(shí),調(diào)用此方法
func (g *Group) done() {
// 如果設(shè)置了最大并發(fā)數(shù),則 sem 不為 nil,從 channel 中消費(fèi)一個(gè) token,表示一個(gè)協(xié)程已完成
if g.sem != nil {
<-g.sem
}
g.wg.Done() // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Done(),將活動(dòng)協(xié)程數(shù)減一
}
另外,如果某個(gè)任務(wù)返回了錯(cuò)誤,則通過 errOnce 確保錯(cuò)誤只被處理一次,處理方式就是先記錄錯(cuò)誤,然后調(diào)用 cancel 方法。
cancel 實(shí)際上是在 errgroup.WithContext 方法中賦值的:
// WithContext 返回一個(gè)新的 Group 和一個(gè)從 ctx 派生的關(guān)聯(lián) Context
//
// 派生的 Context 會(huì)在傳遞給 Go 的函數(shù)首次返回非 nil 錯(cuò)誤或 Wait 首次返回時(shí)被取消,以先發(fā)生者為準(zhǔn)。
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}
這里的 withCancelCause 有兩種實(shí)現(xiàn)。
如果 Go 版本大于等于 1.20,提供的 withCancelCause 函數(shù)實(shí)現(xiàn)如下:
// 構(gòu)建約束標(biāo)識(shí)了這個(gè)文件是 Go 1.20 版本被加入的
//go:build go1.20
package errgroup
import "context"
// 代理到 context.WithCancelCause
func withCancelCause(parent context.Context) (context.Context, func(error)) {
return context.WithCancelCause(parent)
}
如果 Go 版本小于 1.20,提供的 withCancelCause 函數(shù)實(shí)現(xiàn)如下:
//go:build !go1.20
package errgroup
import "context"
func withCancelCause(parent context.Context) (context.Context, func(error)) {
ctx, cancel := context.WithCancel(parent)
return ctx, func(error) { cancel() }
}
因?yàn)?nbsp;context.WithCancelCause 方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在這個(gè) Commit: 93782cc 中看到 withCancelCause 函數(shù)變更記錄。
調(diào)用 errgroup.Go 方法啟動(dòng)任務(wù)后,我們會(huì)調(diào)用 errgroup.Wait 等待所有任務(wù)完成,其實(shí)現(xiàn)如下:
// Wait 會(huì)阻塞,直到來自 Go 方法的所有函數(shù)調(diào)用返回,然后返回它們中的第一個(gè)非 nil 錯(cuò)誤(如果有的話)
func (g *Group) Wait() error {
g.wg.Wait() // 轉(zhuǎn)發(fā)給 sync.WaitGroup.Wait(),等待所有協(xié)程執(zhí)行完成
if g.cancel != nil { // 如果 cancel 不為 nil,則調(diào)用取消函數(shù),并設(shè)置 cause
g.cancel(g.err)
}
return g.err // 返回錯(cuò)誤
}
所以,最終 errgroup.Wait 返回的錯(cuò)誤其實(shí)就是 errgroup.Go 方法中記錄的第一個(gè)錯(cuò)誤。
現(xiàn)在,我們還剩下最后一個(gè)方法 errgroup.TryGo 的源碼沒有分析,我把源碼貼在下面,并寫上了詳細(xì)的注釋:
// TryGo 僅在 Group 中活動(dòng)的協(xié)程數(shù)量低于限額時(shí),才在新的協(xié)程中調(diào)用給定的函數(shù)
//
// 返回值標(biāo)識(shí)協(xié)程是否啟動(dòng)
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil { // 如果設(shè)置了最大并發(fā)數(shù)
select {
case g.sem <- token{}: // 可以向 channel 寫入 token,說明沒有達(dá)到限額,可以啟動(dòng)協(xié)程
// Note: this allows barging iff channels in general allow barging.
default: // 如果超過了配置的活躍協(xié)程數(shù)量限制,會(huì)走到這個(gè) case
return false
}
}
// 接下來的代碼與 Go 中的邏輯相同
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}
主邏輯與 errgroup.Go 方法一樣,不同的是 errgroup.Go 方法如果達(dá)到并發(fā)限額會(huì)阻塞,而 errgroup.TryGo 方法在達(dá)到并發(fā)限額時(shí)直接返回 false。
其實(shí) <font style="color:rgb(31, 35, 40);">errgroup.TryGo</font> 和 <font style="color:rgb(31, 35, 40);">errgroup.SetLimit</font> 兩個(gè)方法是后添加的功能,你可以在 issues/27837 中看到討論記錄。
至此,errgroup 源碼就都解讀完成了。
總結(jié)
errgroup 是官方為我們提供的擴(kuò)展庫,在 sync.WaitGroup 基礎(chǔ)上,增加了處理任務(wù)返回錯(cuò)誤的能力。提供了同步、錯(cuò)誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務(wù)的子任務(wù)。
errgroup.WithContext 方法可以附加取消功能,在任意一個(gè) goroutine 返回錯(cuò)誤時(shí),立即取消其他正在運(yùn)行的 goroutine,并在 Wait 方法中返回第一個(gè)非 nil 的錯(cuò)誤。
errgroup.SetLimit 方法可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
errgroup.TryGo 可以嘗試啟動(dòng)一個(gè)任務(wù),返回值標(biāo)識(shí)啟動(dòng)成功或失敗。
errgroup 源碼設(shè)計(jì)精妙,值得借鑒。
以上就是Golang并發(fā)控制之errgroup使用詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang errgroup的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang利用函數(shù)閉包實(shí)現(xiàn)簡單的中間件
中間件設(shè)計(jì)模式是一種常見的軟件設(shè)計(jì)模式,它在許多編程語言和框架中被廣泛應(yīng)用,這篇文章主要為大家介紹一下golang利用函數(shù)閉包實(shí)現(xiàn)一個(gè)簡單的中間件,感興趣的可以了解下2023-10-10
golang?select?機(jī)制和超時(shí)問題
golang 中的協(xié)程使用非常方便,但是協(xié)程什么時(shí)候結(jié)束是一個(gè)控制問題,可以用 select 配合使用,這篇文章主要介紹了golang?select?機(jī)制和超時(shí)問題,需要的朋友可以參考下2022-06-06
如何讓shell終端和goland控制臺(tái)輸出彩色的文字
這篇文章主要介紹了如何讓shell終端和goland控制臺(tái)輸出彩色的文字的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05

