欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Go語言中ErrGroup的使用

 更新時間:2023年07月17日 10:58:07   作者:242030  
本文主要為大家詳細介紹了Go語言中errGroup的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下

在并發(fā)編程里,sync.WaitGroup 并發(fā)原語的使用頻率非常高,它經常用于協(xié)同等待的場景:goroutine A 在檢查點等待一組執(zhí)行任務的 worker goroutine 全部完成,如果在執(zhí)行任務的這些 goroutine 還沒全部完成,goroutine A 就會阻塞在檢查點,直到所有 woker goroutine 都完成后才能繼續(xù)執(zhí)行。

如果在 woker goroutine 的執(zhí)行過程中遇到錯誤并想要處理該怎么辦? WaitGroup 并沒有提供傳播錯誤的功能,遇到這種場景我們該怎么辦? Go 語言在擴展庫提供了 ErrorGroup 并發(fā)原語正好適合在這種場景下使用,它在WaitGroup 的基礎上還提供了,錯誤傳播以及上下文取消的功能。

Go 擴展庫通過 errorgroup.Group 提供 ErrorGroup 原語的功能,它有三個方法可調用:

func WithContext(ctx context.Context) (*Group, context.Context)
func (g *Group) Go(f func() error)
func (g *Group) Wait() error

接下來我們讓主 goroutine 使用 ErrorGroup 代替 WaitGroup 等待所以子任務的完成,ErrorGroup 有一個特點是

會返回所以執(zhí)行任務的 goroutine 遇到的第一個錯誤。試著執(zhí)行一下下面的程序,觀察程序的輸出。

package main
import (
	"fmt"
	"golang.org/x/sync/errgroup"
	"net/http"
)
func main() {
	var urls = []string{
		"http://www.golang.org/",
		"http://www.baidu.com/",
		"http://www.noexist11111111.com/",
	}
	g := new(errgroup.Group)
	for _, url := range urls {
		url := url
		g.Go(func() error {
			resp, err := http.Get(url)
			if err != nil {
				fmt.Println(err)
				return err
			}
			fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
			return resp.Body.Close()
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println("All success!")
	}
}

輸出:

Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host
get [http://www.baidu.com/] success: [200]
Get "http://www.golang.org/": dial tcp 172.217.24.113:80: connectex: A connection attempt failed because the connected party did not properly respond after a period o
f time, or established connection failed because connected host has failed to respond.
Get "http://www.noexist11111111.com/": dial tcp: lookup www.noexist11111111.com: no such host

ErrorGroup 有一個特點是會返回所以執(zhí)行任務的 goroutine 遇到的第一個錯誤:

package main
import (
	"fmt"
	"golang.org/x/sync/errgroup"
	"log"
	"time"
)
func main() {
	var eg errgroup.Group
	for i := 0; i < 100; i++ {
		i := i
		eg.Go(func() error {
			time.Sleep(2 * time.Second)
			if i > 90 {
				fmt.Println("Error:", i)
				return fmt.Errorf("Error occurred: %d", i)
			}
			fmt.Println("End:", i)
			return nil
		})
	}
	if err := eg.Wait(); err != nil {
		log.Fatal(err)
	}
}

上面程序,遇到 i 大于 90 的都會產生錯誤結束執(zhí)行,但是只有第一個執(zhí)行時產生的錯誤被 ErrorGroup 返回,程序的輸出大概如下:

輸出:

......
End: 35
End: 38
End: 28
End: 37
End:38;2;127;0;0m2023/06/29 14:18:03 Error occurred: 98
32
Error: 92
End: 23
End: 30
Error: 95
Error: 94
End: 74
End: 25
......

最早執(zhí)行遇到錯誤的 goroutine 輸出了Error: 98 但是所有未執(zhí)行完的其他任務并沒有停止執(zhí)行,那么想讓程序遇到錯誤就終止其他子任務該怎么辦呢?我們可以用 errgroup.Group 提供的 WithContext 方法創(chuàng)建一個帶可取消上下文功能的 ErrorGroup。

使用 errorgroup.Group 時注意它的兩個特點:

  • errgroup.Group 在出現(xiàn)錯誤或者等待結束后都會調用 Context 對象的 cancel 方法同步取消信號。
  • 只有第一個出現(xiàn)的錯誤才會被返回,剩余的錯誤都會被直接拋棄。
package main
import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
	"log"
	"time"
)
func main() {
	eg, ctx := errgroup.WithContext(context.Background())
	for i := 0; i < 100; i++ {
		i := i
		eg.Go(func() error {
			time.Sleep(2 * time.Second)
			select {
			case <-ctx.Done():
				fmt.Println("Canceled:", i)
				return nil
			default:
				if i > 90 {
					fmt.Println("Error:", i)
					return fmt.Errorf("Error: %d", i)
				}
				fmt.Println("End:", i)
				return nil
			}
		})
	}
	if err := eg.Wait(); err != nil {
		log.Fatal(err)
	}
}

Go 方法單獨開啟的 gouroutine 在執(zhí)行參數(shù)傳遞進來的函數(shù)時,如果函數(shù)返回了錯誤,會對 ErrorGroup 持有的err 字段進行賦值并及時調用 cancel 函數(shù),通過上下文通知其他子任務取消執(zhí)行任務。所以上面更新后的程序運行后有如下類似的輸出。

......
Canceled: 87
Canceled: 34
Canceled: 92
Canceled: 86
Cancled: 78
Canceled: 46
Cancel[38;2;127;0;0m2023/06/29 14:22:07 Error: 99
ed: 45
Canceled: 44
Canceled: 77
Canceled: 43
Canceled: 50
Canceled: 42
Canceled: 25
Canceled: 76
Canceled: 24
Canceled: 75
Canceled: 40
......

errorgroup源碼:

在上面的例子中,子 goroutine 出現(xiàn)錯誤后,會 cancle 到其他的子任務,但是我們并沒有看到調用 ctx 的 cancel方法,下面我們看下源碼,看看內部是怎么處理的。errgroup 的設計非常精練,全部代碼如下:

package errgroup
import (
    "context"
    "sync"
)
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
    cancel func()
    wg sync.WaitGroup
    errOnce sync.Once
    err     error
}
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
    g.wg.Add(1)
    go func() {
        defer g.wg.Done()
        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

可以看到,errgroup 的實現(xiàn)依靠于結構體 Group,它通過封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在Go() 方法中新起一個子任務 goroutine,并在 Wait() 方法中通過 sync.WaitGroup 的 Wait 進行阻塞等待。

同時 Group 利用 sync.Once 保證了它有且僅會保留第一個子 goroutine 錯誤。

Group 通過嵌入 context.WithCancel 方法產生的 cancel 函數(shù),能夠在子 goroutine 發(fā)生錯誤時,及時通過調用cancle 函數(shù),將 Context 的取消信號及時傳播出去。

再看一個實際應用的例子:

package main
import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
)
func main() {
	g, ctx := errgroup.WithContext(context.Background())
	dataChan := make(chan int, 20)
	// 數(shù)據(jù)生產端任務子goroutine
	g.Go(func() error {
		defer close(dataChan)
		for i := 1; ; i++ {
			if i == 10 {
				return fmt.Errorf("data 10 is wrong")
			}
			dataChan <- i
			fmt.Println(fmt.Sprintf("sending %d", i))
		}
	})
	// 數(shù)據(jù)消費端任務子goroutine
	for i := 0; i < 3; i++ {
		g.Go(func() error {
			for j := 1; ; j++ {
				select {
				case <-ctx.Done():
					return ctx.Err()
				case number := <-dataChan:
					fmt.Println(fmt.Sprintf("receiving %d", number))
				}
			}
		})
	}
	// 主任務goroutine等待pipeline結束數(shù)據(jù)流
	err := g.Wait()
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("main goroutine done!")
}

輸出

sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 2
receiving 1
receiving 3
data 10 is wrong
main goroutine done!

自己實現(xiàn)一個 ErrGroup:

package main
import (
	"context"
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)
const (
	M = 2
	N = 8
)
func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
	defer cancel()
	result := make([]int, N+1)
	errCh := make(chan error, 1)
	var firstSendErr int32
	wg := new(sync.WaitGroup)
	done := make(chan struct{}, 1)
	limit := make(chan struct{}, M)
	for i := 1; i <= N; i++ {
		limit <- struct{}{}
		var quit bool
		select {
		// context已經被cancel,不需要起新的goroutine了
		case <-ctx.Done():
			quit = true
		default:
		}
		if quit {
			break
		}
		wg.Add(1)
		go func(x int) {
			defer func() {
				wg.Done()
				<-limit
			}()
			if ret, err := doTask(ctx, x); err != nil {
				if atomic.CompareAndSwapInt32(&firstSendErr, 0, 1) {
					errCh <- err
					// cancel其他的請求
					cancel()
				}
			} else {
				result[x] = ret
			}
		}(i)
	}
	go func() {
		wg.Wait()
		close(done)
	}()
	select {
	case err := <-errCh:
		handleErr(err, result[1:])
		<-done
	case <-done:
		if len(errCh) > 0 {
			err := <-errCh
			handleErr(err, result[1:])
			return
		}
		fmt.Println("success handle all task:", result[1:])
	}
}
func handleErr(err error, result []int) {
	fmt.Println("task err occurs: ", err, "result", result)
}
func doTask(ctx context.Context, i int) (ret int, err error) {
	fmt.Println("task start", i)
	defer func() {
		fmt.Println("task done", i, "err", err)
	}()
	select {
	// 模擬處理任務時間
	case <-time.After(time.Second * time.Duration(i)):
	// 處理任務要支持被context cancel,不然就一直等到處理完再返回了
	case <-ctx.Done():
		fmt.Println("task canceled", i)
		return -1, ctx.Err()
	}
	// 模擬出現(xiàn)錯誤
	if i == 6 {
		return -1, errors.New("err test")
	}
	return i, nil
}

輸出

task start 2
task start 1
task done 1 err <nil>
task start 3
task done 2 err <nil>
task start 4
task done 3 err <nil>
task start 5
task done 4 err <nil>
task start 6
task done 5 err <nil>
task start 7
task done 6 err err test
task canceled 7
task done 7 err context canceled
task err occurs:  err test result [1 2 3 4 5 0 0 0]

總結:

使用 errorgroup.Group 時注意它的特點:

繼承了 WaitGroup 的功能

errgroup.Group 在出現(xiàn)錯誤或者等待結束后都會調用 Context 對象 的 cancel 方法同步取消信號。

只有第一個出現(xiàn)的錯誤才會被返回,剩余的錯誤都會被直接拋棄。

context 信號傳播:如果子任務 goroutine 中有循環(huán)邏輯,則可以添加 ctx.Done 邏輯,此時通過 context 的取消信號,提前結束子任務執(zhí)行。

以上就是詳解Go語言中ErrGroup的使用的詳細內容,更多關于Go ErrGroup的資料請關注腳本之家其它相關文章!

相關文章

  • Go1.20?arena新特性示例詳解

    Go1.20?arena新特性示例詳解

    這篇文章主要為大家介紹了Go1.20?arena新特性示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-11-11
  • Golang中切片的用法與本質詳解

    Golang中切片的用法與本質詳解

    Go的切片類型為處理同類型數(shù)據(jù)序列提供一個方便而高效的方式,下面這篇文章就來給大家介紹了關于Golang中切片的用法與本質的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考下
    2018-07-07
  • 如何在VScode 中編譯多個Go文件

    如何在VScode 中編譯多個Go文件

    這篇文章主要介紹了VScode 中編譯多個Go文件的實現(xiàn)方法,本文通過實例圖文并茂的形式給大家介紹的非常詳細,需要的朋友可以參考下
    2021-08-08
  • 基于Go語言實現(xiàn)應用IP防火墻

    基于Go語言實現(xiàn)應用IP防火墻

    在公司里面經常會聽到某應用有安全漏洞問題,沒有做安全加固,IP防火墻就是一個典型的安全加固解決方案,下面我們就來學習一下如何使用go語言實現(xiàn)IP防火墻吧
    2023-11-11
  • Go?Gin框架優(yōu)雅重啟和停止實現(xiàn)方法示例

    Go?Gin框架優(yōu)雅重啟和停止實現(xiàn)方法示例

    Web應用程序中,有時需要重啟或停止服務器,無論是因為更新代碼還是進行例行維護,這時需要保證應用程序的可用性和數(shù)據(jù)的一致性,就需要優(yōu)雅地關閉和重啟應用程序,即不丟失正在處理的請求和不拒絕新的請求,本文將詳解如何在Go語言中使用Gin這個框架實現(xiàn)優(yōu)雅的重啟停止
    2024-01-01
  • 淺談Golang 嵌套 interface 的賦值問題

    淺談Golang 嵌套 interface 的賦值問題

    這篇文章主要介紹了淺談Golang 嵌套 interface 的賦值問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • Golang異常處理之優(yōu)雅地控制和處理異常

    Golang異常處理之優(yōu)雅地控制和處理異常

    在Golang中,異常處理是非常重要的一部分,能夠有效地控制和處理代碼中的異常情況。通過Golang的異常處理機制,可以優(yōu)雅地捕獲和處理異常,保障代碼的可靠性和穩(wěn)定性。同時,Golang還提供了豐富的工具和API,幫助開發(fā)者更加輕松地進行異常處理
    2023-04-04
  • 詳解Go語言中上下文context的理解與使用

    詳解Go語言中上下文context的理解與使用

    在Go的日常開發(fā)中,Context上下文對象無處不在,這篇文章小編就來帶大家深入了解一下上下文context的理解與使用,文中的示例代碼講解詳細,需要的可以參考下
    2023-10-10
  • go swagger生成接口文檔使用教程

    go swagger生成接口文檔使用教程

    這篇文章主要為大家介紹了go swagger生成接口文檔使用教程示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-08-08
  • Golang?sync.Once實現(xiàn)單例模式的方法詳解

    Golang?sync.Once實現(xiàn)單例模式的方法詳解

    Go?語言的?sync?包提供了一系列同步原語,其中?sync.Once?就是其中之一。本文將深入探討?sync.Once?的實現(xiàn)原理和使用方法,幫助大家更好地理解和應用?sync.Once,需要的可以參考一下
    2023-05-05

最新評論