Go語言同步與異步執(zhí)行多個(gè)任務(wù)封裝詳解(Runner和RunnerAsync)
前言
同步適合多個(gè)連續(xù)執(zhí)行的,每一步的執(zhí)行依賴于上一步操作,異步執(zhí)行則和任務(wù)執(zhí)行順序無關(guān)(如從10個(gè)站點(diǎn)抓取數(shù)據(jù))
同步執(zhí)行類RunnerAsync
支持返回超時(shí)檢測,系統(tǒng)中斷檢測
錯(cuò)誤常量定義
//超時(shí)錯(cuò)誤 var ErrTimeout = errors.New("received timeout") //操作系統(tǒng)系統(tǒng)中斷錯(cuò)誤 var ErrInterrupt = errors.New("received interrupt")
實(shí)現(xiàn)代碼如下
package task import ( "os" "time" "os/signal" "sync" ) //異步執(zhí)行任務(wù) type Runner struct { //操作系統(tǒng)的信號(hào)檢測 interrupt chan os.Signal //記錄執(zhí)行完成的狀態(tài) complete chan error //超時(shí)檢測 timeout <-chan time.Time //保存所有要執(zhí)行的任務(wù),順序執(zhí)行 tasks []func(id int) error waitGroup sync.WaitGroup lock sync.Mutex errs []error } //new一個(gè)Runner對象 func NewRunner(d time.Duration) *Runner { return &Runner{ interrupt: make(chan os.Signal, 1), complete: make(chan error), timeout: time.After(d), waitGroup: sync.WaitGroup{}, lock: sync.Mutex{}, } } //添加一個(gè)任務(wù) func (this *Runner) Add(tasks ...func(id int) error) { this.tasks = append(this.tasks, tasks...) } //啟動(dòng)Runner,監(jiān)聽錯(cuò)誤信息 func (this *Runner) Start() error { //接收操作系統(tǒng)信號(hào) signal.Notify(this.interrupt, os.Interrupt) //并發(fā)執(zhí)行任務(wù) go func() { this.complete <- this.Run() }() select { //返回執(zhí)行結(jié)果 case err := <-this.complete: return err //超時(shí)返回 case <-this.timeout: return ErrTimeout } } //異步執(zhí)行所有的任務(wù) func (this *Runner) Run() error { for id, task := range this.tasks { if this.gotInterrupt() { return ErrInterrupt } this.waitGroup.Add(1) go func(id int) { this.lock.Lock() //執(zhí)行任務(wù) err := task(id) //加鎖保存到結(jié)果集中 this.errs = append(this.errs, err) this.lock.Unlock() this.waitGroup.Done() }(id) } this.waitGroup.Wait() return nil } //判斷是否接收到操作系統(tǒng)中斷信號(hào) func (this *Runner) gotInterrupt() bool { select { case <-this.interrupt: //停止接收別的信號(hào) signal.Stop(this.interrupt) return true //正常執(zhí)行 default: return false } } //獲取執(zhí)行完的error func (this *Runner) GetErrs() []error { return this.errs }
使用方法
Add添加一個(gè)任務(wù),任務(wù)為接收int類型的一個(gè)閉包
Start開始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)
測試示例代碼
package task import ( "testing" "time" "fmt" "os" "runtime" ) func TestRunnerAsync_Start(t *testing.T) { //開啟多核 runtime.GOMAXPROCS(runtime.NumCPU()) //創(chuàng)建runner對象,設(shè)置超時(shí)時(shí)間 runner := NewRunnerAsync(8 * time.Second) //添加運(yùn)行的任務(wù) runner.Add( createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), createTaskAsync(), ) fmt.Println("同步執(zhí)行任務(wù)") //開始執(zhí)行任務(wù) if err := runner.Start(); err != nil { switch err { case ErrTimeout: fmt.Println("執(zhí)行超時(shí)") os.Exit(1) case ErrInterrupt: fmt.Println("任務(wù)被中斷") os.Exit(2) } } t.Log("執(zhí)行結(jié)束") } //創(chuàng)建要執(zhí)行的任務(wù) func createTaskAsync() func(id int) { return func(id int) { fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id) //模擬任務(wù)執(zhí)行,sleep兩秒 //time.Sleep(1 * time.Second) } }
執(zhí)行結(jié)果
同步執(zhí)行任務(wù) 正在執(zhí)行0個(gè)任務(wù) 正在執(zhí)行1個(gè)任務(wù) 正在執(zhí)行2個(gè)任務(wù) 正在執(zhí)行3個(gè)任務(wù) 正在執(zhí)行4個(gè)任務(wù) 正在執(zhí)行5個(gè)任務(wù) 正在執(zhí)行6個(gè)任務(wù) 正在執(zhí)行7個(gè)任務(wù) 正在執(zhí)行8個(gè)任務(wù) 正在執(zhí)行9個(gè)任務(wù) 正在執(zhí)行10個(gè)任務(wù) 正在執(zhí)行11個(gè)任務(wù) 正在執(zhí)行12個(gè)任務(wù) runnerAsync_test.go:49: 執(zhí)行結(jié)束
異步執(zhí)行類Runner
支持返回超時(shí)檢測,系統(tǒng)中斷檢測
實(shí)現(xiàn)代碼如下
package task import ( "os" "time" "os/signal" "sync" ) //異步執(zhí)行任務(wù) type Runner struct { //操作系統(tǒng)的信號(hào)檢測 interrupt chan os.Signal //記錄執(zhí)行完成的狀態(tài) complete chan error //超時(shí)檢測 timeout <-chan time.Time //保存所有要執(zhí)行的任務(wù),順序執(zhí)行 tasks []func(id int) error waitGroup sync.WaitGroup lock sync.Mutex errs []error } //new一個(gè)Runner對象 func NewRunner(d time.Duration) *Runner { return &Runner{ interrupt: make(chan os.Signal, 1), complete: make(chan error), timeout: time.After(d), waitGroup: sync.WaitGroup{}, lock: sync.Mutex{}, } } //添加一個(gè)任務(wù) func (this *Runner) Add(tasks ...func(id int) error) { this.tasks = append(this.tasks, tasks...) } //啟動(dòng)Runner,監(jiān)聽錯(cuò)誤信息 func (this *Runner) Start() error { //接收操作系統(tǒng)信號(hào) signal.Notify(this.interrupt, os.Interrupt) //并發(fā)執(zhí)行任務(wù) go func() { this.complete <- this.Run() }() select { //返回執(zhí)行結(jié)果 case err := <-this.complete: return err //超時(shí)返回 case <-this.timeout: return ErrTimeout } } //異步執(zhí)行所有的任務(wù) func (this *Runner) Run() error { for id, task := range this.tasks { if this.gotInterrupt() { return ErrInterrupt } this.waitGroup.Add(1) go func(id int) { this.lock.Lock() //執(zhí)行任務(wù) err := task(id) //加鎖保存到結(jié)果集中 this.errs = append(this.errs, err) this.lock.Unlock() this.waitGroup.Done() }(id) } this.waitGroup.Wait() return nil } //判斷是否接收到操作系統(tǒng)中斷信號(hào) func (this *Runner) gotInterrupt() bool { select { case <-this.interrupt: //停止接收別的信號(hào) signal.Stop(this.interrupt) return true //正常執(zhí)行 default: return false } } //獲取執(zhí)行完的error func (this *Runner) GetErrs() []error { return this.errs }
使用方法
Add添加一個(gè)任務(wù),任務(wù)為接收int類型,返回類型error的一個(gè)閉包
Start開始執(zhí)行傷,返回一個(gè)error類型,nil為執(zhí)行完畢, ErrTimeout代表執(zhí)行超時(shí),ErrInterrupt代表執(zhí)行被中斷(類似Ctrl + C操作)
getErrs獲取所有的任務(wù)執(zhí)行結(jié)果
測試示例代碼
package task import ( "testing" "time" "fmt" "os" "runtime" ) func TestRunner_Start(t *testing.T) { //開啟多核心 runtime.GOMAXPROCS(runtime.NumCPU()) //創(chuàng)建runner對象,設(shè)置超時(shí)時(shí)間 runner := NewRunner(18 * time.Second) //添加運(yùn)行的任務(wù) runner.Add( createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), createTask(), ) fmt.Println("異步執(zhí)行任務(wù)") //開始執(zhí)行任務(wù) if err := runner.Start(); err != nil { switch err { case ErrTimeout: fmt.Println("執(zhí)行超時(shí)") os.Exit(1) case ErrInterrupt: fmt.Println("任務(wù)被中斷") os.Exit(2) } } t.Log("執(zhí)行結(jié)束") t.Log(runner.GetErrs()) } //創(chuàng)建要執(zhí)行的任務(wù) func createTask() func(id int) error { return func(id int) error { fmt.Printf("正在執(zhí)行%v個(gè)任務(wù)\n", id) //模擬任務(wù)執(zhí)行,sleep //time.Sleep(1 * time.Second) return nil } }
執(zhí)行結(jié)果
異步執(zhí)行任務(wù) 正在執(zhí)行2個(gè)任務(wù) 正在執(zhí)行1個(gè)任務(wù) 正在執(zhí)行4個(gè)任務(wù) 正在執(zhí)行3個(gè)任務(wù) 正在執(zhí)行6個(gè)任務(wù) 正在執(zhí)行5個(gè)任務(wù) 正在執(zhí)行9個(gè)任務(wù) 正在執(zhí)行7個(gè)任務(wù) 正在執(zhí)行10個(gè)任務(wù) 正在執(zhí)行13個(gè)任務(wù) 正在執(zhí)行8個(gè)任務(wù) 正在執(zhí)行11個(gè)任務(wù) 正在執(zhí)行12個(gè)任務(wù) 正在執(zhí)行0個(gè)任務(wù) runner_test.go:49: 執(zhí)行結(jié)束 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
gin解析json格式的數(shù)據(jù)出錯(cuò)的處理方案
這篇文章主要介紹了gin解析json格式的數(shù)據(jù)出錯(cuò)的處理方案,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-03-03Go標(biāo)準(zhǔn)庫http與fasthttp服務(wù)端性能對比場景分析
這篇文章主要介紹了Go標(biāo)準(zhǔn)庫http與fasthttp服務(wù)端性能比較,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06GOLANG使用Context實(shí)現(xiàn)傳值、超時(shí)和取消的方法
這篇文章主要介紹了GOLANG使用Context實(shí)現(xiàn)傳值、超時(shí)和取消的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-01-01Go語言學(xué)習(xí)之函數(shù)的定義與使用詳解
這篇文章主要為大家詳細(xì)介紹Go語言中函數(shù)的定義與使用,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Go語言有一定幫助,需要的可以參考一下2022-04-04