詳解Go語(yǔ)言中ErrGroup的使用
在并發(fā)編程里,sync.WaitGroup 并發(fā)原語(yǔ)的使用頻率非常高,它經(jīng)常用于協(xié)同等待的場(chǎng)景:goroutine A 在檢查點(diǎn)等待一組執(zhí)行任務(wù)的 worker goroutine 全部完成,如果在執(zhí)行任務(wù)的這些 goroutine 還沒(méi)全部完成,goroutine A 就會(huì)阻塞在檢查點(diǎn),直到所有 woker goroutine 都完成后才能繼續(xù)執(zhí)行。
如果在 woker goroutine 的執(zhí)行過(guò)程中遇到錯(cuò)誤并想要處理該怎么辦? WaitGroup 并沒(méi)有提供傳播錯(cuò)誤的功能,遇到這種場(chǎng)景我們?cè)撛趺崔k? Go 語(yǔ)言在擴(kuò)展庫(kù)提供了 ErrorGroup 并發(fā)原語(yǔ)正好適合在這種場(chǎng)景下使用,它在WaitGroup 的基礎(chǔ)上還提供了,錯(cuò)誤傳播以及上下文取消的功能。
Go 擴(kuò)展庫(kù)通過(guò) errorgroup.Group 提供 ErrorGroup 原語(yǔ)的功能,它有三個(gè)方法可調(diào)用:
func WithContext(ctx context.Context) (*Group, context.Context) func (g *Group) Go(f func() error) func (g *Group) Wait() error
接下來(lái)我們讓主 goroutine 使用 ErrorGroup 代替 WaitGroup 等待所以子任務(wù)的完成,ErrorGroup 有一個(gè)特點(diǎn)是
會(huì)返回所以執(zhí)行任務(wù)的 goroutine 遇到的第一個(gè)錯(cuò)誤。試著執(zhí)行一下下面的程序,觀察程序的輸出。
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
"net/http"
)
func main() {
var urls = []string{
"http://www.golang.org/",
"http://www.baidu.com/",
"http://www.noexist11111111.com/",
}
g := new(errgroup.Group)
for _, url := range urls {
url := url
g.Go(func() error {
resp, err := http.Get(url)
if err != nil {
fmt.Println(err)
return err
}
fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
return resp.Body.Close()
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
} else {
fmt.Println("All success!")
}
}輸出:
Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host
get [http://www.baidu.com/] success: [200]
Get "http://www.golang.org/": dial tcp 172.217.24.113:80: connectex: A connection attempt failed because the connected party did not properly respond after a period o
f time, or established connection failed because connected host has failed to respond.
Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host
ErrorGroup 有一個(gè)特點(diǎn)是會(huì)返回所以執(zhí)行任務(wù)的 goroutine 遇到的第一個(gè)錯(cuò)誤:
package main
import (
"fmt"
"golang.org/x/sync/errgroup"
"log"
"time"
)
func main() {
var eg errgroup.Group
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
time.Sleep(2 * time.Second)
if i > 90 {
fmt.Println("Error:", i)
return fmt.Errorf("Error occurred: %d", i)
}
fmt.Println("End:", i)
return nil
})
}
if err := eg.Wait(); err != nil {
log.Fatal(err)
}
}上面程序,遇到 i 大于 90 的都會(huì)產(chǎn)生錯(cuò)誤結(jié)束執(zhí)行,但是只有第一個(gè)執(zhí)行時(shí)產(chǎn)生的錯(cuò)誤被 ErrorGroup 返回,程序的輸出大概如下:
輸出:
......
End: 35
End: 38
End: 28
End: 37
End:38;2;127;0;0m2023/06/29 14:18:03 Error occurred: 98
32
Error: 92
End: 23
End: 30
Error: 95
Error: 94
End: 74
End: 25
......
最早執(zhí)行遇到錯(cuò)誤的 goroutine 輸出了Error: 98 但是所有未執(zhí)行完的其他任務(wù)并沒(méi)有停止執(zhí)行,那么想讓程序遇到錯(cuò)誤就終止其他子任務(wù)該怎么辦呢?我們可以用 errgroup.Group 提供的 WithContext 方法創(chuàng)建一個(gè)帶可取消上下文功能的 ErrorGroup。
使用 errorgroup.Group 時(shí)注意它的兩個(gè)特點(diǎn):
- errgroup.Group 在出現(xiàn)錯(cuò)誤或者等待結(jié)束后都會(huì)調(diào)用 Context 對(duì)象的 cancel 方法同步取消信號(hào)。
- 只有第一個(gè)出現(xiàn)的錯(cuò)誤才會(huì)被返回,剩余的錯(cuò)誤都會(huì)被直接拋棄。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"log"
"time"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 100; i++ {
i := i
eg.Go(func() error {
time.Sleep(2 * time.Second)
select {
case <-ctx.Done():
fmt.Println("Canceled:", i)
return nil
default:
if i > 90 {
fmt.Println("Error:", i)
return fmt.Errorf("Error: %d", i)
}
fmt.Println("End:", i)
return nil
}
})
}
if err := eg.Wait(); err != nil {
log.Fatal(err)
}
}Go 方法單獨(dú)開啟的 gouroutine 在執(zhí)行參數(shù)傳遞進(jìn)來(lái)的函數(shù)時(shí),如果函數(shù)返回了錯(cuò)誤,會(huì)對(duì) ErrorGroup 持有的err 字段進(jìn)行賦值并及時(shí)調(diào)用 cancel 函數(shù),通過(guò)上下文通知其他子任務(wù)取消執(zhí)行任務(wù)。所以上面更新后的程序運(yùn)行后有如下類似的輸出。
......
Canceled: 87
Canceled: 34
Canceled: 92
Canceled: 86
Cancled: 78
Canceled: 46
Cancel[38;2;127;0;0m2023/06/29 14:22:07 Error: 99
ed: 45
Canceled: 44
Canceled: 77
Canceled: 43
Canceled: 50
Canceled: 42
Canceled: 25
Canceled: 76
Canceled: 24
Canceled: 75
Canceled: 40
......
errorgroup源碼:
在上面的例子中,子 goroutine 出現(xiàn)錯(cuò)誤后,會(huì) cancle 到其他的子任務(wù),但是我們并沒(méi)有看到調(diào)用 ctx 的 cancel方法,下面我們看下源碼,看看內(nèi)部是怎么處理的。errgroup 的設(shè)計(jì)非常精練,全部代碼如下:
package errgroup
import (
"context"
"sync"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}可以看到,errgroup 的實(shí)現(xiàn)依靠于結(jié)構(gòu)體 Group,它通過(guò)封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在Go() 方法中新起一個(gè)子任務(wù) goroutine,并在 Wait() 方法中通過(guò) sync.WaitGroup 的 Wait 進(jìn)行阻塞等待。
同時(shí) Group 利用 sync.Once 保證了它有且僅會(huì)保留第一個(gè)子 goroutine 錯(cuò)誤。
Group 通過(guò)嵌入 context.WithCancel 方法產(chǎn)生的 cancel 函數(shù),能夠在子 goroutine 發(fā)生錯(cuò)誤時(shí),及時(shí)通過(guò)調(diào)用cancle 函數(shù),將 Context 的取消信號(hào)及時(shí)傳播出去。
再看一個(gè)實(shí)際應(yīng)用的例子:
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
dataChan := make(chan int, 20)
// 數(shù)據(jù)生產(chǎn)端任務(wù)子goroutine
g.Go(func() error {
defer close(dataChan)
for i := 1; ; i++ {
if i == 10 {
return fmt.Errorf("data 10 is wrong")
}
dataChan <- i
fmt.Println(fmt.Sprintf("sending %d", i))
}
})
// 數(shù)據(jù)消費(fèi)端任務(wù)子goroutine
for i := 0; i < 3; i++ {
g.Go(func() error {
for j := 1; ; j++ {
select {
case <-ctx.Done():
return ctx.Err()
case number := <-dataChan:
fmt.Println(fmt.Sprintf("receiving %d", number))
}
}
})
}
// 主任務(wù)goroutine等待pipeline結(jié)束數(shù)據(jù)流
err := g.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("main goroutine done!")
}輸出
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 2
receiving 1
receiving 3
data 10 is wrong
main goroutine done!
自己實(shí)現(xiàn)一個(gè) ErrGroup:
package main
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
const (
M = 2
N = 8
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
result := make([]int, N+1)
errCh := make(chan error, 1)
var firstSendErr int32
wg := new(sync.WaitGroup)
done := make(chan struct{}, 1)
limit := make(chan struct{}, M)
for i := 1; i <= N; i++ {
limit <- struct{}{}
var quit bool
select {
// context已經(jīng)被cancel,不需要起新的goroutine了
case <-ctx.Done():
quit = true
default:
}
if quit {
break
}
wg.Add(1)
go func(x int) {
defer func() {
wg.Done()
<-limit
}()
if ret, err := doTask(ctx, x); err != nil {
if atomic.CompareAndSwapInt32(&firstSendErr, 0, 1) {
errCh <- err
// cancel其他的請(qǐng)求
cancel()
}
} else {
result[x] = ret
}
}(i)
}
go func() {
wg.Wait()
close(done)
}()
select {
case err := <-errCh:
handleErr(err, result[1:])
<-done
case <-done:
if len(errCh) > 0 {
err := <-errCh
handleErr(err, result[1:])
return
}
fmt.Println("success handle all task:", result[1:])
}
}
func handleErr(err error, result []int) {
fmt.Println("task err occurs: ", err, "result", result)
}
func doTask(ctx context.Context, i int) (ret int, err error) {
fmt.Println("task start", i)
defer func() {
fmt.Println("task done", i, "err", err)
}()
select {
// 模擬處理任務(wù)時(shí)間
case <-time.After(time.Second * time.Duration(i)):
// 處理任務(wù)要支持被context cancel,不然就一直等到處理完再返回了
case <-ctx.Done():
fmt.Println("task canceled", i)
return -1, ctx.Err()
}
// 模擬出現(xiàn)錯(cuò)誤
if i == 6 {
return -1, errors.New("err test")
}
return i, nil
}輸出
task start 2
task start 1
task done 1 err <nil>
task start 3
task done 2 err <nil>
task start 4
task done 3 err <nil>
task start 5
task done 4 err <nil>
task start 6
task done 5 err <nil>
task start 7
task done 6 err err test
task canceled 7
task done 7 err context canceled
task err occurs: err test result [1 2 3 4 5 0 0 0]
總結(jié):
使用 errorgroup.Group 時(shí)注意它的特點(diǎn):
繼承了 WaitGroup 的功能
errgroup.Group 在出現(xiàn)錯(cuò)誤或者等待結(jié)束后都會(huì)調(diào)用 Context 對(duì)象 的 cancel 方法同步取消信號(hào)。
只有第一個(gè)出現(xiàn)的錯(cuò)誤才會(huì)被返回,剩余的錯(cuò)誤都會(huì)被直接拋棄。
context 信號(hào)傳播:如果子任務(wù) goroutine 中有循環(huán)邏輯,則可以添加 ctx.Done 邏輯,此時(shí)通過(guò) context 的取消信號(hào),提前結(jié)束子任務(wù)執(zhí)行。
以上就是詳解Go語(yǔ)言中ErrGroup的使用的詳細(xì)內(nèi)容,更多關(guān)于Go ErrGroup的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于Go語(yǔ)言實(shí)現(xiàn)應(yīng)用IP防火墻
在公司里面經(jīng)常會(huì)聽(tīng)到某應(yīng)用有安全漏洞問(wèn)題,沒(méi)有做安全加固,IP防火墻就是一個(gè)典型的安全加固解決方案,下面我們就來(lái)學(xué)習(xí)一下如何使用go語(yǔ)言實(shí)現(xiàn)IP防火墻吧2023-11-11
Go?Gin框架優(yōu)雅重啟和停止實(shí)現(xiàn)方法示例
Web應(yīng)用程序中,有時(shí)需要重啟或停止服務(wù)器,無(wú)論是因?yàn)楦麓a還是進(jìn)行例行維護(hù),這時(shí)需要保證應(yīng)用程序的可用性和數(shù)據(jù)的一致性,就需要優(yōu)雅地關(guān)閉和重啟應(yīng)用程序,即不丟失正在處理的請(qǐng)求和不拒絕新的請(qǐng)求,本文將詳解如何在Go語(yǔ)言中使用Gin這個(gè)框架實(shí)現(xiàn)優(yōu)雅的重啟停止2024-01-01
淺談Golang 嵌套 interface 的賦值問(wèn)題
這篇文章主要介紹了淺談Golang 嵌套 interface 的賦值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04
Golang?sync.Once實(shí)現(xiàn)單例模式的方法詳解
Go?語(yǔ)言的?sync?包提供了一系列同步原語(yǔ),其中?sync.Once?就是其中之一。本文將深入探討?sync.Once?的實(shí)現(xiàn)原理和使用方法,幫助大家更好地理解和應(yīng)用?sync.Once,需要的可以參考一下2023-05-05

