Golang?手寫一個簡單的并發(fā)任務(wù)?manager
前言
今天也是偏實戰(zhàn)的內(nèi)容,作為一個并發(fā)復(fù)習(xí)課,很簡單,我們來看看怎樣實現(xiàn)一個并發(fā)任務(wù) manager。
在微服務(wù)的場景下,我們有很多任務(wù)的執(zhí)行是沒有明確的先后順序的,比如一個接口同時要做到任務(wù) A 和 任務(wù) B,兩個任務(wù)分別拿到一些數(shù)據(jù),最后組裝裁剪后通過接口下發(fā)。
此時,A 和 B 兩個任務(wù)沒有依賴關(guān)系,如果我們串行來執(zhí)行,會拖慢整個任務(wù)的執(zhí)行節(jié)奏,用并發(fā)的方式來優(yōu)化是一個方向。
那怎么實現(xiàn)呢?
errgroup
一個常見的想法是用 errgroup,我們之前也介紹過 Golang errgroup 設(shè)計和原理解析。
今天我們不打算用這種實現(xiàn),希望用更加基礎(chǔ)的組件來引發(fā)思考,看看如何活用 sync 包提供的基礎(chǔ)能力。另外一點是 errgroup 也有他的缺陷,如果在啟動的協(xié)程中沒有手動 recover,那么一旦在我們的任務(wù)中出現(xiàn) panic,整個程序就 crash 了。
這一點還是很有爭議的,很多開發(fā)者認(rèn)為這是符合預(yù)期的,也有一些開發(fā)者希望在 New 一個 errgroup 的時候能夠提供 option 控制是否來 recover。近期還有兩個 issue 在進(jìn)行激烈的討論,目前看沒有定論。
感興趣的同學(xué)可以看下這兩個 issue:
- x/sync/errgroup: why not recover the fn's err in errgroup #40484
- proposal: x/sync/errgroup: propagate panics and Goexits through Wait #53757
需求拆解
ok,我們來試著用 sync 包基礎(chǔ)能力來實現(xiàn)一個簡單的并行任務(wù) manager。首先我們分析下需求。
- 一定要能做到并發(fā)執(zhí)行各個任務(wù),開多個協(xié)程,而不是在一個 main goroutine 里串行執(zhí)行各個任務(wù);
- 并發(fā)安全,我們當(dāng)然不希望出現(xiàn)數(shù)據(jù)異常,不希望并發(fā)執(zhí)行任務(wù)導(dǎo)致最后程序因為 runtime error 而掛掉;
- 如果多個任務(wù)都失敗,只返回一個 error 即可;
- 能夠 recover from panic,不需要開發(fā)者使用的時候再手動去寫 recover 邏輯;
- 性能有保障。
并發(fā)執(zhí)行這一點我們可以借助 sync.WaitGroup 的能力,每次啟動一個goroutine,WaitGroup 就加 1,在 defer 里完成 Done,啟動所有 goroutine 之后,等著 Wait 返回結(jié)果即可。常規(guī)的能力復(fù)用。
需要額外處理的地方在于,怎么實現(xiàn)多個線程只有一個 error 能賦值,以及 recover 的適配。
實戰(zhàn)代碼
我們理一下思路,看看代碼怎么寫。
Job
首先一定需要定義一個通用的函數(shù)簽名,使得開發(fā)者能夠傳入自己要執(zhí)行的并發(fā)任務(wù)。
type Job interface { Do(ctx context.Context, param interface{}) error Name() string }
JobManager
我們的 job manager 現(xiàn)階段可以簡單實現(xiàn),只是一組 Job 的集合:
type JobManager []Job
錯誤處理
要達(dá)到只有一個 error 賦值,且不出現(xiàn) race condition,有兩個方案:
- sync.Mutex 加鎖;
- sync.Once 只執(zhí)行一次。
當(dāng)然,什么時候我們都可以用一把大鎖解決問題,但它的性能不會很好,能用原子操作解決的盡量還是不要用 Mutex,這里參照 errgroup,我們可以用一個 Once 對象來控制只賦值一次。
panic 恢復(fù)可以直接在 defer 里面 recover 即可,需要能帶出來 stack trace,把它變成一個 error 賦值
及時退出
有時候我們這個并發(fā)任務(wù)數(shù)量非常多,可能還沒創(chuàng)建完 goroutine,某個先創(chuàng)建的任務(wù)就已經(jīng)掛了,這時候需要有一個全局的信號,終止后續(xù)的 goroutine 創(chuàng)建。這一點用原子操作就能實現(xiàn)。
完整代碼
把上面的分析落地,這樣我們就實現(xiàn)了一個帶上了 recover 能力,以及終止能力的的 errgroup。
package main import ( "context" "errors" "fmt" "sync" "sync/atomic" ) type Job interface { Do(ctx context.Context, param interface{}) error Name() string } type JobManager []Job func (mgr JobManager) Execute(ctx context.Context, param interface{}) error { var ( stop int32 = 0 err error wg sync.WaitGroup errOnce sync.Once ) for _, job := range mgr { if atomic.LoadInt32(&stop) > 0 { break } wg.Add(1) go func(j Job) { defer func() { wg.Done() if r := recover(); r != nil { errMsg := fmt.Sprintf("JobManager panic: job: %v, reason: %v", j.Name(), r) nerr := errors.New(errMsg) errOnce.Do(func() { if err == nil { err = nerr } }) atomic.AddInt32(&stop, 1) } }() nerr := j.Do(ctx, param) if nerr != nil { atomic.AddInt32(&stop, 1) errOnce.Do(func() { if err == nil { err = nerr } }) } }(job) } wg.Wait() return err }
使用方法也很簡單:
var mgr = JobManager{ AJob, BJob, CJob, // 這里的各個 Job 需要實現(xiàn)一開始我們定義的接口 } err := mgr.Execute(ctx, param)
這里我們需要定義統(tǒng)一的 param interface{},建議是一個接口,各個 Job 執(zhí)行完畢后如果有需要寫入的數(shù)據(jù),可以調(diào)用 param 的 Setter 方法寫入,最后直接拿 param 來做后續(xù)邏輯。
小結(jié)
今天我們用 sync.Once,以及 sync.WaitGroup 的能力實現(xiàn)了一個簡易的并發(fā)任務(wù)調(diào)度器,希望能夠幫助大家回顧一下此前介紹的并發(fā)相關(guān)概念和用法。其實并發(fā)管理這一點很多時候我們會存在依賴,這時候可能需要將多個 job 分層,或者梳理出來拓?fù)潢P(guān)系來執(zhí)行,我們今天只是簡單入門,復(fù)習(xí)一下相關(guān)知識。
建議大家回顧一下此前對于 once 以及 errgroup 的源碼解析,相信你會更能融會貫通。
到此這篇關(guān)于Golang 手寫一個簡單的并發(fā)任務(wù) manager的文章就介紹到這了,更多相關(guān)Golang manager內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang?xorm?自定義日志記錄器之使用zap實現(xiàn)日志輸出、切割日志(最新)
這篇文章主要介紹了golang?xorm?自定義日志記錄器,使用zap實現(xiàn)日志輸出、切割日志,包括連接postgresql數(shù)據(jù)庫的操作方法及?zap日志工具?,本文結(jié)合實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-10-10golang 實現(xiàn)struct、json、map互相轉(zhuǎn)化
這篇文章主要介紹了golang 實現(xiàn)struct、json、map互相轉(zhuǎn)化,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12Golang并發(fā)繞不開的重要組件之Goroutine詳解
Goroutine、Channel、Context、Sync都是Golang并發(fā)編程中的幾個重要組件,這篇文中主要為大家介紹了Goroutine的相關(guān)知識,需要的可以參考一下2023-06-06Redis?BloomFilter布隆過濾器原理與實現(xiàn)
你在開發(fā)或者面試過程中,有沒有遇到過?海量數(shù)據(jù)需要查重,緩存穿透怎么避免等等這樣的問題呢?下面這個東西超棒,好好了解下,面試過關(guān)斬將,凸顯你的不一樣2022-10-10golang基于websocket通信tcp keepalive研究記錄
這篇文章主要為大家介紹了golang基于websocket通信tcp keepalive研究記錄,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06