Go語言并發(fā)編程之控制并發(fā)數(shù)量實現(xiàn)實例
今天我主要分享下Go語言并發(fā)編程如何控制并發(fā)數(shù)量。
適用場景
有一批數(shù)據(jù)需要并發(fā)處理,不能開啟協(xié)程數(shù)量過多,以免服務(wù)器資源耗盡或者對服務(wù)造成過大壓力,需要控制并發(fā)數(shù)量為N。
代碼
話不多說,直接上代碼,示例中采用三種方式進行處理。
1、以int數(shù)據(jù)集為例,并發(fā)數(shù)量為num;
2、第一種方式,并發(fā)函數(shù)報錯則終止任務(wù)執(zhí)行。后兩種方式會等待所有處理任務(wù)執(zhí)行完,再返回是否發(fā)生錯誤。
代碼如下:
# utils.go
package utils
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
// BatchDeal BatchDeal
// TODO int類型待后續(xù)修改為泛型T
func BatchDeal(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) {
ch := make(chan int, num)
go func() {
select {
case <-ctx.Done():
return
default:
}
for _, v := range records {
ch <- v
}
close(ch)
}()
errCh := make(chan error, len(records))
go func() {
goN(num, func(i int) {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
default:
}
for v := range ch {
if er := f(ctx, v); er != nil {
errCh <- er
}
}
})()
// 處理完關(guān)閉errCh
close(errCh)
}()
// 有錯誤就結(jié)束或者關(guān)閉errCh后執(zhí)行
err = <-errCh
if err != nil {
fmt.Printf("batch deal fail, err=%v", err)
return
}
fmt.Println("batch deal end")
return
}
func goN(n int, fn func(int)) func() {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
fn(i)
wg.Done()
}(i)
}
return wg.Wait
}
// BatchDeal2 BatchDeal2
// TODO int類型待后續(xù)修改為泛型T
func BatchDeal2(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) {
ch := make(chan int, num)
go func() {
select {
case <-ctx.Done():
return
default:
}
for _, v := range records {
ch <- v
}
close(ch)
}()
err = groupN(ctx, num, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
for v := range ch {
if err := f(ctx, v); err != nil {
return err
}
}
return nil
})
if err != nil {
fmt.Printf("batch deal fail, err=%v", err)
return
}
fmt.Println("batch deal end")
return
}
// groupN n為并發(fā)數(shù)量
func groupN(ctx context.Context, n int, fn func(context.Context) error) error {
group, ctx := errgroup.WithContext(ctx)
for i := 0; i < n; i++ {
group.Go(func() error {
if err := fn(ctx); err != nil {
return err
}
return nil
})
}
return group.Wait()
}
// BatchDeal3 BatchDeal3
// TODO int類型待后續(xù)修改為泛型T
func BatchDeal3(ctx context.Context, records []int, num int, f func(context.Context, int) error) (err error) {
group, ctx := errgroup.WithContext(ctx)
// 并發(fā)控制channel,并發(fā)數(shù)量為num
ch := make(chan struct{}, num)
for _, v := range records {
// 元素進channel,并發(fā)超過10則阻塞
ch <- struct{}{}
vCopy := v
group.Go(func() error {
// 釋放元素
defer func() {
<-ch
}()
if err := f(ctx, vCopy); err != nil {
return err
}
return nil
})
}
// 等待執(zhí)行完畢,全部執(zhí)行完畢才會結(jié)束
if err = group.Wait(); err != nil {
fmt.Printf("batch deal failed, err=%v", err)
}
fmt.Println("batch deal end")
return
}
# utils_test.go
package utils
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// TestBatchDeal
func TestBatchDeal(t *testing.T) {
records := make([]int, 0)
for i := 1; i < 100; i++ {
records = append(records, i)
}
num := 10
testAssert := assert.New(t)
testF := func(ctx context.Context, i int) error {
fmt.Printf("args=%d", i)
time.Sleep(time.Duration(i * int(time.Millisecond)))
return nil
}
testFailF := func(ctx context.Context, i int) error {
fmt.Printf("args=%d", i)
time.Sleep(time.Duration(i * int(time.Millisecond)))
var er error
if i == 10 {
fmt.Printf("error accour, i=%d\n", i)
er = fmt.Errorf("err=%d", i)
}
return er
}
err := BatchDeal(context.Background(), records, num, testF)
testAssert.Nil(err)
err2 := BatchDeal(context.Background(), records, num, testFailF)
testAssert.ErrorContains(err2, "err")
err3 := BatchDeal2(context.Background(), records, num, testF)
testAssert.Nil(err3)
err4 := BatchDeal2(context.Background(), records, num, testFailF)
testAssert.ErrorContains(err4, "err")
err5 := BatchDeal3(context.Background(), records, num, testF)
testAssert.Nil(err5)
err6 := BatchDeal3(context.Background(), records, num, testFailF)
testAssert.ErrorContains(err6, "err")
}以上就是Go語言并發(fā)編程之控制并發(fā)數(shù)量實現(xiàn)實例的詳細(xì)內(nèi)容,更多關(guān)于Go并發(fā)控制的資料請關(guān)注腳本之家其它相關(guān)文章!

