Golang?errgroup?設(shè)計(jì)及實(shí)現(xiàn)原理解析
開篇
繼上次學(xué)習(xí)了信號量 semaphore 擴(kuò)展庫的設(shè)計(jì)思路和實(shí)現(xiàn)之后,今天我們繼續(xù)來看 golang.org/x/sync
包下的另一個(gè)經(jīng)常被 Golang 開發(fā)者使用的大殺器:errgroup。
業(yè)務(wù)研發(fā)中我們經(jīng)常會遇到需要調(diào)用多個(gè)下游的場景,比如加載一個(gè)商品的詳情頁,你可能需要訪問商品服務(wù),庫存服務(wù),券服務(wù),用戶服務(wù)等,才能從各個(gè)數(shù)據(jù)源獲取到所需要的信息,經(jīng)過一些鑒權(quán)邏輯后,組裝成前端需要的數(shù)據(jù)格式下發(fā)。
串行調(diào)用當(dāng)然可以,但這樣就潛在的給各個(gè)調(diào)用預(yù)設(shè)了【順序】,必須執(zhí)行完 A,B,C 之后才能執(zhí)行 D 操作。但如果我們對于順序并沒有強(qiáng)需求,從語義上看兩個(gè)調(diào)用是完全獨(dú)立可并發(fā)的,那么我們就可以讓他們并發(fā)執(zhí)行。
這個(gè)時(shí)候就可以使用 errgroup 來解決問題。一定意義上講,errgroup 是基于 WaitGroup 在錯誤傳遞上進(jìn)行一些優(yōu)化而提供出來的能力。它不僅可以支持 context.Context 的相關(guān)控制能力,還可以將子任務(wù)的 error 向上傳遞。
errgroup 源碼拆解
errgroup 定義在 golang.org/x/sync/errgroup
,承載核心能力的結(jié)構(gòu)體是 Group。
Group
type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func() wg sync.WaitGroup sem chan token errOnce sync.Once err error }
Group 就是對我們上面提到的一堆子任務(wù)執(zhí)行計(jì)劃的抽象。每一個(gè)子任務(wù)都會有自己對應(yīng)的 goroutine 來執(zhí)行。
通過這個(gè)結(jié)構(gòu)我們也可以看出來,errgroup 底層實(shí)現(xiàn)多個(gè) goroutine 調(diào)度,等待的能力還是基于 sync.WaitGroup。
WithContext
我們可以調(diào)用 errgroup.WithContext 函數(shù)來創(chuàng)建一個(gè) Group。
// WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel}, ctx }
這里可以看到,Group 的 cancel 函數(shù)本質(zhì)就是為了支持 context 的 cancel 能力,初始化的 Group 只有一個(gè) cancel 屬性,其他都是默認(rèn)的。一旦有一個(gè)子任務(wù)返回錯誤,或者是 Wait 調(diào)用返回,這個(gè)新 Context 就會被 cancel。
Wait
本質(zhì)上和 WaitGroup 的 Wait 方法語義一樣,既然我們是個(gè) group task,就需要等待所有任務(wù)都執(zhí)行完,這個(gè)語義就由 Wait 方法提供。如果有多個(gè)子任務(wù)返回錯誤,它只會返回第一個(gè)出現(xiàn)的錯誤,如果所有的子任務(wù)都執(zhí)行成功,就返回 nil。
// Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err }
Wait 的實(shí)現(xiàn)非常簡單。一個(gè)前置的 WaitGroup Wait,結(jié)束后只做了兩件事:
- 如果對于公共的 Context 有 cancel 函數(shù),就將其 cancel,因?yàn)槭虑檗k完了;
- 返回公共的 Group 結(jié)構(gòu)中的 err 給調(diào)用方。
Go
Group 的核心能力就在于能夠并發(fā)執(zhí)行多個(gè)子任務(wù),從調(diào)用者的角度,我們只需要傳入要執(zhí)行的函數(shù),簽名為:func() error
即可,非常通用。如果任務(wù)執(zhí)行成功,就返回 nil,否則就返回 error,并且會 cancel 那個(gè)新的 Context。底層的調(diào)度邏輯由 Group 的 Go 方法實(shí)現(xiàn):
// Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group; its error will be // returned by Wait. func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() } func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() }
我們重點(diǎn)來分析下 Go 這里發(fā)生了什么。
WaitGroup 加 1 用作計(jì)數(shù);
啟動一個(gè)新的 goroutine 執(zhí)行調(diào)用方傳入的 f() 函數(shù);
- 若 err 為 nil 說明執(zhí)行正常;
- 若 err 不為 nil,說明執(zhí)行出錯,此時(shí)將這個(gè)返回的 err 賦值給全局 Group 的變量 err,并將 context cancel 掉。注意,這里的處理在 once 分支中,也就是只有第一個(gè)來的錯誤會被處理。
在 defer 語句中調(diào)用 Group 的 done 方法,底層依賴 WaitGroup 的 Done,說明這一個(gè)子任務(wù)結(jié)束。
這里也可以看到,其實(shí)所謂 errgroup,我們并不是將所有子任務(wù)的 error 拼成一個(gè)字符串返回,而是直接在 Go 方法那里將第一個(gè)錯誤返回,底層依賴了 once 的能力。
SetLimit
其實(shí)看到這里,你有沒有覺得 errgroup 的功能有點(diǎn)雞肋?底層核心技術(shù)都是靠 WaitGroup 完成的,自己只不過是起了個(gè) goroutine 執(zhí)行方法,err 還只能保留一個(gè)。而且 Group 里面的 sem 那個(gè) chan 是用來干什么呢?
這一節(jié)我們就來看看,Golang 對 errgroup 能力的一次擴(kuò)充。
到目前為止,errgroup 是可以做到一開始人們對它的期望的,即并發(fā)執(zhí)行子任務(wù)。但問題在于,這里是每一個(gè)子任務(wù)都開了個(gè)goroutine,如果是在高并發(fā)的環(huán)境里,頻繁創(chuàng)建大量goroutine 這樣的用法很容易對資源負(fù)載產(chǎn)生影響。開發(fā)者們提出,希望有辦法限制 errgroup 創(chuàng)建的 goroutine 數(shù)量,參照這個(gè) proposal: #27837
// SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) }
SetLimit 的參數(shù) n 就是我們對這個(gè) Group 期望的最大 goroutine 數(shù)量,這里其實(shí)除去校驗(yàn)邏輯,只干了一件事:g.sem = make(chan token, n)
,即創(chuàng)建一個(gè)長度為 n 的 channel,賦值給 sem。
回憶一下我們在 Go 方法 defer 調(diào)用 done 中的表現(xiàn),是不是清晰了很多?我們來理一下:
首先,Group 結(jié)構(gòu)體就包含了 sem 變量,只作為通信,元素是空結(jié)構(gòu)體,不包含實(shí)際意義:
type Group struct { cancel func() wg sync.WaitGroup sem chan token errOnce sync.Once err error }
如果你對整個(gè) Group 的 Limit 沒有要求,which is fine,你就直接忽略這個(gè) SetLimit,errgroup 的原有能力就可以支持你的訴求。
但是,如果你希望保持 errgroup 的 goroutine 在一個(gè)上限之內(nèi),請?jiān)谡{(diào)用 Go 方法前,先 SetLimit,這樣 Group 的 sem 就賦值為一個(gè)長度為 n 的 channel。
那么,當(dāng)你調(diào)用 Go 方法時(shí),注意下面的框架代碼,此時(shí) g.sem 不為 nil,所以我們會塞一個(gè) token 進(jìn)去,作為占坑,語義上表示我申請一個(gè) goroutine 用。
func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() ... }() }
當(dāng)然,如果此時(shí) goroutine 數(shù)量已經(jīng)達(dá)到上限,這里就會 block 住,直到別的 goroutine 干完活,sem 輸出了一個(gè) token之后,才能繼續(xù)往里面塞。
在每個(gè) goroutine 執(zhí)行完畢后 defer 的 g.done
方法,則是完成了這一點(diǎn):
func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() }
這樣就把 sem 的用法串起來了。我們通過創(chuàng)建一個(gè)定長的channel來實(shí)現(xiàn)對于 goroutine 數(shù)量的管控,對于channel實(shí)際包含的元素并不關(guān)心,所以用一個(gè)空結(jié)構(gòu)體省一省空間,這是非常優(yōu)秀的設(shè)計(jì),大家平常也可以參考。
TryGo
TryGo 和 SetLimit 這套體系其實(shí)都是不久前歐長坤大佬提交到 errgroup 的能力。
一如既往,所有帶 TryXXX的函數(shù),都不會阻塞。 其實(shí)辦的事情非常簡單,和 Go 方法一樣,傳進(jìn)來一個(gè) func() error
來執(zhí)行。
和 Go
方法的區(qū)別在于,如果判斷 limit 已經(jīng)不夠了,此時(shí)不再阻塞,而是直接 return false,代表執(zhí)行失敗。其他的部分完全一樣。
// TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() return true }
使用方法
這里我們先看一個(gè)最常見的用法,針對一組任務(wù),每一個(gè)都單獨(dú)起 goroutine 執(zhí)行,最后獲取 Wait 返回的 err 作為整個(gè) Group 的 err。
package main import ( "errors" "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { var g errgroup.Group // 啟動第一個(gè)子任務(wù),它執(zhí)行成功 g.Go(func() error { time.Sleep(5 * time.Second) fmt.Println("exec #1") return nil }) // 啟動第二個(gè)子任務(wù),它執(zhí)行失敗 g.Go(func() error { time.Sleep(10 * time.Second) fmt.Println("exec #2") return errors.New("failed to exec #2") }) // 啟動第三個(gè)子任務(wù),它執(zhí)行成功 g.Go(func() error { time.Sleep(15 * time.Second) fmt.Println("exec #3") return nil }) // 等待三個(gè)任務(wù)都完成 if err := g.Wait(); err == nil { fmt.Println("Successfully exec all") } else { fmt.Println("failed:", err) } }
你會發(fā)現(xiàn),最后 err 打印出來就是第二個(gè)子任務(wù)的 err。
當(dāng)然,上面這個(gè) case 是我們正好只有一個(gè)報(bào)錯,但是,如果實(shí)際有多個(gè)任務(wù)都掛了呢?
從完備性來考慮,有沒有什么辦法能夠?qū)⒍鄠€(gè)任務(wù)的錯誤都返回呢?
這一點(diǎn)其實(shí) errgroup 庫并沒有提供非常好的支持,需要開發(fā)者自行做一些改造。因?yàn)?Group 中只有一個(gè) err 變量,我們不可能基于 Group 來實(shí)現(xiàn)這一點(diǎn)。
通常情況下,我們會創(chuàng)建一個(gè) slice 來存儲 f()
執(zhí)行的 err。
package main import ( "errors" "fmt" "time" "golang.org/x/sync/errgroup" ) func main() { var g errgroup.Group var result = make([]error, 3) // 啟動第一個(gè)子任務(wù),它執(zhí)行成功 g.Go(func() error { time.Sleep(5 * time.Second) fmt.Println("exec #1") result[0] = nil // 保存成功或者失敗的結(jié)果 return nil }) // 啟動第二個(gè)子任務(wù),它執(zhí)行失敗 g.Go(func() error { time.Sleep(10 * time.Second) fmt.Println("exec #2") result[1] = errors.New("failed to exec #2") // 保存成功或者失敗的結(jié)果 return result[1] }) // 啟動第三個(gè)子任務(wù),它執(zhí)行成功 g.Go(func() error { time.Sleep(15 * time.Second) fmt.Println("exec #3") result[2] = nil // 保存成功或者失敗的結(jié)果 return nil }) if err := g.Wait(); err == nil { fmt.Printf("Successfully exec all. result: %v\n", result) } else { fmt.Printf("failed: %v\n", result) } }
可以看到,我們聲明了一個(gè) result slice,長度為 3。這里不用擔(dān)心并發(fā)問題,因?yàn)槊總€(gè) goroutine 讀寫的位置是確定唯一的。
本質(zhì)上可以理解為,我們把 f()
返回的 err 不僅僅給了 Group 一份,還自己存了一份,當(dāng)出錯的時(shí)候,Wait 返回的錯誤我們不一定真的用,而是直接用自己錯的這一個(gè) error slice。
Go 官方文檔中的利用 errgroup 實(shí)現(xiàn) pipeline 的示例也很有參考意義:由一個(gè)子任務(wù)遍歷文件夾下的文件,然后把遍歷出的文件交給 20 個(gè) goroutine,讓這些 goroutine 并行計(jì)算文件的 md5。
這里貼出來簡化代碼學(xué)習(xí)一下.
package main import ( "context" "crypto/md5" "fmt" "io/ioutil" "log" "os" "path/filepath" "golang.org/x/sync/errgroup" ) // Pipeline demonstrates the use of a Group to implement a multi-stage // pipeline: a version of the MD5All function with bounded parallelism from // https://blog.golang.org/pipelines. func main() { m, err := MD5All(context.Background(), ".") if err != nil { log.Fatal(err) } for k, sum := range m { fmt.Printf("%s:\t%x\n", k, sum) } } type result struct { path string sum [md5.Size]byte } // MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) { // ctx is canceled when g.Wait() returns. When this version of MD5All returns // - even in case of error! - we know that all of the goroutines have finished // and the memory they were using can be garbage-collected. g, ctx := errgroup.WithContext(ctx) paths := make(chan string) g.Go(func() error { defer close(paths) return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-ctx.Done(): return ctx.Err() } return nil }) }) // Start a fixed number of goroutines to read and digest files. c := make(chan result) const numDigesters = 20 for i := 0; i < numDigesters; i++ { g.Go(func() error { for path := range paths { data, err := ioutil.ReadFile(path) if err != nil { return err } select { case c <- result{path, md5.Sum(data)}: case <-ctx.Done(): return ctx.Err() } } return nil }) } go func() { g.Wait() close(c) }() m := make(map[string][md5.Size]byte) for r := range c { m[r.path] = r.sum } // Check whether any of the goroutines failed. Since g is accumulating the // errors, we don't need to send them (or check for them) in the individual // results sent on the channel. if err := g.Wait(); err != nil { return nil, err } return m, nil }
其實(shí)本質(zhì)上還是 channel發(fā)揮了至關(guān)重要的作用,這里建議大家有時(shí)間盡量看看源文章:pkg.go.dev/golang.org/…
對于用 errgroup 實(shí)現(xiàn) pipeline 模式有很大幫助。
結(jié)束語
今天我們學(xué)習(xí)了 errgroup 的源碼已經(jīng)新增的 SetLimit 能力,其實(shí)看過了 sync 相關(guān)的這些庫,整體用到的能力其實(shí)大差不差,很多設(shè)計(jì)思想都是非常精巧的。比如 errgroup 中利用定長 channel 實(shí)現(xiàn)控制 goroutine 數(shù)量,空結(jié)構(gòu)體節(jié)省內(nèi)存空間。
并且 sync 包的實(shí)現(xiàn)一般都還是非常簡潔的,比如 once,singleflight,semaphore 等。建議大家有空的話自己過一遍,對并發(fā)和設(shè)計(jì)模式的理解會更上一個(gè)臺階。
errgroup 本身并不復(fù)雜,業(yè)界也有很多封裝實(shí)現(xiàn),大家可以對照源碼再思考一下還有什么可以改進(jìn)的地方。
以上就是Golang errgroup 設(shè)計(jì)及實(shí)現(xiàn)原理解析的詳細(xì)內(nèi)容,更多關(guān)于Golang errgroup 設(shè)計(jì)原理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解go 動態(tài)數(shù)組 二維動態(tài)數(shù)組
這篇文章主要介紹了go 動態(tài)數(shù)組 二維動態(tài)數(shù)組,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07關(guān)于go-zero單體服務(wù)使用泛型簡化注冊Handler路由的問題
這篇文章主要介紹了go-zero單體服務(wù)使用泛型簡化注冊Handler路由,涉及到Golang環(huán)境安裝及配置Go Module的相關(guān)知識,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07