Go語(yǔ)言實(shí)現(xiàn)并發(fā)控制的常見方式詳解
一、Channel并發(fā)控制
1.1 channel切片控制攜程執(zhí)行
通過創(chuàng)建一個(gè)切片channel 控制多個(gè)攜程地并發(fā)執(zhí)行,并收集攜程執(zhí)行獲取的數(shù)據(jù)及錯(cuò)誤信息
type ResultDto struct {
Err error
Data interface{}
}
?
func main() {
channel := make([]chan *ResultDto, 10)
for i := 0; i < 10; i++ {
channel[i] = make(chan *ResultDto)
temp := i
go Process(temp, channel[i])
}
?
for _, ch := range channel {
fmt.Println(<-ch)
}
}
?
func Process(i int, ch chan *ResultDto) {
// Do some work...
if i == 1 {
ch <- &ResultDto{Err: errors.New("do work err")}
} else {
ch <- &ResultDto{Data: i}
}
}1.2 channel控制并發(fā)數(shù)量
通過帶緩沖區(qū)的channel控制并發(fā)執(zhí)行攜程的數(shù)量 , 注意這里需要配合 sync.WaitGroup 一起使用,不然當(dāng)執(zhí)行到i為7 8 9 時(shí),子攜程還沒有執(zhí)行完,主攜程就退出了
func main() {
wg := &sync.WaitGroup{}
ch := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
ch <- struct{}{}
wg.Add(1)
// 執(zhí)行攜程
temp := i
go Process(wg, temp, ch)
}
wg.Wait()
}
?
func Process(wg *sync.WaitGroup, i int, ch chan struct{}) {
defer func() {
<-ch
wg.Done()
}()
// Do some work...
time.Sleep(1 * time.Second)
fmt.Println(i)
}二、WaitGroup并發(fā)控制
2.1 WaitGroup 控制協(xié)程并行
WaitGroup是Golang應(yīng)用開發(fā)過程中經(jīng)常使用的并發(fā)控制技術(shù)。
WaitGroup,可理解為Wait-Goroutine-Group,即等待一組goroutine結(jié)束。比如某個(gè)goroutine需要等待其他幾個(gè)goroutine全部完成,那么使用WaitGroup可以輕松實(shí)現(xiàn)。
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
temp := i
go Process(wg, temp)
}
wg.Wait()
}
?
func Process(wg *sync.WaitGroup, i int) {
defer func() {
wg.Done()
}()
// Do some work...
time.Sleep(1 * time.Second)
fmt.Println(i)
}
簡(jiǎn)單的說,上面程序中wg內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器:
- 啟動(dòng)goroutine前將計(jì)數(shù)器通過Add(2)將計(jì)數(shù)器設(shè)置為待啟動(dòng)的goroutine個(gè)數(shù)。
- 啟動(dòng)goroutine后,使用Wait()方法阻塞自己,等待計(jì)數(shù)器變?yōu)?。
- 每個(gè)goroutine執(zhí)行結(jié)束通過Done()方法將計(jì)數(shù)器減1。
- 計(jì)數(shù)器變?yōu)?后,阻塞的goroutine被喚醒。
2.2 WaitGroup封裝通用函數(shù)
waitGroup控制并發(fā)執(zhí)行,limit 并發(fā)上限,收集錯(cuò)誤返回
func main() {
funcList := []ExeFunc{
func(ctx context.Context) error {
fmt.Println("5 開始")
time.Sleep(5 * time.Second)
fmt.Println("5 結(jié)束")
return nil
},
func(ctx context.Context) error {
fmt.Println("3 開始")
time.Sleep(3 * time.Second)
fmt.Println("3 結(jié)束")
return nil
},
}
err := GoExeAll(context.Background(), 2, funcList...)
if err != nil {
fmt.Println(err)
}
}
?
type ExeFunc func(ctx context.Context) error
?
// GoExeAll 并發(fā)執(zhí)行所有,limit 為并發(fā)上限,收集所有錯(cuò)誤返回
func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) {
wg := &sync.WaitGroup{}
ch := make(chan struct{}, limit)
errCh := make(chan error, len(fs))
for _, f := range fs {
fTmp := f
wg.Add(1)
ch <- struct{}{}
go func() {
defer func() {
if panicErr := recover(); panicErr != nil {
errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
}
wg.Done()
<-ch
}()
if err := fTmp(ctx); err != nil {
errCh <- err
}
}()
}
wg.Wait()
close(errCh)
close(ch)
for chErr := range errCh {
errs = append(errs, chErr)
}
return
}三、Context
Golang context是Golang應(yīng)用開發(fā)常用的并發(fā)控制技術(shù),它與WaitGroup最大的不同點(diǎn)是context對(duì)于派生goroutine有更強(qiáng)的控制力,它可以控制多級(jí)的goroutine。
3.1 Context定義的接口
context實(shí)際上只定義了接口,凡是實(shí)現(xiàn)該接口的類都可稱為是一種context,官方包中實(shí)現(xiàn)了幾個(gè)常用的context,分別可用于不同的場(chǎng)景。
type Context interface {
Deadline() (deadline time.Time, ok bool)
?
Done() <-chan struct{}
?
Err() error
?
Value(key interface{}) interface{}
}
Deadline()
該方法返回一個(gè)deadline和標(biāo)識(shí)是否已設(shè)置deadline的bool值,如果沒有設(shè)置deadline,則ok == false,此時(shí)deadline為一個(gè)初始值的time.Time值
Done()
該方法返回一個(gè)channel,需要在select-case語(yǔ)句中使用,如”case <-context.Done():”。
當(dāng)context關(guān)閉后,Done()返回一個(gè)被關(guān)閉的管道,關(guān)閉的管道仍然是可讀的,據(jù)此goroutine可以收到關(guān)閉請(qǐng)求;當(dāng)context還未關(guān)閉時(shí),Done()返回nil。
Err()
該方法描述context關(guān)閉的原因。關(guān)閉原因由context實(shí)現(xiàn)控制,不需要用戶設(shè)置。比如Deadline context,關(guān)閉原因可能是因?yàn)閐eadline,也可能提前被主動(dòng)關(guān)閉,那么關(guān)閉原因就會(huì)不同:
Value()
有一種context,它不是用于控制呈樹狀分布的goroutine,而是用于在樹狀分布的goroutine間傳遞信息
3.2 Context控制協(xié)程結(jié)束
func main() {
wg := &sync.WaitGroup{}
ctx, cancelFunc := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
wg.Add(1)
temp := i
go Process(ctx, wg, temp)
}
time.Sleep(5 * time.Second)
cancelFunc()
wg.Wait()
}
?
func Process(ctx context.Context, wg *sync.WaitGroup, i int) {
defer wg.Done()
ch := make(chan error)
go DoWork(ctx, ch, i)
select {
case <-ctx.Done():
fmt.Println("cancelFunc")
return
case <-ch:
return
}
}
?
func DoWork(ctx context.Context, ch chan error, i int) {
defer func() {
ch <- nil
}()
time.Sleep(time.Duration(i) * time.Second)
fmt.Println(i)
}
四、 ErrorGroup
可采用第三方庫(kù)golang.org/x/sync/errgroup堆多個(gè)協(xié)助并發(fā)執(zhí)行進(jìn)行控制
4.1 errorGroup并發(fā)執(zhí)行,limit 為并發(fā)上限,timeout超時(shí)
func main() {
funcList := []ExeFunc{
func(ctx context.Context) error {
fmt.Println("5 開始")
time.Sleep(5 * time.Second)
fmt.Println("5 結(jié)束")
return nil
},
func(ctx context.Context) error {
fmt.Println("3 開始")
time.Sleep(3 * time.Second)
fmt.Println("3 結(jié)束")
return nil
},
}
?
err := GoExe(context.Background(), 2, 10*time.Second, funcList...)
if err != nil {
fmt.Println(err)
}
}
?
type ExeFunc func(ctx context.Context) error
?
// GoExe 并發(fā)執(zhí)行,limit 為并發(fā)上限,其中任意一個(gè)報(bào)錯(cuò),其他中斷,timeout為0不超時(shí)
func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(limit)
var timeCh <-chan time.Time
if timeout > 0 {
timeCh = time.After(timeout)
}
for _, f := range fs {
fTmp := f
eg.Go(func() (err error) {
ch := make(chan error)
defer close(ch)
go DoWorkFunc(ctx, ch, fTmp)
select {
case <-ctx.Done():
return ctx.Err()
case <-timeCh:
return errors.New("execution timeout")
case err = <-ch:
return err
}
})
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}
?
func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) {
var err error
defer func() {
if panicErr := recover(); panicErr != nil {
err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
}
ch <- err
}()
err = fs(ctx)
return
}五、通用協(xié)程控制工具封裝
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
?
?
// ExeFunc 要被執(zhí)行的函數(shù)或方法
type ExeFunc func(ctx context.Context) error
?
// SeqExe 順序執(zhí)行,遇到錯(cuò)誤就返回
func SeqExe(ctx context.Context, fs ...ExeFunc) error {
for _, f := range fs {
if err := f(ctx); err != nil {
return err
}
}
return nil
}
?
// GoExe 并發(fā)執(zhí)行,limit 為并發(fā)上限,其中任意一個(gè)報(bào)錯(cuò),其他中斷,timeout為0不超時(shí)
func GoExe(ctx context.Context, limit int, timeout time.Duration, fs ...ExeFunc) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(limit)
var timeCh <-chan time.Time
if timeout > 0 {
timeCh = time.After(timeout)
}
for _, f := range fs {
fTmp := f
eg.Go(func() (err error) {
ch := make(chan error)
defer close(ch)
go DoWorkFunc(ctx, ch, fTmp)
select {
case <-ctx.Done():
return ctx.Err()
case <-timeCh:
return errors.New("execution timeout")
case err = <-ch:
return err
}
})
}
if err := eg.Wait(); err != nil {
return err
}
return nil
}
?
func DoWorkFunc(ctx context.Context, ch chan error, fs ExeFunc) {
var err error
defer func() {
if panicErr := recover(); panicErr != nil {
err = errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
}
ch <- err
}()
err = fs(ctx)
return
}
?
// SeqExeAll 順序執(zhí)行所有,收集所有錯(cuò)誤返回
func SeqExeAll(ctx context.Context, fs ...ExeFunc) (errs []error) {
for _, f := range fs {
if err := f(ctx); err != nil {
errs = append(errs, err)
}
}
return errs
}
?
// GoExeAll 并發(fā)執(zhí)行所有,limit 為并發(fā)上限,收集所有錯(cuò)誤返回
func GoExeAll(ctx context.Context, limit int, fs ...ExeFunc) (errs []error) {
wg := &sync.WaitGroup{}
ch := make(chan struct{}, limit)
errCh := make(chan error, len(fs))
for _, f := range fs {
fTmp := f
wg.Add(1)
ch <- struct{}{}
go func() {
defer func() {
if panicErr := recover(); panicErr != nil {
errCh <- errors.New("execution panic:" + fmt.Sprintf("%v", panicErr))
}
wg.Done()
<-ch
}()
if err := fTmp(ctx); err != nil {
errCh <- err
}
}()
}
wg.Wait()
close(errCh)
close(ch)
for chErr := range errCh {
errs = append(errs, chErr)
}
return
}以上就是Go語(yǔ)言實(shí)現(xiàn)并發(fā)控制的常見方式詳解的詳細(xì)內(nèi)容,更多關(guān)于Go并發(fā)控制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
GO 函數(shù)式選項(xiàng)模式(Functional Options Pattern)
Option模式支持傳遞多個(gè)參數(shù),并且在參數(shù)個(gè)數(shù)、類型發(fā)生變化時(shí)保持兼容性,任意順序傳遞參數(shù),下面給大家介紹GO 函數(shù)式選項(xiàng)模式(Functional Options Pattern)的相關(guān)知識(shí),感興趣的朋友一起看看吧2021-10-10
Golang 數(shù)據(jù)庫(kù)操作(sqlx)和不定字段結(jié)果查詢
本文主要介紹了Golang 數(shù)據(jù)庫(kù)操作(sqlx)和不定字段結(jié)果查詢,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09
詳解Golang如何實(shí)現(xiàn)一個(gè)環(huán)形緩沖器
環(huán)形緩沖器(ringr?buffer)是一種用于表示一個(gè)固定尺寸、頭尾相連的緩沖區(qū)的數(shù)據(jù)結(jié)構(gòu),適合緩存數(shù)據(jù)流。本文將利用Golang實(shí)現(xiàn)一個(gè)環(huán)形緩沖器,需要的可以參考一下2022-09-09
讓GPT教你用go語(yǔ)言和C語(yǔ)言開發(fā)IDE配置學(xué)習(xí)
這篇文章主要介紹了讓GPT教你用go語(yǔ)言和C語(yǔ)言開發(fā)IDE配置學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
基于Go語(yǔ)言實(shí)現(xiàn)插入排序算法及優(yōu)化
插入排序是一種簡(jiǎn)單的排序算法。這篇文章將利用Go語(yǔ)言實(shí)現(xiàn)冒泡排序算法,文中的示例代碼講解詳細(xì),對(duì)學(xué)習(xí)Go語(yǔ)言有一定的幫助,需要的可以參考一下2022-12-12

