Go語(yǔ)言并發(fā)編程之控制并發(fā)數(shù)量實(shí)現(xiàn)實(shí)例
今天我主要分享下Go語(yǔ)言并發(fā)編程如何控制并發(fā)數(shù)量。
適用場(chǎng)景
有一批數(shù)據(jù)需要并發(fā)處理,不能開(kāi)啟協(xié)程數(shù)量過(guò)多,以免服務(wù)器資源耗盡或者對(duì)服務(wù)造成過(guò)大壓力,需要控制并發(fā)數(shù)量為N。
代碼
話(huà)不多說(shuō),直接上代碼,示例中采用三種方式進(jìn)行處理。
1、以int數(shù)據(jù)集為例,并發(fā)數(shù)量為num;
2、第一種方式,并發(fā)函數(shù)報(bào)錯(cuò)則終止任務(wù)執(zhí)行。后兩種方式會(huì)等待所有處理任務(wù)執(zhí)行完,再返回是否發(fā)生錯(cuò)誤。
代碼如下:
# utils.go package utils import ( "context" "fmt" "sync" "golang.org/x/sync/errgroup" ) // BatchDeal BatchDeal // TODO int類(lèi)型待后續(xù)修改為泛型T func BatchDeal(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { ch := make(chan int, num) go func() { select { case <-ctx.Done(): return default: } for _, v := range records { ch <- v } close(ch) }() errCh := make(chan error, len(records)) go func() { goN(num, func(i int) { select { case <-ctx.Done(): errCh <- ctx.Err() return default: } for v := range ch { if er := f(ctx, v); er != nil { errCh <- er } } })() // 處理完關(guān)閉errCh close(errCh) }() // 有錯(cuò)誤就結(jié)束或者關(guān)閉errCh后執(zhí)行 err = <-errCh if err != nil { fmt.Printf("batch deal fail, err=%v", err) return } fmt.Println("batch deal end") return } func goN(n int, fn func(int)) func() { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func(i int) { fn(i) wg.Done() }(i) } return wg.Wait } // BatchDeal2 BatchDeal2 // TODO int類(lèi)型待后續(xù)修改為泛型T func BatchDeal2(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { ch := make(chan int, num) go func() { select { case <-ctx.Done(): return default: } for _, v := range records { ch <- v } close(ch) }() err = groupN(ctx, num, func(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() default: } for v := range ch { if err := f(ctx, v); err != nil { return err } } return nil }) if err != nil { fmt.Printf("batch deal fail, err=%v", err) return } fmt.Println("batch deal end") return } // groupN n為并發(fā)數(shù)量 func groupN(ctx context.Context, n int, fn func(context.Context) error) error { group, ctx := errgroup.WithContext(ctx) for i := 0; i < n; i++ { group.Go(func() error { if err := fn(ctx); err != nil { return err } return nil }) } return group.Wait() } // BatchDeal3 BatchDeal3 // TODO int類(lèi)型待后續(xù)修改為泛型T func BatchDeal3(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) { group, ctx := errgroup.WithContext(ctx) // 并發(fā)控制channel,并發(fā)數(shù)量為num ch := make(chan struct{}, num) for _, v := range records { // 元素進(jìn)channel,并發(fā)超過(guò)10則阻塞 ch <- struct{}{} vCopy := v group.Go(func() error { // 釋放元素 defer func() { <-ch }() if err := f(ctx, vCopy); err != nil { return err } return nil }) } // 等待執(zhí)行完畢,全部執(zhí)行完畢才會(huì)結(jié)束 if err = group.Wait(); err != nil { fmt.Printf("batch deal failed, err=%v", err) } fmt.Println("batch deal end") return } # utils_test.go package utils import ( "context" "fmt" "testing" "time" "github.com/stretchr/testify/assert" ) // TestBatchDeal func TestBatchDeal(t *testing.T) { records := make([]int, 0) for i := 1; i < 100; i++ { records = append(records, i) } num := 10 testAssert := assert.New(t) testF := func(ctx context.Context, i int) error { fmt.Printf("args=%d", i) time.Sleep(time.Duration(i * int(time.Millisecond))) return nil } testFailF := func(ctx context.Context, i int) error { fmt.Printf("args=%d", i) time.Sleep(time.Duration(i * int(time.Millisecond))) var er error if i == 10 { fmt.Printf("error accour, i=%d\n", i) er = fmt.Errorf("err=%d", i) } return er } err := BatchDeal(context.Background(), records, num, testF) testAssert.Nil(err) err2 := BatchDeal(context.Background(), records, num, testFailF) testAssert.ErrorContains(err2, "err") err3 := BatchDeal2(context.Background(), records, num, testF) testAssert.Nil(err3) err4 := BatchDeal2(context.Background(), records, num, testFailF) testAssert.ErrorContains(err4, "err") err5 := BatchDeal3(context.Background(), records, num, testF) testAssert.Nil(err5) err6 := BatchDeal3(context.Background(), records, num, testFailF) testAssert.ErrorContains(err6, "err") }
以上就是Go語(yǔ)言并發(fā)編程之控制并發(fā)數(shù)量實(shí)現(xiàn)實(shí)例的詳細(xì)內(nèi)容,更多關(guān)于Go并發(fā)控制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang中crypto/rand庫(kù)的使用技巧與最佳實(shí)踐
在Golang的眾多隨機(jī)數(shù)生成庫(kù)中,crypto/rand?是一個(gè)專(zhuān)為加密安全設(shè)計(jì)的庫(kù),本文主要介紹了Golang中crypto/rand庫(kù)的使用技巧與最佳實(shí)踐,感興趣的可以了解一下2024-02-02GPT回答 go語(yǔ)言和C語(yǔ)言數(shù)組操作對(duì)比
這篇文章主要為大家介紹了GPT回答的go語(yǔ)言和C語(yǔ)言數(shù)組操作方法對(duì)比,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Golang實(shí)現(xiàn)AES對(duì)稱(chēng)加密的過(guò)程詳解
AES是一個(gè)對(duì)稱(chēng)密碼,旨在取代DES成為廣泛使用的標(biāo)準(zhǔn),本文給大家分享Golang實(shí)現(xiàn)AES對(duì)稱(chēng)加密的過(guò)程,本文附有Golang實(shí)現(xiàn)AES加密ECB模式的源碼,感興趣的朋友跟隨小編一起學(xué)習(xí)下吧2021-05-05