golang批量執(zhí)行任務(wù)的通用模板分享
需求
一個(gè)接口調(diào)用時(shí),接收到一個(gè)列表,十個(gè)元素,需要并發(fā)執(zhí)行十個(gè)任務(wù),每個(gè)任務(wù)都要返回執(zhí)行的結(jié)果和異常,然后對(duì)返回的結(jié)果裝填到一個(gè)切片列表里,統(tǒng)一返回結(jié)果。
需要協(xié)程處理的結(jié)構(gòu)體
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
確定通道數(shù)量
一般按入?yún)⒌男枰幚淼脑財(cái)?shù)量為準(zhǔn)
taskNum := 10
初始化通道
orderCh := make(chan Order, taskNum) //接收返回的結(jié)果 errCh := make(chan error, taskNum) //接收返回的異常
發(fā)起執(zhí)行
我們使用sync.WaitGroup來監(jiān)聽執(zhí)行情況
wg := sync.WaitGroup{}
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if i == 3 {//模擬當(dāng)i=3的時(shí)候,返回一個(gè)異常
err := errors.New("there is an error")
errCh <- err
return
}
//組裝返回結(jié)果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <- res
}()
}
wg.Wait() //等待所有任務(wù)執(zhí)行完畢
使用for-select接收?qǐng)?zhí)行結(jié)果
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err //看需求,這里設(shè)計(jì)發(fā)現(xiàn)一個(gè)錯(cuò)誤就直接停止執(zhí)行,返回錯(cuò)誤
}
default:
fmt.Println("done")
}
}
//處理完數(shù)據(jù),關(guān)閉通道
close(orderCh)
close(errCh)
超時(shí)問題
任務(wù)執(zhí)行過程中,需要控制每個(gè)任務(wù)的執(zhí)行時(shí)間,不能超過一定范圍,我們用定時(shí)器來解決這個(gè)問題
timeoutTime := time.Second * 3 //超時(shí)時(shí)間
taskTimer := time.NewTimer(timeoutTime) //初始化定時(shí)器
orderList := make([]Order, taskNum)
for i:=0; i<taskNum; i++ {
select {
....
case <-taskTimeout.C: //處理超時(shí)
err := errors.New("task timeout") //此處我們認(rèn)為超時(shí)是錯(cuò)誤的一種,賦值給了err
return
...
}
//每次執(zhí)行都需要重置定時(shí)器
taskTimer.Reset(timeoutTime)
}
協(xié)程panic問題
主程序是無法捕捉協(xié)程內(nèi)的panic,因此如果不手動(dòng)處理,就會(huì)發(fā)生協(xié)程內(nèi)panic導(dǎo)致整個(gè)程序中止的情況,我們?cè)赿efer里處理
for i:=0; i < taskNum; i++ {
wg.Add(1)
go func() {
defer func () {
wg.Done()
//協(xié)程內(nèi)單獨(dú)捕捉異常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此處將panic信息轉(zhuǎn)為err返回,也可以按需求和異常等級(jí)進(jìn)行處理
return
}
}()
........
}()
}
順序問題
返回的列表元素的順序,需要跟傳參的列表順序保持一致,這時(shí)我們需要定義個(gè)帶序號(hào)的結(jié)構(gòu)體
// 需要記錄原始順序的時(shí)候,定義個(gè)帶編號(hào)的結(jié)構(gòu)體
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重寫相關(guān)排序類型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
// 調(diào)整返回結(jié)果
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號(hào)的結(jié)構(gòu)體
//在執(zhí)行任務(wù)時(shí),加入序號(hào)
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
····
//組裝返回結(jié)果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //帶上i這個(gè)序號(hào)
OrderItem: res,
}
}()
//接收信息,也按帶序號(hào)的結(jié)構(gòu)體進(jìn)行組裝
orderSeqList := make([]OrderWithSeq, taskNum)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
.....
}
}
//按原始順序進(jìn)行排序
sort.Sort(BySeq(orderSeqList))
....重新組裝數(shù)據(jù)返回
總結(jié)
標(biāo)準(zhǔn)模板如下:
type Order struct {
Name string `json:"name"`
Id int `json:"id"`
}
// 需要記錄原始順序的時(shí)候,定義個(gè)帶編號(hào)的結(jié)構(gòu)體
type OrderWithSeq struct {
Seq int
OrderItem Order
}
//重寫相關(guān)排序類型
type BySeq []OrderWithSeq
func (a BySeq) Len() int {
return len(a)
}
func (a BySeq) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a BySeq) Less(i, j int) bool {
return a[i].Seq < a[j].Seq
}
taskNum := 10
orderCh := make(chan OrderWithSeq, taskNum) //接收帶序號(hào)的結(jié)構(gòu)體
errCh := make(chan error, taskNum) //接收返回的異常
wg := sync.WaitGroup{}
//在執(zhí)行任務(wù)時(shí),加入序號(hào)
for i:=0; i < taskNum; i++ {
i:= i
wg.Add(1)
go func() {
defer func () {
wg.Done()
//協(xié)程內(nèi)單獨(dú)捕捉異常
if r := recover(); r != nil {
err := errors.New(fmt.Sprintf("System panic:%v", r))
errCh <- err //此處將panic信息轉(zhuǎn)為err返回,也可以按需求和異常等級(jí)進(jìn)行處理
return
}
}()
//組裝返回結(jié)果
res := Order{
Name: "num: " + strconv.Itoa(i),
Id: i,
}
orderCh <-OrderWithSeq {
Seq: i, //帶上i這個(gè)序號(hào)
OrderItem: res,
}
}()
wg.Wait()
//接收信息,也按帶序號(hào)的結(jié)構(gòu)體進(jìn)行組裝
orderSeqList := make([]OrderWithSeq, taskNum)
timeoutTime := time.Second * 3
taskTimer := time.NewTimer(timeoutTime)
for i:=0; i<taskNum; i++ {
select {
case order, ok := <-orderCh: //接收orderCh
if ok {
orderList = append(orderSeqList, order)
}
case err := <-errCh: //接收errCh
if err != nil {
return err
}
case <-taskTimer.C: //處理超時(shí)
err := errors.New("task timeout")
return
default:
fmt.Println("done")
}
taskTimer.Reset(timeoutTime)
}
close(orderCh)
close(errCh)
//按原始順序進(jìn)行排序
sort.Sort(BySeq(orderSeqList))
到此這篇關(guān)于golang批量執(zhí)行任務(wù)的通用模板分享的文章就介紹到這了,更多相關(guān)golang批量執(zhí)行任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang實(shí)現(xiàn)webgis后端開發(fā)的步驟詳解
這篇文章主要介紹如何用golang結(jié)合postgis數(shù)據(jù)庫,使用gin、grom框架實(shí)現(xiàn)后端的MVC的接口搭建,文中有詳細(xì)的流程步驟及代碼示例,需要的朋友可以參考下2023-06-06
Go語言中g(shù)oroutine和WaitGroup的使用示例詳解
goroutine 是Go中一個(gè)輕量級(jí)的線程, 只需要一個(gè)go關(guān)鍵字就可以創(chuàng)建一個(gè)goroutine,這篇文章主要介紹了Go語言中g(shù)oroutine和WaitGroup的使用,需要的朋友可以參考下2023-03-03
Go語言的Windows下環(huán)境配置以及簡(jiǎn)單的程序結(jié)構(gòu)講解
這篇文章主要介紹了Go語言的Windows下環(huán)境配置以及簡(jiǎn)單的程序結(jié)構(gòu)講解,從編程語言約定俗成的hellow world開始,需要的朋友可以參考下2015-10-10
golang中select語句的簡(jiǎn)單實(shí)例
Go的select語句是一種僅能用于channl發(fā)送和接收消息的專用語句,此語句運(yùn)行期間是阻塞的,下面這篇文章主要給大家介紹了關(guān)于golang中select語句的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06
Golang實(shí)現(xiàn)復(fù)合數(shù)據(jù)類型
Go語言的復(fù)合數(shù)據(jù)類型包括數(shù)組、切片、映射、結(jié)構(gòu)體和接口,本文就來介紹一下Golang實(shí)現(xiàn)復(fù)合數(shù)據(jù)類型,具有一定的參考價(jià)值,感興趣的可以了解一下2025-02-02
golang實(shí)現(xiàn)mysql數(shù)據(jù)庫事務(wù)的提交與回滾
這篇文章主要介紹了golang實(shí)現(xiàn)mysql數(shù)據(jù)庫事務(wù)的提交與回滾,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-04-04

