golang批量執(zhí)行任務(wù)的通用模板分享
需求
一個(gè)接口調(diào)用時(shí),接收到一個(gè)列表,十個(gè)元素,需要并發(fā)執(zhí)行十個(gè)任務(wù),每個(gè)任務(wù)都要返回執(zhí)行的結(jié)果和異常,然后對(duì)返回的結(jié)果裝填到一個(gè)切片列表里,統(tǒng)一返回結(jié)果。
需要協(xié)程處理的結(jié)構(gòu)體
type Order struct { Name string `json:"name"` Id int `json:"id"` }
確定通道數(shù)量
一般按入?yún)⒌男枰幚淼脑財(cái)?shù)量為準(zhǔn)
taskNum := 10
初始化通道
orderCh := make(chan Order, taskNum) //接收返回的結(jié)果 errCh := make(chan error, taskNum) //接收返回的異常
發(fā)起執(zhí)行
我們使用sync.WaitGroup來(lái)監(jiān)聽執(zhí)行情況
wg := sync.WaitGroup{} for i:=0; i < taskNum; i++ { wg.Add(1) go func() { defer wg.Done() if i == 3 {//模擬當(dāng)i=3的時(shí)候,返回一個(gè)異常 err := errors.New("there is an error") errCh <- err return } //組裝返回結(jié)果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <- res }() } wg.Wait() //等待所有任務(wù)執(zhí)行完畢
使用for-select接收?qǐng)?zhí)行結(jié)果
orderList := make([]Order, taskNum) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append(orderList, order) } case err := <-errCh: //接收errCh if err != nil { return err //看需求,這里設(shè)計(jì)發(fā)現(xiàn)一個(gè)錯(cuò)誤就直接停止執(zhí)行,返回錯(cuò)誤 } default: fmt.Println("done") } } //處理完數(shù)據(jù),關(guān)閉通道 close(orderCh) close(errCh)
超時(shí)問題
任務(wù)執(zhí)行過程中,需要控制每個(gè)任務(wù)的執(zhí)行時(shí)間,不能超過一定范圍,我們用定時(shí)器來(lái)解決這個(gè)問題
timeoutTime := time.Second * 3 //超時(shí)時(shí)間 taskTimer := time.NewTimer(timeoutTime) //初始化定時(shí)器 orderList := make([]Order, taskNum) for i:=0; i<taskNum; i++ { select { .... case <-taskTimeout.C: //處理超時(shí) err := errors.New("task timeout") //此處我們認(rèn)為超時(shí)是錯(cuò)誤的一種,賦值給了err return ... } //每次執(zhí)行都需要重置定時(shí)器 taskTimer.Reset(timeoutTime) }
協(xié)程panic問題
主程序是無(wú)法捕捉協(xié)程內(nèi)的panic,因此如果不手動(dòng)處理,就會(huì)發(fā)生協(xié)程內(nèi)panic導(dǎo)致整個(gè)程序中止的情況,我們?cè)赿efer里處理
for i:=0; i < taskNum; i++ { wg.Add(1) go func() { defer func () { wg.Done() //協(xié)程內(nèi)單獨(dú)捕捉異常 if r := recover(); r != nil { err := errors.New(fmt.Sprintf("System panic:%v", r)) errCh <- err //此處將panic信息轉(zhuǎn)為err返回,也可以按需求和異常等級(jí)進(jìn)行處理 return } }() ........ }() }
順序問題
返回的列表元素的順序,需要跟傳參的列表順序保持一致,這時(shí)我們需要定義個(gè)帶序號(hào)的結(jié)構(gòu)體
// 需要記錄原始順序的時(shí)候,定義個(gè)帶編號(hào)的結(jié)構(gòu)體 type OrderWithSeq struct { Seq int OrderItem Order } //重寫相關(guān)排序類型 type BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } // 調(diào)整返回結(jié)果 orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號(hào)的結(jié)構(gòu)體 //在執(zhí)行任務(wù)時(shí),加入序號(hào) for i:=0; i < taskNum; i++ { i:= i wg.Add(1) go func() { ···· //組裝返回結(jié)果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //帶上i這個(gè)序號(hào) OrderItem: res, } }() //接收信息,也按帶序號(hào)的結(jié)構(gòu)體進(jìn)行組裝 orderSeqList := make([]OrderWithSeq, taskNum) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append(orderSeqList, order) } ..... } } //按原始順序進(jìn)行排序 sort.Sort(BySeq(orderSeqList)) ....重新組裝數(shù)據(jù)返回
總結(jié)
標(biāo)準(zhǔn)模板如下:
type Order struct { Name string `json:"name"` Id int `json:"id"` } // 需要記錄原始順序的時(shí)候,定義個(gè)帶編號(hào)的結(jié)構(gòu)體 type OrderWithSeq struct { Seq int OrderItem Order } //重寫相關(guān)排序類型 type BySeq []OrderWithSeq func (a BySeq) Len() int { return len(a) } func (a BySeq) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a BySeq) Less(i, j int) bool { return a[i].Seq < a[j].Seq } taskNum := 10 orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號(hào)的結(jié)構(gòu)體 errCh := make(chan error, taskNum) //接收返回的異常 wg := sync.WaitGroup{} //在執(zhí)行任務(wù)時(shí),加入序號(hào) for i:=0; i < taskNum; i++ { i:= i wg.Add(1) go func() { defer func () { wg.Done() //協(xié)程內(nèi)單獨(dú)捕捉異常 if r := recover(); r != nil { err := errors.New(fmt.Sprintf("System panic:%v", r)) errCh <- err //此處將panic信息轉(zhuǎn)為err返回,也可以按需求和異常等級(jí)進(jìn)行處理 return } }() //組裝返回結(jié)果 res := Order{ Name: "num: " + strconv.Itoa(i), Id: i, } orderCh <-OrderWithSeq { Seq: i, //帶上i這個(gè)序號(hào) OrderItem: res, } }() wg.Wait() //接收信息,也按帶序號(hào)的結(jié)構(gòu)體進(jìn)行組裝 orderSeqList := make([]OrderWithSeq, taskNum) timeoutTime := time.Second * 3 taskTimer := time.NewTimer(timeoutTime) for i:=0; i<taskNum; i++ { select { case order, ok := <-orderCh: //接收orderCh if ok { orderList = append(orderSeqList, order) } case err := <-errCh: //接收errCh if err != nil { return err } case <-taskTimer.C: //處理超時(shí) err := errors.New("task timeout") return default: fmt.Println("done") } taskTimer.Reset(timeoutTime) } close(orderCh) close(errCh) //按原始順序進(jìn)行排序 sort.Sort(BySeq(orderSeqList))
到此這篇關(guān)于golang批量執(zhí)行任務(wù)的通用模板分享的文章就介紹到這了,更多相關(guān)golang批量執(zhí)行任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang實(shí)現(xiàn)webgis后端開發(fā)的步驟詳解
這篇文章主要介紹如何用golang結(jié)合postgis數(shù)據(jù)庫(kù),使用gin、grom框架實(shí)現(xiàn)后端的MVC的接口搭建,文中有詳細(xì)的流程步驟及代碼示例,需要的朋友可以參考下2023-06-06Go語(yǔ)言中g(shù)oroutine和WaitGroup的使用示例詳解
goroutine 是Go中一個(gè)輕量級(jí)的線程, 只需要一個(gè)go關(guān)鍵字就可以創(chuàng)建一個(gè)goroutine,這篇文章主要介紹了Go語(yǔ)言中g(shù)oroutine和WaitGroup的使用,需要的朋友可以參考下2023-03-03Go語(yǔ)言的Windows下環(huán)境配置以及簡(jiǎn)單的程序結(jié)構(gòu)講解
這篇文章主要介紹了Go語(yǔ)言的Windows下環(huán)境配置以及簡(jiǎn)單的程序結(jié)構(gòu)講解,從編程語(yǔ)言約定俗成的hellow world開始,需要的朋友可以參考下2015-10-10golang中select語(yǔ)句的簡(jiǎn)單實(shí)例
Go的select語(yǔ)句是一種僅能用于channl發(fā)送和接收消息的專用語(yǔ)句,此語(yǔ)句運(yùn)行期間是阻塞的,下面這篇文章主要給大家介紹了關(guān)于golang中select語(yǔ)句的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06Golang實(shí)現(xiàn)復(fù)合數(shù)據(jù)類型
Go語(yǔ)言的復(fù)合數(shù)據(jù)類型包括數(shù)組、切片、映射、結(jié)構(gòu)體和接口,本文就來(lái)介紹一下Golang實(shí)現(xiàn)復(fù)合數(shù)據(jù)類型,具有一定的參考價(jià)值,感興趣的可以了解一下2025-02-02golang實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)事務(wù)的提交與回滾
這篇文章主要介紹了golang實(shí)現(xiàn)mysql數(shù)據(jù)庫(kù)事務(wù)的提交與回滾,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2021-04-04