Golang并發(fā)控制之errgroup使用詳解
errgroup
是 Go 官方庫 x 中提供的一個非常實用的工具,用于并發(fā)執(zhí)行多個 goroutine,并且方便的處理錯誤。
我們知道,Go 標準庫中有個 sync.WaitGroup
可以用來并發(fā)執(zhí)行多個 goroutine,errgroup
就是在其基礎上實現(xiàn)了 errgroup.Group
。不過,errgroup.Group
和 sync.WaitGroup
在功能上是有區(qū)別的,盡管它們都用于管理 goroutine 的同步。
errgroup 優(yōu)勢
與 sync.WaitGroup
相比,以下是設計 errgroup.Group
的原因和優(yōu)勢:
錯誤處理:
sync.WaitGroup
只負責等待 goroutine 完成,不處理 goroutine 的返回值或錯誤。errgroup.Group
雖然目前也不能直接處理 goroutine 的返回值,但在 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在Wait
方法中返回第一個非nil
的錯誤。
上下文取消:
errgroup
可以與 context.Context
配合使用,支持在某個 goroutine 出現(xiàn)錯誤時自動取消其他 goroutine,這樣可以更好地控制資源,避免不必要的工作。
簡化并發(fā)編程:
使用 errgroup
可以減少錯誤處理的樣板代碼,開發(fā)者不需要手動管理錯誤狀態(tài)和同步邏輯,使得并發(fā)編程更簡單、更易于維護。
限制并發(fā)數(shù)量:
errgroup
提供了便捷的接口來限制并發(fā) goroutine 的數(shù)量,避免過載,而 sync.WaitGroup
沒有這樣的功能。
以上,errgroup
為處理并發(fā)任務提供了更強大的錯誤管理和控制機制,因此在許多并發(fā)場景下是更優(yōu)的選擇。
隨著本文接下來的深入講解,你就能深刻體會到上面所說的優(yōu)勢了。
sync.WaitGroup 使用示例
在介紹 errgroup.Group
前,我們還是先來一起回顧下 sync.WaitGroup
的用法。
示例如下:
package main import ( "fmt" "net/http" "sync" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } var err error var wg sync.WaitGroup // 零值可用,不必顯式初始化 for _, url := range urls { wg.Add(1) // 增加 WaitGroup 計數(shù)器 // 啟動一個 goroutine 來獲取 URL go func() { defer wg.Done() // 當 goroutine 完成時遞減 WaitGroup 計數(shù)器 resp, e := http.Get(url) if e != nil { // 發(fā)生錯誤返回,并記錄該錯誤 err = e return } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) }() } // 等待所有 goroutine 執(zhí)行完成 wg.Wait() if err != nil { // err 會記錄最后一個錯誤 fmt.Printf("Error: %s\n", err) } }
示例中,我們使用 sync.WaitGroup
來啟動 3 個 goroutine 并發(fā)訪問 3 個不同的 URL
,并在成功時打印響應狀態(tài)碼,或失敗時記錄錯誤信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
我們獲取了兩個成功的響應,并打印了一條錯誤信息。
根據(jù)示例,我們可以抽象出 sync.WaitGroup
最典型的慣用法:
var wg sync.WaitGroup for ... { wg.Add(1) go func() { defer wg.Done() // do something }() } wg.Wait()
errgroup.Group 使用示例
其實 errgroup.Group
的使用套路與 sync.WaitGroup
非常類似。
基本使用
errgroup
基本使用套路如下:
- 導入
errgroup
包。 - 創(chuàng)建一個
errgroup.Group
實例。 - 使用
Group.Go
方法啟動多個并發(fā)任務。 - 使用
Group.Wait
方法等待所有 goroutine 完成或有一個返回錯誤。
將前文中的 sync.WaitGroup
程序示例使用 errgroup.Group
重寫為如下示例:
package main import ( "fmt" "net/http" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } // 使用 errgroup 創(chuàng)建一個新的 goroutine 組 var g errgroup.Group // 零值可用,不必顯式初始化 for _, url := range urls { // 使用 errgroup 啟動一個 goroutine 來獲取 URL g.Go(func() error { resp, err := http.Get(url) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } defer resp.Body.Close() fmt.Printf("fetch url %s status %s\n", url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個錯誤(如果有) if err := g.Wait(); err != nil { fmt.Printf("Error: %s\n", err) } }
可以發(fā)現(xiàn),這段程序與 sync.WaitGroup
示例很像,根據(jù)代碼中的注釋,很容易看懂。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
輸出結果也沒什么變化。
上下文取消
errgroup
提供了 errgroup.WithContext
可以附加取消功能,在任意一個 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在 Wait
方法中返回第一個非 nil
的錯誤。
示例如下:
package main import ( "context" "fmt" "net/http" "sync" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗 } // 創(chuàng)建一個帶有 context 的 errgroup // 任何一個 goroutine 返回非 nil 的錯誤,或 Wait() 等待所有 goroutine 完成后,context 都會被取消 g, ctx := errgroup.WithContext(context.Background()) // 創(chuàng)建一個 map 來保存結果 var result sync.Map for _, url := range urls { // 使用 errgroup 啟動一個 goroutine 來獲取 URL g.Go(func() error { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } // 發(fā)起請求 resp, err := http.DefaultClient.Do(req) if err != nil { return err // 發(fā)生錯誤,返回該錯誤 } defer resp.Body.Close() // 保存每個 URL 的響應狀態(tài)碼 result.Store(url, resp.Status) return nil // 返回 nil 表示成功 }) } // 等待所有 goroutine 完成并返回第一個錯誤(如果有) if err := g.Wait(); err != nil { fmt.Println("Error: ", err) } // 所有 goroutine 都執(zhí)行完成,遍歷并打印成功的結果 result.Range(func(key, value any) bool { fmt.Printf("fetch url %s status %s\n", key, value) return true }) }
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/withcontext/main.go
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK
由測試結果來看,對于 [http://www.google.com/](http://www.google.com/)
的請求可以接收到成功響應,由于對 [http://www.somestupidname.com/](http://www.somestupidname.com/)
請求報錯,程序來不及等待 [http://www.golang.org/](http://www.golang.org/)
響應,就被取消了。
其實我們大致可以猜測到,取消功能應該是通過 context.cancelCtx
來實現(xiàn)的,我們暫且不必深究,稍后探索源碼就能驗證我們的猜想了。
限制并發(fā)數(shù)量
errgroup
提供了 errgroup.SetLimit
可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個 errgroup.Group var g errgroup.Group // 設置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動 10 個 goroutine for i := 1; i <= 10; i++ { g.Go(func() error { // 打印正在運行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬任務耗時 fmt.Printf("Goroutine %d is done\n", i) return nil }) } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
可以限制最大并發(fā)為 3 個 goroutine。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.
根據(jù)輸出可以發(fā)現(xiàn),雖然我們通過 for
循環(huán)啟動了 10 個 goroutine,但程序執(zhí)行時最多只允許同時啟動 3 個 goroutine,當這 3 個 goroutine 中有某個執(zhí)行完成并退出,才會有新的 goroutine 被啟動。
嘗試啟動
errgroup
還提供了 errgroup.TryGo
可以嘗試啟動一個任務,它返回一個 bool
值,標識任務是否啟動成功,true
表示成功,false
表示失敗。
errgroup.TryGo
需要搭配 errgroup.SetLimit
一同使用,因為如果不限制并發(fā)數(shù)量,那么 errgroup.TryGo
始終返回 true
,當達到最大并發(fā)數(shù)量限制時,errgroup.TryGo
返回 false
。
示例如下:
package main import ( "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { // 創(chuàng)建一個 errgroup.Group var g errgroup.Group // 設置最大并發(fā)限制為 3 g.SetLimit(3) // 啟動 10 個 goroutine for i := 1; i <= 10; i++ { if g.TryGo(func() error { // 打印正在運行的 goroutine fmt.Printf("Goroutine %d is starting\n", i) time.Sleep(2 * time.Second) // 模擬工作 fmt.Printf("Goroutine %d is done\n", i) return nil }) { // 如果成功啟動,打印提示 fmt.Printf("Goroutine %d started successfully\n", i) } else { // 如果達到并發(fā)限制,打印提示 fmt.Printf("Goroutine %d could not start (limit reached)\n", i) } } // 等待所有 goroutine 完成 if err := g.Wait(); err != nil { fmt.Printf("Encountered an error: %v\n", err) } fmt.Println("All goroutines complete.") }
使用 g.SetLimit(3)
限制最大并發(fā)為 3 個 goroutine,調用 g.TryGo
如果啟動任務成功,打印 Goroutine {i} started successfully
提示信息;啟動任務失敗,則打印 Goroutine {i} could not start (limit reached)
提示信息。
執(zhí)行示例代碼,得到如下輸出:
$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.
因為限制最大并發(fā)數(shù)量為 3,所以前面 3 個 goroutine 啟動成功,并且正常執(zhí)行完成,其他幾個 goroutine 全部執(zhí)行失敗。
以上就是 errgroup
的全部用法了,更多使用場景你可以在實踐中去嘗試和感悟。
源碼解讀
接下來,我們一起閱讀下 errgroup
源碼,以此來加深對 errgroup
的理解。
errgroup
源碼非常少,僅有 3 個文件。這 3 個文件源碼內(nèi)容分別如下:
主邏輯代碼:
https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go
// Copyright 2016 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package errgroup provides synchronization, error propagation, and Context // cancelation for groups of goroutines working on subtasks of a common task. // // [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks // returning errors. package errgroup import ( "context" "fmt" "sync" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func(error) wg sync.WaitGroup sem chan token errOnce sync.Once err error } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } return g.err } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group's context, if the // group was created by calling WithContext. The error will be returned by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
為 Go 1.20 及更高版本提供的 withCancelCause
函數(shù)實現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
為低于 Go 1.20 版本提供的 withCancelCause
函數(shù)實現(xiàn):
https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go
// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. //go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
可以看到,errgroup
全部源碼加起來也不到 100 行,可謂短小精悍。
現(xiàn)在我們來分析下 errgroup
源碼。
根據(jù)包注釋我們可以知道,errgroup
包提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。errgroup.Group
與 sync.WaitGroup
相關,增加了處理任務返回錯誤的能力。
為了提供以上功能,首先 errgroup
定義了 token
和 Group
兩個結構體:
// 定義一個空結構體類型 token,會作為信號進行傳遞,用于控制并發(fā)數(shù) type token struct{} // Group 是一組協(xié)程的集合,這些協(xié)程處理同一整體任務的子任務 // // 零值 Group 是有效的,對活動協(xié)程的數(shù)量沒有限制,并且不會在出錯時取消 type Group struct { cancel func(error) // 取消函數(shù),就是 context.CancelCauseFunc 類型 wg sync.WaitGroup // 內(nèi)部使用了 sync.WaitGroup sem chan token // 信號 channel,可以控制協(xié)程并發(fā)數(shù)量 errOnce sync.Once // 確保錯誤僅處理一次 err error // 記錄子協(xié)程集中返回的第一個錯誤 }
token
被定義為空結構體,用來傳遞信號,這也是 Go 中空結構體的慣用法。
NOTE:
你可以在我的另一篇文章《Go 中空結構體慣用法,我?guī)湍憧偨Y全了!》中查看空結構體的更多用法。
Group
是 errgroup
包提供的唯一公開結構體,其關聯(lián)的方法承載了所有功能。
cancel
屬性為一個函數(shù),上下文取消時會被調用,其實就是 context.CancelCauseFunc
類型,調用 errgroup.WithContext
時被賦值。
wg
屬性即為 sync.WaitGroup
,承擔并發(fā)控制的主邏輯,errgroup.Go
和 errgroup.TryGo
內(nèi)部并發(fā)控制邏輯都會代理給 sync.WaitGroup
。
sem
屬性是 token
類型的 channel
,用于限制并發(fā)數(shù)量,調用 errgroup.SetLimit
是被賦值。
err
會記錄所有 goroutine 中出現(xiàn)的第一個錯誤,由errOnce
確保錯誤錯誤僅處理一次,所以后面再出現(xiàn)更多的錯誤都會被忽略。
接下來我們先看 errgroup.SetLimit
方法定義:
// SetLimit 限制該 Group 中活動的協(xié)程數(shù)量最多為 n,負值表示沒有限制 // // 任何后續(xù)對 Go 方法的調用都將阻塞,直到可以在不超過限額的情況下添加活動協(xié)程 // // 在 Group 中存在任何活動的協(xié)程時,限制不得修改 func (g *Group) SetLimit(n int) { // 傳進來的 n 就是 channel 長度,以此來限制協(xié)程的并發(fā)數(shù)量 if n < 0 { // 這里檢查如果小于 0 則不限制協(xié)程并發(fā)數(shù)量。此外,也不要將其設置為 0,會產(chǎn)生死鎖 g.sem = nil return } if len(g.sem) != 0 { // 如果存在活動的協(xié)程,調用此方法將產(chǎn)生 panic panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
errgroup.SetLimit
方法可以限制并發(fā)屬性,其內(nèi)部邏輯很簡單,不過要注意在調用 errgroup.Go
或 errgroup.TryGo
方法前調用 errgroup.SetLimit
,以防程序出現(xiàn) panic
。
然后看下主邏輯 errgroup.Go
方法實現(xiàn):
// Go 會在新的協(xié)程中調用給定的函數(shù) // 它會阻塞,直到可以在不超過配置的活躍協(xié)程數(shù)量限制的情況下添加新的協(xié)程 // // 首次返回非 nil 錯誤的調用會取消該 Group 的上下文(context),如果該 context 是通過調用 WithContext 創(chuàng)建的,該錯誤將由 Wait 返回 func (g *Group) Go(f func() error) { if g.sem != nil { // 這個是限制并發(fā)數(shù)的信號通道 g.sem <- token{} // 如果超過了配置的活躍協(xié)程數(shù)量限制,向 channel 發(fā)送 token 會阻塞 } g.wg.Add(1) // 轉發(fā)給 sync.WaitGroup.Add(1),將活動協(xié)程數(shù)加一 go func() { defer g.done() // 當一個協(xié)程完成時,調用此方法,內(nèi)部會將調用轉發(fā)給 sync.WaitGroup.Done() if err := f(); err != nil { // f() 就是我們要執(zhí)行的任務 g.errOnce.Do(func() { // 僅執(zhí)行一次,即只處理一次錯誤,所以會記錄第一個非 nil 的錯誤,與協(xié)程啟動順序無關 g.err = err // 記錄錯誤 if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause g.cancel(g.err) } }) } }() }
首先會檢測是否使用 errgroup.SetLimit
方法設置了并發(fā)限制,如果有限制,則使用 channel
來控制并發(fā)數(shù)量。
否則執(zhí)行主邏輯,其實就是 sync.WaitGroup
的套路代碼。
在 defer
中調用了 g.done()
,done
方法定義如下:
// 當一個協(xié)程完成時,調用此方法 func (g *Group) done() { // 如果設置了最大并發(fā)數(shù),則 sem 不為 nil,從 channel 中消費一個 token,表示一個協(xié)程已完成 if g.sem != nil { <-g.sem } g.wg.Done() // 轉發(fā)給 sync.WaitGroup.Done(),將活動協(xié)程數(shù)減一 }
另外,如果某個任務返回了錯誤,則通過 errOnce
確保錯誤只被處理一次,處理方式就是先記錄錯誤,然后調用 cancel
方法。
cancel
實際上是在 errgroup.WithContext
方法中賦值的:
// WithContext 返回一個新的 Group 和一個從 ctx 派生的關聯(lián) Context // // 派生的 Context 會在傳遞給 Go 的函數(shù)首次返回非 nil 錯誤或 Wait 首次返回時被取消,以先發(fā)生者為準。 func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := withCancelCause(ctx) return &Group{cancel: cancel}, ctx }
這里的 withCancelCause
有兩種實現(xiàn)。
如果 Go 版本大于等于 1.20,提供的 withCancelCause
函數(shù)實現(xiàn)如下:
// 構建約束標識了這個文件是 Go 1.20 版本被加入的 //go:build go1.20 package errgroup import "context" // 代理到 context.WithCancelCause func withCancelCause(parent context.Context) (context.Context, func(error)) { return context.WithCancelCause(parent) }
如果 Go 版本小于 1.20,提供的 withCancelCause
函數(shù)實現(xiàn)如下:
//go:build !go1.20 package errgroup import "context" func withCancelCause(parent context.Context) (context.Context, func(error)) { ctx, cancel := context.WithCancel(parent) return ctx, func(error) { cancel() } }
因為 context.WithCancelCause
方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在這個 Commit: 93782cc 中看到 withCancelCause
函數(shù)變更記錄。
調用 errgroup.Go
方法啟動任務后,我們會調用 errgroup.Wait
等待所有任務完成,其實現(xiàn)如下:
// Wait 會阻塞,直到來自 Go 方法的所有函數(shù)調用返回,然后返回它們中的第一個非 nil 錯誤(如果有的話) func (g *Group) Wait() error { g.wg.Wait() // 轉發(fā)給 sync.WaitGroup.Wait(),等待所有協(xié)程執(zhí)行完成 if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause g.cancel(g.err) } return g.err // 返回錯誤 }
所以,最終 errgroup.Wait
返回的錯誤其實就是 errgroup.Go
方法中記錄的第一個錯誤。
現(xiàn)在,我們還剩下最后一個方法 errgroup.TryGo
的源碼沒有分析,我把源碼貼在下面,并寫上了詳細的注釋:
// TryGo 僅在 Group 中活動的協(xié)程數(shù)量低于限額時,才在新的協(xié)程中調用給定的函數(shù) // // 返回值標識協(xié)程是否啟動 func (g *Group) TryGo(f func() error) bool { if g.sem != nil { // 如果設置了最大并發(fā)數(shù) select { case g.sem <- token{}: // 可以向 channel 寫入 token,說明沒有達到限額,可以啟動協(xié)程 // Note: this allows barging iff channels in general allow barging. default: // 如果超過了配置的活躍協(xié)程數(shù)量限制,會走到這個 case return false } } // 接下來的代碼與 Go 中的邏輯相同 g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() return true }
主邏輯與 errgroup.Go
方法一樣,不同的是 errgroup.Go
方法如果達到并發(fā)限額會阻塞,而 errgroup.TryGo
方法在達到并發(fā)限額時直接返回 false
。
其實 <font style="color:rgb(31, 35, 40);">errgroup.TryGo</font>
和 <font style="color:rgb(31, 35, 40);">errgroup.SetLimit</font>
兩個方法是后添加的功能,你可以在 issues/27837 中看到討論記錄。
至此,errgroup
源碼就都解讀完成了。
總結
errgroup
是官方為我們提供的擴展庫,在 sync.WaitGroup
基礎上,增加了處理任務返回錯誤的能力。提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。
errgroup.WithContext
方法可以附加取消功能,在任意一個 goroutine 返回錯誤時,立即取消其他正在運行的 goroutine,并在 Wait
方法中返回第一個非 nil
的錯誤。
errgroup.SetLimit
方法可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。
errgroup.TryGo
可以嘗試啟動一個任務,返回值標識啟動成功或失敗。
errgroup
源碼設計精妙,值得借鑒。
以上就是Golang并發(fā)控制之errgroup使用詳解的詳細內(nèi)容,更多關于Golang errgroup的資料請關注腳本之家其它相關文章!
相關文章
golang利用函數(shù)閉包實現(xiàn)簡單的中間件
中間件設計模式是一種常見的軟件設計模式,它在許多編程語言和框架中被廣泛應用,這篇文章主要為大家介紹一下golang利用函數(shù)閉包實現(xiàn)一個簡單的中間件,感興趣的可以了解下2023-10-10