欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang?errgroup?設(shè)計(jì)及實(shí)現(xiàn)原理解析

 更新時(shí)間:2022年08月29日 14:28:55   作者:ag9920  
這篇文章主要為大家介紹了Golang?errgroup?設(shè)計(jì)及實(shí)現(xiàn)原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

開篇

繼上次學(xué)習(xí)了信號量 semaphore 擴(kuò)展庫的設(shè)計(jì)思路和實(shí)現(xiàn)之后,今天我們繼續(xù)來看 golang.org/x/sync 包下的另一個(gè)經(jīng)常被 Golang 開發(fā)者使用的大殺器:errgroup。

業(yè)務(wù)研發(fā)中我們經(jīng)常會遇到需要調(diào)用多個(gè)下游的場景,比如加載一個(gè)商品的詳情頁,你可能需要訪問商品服務(wù),庫存服務(wù),券服務(wù),用戶服務(wù)等,才能從各個(gè)數(shù)據(jù)源獲取到所需要的信息,經(jīng)過一些鑒權(quán)邏輯后,組裝成前端需要的數(shù)據(jù)格式下發(fā)。

串行調(diào)用當(dāng)然可以,但這樣就潛在的給各個(gè)調(diào)用預(yù)設(shè)了【順序】,必須執(zhí)行完 A,B,C 之后才能執(zhí)行 D 操作。但如果我們對于順序并沒有強(qiáng)需求,從語義上看兩個(gè)調(diào)用是完全獨(dú)立可并發(fā)的,那么我們就可以讓他們并發(fā)執(zhí)行。

這個(gè)時(shí)候就可以使用 errgroup 來解決問題。一定意義上講,errgroup 是基于 WaitGroup 在錯誤傳遞上進(jìn)行一些優(yōu)化而提供出來的能力。它不僅可以支持 context.Context 的相關(guān)控制能力,還可以將子任務(wù)的 error 向上傳遞。

errgroup 源碼拆解

errgroup 定義在 golang.org/x/sync/errgroup,承載核心能力的結(jié)構(gòu)體是 Group。

Group

type token struct{}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
	cancel func()
	wg sync.WaitGroup
	sem chan token
	errOnce sync.Once
	err     error
}

Group 就是對我們上面提到的一堆子任務(wù)執(zhí)行計(jì)劃的抽象。每一個(gè)子任務(wù)都會有自己對應(yīng)的 goroutine 來執(zhí)行。

通過這個(gè)結(jié)構(gòu)我們也可以看出來,errgroup 底層實(shí)現(xiàn)多個(gè) goroutine 調(diào)度,等待的能力還是基于 sync.WaitGroup。

WithContext

我們可以調(diào)用 errgroup.WithContext 函數(shù)來創(chuàng)建一個(gè) Group。

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

這里可以看到,Group 的 cancel 函數(shù)本質(zhì)就是為了支持 context 的 cancel 能力,初始化的 Group 只有一個(gè) cancel 屬性,其他都是默認(rèn)的。一旦有一個(gè)子任務(wù)返回錯誤,或者是 Wait 調(diào)用返回,這個(gè)新 Context 就會被 cancel。

Wait

本質(zhì)上和 WaitGroup 的 Wait 方法語義一樣,既然我們是個(gè) group task,就需要等待所有任務(wù)都執(zhí)行完,這個(gè)語義就由 Wait 方法提供。如果有多個(gè)子任務(wù)返回錯誤,它只會返回第一個(gè)出現(xiàn)的錯誤,如果所有的子任務(wù)都執(zhí)行成功,就返回 nil。

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

Wait 的實(shí)現(xiàn)非常簡單。一個(gè)前置的 WaitGroup Wait,結(jié)束后只做了兩件事:

  • 如果對于公共的 Context 有 cancel 函數(shù),就將其 cancel,因?yàn)槭虑檗k完了;
  • 返回公共的 Group 結(jié)構(gòu)中的 err 給調(diào)用方。

Go

Group 的核心能力就在于能夠并發(fā)執(zhí)行多個(gè)子任務(wù),從調(diào)用者的角度,我們只需要傳入要執(zhí)行的函數(shù),簽名為:func() error即可,非常通用。如果任務(wù)執(zhí)行成功,就返回 nil,否則就返回 error,并且會 cancel 那個(gè)新的 Context。底層的調(diào)度邏輯由 Group 的 Go 方法實(shí)現(xiàn):

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}
func (g *Group) done() {
	if g.sem != nil {
		<-g.sem
	}
	g.wg.Done()
}

我們重點(diǎn)來分析下 Go 這里發(fā)生了什么。

WaitGroup 加 1 用作計(jì)數(shù);

啟動一個(gè)新的 goroutine 執(zhí)行調(diào)用方傳入的 f() 函數(shù);

  • 若 err 為 nil 說明執(zhí)行正常;
  • 若 err 不為 nil,說明執(zhí)行出錯,此時(shí)將這個(gè)返回的 err 賦值給全局 Group 的變量 err,并將 context cancel 掉。注意,這里的處理在 once 分支中,也就是只有第一個(gè)來的錯誤會被處理。

在 defer 語句中調(diào)用 Group 的 done 方法,底層依賴 WaitGroup 的 Done,說明這一個(gè)子任務(wù)結(jié)束。

這里也可以看到,其實(shí)所謂 errgroup,我們并不是將所有子任務(wù)的 error 拼成一個(gè)字符串返回,而是直接在 Go 方法那里將第一個(gè)錯誤返回,底層依賴了 once 的能力。

SetLimit

其實(shí)看到這里,你有沒有覺得 errgroup 的功能有點(diǎn)雞肋?底層核心技術(shù)都是靠 WaitGroup 完成的,自己只不過是起了個(gè) goroutine 執(zhí)行方法,err 還只能保留一個(gè)。而且 Group 里面的 sem 那個(gè) chan 是用來干什么呢?

這一節(jié)我們就來看看,Golang 對 errgroup 能力的一次擴(kuò)充。

到目前為止,errgroup 是可以做到一開始人們對它的期望的,即并發(fā)執(zhí)行子任務(wù)。但問題在于,這里是每一個(gè)子任務(wù)都開了個(gè)goroutine,如果是在高并發(fā)的環(huán)境里,頻繁創(chuàng)建大量goroutine 這樣的用法很容易對資源負(fù)載產(chǎn)生影響。開發(fā)者們提出,希望有辦法限制 errgroup 創(chuàng)建的 goroutine 數(shù)量,參照這個(gè) proposal: #27837

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

SetLimit 的參數(shù) n 就是我們對這個(gè) Group 期望的最大 goroutine 數(shù)量,這里其實(shí)除去校驗(yàn)邏輯,只干了一件事:g.sem = make(chan token, n),即創(chuàng)建一個(gè)長度為 n 的 channel,賦值給 sem。

回憶一下我們在 Go 方法 defer 調(diào)用 done 中的表現(xiàn),是不是清晰了很多?我們來理一下:

首先,Group 結(jié)構(gòu)體就包含了 sem 變量,只作為通信,元素是空結(jié)構(gòu)體,不包含實(shí)際意義:

type Group struct {
	cancel func()
	wg sync.WaitGroup
	sem chan token
	errOnce sync.Once
	err     error
}

如果你對整個(gè) Group 的 Limit 沒有要求,which is fine,你就直接忽略這個(gè) SetLimit,errgroup 的原有能力就可以支持你的訴求。

但是,如果你希望保持 errgroup 的 goroutine 在一個(gè)上限之內(nèi),請?jiān)谡{(diào)用 Go 方法前,先 SetLimit,這樣 Group 的 sem 就賦值為一個(gè)長度為 n 的 channel。

那么,當(dāng)你調(diào)用 Go 方法時(shí),注意下面的框架代碼,此時(shí) g.sem 不為 nil,所以我們會塞一個(gè) token 進(jìn)去,作為占坑,語義上表示我申請一個(gè) goroutine 用。

func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
                ...
	}()
}

當(dāng)然,如果此時(shí) goroutine 數(shù)量已經(jīng)達(dá)到上限,這里就會 block 住,直到別的 goroutine 干完活,sem 輸出了一個(gè) token之后,才能繼續(xù)往里面塞。

在每個(gè) goroutine 執(zhí)行完畢后 defer 的 g.done 方法,則是完成了這一點(diǎn):

func (g *Group) done() {
	if g.sem != nil {
		<-g.sem
	}
	g.wg.Done()
}

這樣就把 sem 的用法串起來了。我們通過創(chuàng)建一個(gè)定長的channel來實(shí)現(xiàn)對于 goroutine 數(shù)量的管控,對于channel實(shí)際包含的元素并不關(guān)心,所以用一個(gè)空結(jié)構(gòu)體省一省空間,這是非常優(yōu)秀的設(shè)計(jì),大家平常也可以參考。

TryGo

TryGo 和 SetLimit 這套體系其實(shí)都是不久前歐長坤大佬提交到 errgroup 的能力。

一如既往,所有帶 TryXXX的函數(shù),都不會阻塞。 其實(shí)辦的事情非常簡單,和 Go 方法一樣,傳進(jìn)來一個(gè) func() error來執(zhí)行。

Go 方法的區(qū)別在于,如果判斷 limit 已經(jīng)不夠了,此時(shí)不再阻塞,而是直接 return false,代表執(zhí)行失敗。其他的部分完全一樣。

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}:
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}
	g.wg.Add(1)
	go func() {
		defer g.done()
		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}

使用方法

這里我們先看一個(gè)最常見的用法,針對一組任務(wù),每一個(gè)都單獨(dú)起 goroutine 執(zhí)行,最后獲取 Wait 返回的 err 作為整個(gè) Group 的 err。

package main
import (
    "errors"
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)
func main() {
    var g errgroup.Group
    // 啟動第一個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #1")
        return nil
    })
    // 啟動第二個(gè)子任務(wù),它執(zhí)行失敗
    g.Go(func() error {
        time.Sleep(10 * time.Second)
        fmt.Println("exec #2")
        return errors.New("failed to exec #2")
    })
    // 啟動第三個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(15 * time.Second)
        fmt.Println("exec #3")
        return nil
    })
    // 等待三個(gè)任務(wù)都完成
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully exec all")
    } else {
        fmt.Println("failed:", err)
    }
}

你會發(fā)現(xiàn),最后 err 打印出來就是第二個(gè)子任務(wù)的 err。

當(dāng)然,上面這個(gè) case 是我們正好只有一個(gè)報(bào)錯,但是,如果實(shí)際有多個(gè)任務(wù)都掛了呢?

從完備性來考慮,有沒有什么辦法能夠?qū)⒍鄠€(gè)任務(wù)的錯誤都返回呢?

這一點(diǎn)其實(shí) errgroup 庫并沒有提供非常好的支持,需要開發(fā)者自行做一些改造。因?yàn)?Group 中只有一個(gè) err 變量,我們不可能基于 Group 來實(shí)現(xiàn)這一點(diǎn)。

通常情況下,我們會創(chuàng)建一個(gè) slice 來存儲 f() 執(zhí)行的 err。

package main
import (
    "errors"
    "fmt"
    "time"
    "golang.org/x/sync/errgroup"
)
func main() {
    var g errgroup.Group
    var result = make([]error, 3)
    // 啟動第一個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #1")
        result[0] = nil // 保存成功或者失敗的結(jié)果
        return nil
    })
    // 啟動第二個(gè)子任務(wù),它執(zhí)行失敗
    g.Go(func() error {
        time.Sleep(10 * time.Second)
        fmt.Println("exec #2")
        result[1] = errors.New("failed to exec #2") // 保存成功或者失敗的結(jié)果
        return result[1]
    })
    // 啟動第三個(gè)子任務(wù),它執(zhí)行成功
    g.Go(func() error {
        time.Sleep(15 * time.Second)
        fmt.Println("exec #3")
        result[2] = nil // 保存成功或者失敗的結(jié)果
        return nil
    })
    if err := g.Wait(); err == nil {
        fmt.Printf("Successfully exec all. result: %v\n", result)
    } else {
        fmt.Printf("failed: %v\n", result)
    }
}

可以看到,我們聲明了一個(gè) result slice,長度為 3。這里不用擔(dān)心并發(fā)問題,因?yàn)槊總€(gè) goroutine 讀寫的位置是確定唯一的。

本質(zhì)上可以理解為,我們把 f() 返回的 err 不僅僅給了 Group 一份,還自己存了一份,當(dāng)出錯的時(shí)候,Wait 返回的錯誤我們不一定真的用,而是直接用自己錯的這一個(gè) error slice。

Go 官方文檔中的利用 errgroup 實(shí)現(xiàn) pipeline 的示例也很有參考意義:由一個(gè)子任務(wù)遍歷文件夾下的文件,然后把遍歷出的文件交給 20 個(gè) goroutine,讓這些 goroutine 并行計(jì)算文件的 md5。

這里貼出來簡化代碼學(xué)習(xí)一下.

package main
import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	"golang.org/x/sync/errgroup"
)
// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}
	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}
type result struct {
	path string
	sum  [md5.Size]byte
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)
	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})
	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()
	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

其實(shí)本質(zhì)上還是 channel發(fā)揮了至關(guān)重要的作用,這里建議大家有時(shí)間盡量看看源文章:pkg.go.dev/golang.org/…

對于用 errgroup 實(shí)現(xiàn) pipeline 模式有很大幫助。

結(jié)束語

今天我們學(xué)習(xí)了 errgroup 的源碼已經(jīng)新增的 SetLimit 能力,其實(shí)看過了 sync 相關(guān)的這些庫,整體用到的能力其實(shí)大差不差,很多設(shè)計(jì)思想都是非常精巧的。比如 errgroup 中利用定長 channel 實(shí)現(xiàn)控制 goroutine 數(shù)量,空結(jié)構(gòu)體節(jié)省內(nèi)存空間。

并且 sync 包的實(shí)現(xiàn)一般都還是非常簡潔的,比如 once,singleflight,semaphore 等。建議大家有空的話自己過一遍,對并發(fā)和設(shè)計(jì)模式的理解會更上一個(gè)臺階。

errgroup 本身并不復(fù)雜,業(yè)界也有很多封裝實(shí)現(xiàn),大家可以對照源碼再思考一下還有什么可以改進(jìn)的地方。

以上就是Golang errgroup 設(shè)計(jì)及實(shí)現(xiàn)原理解析的詳細(xì)內(nèi)容,更多關(guān)于Golang errgroup 設(shè)計(jì)原理的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang配置管理庫?Viper的教程詳解

    Golang配置管理庫?Viper的教程詳解

    這篇文章主要介紹了Golang?配置管理庫?Viper,使用?viper?能夠很好的去管理你的配置文件信息,比如數(shù)據(jù)庫的賬號密碼,服務(wù)器監(jiān)聽的端口,你可以通過更改配置文件去更改這些內(nèi)容,而不用定位到那一段代碼上去,提高了開發(fā)效率,需要的朋友可以參考下
    2022-05-05
  • 詳解go 動態(tài)數(shù)組 二維動態(tài)數(shù)組

    詳解go 動態(tài)數(shù)組 二維動態(tài)數(shù)組

    這篇文章主要介紹了go 動態(tài)數(shù)組 二維動態(tài)數(shù)組,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 使用Go語言實(shí)現(xiàn)常見hash算法

    使用Go語言實(shí)現(xiàn)常見hash算法

    這篇文章主要為大家詳細(xì)介紹了使語言實(shí)現(xiàn)各種常見hash算法的相關(guān)知識,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的小伙伴可以參考下
    2024-01-01
  • Go 驗(yàn)證字符串中是否包含中文(推薦)

    Go 驗(yàn)證字符串中是否包含中文(推薦)

    這篇文章主要介紹了Go 驗(yàn)證字符串中是否包含中文,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-01-01
  • Go時(shí)間操作常用方法(推薦!)

    Go時(shí)間操作常用方法(推薦!)

    平時(shí)開發(fā)過程中,時(shí)間相關(guān)的操作用的還是很多的,下面這篇文章主要給大家介紹了關(guān)于Go時(shí)間操作常用方法的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-06-06
  • 淺析Go中關(guān)于零值和空值判斷的問題

    淺析Go中關(guān)于零值和空值判斷的問題

    這篇文章主要是對零值和空值判斷現(xiàn)狀進(jìn)行簡單的梳理和分享,文中的示例代碼講解詳細(xì),對我們深入了解go語言有一定的幫助,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-08-08
  • Go語言生成素?cái)?shù)的方法

    Go語言生成素?cái)?shù)的方法

    這篇文章主要介紹了Go語言生成素?cái)?shù)的方法,實(shí)例分析了Go語言生成素?cái)?shù)的技巧,需要的朋友可以參考下
    2015-03-03
  • Go語音開發(fā)中常見Error類型處理示例詳解

    Go語音開發(fā)中常見Error類型處理示例詳解

    這篇文章主要為大家介紹了Go語音開發(fā)中常見Error類型處理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • 關(guān)于go-zero單體服務(wù)使用泛型簡化注冊Handler路由的問題

    關(guān)于go-zero單體服務(wù)使用泛型簡化注冊Handler路由的問題

    這篇文章主要介紹了go-zero單體服務(wù)使用泛型簡化注冊Handler路由,涉及到Golang環(huán)境安裝及配置Go Module的相關(guān)知識,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-07-07
  • Golang獲取本地IP地址方法分享

    Golang獲取本地IP地址方法分享

    這篇文章主要給大家介紹了Golang 獲取本地 IP 地址方法,文中有詳細(xì)的代碼示例,對我們的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2023-07-07

最新評論