欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang并發(fā)控制之errgroup使用詳解

 更新時間:2024年11月05日 12:00:25   作者:江湖十年  
errgroup?是?Go?官方庫?x?中提供的一個非常實用的工具,用于并發(fā)執(zhí)行多個?goroutine,并且方便的處理錯誤,下面就跟隨小編一起來了解下的它的具體使用吧

errgroup 是 Go 官方庫 x 中提供的一個非常實用的工具,用于并發(fā)執(zhí)行多個 goroutine,并且方便的處理錯誤。

我們知道,Go 標準庫中有個 sync.WaitGroup 可以用來并發(fā)執(zhí)行多個 goroutine,errgroup 就是在其基礎上實現(xiàn)了 errgroup.Group。不過,errgroup.Group 和 sync.WaitGroup 在功能上是有區(qū)別的,盡管它們都用于管理 goroutine 的同步。

errgroup 優(yōu)勢

與 sync.WaitGroup 相比,以下是設計 errgroup.Group 的原因和優(yōu)勢:

錯誤處理

  • sync.WaitGroup 只負責等待 goroutine 完成,不處理 goroutine 的返回值或錯誤。
  • errgroup.Group 雖然目前也不能直接處理 goroutine 的返回值,但在 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在 Wait 方法中返回第一個非 nil 的錯誤。

上下文取消

errgroup 可以與 context.Context 配合使用,支持在某個 goroutine 出現(xiàn)錯誤時自動取消其他 goroutine,這樣可以更好地控制資源,避免不必要的工作。

簡化并發(fā)編程

使用 errgroup 可以減少錯誤處理的樣板代碼,開發(fā)者不需要手動管理錯誤狀態(tài)和同步邏輯,使得并發(fā)編程更簡單、更易于維護。

限制并發(fā)數(shù)量

errgroup 提供了便捷的接口來限制并發(fā) goroutine 的數(shù)量,避免過載,而 sync.WaitGroup 沒有這樣的功能。

以上,errgroup 為處理并發(fā)任務提供了更強大的錯誤管理和控制機制,因此在許多并發(fā)場景下是更優(yōu)的選擇。

隨著本文接下來的深入講解,你就能深刻體會到上面所說的優(yōu)勢了。

sync.WaitGroup 使用示例

在介紹 errgroup.Group 前,我們還是先來一起回顧下 sync.WaitGroup 的用法。

示例如下:

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗
    }
    var err error

    var wg sync.WaitGroup // 零值可用,不必顯式初始化

    for _, url := range urls {
        wg.Add(1) // 增加 WaitGroup 計數(shù)器

        // 啟動一個 goroutine 來獲取 URL
        go func() {
            defer wg.Done() // 當 goroutine 完成時遞減 WaitGroup 計數(shù)器

            resp, e := http.Get(url)
            if e != nil { // 發(fā)生錯誤返回,并記錄該錯誤
                err = e
                return
            }
            defer resp.Body.Close()
            fmt.Printf("fetch url %s status %s\n", url, resp.Status)
        }()
    }

    // 等待所有 goroutine 執(zhí)行完成
    wg.Wait()
    if err != nil { // err 會記錄最后一個錯誤
        fmt.Printf("Error: %s\n", err)
    }
}

示例中,我們使用 sync.WaitGroup 來啟動 3 個 goroutine 并發(fā)訪問 3 個不同的 URL,并在成功時打印響應狀態(tài)碼,或失敗時記錄錯誤信息。

執(zhí)行示例代碼,得到如下輸出:

$ go run waitgroup/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host

我們獲取了兩個成功的響應,并打印了一條錯誤信息。

根據(jù)示例,我們可以抽象出 sync.WaitGroup 最典型的慣用法:

var wg sync.WaitGroup

for ... {
    wg.Add(1)

    go func() {
        defer wg.Done()
        // do something
    }()
}

wg.Wait()

errgroup.Group 使用示例

其實 errgroup.Group 的使用套路與 sync.WaitGroup 非常類似。

基本使用

errgroup 基本使用套路如下:

  • 導入 errgroup 包。
  • 創(chuàng)建一個 errgroup.Group 實例。
  • 使用 Group.Go 方法啟動多個并發(fā)任務。
  • 使用 Group.Wait 方法等待所有 goroutine 完成或有一個返回錯誤。

將前文中的 sync.WaitGroup 程序示例使用 errgroup.Group 重寫為如下示例:

package main

import (
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗
    }

    // 使用 errgroup 創(chuàng)建一個新的 goroutine 組
    var g errgroup.Group // 零值可用,不必顯式初始化

    for _, url := range urls {
        // 使用 errgroup 啟動一個 goroutine 來獲取 URL
        g.Go(func() error {
            resp, err := http.Get(url)
            if err != nil {
                return err // 發(fā)生錯誤,返回該錯誤
            }
            defer resp.Body.Close()
            fmt.Printf("fetch url %s status %s\n", url, resp.Status)
            return nil // 返回 nil 表示成功
        })
    }

    // 等待所有 goroutine 完成并返回第一個錯誤(如果有)
    if err := g.Wait(); err != nil {
        fmt.Printf("Error: %s\n", err)
    }
}

可以發(fā)現(xiàn),這段程序與 sync.WaitGroup 示例很像,根據(jù)代碼中的注釋,很容易看懂。

執(zhí)行示例代碼,得到如下輸出:

$ go run examples/main.go
fetch url http://www.google.com/ status 200 OK
fetch url http://www.golang.org/ status 200 OK
Error: Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host

輸出結果也沒什么變化。

上下文取消

errgroup 提供了 errgroup.WithContext 可以附加取消功能,在任意一個 goroutine 返回錯誤時,可以立即取消其他正在運行的 goroutine,并在 Wait 方法中返回第一個非 nil 的錯誤。

示例如下:

package main

import (
    "context"
    "fmt"
    "net/http"
    "sync"

    "golang.org/x/sync/errgroup"
)

func main() {
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/", // 這是一個錯誤的 URL,會導致任務失敗
    }

    // 創(chuàng)建一個帶有 context 的 errgroup
    // 任何一個 goroutine 返回非 nil 的錯誤,或 Wait() 等待所有 goroutine 完成后,context 都會被取消
    g, ctx := errgroup.WithContext(context.Background())

    // 創(chuàng)建一個 map 來保存結果
    var result sync.Map

    for _, url := range urls {
        // 使用 errgroup 啟動一個 goroutine 來獲取 URL
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err // 發(fā)生錯誤,返回該錯誤
            }

            // 發(fā)起請求
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err // 發(fā)生錯誤,返回該錯誤
            }
            defer resp.Body.Close()

            // 保存每個 URL 的響應狀態(tài)碼
            result.Store(url, resp.Status)
            return nil // 返回 nil 表示成功
        })
    }

    // 等待所有 goroutine 完成并返回第一個錯誤(如果有)
    if err := g.Wait(); err != nil {
        fmt.Println("Error: ", err)
    }

    // 所有 goroutine 都執(zhí)行完成,遍歷并打印成功的結果
    result.Range(func(key, value any) bool {
        fmt.Printf("fetch url %s status %s\n", key, value)
        return true
    })
}

執(zhí)行示例代碼,得到如下輸出:

$ go run examples/withcontext/main.go
Error:  Get "http://www.somestupidname.com/": dial tcp: lookup www.somestupidname.com: no such host
fetch url http://www.google.com/ status 200 OK

由測試結果來看,對于 [http://www.google.com/](http://www.google.com/) 的請求可以接收到成功響應,由于對 [http://www.somestupidname.com/](http://www.somestupidname.com/) 請求報錯,程序來不及等待 [http://www.golang.org/](http://www.golang.org/) 響應,就被取消了。

其實我們大致可以猜測到,取消功能應該是通過 context.cancelCtx 來實現(xiàn)的,我們暫且不必深究,稍后探索源碼就能驗證我們的猜想了。

限制并發(fā)數(shù)量

errgroup 提供了 errgroup.SetLimit 可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。

示例如下:

package main

import (
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    // 創(chuàng)建一個 errgroup.Group
    var g errgroup.Group
    // 設置最大并發(fā)限制為 3
    g.SetLimit(3)

    // 啟動 10 個 goroutine
    for i := 1; i <= 10; i++ {
        g.Go(func() error {
            // 打印正在運行的 goroutine
            fmt.Printf("Goroutine %d is starting\n", i)
            time.Sleep(2 * time.Second) // 模擬任務耗時
            fmt.Printf("Goroutine %d is done\n", i)
            return nil
        })
    }

    // 等待所有 goroutine 完成
    if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    }

    fmt.Println("All goroutines complete.")
}

使用 g.SetLimit(3) 可以限制最大并發(fā)為 3 個 goroutine。

執(zhí)行示例代碼,得到如下輸出:

$  go run examples/setlimit/main.go
Goroutine 3 is starting
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 is done
Goroutine 1 is done
Goroutine 5 is starting
Goroutine 3 is done
Goroutine 6 is starting
Goroutine 4 is starting
Goroutine 6 is done
Goroutine 5 is done
Goroutine 8 is starting
Goroutine 4 is done
Goroutine 7 is starting
Goroutine 9 is starting
Goroutine 9 is done
Goroutine 8 is done
Goroutine 10 is starting
Goroutine 7 is done
Goroutine 10 is done
All goroutines complete.

根據(jù)輸出可以發(fā)現(xiàn),雖然我們通過 for 循環(huán)啟動了 10 個 goroutine,但程序執(zhí)行時最多只允許同時啟動 3 個 goroutine,當這 3 個 goroutine 中有某個執(zhí)行完成并退出,才會有新的 goroutine 被啟動。

嘗試啟動

errgroup 還提供了 errgroup.TryGo 可以嘗試啟動一個任務,它返回一個 bool 值,標識任務是否啟動成功,true 表示成功,false 表示失敗。

errgroup.TryGo 需要搭配 errgroup.SetLimit 一同使用,因為如果不限制并發(fā)數(shù)量,那么 errgroup.TryGo 始終返回 true,當達到最大并發(fā)數(shù)量限制時,errgroup.TryGo 返回 false。

示例如下:

package main

import (
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    // 創(chuàng)建一個 errgroup.Group
    var g errgroup.Group
    // 設置最大并發(fā)限制為 3
    g.SetLimit(3)

    // 啟動 10 個 goroutine
    for i := 1; i <= 10; i++ {
        if g.TryGo(func() error {
            // 打印正在運行的 goroutine
            fmt.Printf("Goroutine %d is starting\n", i)
            time.Sleep(2 * time.Second) // 模擬工作
            fmt.Printf("Goroutine %d is done\n", i)
            return nil
        }) {
            // 如果成功啟動,打印提示
            fmt.Printf("Goroutine %d started successfully\n", i)
        } else {
            // 如果達到并發(fā)限制,打印提示
            fmt.Printf("Goroutine %d could not start (limit reached)\n", i)
        }
    }

    // 等待所有 goroutine 完成
    if err := g.Wait(); err != nil {
        fmt.Printf("Encountered an error: %v\n", err)
    }

    fmt.Println("All goroutines complete.")
}

使用 g.SetLimit(3) 限制最大并發(fā)為 3 個 goroutine,調用 g.TryGo 如果啟動任務成功,打印 Goroutine {i} started successfully 提示信息;啟動任務失敗,則打印 Goroutine {i} could not start (limit reached) 提示信息。

執(zhí)行示例代碼,得到如下輸出:

$ go run examples/trygo/main.go
Goroutine 1 started successfully
Goroutine 1 is starting
Goroutine 2 is starting
Goroutine 2 started successfully
Goroutine 3 started successfully
Goroutine 4 could not start (limit reached)
Goroutine 5 could not start (limit reached)
Goroutine 6 could not start (limit reached)
Goroutine 7 could not start (limit reached)
Goroutine 8 could not start (limit reached)
Goroutine 9 could not start (limit reached)
Goroutine 10 could not start (limit reached)
Goroutine 3 is starting
Goroutine 2 is done
Goroutine 3 is done
Goroutine 1 is done
All goroutines complete.

因為限制最大并發(fā)數(shù)量為 3,所以前面 3 個 goroutine 啟動成功,并且正常執(zhí)行完成,其他幾個 goroutine 全部執(zhí)行失敗。

以上就是 errgroup 的全部用法了,更多使用場景你可以在實踐中去嘗試和感悟。

源碼解讀

接下來,我們一起閱讀下 errgroup 源碼,以此來加深對 errgroup 的理解。

errgroup 源碼非常少,僅有 3 個文件。這 3 個文件源碼內(nèi)容分別如下:

主邏輯代碼:

https://github.com/golang/sync/blob/v0.8.0/errgroup/errgroup.go

// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup

import (
    "context"
    "fmt"
    "sync"
)

type token struct{}

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
    cancel func(error)

    wg sync.WaitGroup

    sem chan token

    errOnce sync.Once
    err     error
}

func (g *Group) done() {
    if g.sem != nil {
        <-g.sem
    }
    g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := withCancelCause(ctx)
    return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel(g.err)
    }
    return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
    if g.sem != nil {
        g.sem <- token{}
    }

    g.wg.Add(1)
    go func() {
        defer g.done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err)
                }
            })
        }
    }()
}

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
    if g.sem != nil {
        select {
        case g.sem <- token{}:
            // Note: this allows barging iff channels in general allow barging.
        default:
            return false
        }
    }

    g.wg.Add(1)
    go func() {
        defer g.done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err)
                }
            })
        }
    }()
    return true
}

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
    if n < 0 {
        g.sem = nil
        return
    }
    if len(g.sem) != 0 {
        panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
    }
    g.sem = make(chan token, n)
}

為 Go 1.20 及更高版本提供的 withCancelCause 函數(shù)實現(xiàn):

https://github.com/golang/sync/blob/v0.8.0/errgroup/go120.go

// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.20

package errgroup

import "context"

func withCancelCause(parent context.Context) (context.Context, func(error)) {
    return context.WithCancelCause(parent)
}

為低于 Go 1.20 版本提供的 withCancelCause 函數(shù)實現(xiàn):

https://github.com/golang/sync/blob/v0.8.0/errgroup/pre_go120.go

// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build !go1.20

package errgroup

import "context"

func withCancelCause(parent context.Context) (context.Context, func(error)) {
    ctx, cancel := context.WithCancel(parent)
    return ctx, func(error) { cancel() }
}

可以看到,errgroup 全部源碼加起來也不到 100 行,可謂短小精悍。

現(xiàn)在我們來分析下 errgroup 源碼。

根據(jù)包注釋我們可以知道,errgroup 包提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。errgroup.Group 與 sync.WaitGroup 相關,增加了處理任務返回錯誤的能力。

為了提供以上功能,首先 errgroup 定義了 token 和 Group 兩個結構體:

// 定義一個空結構體類型 token,會作為信號進行傳遞,用于控制并發(fā)數(shù)
type token struct{}

// Group 是一組協(xié)程的集合,這些協(xié)程處理同一整體任務的子任務
//
// 零值 Group 是有效的,對活動協(xié)程的數(shù)量沒有限制,并且不會在出錯時取消
type Group struct {
    cancel func(error) // 取消函數(shù),就是 context.CancelCauseFunc 類型

    wg sync.WaitGroup // 內(nèi)部使用了 sync.WaitGroup

    sem chan token // 信號 channel,可以控制協(xié)程并發(fā)數(shù)量

    errOnce sync.Once // 確保錯誤僅處理一次
    err     error     // 記錄子協(xié)程集中返回的第一個錯誤
}

token 被定義為空結構體,用來傳遞信號,這也是 Go 中空結構體的慣用法。

NOTE:

你可以在我的另一篇文章《Go 中空結構體慣用法,我?guī)湍憧偨Y全了!》中查看空結構體的更多用法。

Group 是 errgroup 包提供的唯一公開結構體,其關聯(lián)的方法承載了所有功能。

cancel 屬性為一個函數(shù),上下文取消時會被調用,其實就是 context.CancelCauseFunc 類型,調用 errgroup.WithContext 時被賦值。

wg 屬性即為 sync.WaitGroup,承擔并發(fā)控制的主邏輯,errgroup.Go 和 errgroup.TryGo 內(nèi)部并發(fā)控制邏輯都會代理給 sync.WaitGroup。

sem屬性是 token 類型的 channel,用于限制并發(fā)數(shù)量,調用 errgroup.SetLimit 是被賦值。

err 會記錄所有 goroutine 中出現(xiàn)的第一個錯誤,由errOnce 確保錯誤錯誤僅處理一次,所以后面再出現(xiàn)更多的錯誤都會被忽略。

接下來我們先看 errgroup.SetLimit 方法定義:

// SetLimit 限制該 Group 中活動的協(xié)程數(shù)量最多為 n,負值表示沒有限制
//
// 任何后續(xù)對 Go 方法的調用都將阻塞,直到可以在不超過限額的情況下添加活動協(xié)程
//
// 在 Group 中存在任何活動的協(xié)程時,限制不得修改
func (g *Group) SetLimit(n int) { // 傳進來的 n 就是 channel 長度,以此來限制協(xié)程的并發(fā)數(shù)量
    if n < 0 { // 這里檢查如果小于 0 則不限制協(xié)程并發(fā)數(shù)量。此外,也不要將其設置為 0,會產(chǎn)生死鎖
        g.sem = nil
        return
    }
    if len(g.sem) != 0 { // 如果存在活動的協(xié)程,調用此方法將產(chǎn)生 panic
        panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
    }
    g.sem = make(chan token, n)
}

errgroup.SetLimit 方法可以限制并發(fā)屬性,其內(nèi)部邏輯很簡單,不過要注意在調用 errgroup.Go 或 errgroup.TryGo 方法前調用 errgroup.SetLimit,以防程序出現(xiàn) panic

然后看下主邏輯 errgroup.Go 方法實現(xiàn):

// Go 會在新的協(xié)程中調用給定的函數(shù)
// 它會阻塞,直到可以在不超過配置的活躍協(xié)程數(shù)量限制的情況下添加新的協(xié)程
//
// 首次返回非 nil 錯誤的調用會取消該 Group 的上下文(context),如果該 context 是通過調用 WithContext 創(chuàng)建的,該錯誤將由 Wait 返回
func (g *Group) Go(f func() error) {
    if g.sem != nil { // 這個是限制并發(fā)數(shù)的信號通道
        g.sem <- token{} // 如果超過了配置的活躍協(xié)程數(shù)量限制,向 channel 發(fā)送 token 會阻塞
    }

    g.wg.Add(1) // 轉發(fā)給 sync.WaitGroup.Add(1),將活動協(xié)程數(shù)加一
    go func() {
        defer g.done() // 當一個協(xié)程完成時,調用此方法,內(nèi)部會將調用轉發(fā)給 sync.WaitGroup.Done()

        if err := f(); err != nil { // f() 就是我們要執(zhí)行的任務
            g.errOnce.Do(func() { // 僅執(zhí)行一次,即只處理一次錯誤,所以會記錄第一個非 nil 的錯誤,與協(xié)程啟動順序無關
                g.err = err          // 記錄錯誤
                if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause
                    g.cancel(g.err)
                }
            })
        }
    }()
}

首先會檢測是否使用 errgroup.SetLimit 方法設置了并發(fā)限制,如果有限制,則使用 channel 來控制并發(fā)數(shù)量。

否則執(zhí)行主邏輯,其實就是 sync.WaitGroup 的套路代碼。

在 defer 中調用了 g.done(),done 方法定義如下:

// 當一個協(xié)程完成時,調用此方法
func (g *Group) done() {
    // 如果設置了最大并發(fā)數(shù),則 sem 不為 nil,從 channel 中消費一個 token,表示一個協(xié)程已完成
    if g.sem != nil {
        <-g.sem
    }
    g.wg.Done() // 轉發(fā)給 sync.WaitGroup.Done(),將活動協(xié)程數(shù)減一
}

另外,如果某個任務返回了錯誤,則通過 errOnce 確保錯誤只被處理一次,處理方式就是先記錄錯誤,然后調用 cancel 方法。

cancel 實際上是在 errgroup.WithContext 方法中賦值的:

// WithContext 返回一個新的 Group 和一個從 ctx 派生的關聯(lián) Context
//
// 派生的 Context 會在傳遞給 Go 的函數(shù)首次返回非 nil 錯誤或 Wait 首次返回時被取消,以先發(fā)生者為準。
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := withCancelCause(ctx)
    return &Group{cancel: cancel}, ctx
}

這里的 withCancelCause 有兩種實現(xiàn)。

如果 Go 版本大于等于 1.20,提供的 withCancelCause 函數(shù)實現(xiàn)如下:

// 構建約束標識了這個文件是 Go 1.20 版本被加入的
//go:build go1.20

package errgroup

import "context"

// 代理到 context.WithCancelCause
func withCancelCause(parent context.Context) (context.Context, func(error)) {
    return context.WithCancelCause(parent)
}

如果 Go 版本小于 1.20,提供的 withCancelCause 函數(shù)實現(xiàn)如下:

//go:build !go1.20

package errgroup

import "context"

func withCancelCause(parent context.Context) (context.Context, func(error)) {
    ctx, cancel := context.WithCancel(parent)
    return ctx, func(error) { cancel() }
}

因為 context.WithCancelCause 方法是在 Go 1.20 版本加入的,你可以在 Go 1.20 Release Notes 中找到,你也可以在這個 Commit: 93782cc 中看到 withCancelCause 函數(shù)變更記錄。

調用 errgroup.Go 方法啟動任務后,我們會調用 errgroup.Wait 等待所有任務完成,其實現(xiàn)如下:

// Wait 會阻塞,直到來自 Go 方法的所有函數(shù)調用返回,然后返回它們中的第一個非 nil 錯誤(如果有的話)
func (g *Group) Wait() error {
    g.wg.Wait()          // 轉發(fā)給 sync.WaitGroup.Wait(),等待所有協(xié)程執(zhí)行完成
    if g.cancel != nil { // 如果 cancel 不為 nil,則調用取消函數(shù),并設置 cause
        g.cancel(g.err)
    }
    return g.err // 返回錯誤
}

所以,最終 errgroup.Wait 返回的錯誤其實就是 errgroup.Go 方法中記錄的第一個錯誤。

現(xiàn)在,我們還剩下最后一個方法 errgroup.TryGo 的源碼沒有分析,我把源碼貼在下面,并寫上了詳細的注釋:

// TryGo 僅在 Group 中活動的協(xié)程數(shù)量低于限額時,才在新的協(xié)程中調用給定的函數(shù)
//
// 返回值標識協(xié)程是否啟動
func (g *Group) TryGo(f func() error) bool {
    if g.sem != nil { // 如果設置了最大并發(fā)數(shù)
        select {
        case g.sem <- token{}: // 可以向 channel 寫入 token,說明沒有達到限額,可以啟動協(xié)程
            // Note: this allows barging iff channels in general allow barging.
        default: // 如果超過了配置的活躍協(xié)程數(shù)量限制,會走到這個 case
            return false
        }
    }

    // 接下來的代碼與 Go 中的邏輯相同
    g.wg.Add(1)
    go func() {
        defer g.done()

        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err)
                }
            })
        }
    }()
    return true
}

主邏輯與 errgroup.Go 方法一樣,不同的是 errgroup.Go 方法如果達到并發(fā)限額會阻塞,而 errgroup.TryGo 方法在達到并發(fā)限額時直接返回 false

其實 <font style="color:rgb(31, 35, 40);">errgroup.TryGo</font> 和 <font style="color:rgb(31, 35, 40);">errgroup.SetLimit</font> 兩個方法是后添加的功能,你可以在 issues/27837 中看到討論記錄。

至此,errgroup 源碼就都解讀完成了。

總結

errgroup 是官方為我們提供的擴展庫,在 sync.WaitGroup 基礎上,增加了處理任務返回錯誤的能力。提供了同步、錯誤傳播和上下文取消功能,用于一組 goroutines 處理共同任務的子任務。

errgroup.WithContext 方法可以附加取消功能,在任意一個 goroutine 返回錯誤時,立即取消其他正在運行的 goroutine,并在 Wait 方法中返回第一個非 nil 的錯誤。

errgroup.SetLimit 方法可以限制并發(fā)執(zhí)行的 goroutine 數(shù)量。

errgroup.TryGo 可以嘗試啟動一個任務,返回值標識啟動成功或失敗。

errgroup 源碼設計精妙,值得借鑒。

以上就是Golang并發(fā)控制之errgroup使用詳解的詳細內(nèi)容,更多關于Golang errgroup的資料請關注腳本之家其它相關文章!

相關文章

  • 簡單聊聊為什么說Go語言字符串是不可變的

    簡單聊聊為什么說Go語言字符串是不可變的

    最近有讀者留言說,平時在寫代碼的過程中,是會對字符串進行修改的,但網(wǎng)上都說 Go 語言字符串是不可變的,這是為什么呢,本文就來和大家簡單講講
    2023-05-05
  • 使用Go語言連接和操作數(shù)據(jù)庫的基本步驟

    使用Go語言連接和操作數(shù)據(jù)庫的基本步驟

    在Go語言中,連接和操作數(shù)據(jù)庫通常使用database/sql包,它提供了一個數(shù)據(jù)庫抽象層,支持多種數(shù)據(jù)庫引擎,如MySQL、PostgreSQL、SQLite等,下面我將以MySQL為例,詳細講解如何使用Go語言連接和操作數(shù)據(jù)庫,需要的朋友可以參考下
    2024-06-06
  • golang利用函數(shù)閉包實現(xiàn)簡單的中間件

    golang利用函數(shù)閉包實現(xiàn)簡單的中間件

    中間件設計模式是一種常見的軟件設計模式,它在許多編程語言和框架中被廣泛應用,這篇文章主要為大家介紹一下golang利用函數(shù)閉包實現(xiàn)一個簡單的中間件,感興趣的可以了解下
    2023-10-10
  • 全面解析Go語言中crypto/sha1庫

    全面解析Go語言中crypto/sha1庫

    crypto/sha1在Go語言標準庫中是一個強大且實用的工具,適用于多種應用場景,本文就詳細的介紹了Go語言中crypto/sha1庫,具有一定的參考價值,感興趣的可以了解一下
    2024-02-02
  • golang?select?機制和超時問題

    golang?select?機制和超時問題

    golang 中的協(xié)程使用非常方便,但是協(xié)程什么時候結束是一個控制問題,可以用 select 配合使用,這篇文章主要介紹了golang?select?機制和超時問題,需要的朋友可以參考下
    2022-06-06
  • 深入理解Go設計模式之代理模式

    深入理解Go設計模式之代理模式

    代理模式是一種結構型設計模式,?其中代理控制著對于原對象的訪問,?并允許在將請求提交給原對象的前后進行一些處理,從而增強原對象的邏輯處理,這篇文章主要來學習一下代理模式的構成和用法,需要的朋友可以參考下
    2023-05-05
  • go嵌套匿名結構體的初始化詳解

    go嵌套匿名結構體的初始化詳解

    這篇文章主要介紹了go嵌套匿名結構體的初始化詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go語言中Goroutine的設置方式

    Go語言中Goroutine的設置方式

    這篇文章介紹了Go語言中Goroutine的設置方式,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-07-07
  • go語言單例模式(Singleton)實例分析

    go語言單例模式(Singleton)實例分析

    這篇文章主要介紹了go語言單例模式(Singleton),實例分析了單例模式的原理與Go語言的實現(xiàn)技巧,需要的朋友可以參考下
    2015-03-03
  • 如何讓shell終端和goland控制臺輸出彩色的文字

    如何讓shell終端和goland控制臺輸出彩色的文字

    這篇文章主要介紹了如何讓shell終端和goland控制臺輸出彩色的文字的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05

最新評論