Go語言實現(xiàn)并發(fā)控制的常見方式詳解
一、Channel并發(fā)控制
1.1 channel切片控制攜程執(zhí)行
通過創(chuàng)建一個切片channel 控制多個攜程地并發(fā)執(zhí)行,并收集攜程執(zhí)行獲取的數(shù)據(jù)及錯誤信息
type ResultDto struct { Err error Data interface{} } ? func main() { channel := make([]chan *ResultDto, 10) for i := 0; i < 10; i++ { channel[i] = make(chan *ResultDto) temp := i go Process(temp, channel[i]) } ? for _, ch := range channel { fmt.Println(<-ch) } } ? func Process(i int, ch chan *ResultDto) { // Do some work... if i == 1 { ch <- &ResultDto{Err: errors.New("do work err")} } else { ch <- &ResultDto{Data: i} } }
1.2 channel控制并發(fā)數(shù)量
通過帶緩沖區(qū)的channel控制并發(fā)執(zhí)行攜程的數(shù)量 , 注意這里需要配合 sync.WaitGroup
一起使用,不然當執(zhí)行到i為7 8 9 時,子攜程還沒有執(zhí)行完,主攜程就退出了
func main() { wg := &sync.WaitGroup{} ch := make(chan struct{}, 3) for i := 0; i < 10; i++ { ch <- struct{}{} wg.Add(1) // 執(zhí)行攜程 temp := i go Process(wg, temp, ch) } wg.Wait() } ? func Process(wg *sync.WaitGroup, i int, ch chan struct{}) { defer func() { <-ch wg.Done() }() // Do some work... time.Sleep(1 * time.Second) fmt.Println(i) }
二、WaitGroup并發(fā)控制
2.1 WaitGroup 控制協(xié)程并行
WaitGroup是Golang應用開發(fā)過程中經(jīng)常使用的并發(fā)控制技術(shù)。
WaitGroup,可理解為Wait-Goroutine-Group,即等待一組goroutine結(jié)束。比如某個goroutine需要等待其他幾個goroutine全部完成,那么使用WaitGroup可以輕松實現(xiàn)。
func main() { wg := &sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) temp := i go Process(wg, temp) } wg.Wait() } ? func Process(wg *sync.WaitGroup, i int) { defer func() { wg.Done() }() // Do some work... time.Sleep(1 * time.Second) fmt.Println(i) }
簡單的說,上面程序中wg內(nèi)部維護了一個計數(shù)器:
- 啟動goroutine前將計數(shù)器通過Add(2)將計數(shù)器設置為待啟動的goroutine個數(shù)。
- 啟動goroutine后,使用Wait()方法阻塞自己,等待計數(shù)器變?yōu)?。
- 每個goroutine執(zhí)行結(jié)束通過Done()方法將計數(shù)器減1。
- 計數(shù)器變?yōu)?后,阻塞的goroutine被喚醒。
2.2 WaitGroup封裝通用函數(shù)
waitGroup控制并發(fā)執(zhí)行,limit 并發(fā)上限,收集錯誤返回
func main() { funcList := []ExeFunc{ func(ctx context.Context) error { fmt.Println("5 開始") time.Sleep(5 * time.Second) fmt.Println("5 結(jié)束") return nil }, func(ctx context.Context) error { fmt.Println("3 開始") time.Sleep(3 * time.Second) fmt.Println("3 結(jié)束") return nil }, } err := GoExeAll(context.Background(), 2, funcList...) if err != nil { fmt.Println(err) } } ? type ExeFunc func(ctx context.Context) error ? // GoExeAll 并發(fā)執(zhí)行所有,limit 為并發(fā)上限,收集所有錯誤返回 func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) { wg := &sync.WaitGroup{} ch := make(chan struct{}, limit) errCh := make(chan error, len(fs)) for _, f := range fs { fTmp := f wg.Add(1) ch <- struct{}{} go func() { defer func() { if panicErr := recover(); panicErr != nil { errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr)) } wg.Done() <-ch }() if err := fTmp(ctx); err != nil { errCh <- err } }() } wg.Wait() close(errCh) close(ch) for chErr := range errCh { errs = append(errs, chErr) } return }
三、Context
Golang context是Golang應用開發(fā)常用的并發(fā)控制技術(shù),它與WaitGroup最大的不同點是context對于派生goroutine有更強的控制力,它可以控制多級的goroutine。
3.1 Context定義的接口
context實際上只定義了接口,凡是實現(xiàn)該接口的類都可稱為是一種context,官方包中實現(xiàn)了幾個常用的context,分別可用于不同的場景。
type Context interface { Deadline() (deadline time.Time, ok bool) ? Done() <-chan struct{} ? Err() error ? Value(key interface{}) interface{} }
Deadline()
該方法返回一個deadline和標識是否已設置deadline的bool值,如果沒有設置deadline,則ok == false,此時deadline為一個初始值的time.Time值
Done()
該方法返回一個channel,需要在select-case語句中使用,如”case <-context.Done():”。
當context關(guān)閉后,Done()返回一個被關(guān)閉的管道,關(guān)閉的管道仍然是可讀的,據(jù)此goroutine可以收到關(guān)閉請求;當context還未關(guān)閉時,Done()返回nil。
Err()
該方法描述context關(guān)閉的原因。關(guān)閉原因由context實現(xiàn)控制,不需要用戶設置。比如Deadline context,關(guān)閉原因可能是因為deadline,也可能提前被主動關(guān)閉,那么關(guān)閉原因就會不同:
Value()
有一種context,它不是用于控制呈樹狀分布的goroutine,而是用于在樹狀分布的goroutine間傳遞信息
3.2 Context控制協(xié)程結(jié)束
func main() { wg := &sync.WaitGroup{} ctx, cancelFunc := context.WithCancel(context.Background()) for i := 0; i < 10; i++ { wg.Add(1) temp := i go Process(ctx, wg, temp) } time.Sleep(5 * time.Second) cancelFunc() wg.Wait() } ? func Process(ctx context.Context, wg *sync.WaitGroup, i int) { defer wg.Done() ch := make(chan error) go DoWork(ctx, ch, i) select { case <-ctx.Done(): fmt.Println("cancelFunc") return case <-ch: return } } ? func DoWork(ctx context.Context, ch chan error, i int) { defer func() { ch <- nil }() time.Sleep(time.Duration(i) * time.Second) fmt.Println(i) }
四、 ErrorGroup
可采用第三方庫golang.org/x/sync/errgroup
堆多個協(xié)助并發(fā)執(zhí)行進行控制
4.1 errorGroup并發(fā)執(zhí)行,limit 為并發(fā)上限,timeout超時
func main() { funcList := []ExeFunc{ func(ctx context.Context) error { fmt.Println("5 開始") time.Sleep(5 * time.Second) fmt.Println("5 結(jié)束") return nil }, func(ctx context.Context) error { fmt.Println("3 開始") time.Sleep(3 * time.Second) fmt.Println("3 結(jié)束") return nil }, } ? err := GoExe(context.Background(), 2, 10*time.Second, funcList...) if err != nil { fmt.Println(err) } } ? type ExeFunc func(ctx context.Context) error ? // GoExe 并發(fā)執(zhí)行,limit 為并發(fā)上限,其中任意一個報錯,其他中斷,timeout為0不超時 func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error { eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(limit) var timeCh <-chan time.Time if timeout > 0 { timeCh = time.After(timeout) } for _, f := range fs { fTmp := f eg.Go(func() (err error) { ch := make(chan error) defer close(ch) go DoWorkFunc(ctx, ch, fTmp) select { case <-ctx.Done(): return ctx.Err() case <-timeCh: return errors.New("execution timeout") case err = <-ch: return err } }) } if err := eg.Wait(); err != nil { return err } return nil } ? func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) { var err error defer func() { if panicErr := recover(); panicErr != nil { err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr)) } ch <- err }() err = fs(ctx) return }
五、通用協(xié)程控制工具封裝
import ( "context" "errors" "fmt" "golang.org/x/sync/errgroup" "sync" "time" ) ? ? // ExeFunc 要被執(zhí)行的函數(shù)或方法 type ExeFunc func(ctx context.Context) error ? // SeqExe 順序執(zhí)行,遇到錯誤就返回 func SeqExe(ctx context.Context, fs ...ExeFunc) error { for _, f := range fs { if err := f(ctx); err != nil { return err } } return nil } ? // GoExe 并發(fā)執(zhí)行,limit 為并發(fā)上限,其中任意一個報錯,其他中斷,timeout為0不超時 func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error { eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(limit) var timeCh <-chan time.Time if timeout > 0 { timeCh = time.After(timeout) } for _, f := range fs { fTmp := f eg.Go(func() (err error) { ch := make(chan error) defer close(ch) go DoWorkFunc(ctx, ch, fTmp) select { case <-ctx.Done(): return ctx.Err() case <-timeCh: return errors.New("execution timeout") case err = <-ch: return err } }) } if err := eg.Wait(); err != nil { return err } return nil } ? func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) { var err error defer func() { if panicErr := recover(); panicErr != nil { err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr)) } ch <- err }() err = fs(ctx) return } ? // SeqExeAll 順序執(zhí)行所有,收集所有錯誤返回 func SeqExeAll(ctx context.Context, fs ...ExeFunc) (errs []error) { for _, f := range fs { if err := f(ctx); err != nil { errs = append(errs, err) } } return errs } ? // GoExeAll 并發(fā)執(zhí)行所有,limit 為并發(fā)上限,收集所有錯誤返回 func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) { wg := &sync.WaitGroup{} ch := make(chan struct{}, limit) errCh := make(chan error, len(fs)) for _, f := range fs { fTmp := f wg.Add(1) ch <- struct{}{} go func() { defer func() { if panicErr := recover(); panicErr != nil { errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr)) } wg.Done() <-ch }() if err := fTmp(ctx); err != nil { errCh <- err } }() } wg.Wait() close(errCh) close(ch) for chErr := range errCh { errs = append(errs, chErr) } return }
以上就是Go語言實現(xiàn)并發(fā)控制的常見方式詳解的詳細內(nèi)容,更多關(guān)于Go并發(fā)控制的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
GO 函數(shù)式選項模式(Functional Options Pattern)
Option模式支持傳遞多個參數(shù),并且在參數(shù)個數(shù)、類型發(fā)生變化時保持兼容性,任意順序傳遞參數(shù),下面給大家介紹GO 函數(shù)式選項模式(Functional Options Pattern)的相關(guān)知識,感興趣的朋友一起看看吧2021-10-10Golang 數(shù)據(jù)庫操作(sqlx)和不定字段結(jié)果查詢
本文主要介紹了Golang 數(shù)據(jù)庫操作(sqlx)和不定字段結(jié)果查詢,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09詳解Golang如何實現(xiàn)一個環(huán)形緩沖器
環(huán)形緩沖器(ringr?buffer)是一種用于表示一個固定尺寸、頭尾相連的緩沖區(qū)的數(shù)據(jù)結(jié)構(gòu),適合緩存數(shù)據(jù)流。本文將利用Golang實現(xiàn)一個環(huán)形緩沖器,需要的可以參考一下2022-09-09