Go語(yǔ)言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫(kù)詳解
前言
前幾天逛github發(fā)現(xiàn)了一個(gè)有趣的并發(fā)庫(kù)-conc,其目標(biāo)是:
- 更難出現(xiàn)goroutine泄漏
- 處理panic更友好
- 并發(fā)代碼可讀性高
從簡(jiǎn)介上看主要封裝功能如下:
- 對(duì)
waitGroup
進(jìn)行封裝,避免了產(chǎn)生大量重復(fù)代碼,并且也封裝recover,安全性更高 - 提供
panics.Catcher
封裝recover
邏輯,統(tǒng)一捕獲panic
,打印調(diào)用棧一些信息 - 提供一個(gè)并發(fā)執(zhí)行任務(wù)的
worker
池,可以控制并發(fā)度、goroutine可以進(jìn)行復(fù)用,支持函數(shù)簽名,同時(shí)提供了stream
方法來(lái)保證結(jié)果有序 - 提供
ForEach
、map
方法優(yōu)雅的處理切片
接下來(lái)就區(qū)分模塊來(lái)介紹一下這個(gè)庫(kù);
倉(cāng)庫(kù)地址:github.com/sourcegraph…
WaitGroup的封裝
Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)有提供sync.waitGroup
控制等待goroutine,我們一般會(huì)寫出如下代碼:
func main(){ var wg sync.WaitGroup for i:=0; i < 10; i++{ wg.Add(1) go func() { defer wg.Done() defer func() { // recover panic err := recover() if err != nil { fmt.Println(err) } } // do something handle() } } wg.Wait() }
上述代碼我們需要些一堆重復(fù)代碼,并且需要單獨(dú)在每一個(gè)func中處理recover邏輯,所以conc
庫(kù)對(duì)其進(jìn)行了封裝,代碼簡(jiǎn)化如下:
func main() { wg := conc.NewWaitGroup() for i := 0; i < 10; i++ { wg.Go(doSomething) } wg.Wait() } func doSomething() { fmt.Println("test") }
conc
庫(kù)封裝也比較簡(jiǎn)單,結(jié)構(gòu)如下:
type WaitGroup struct { wg sync.WaitGroup pc panics.Catcher }
其自己實(shí)現(xiàn)了Catcher
類型對(duì)recover邏輯進(jìn)行了封裝,封裝思路如下:
type Catcher struct { recovered atomic.Pointer[RecoveredPanic] }
recovered是原子指針類型,RecoveredPanic是捕獲的recover封裝,封裝了堆棧等信息:
type RecoveredPanic struct { // The original value of the panic. Value any // The caller list as returned by runtime.Callers when the panic was // recovered. Can be used to produce a more detailed stack information with // runtime.CallersFrames. Callers []uintptr // The formatted stacktrace from the goroutine where the panic was recovered. // Easier to use than Callers. Stack []byte }
提供了Try方法執(zhí)行方法,只會(huì)記錄第一個(gè)panic的goroutine信息:
func (p *Catcher) Try(f func()) { defer p.tryRecover() f() } func (p *Catcher) tryRecover() { if val := recover(); val != nil { rp := NewRecoveredPanic(1, val) // 只會(huì)記錄第一個(gè)panic的goroutine信息 p.recovered.CompareAndSwap(nil, &rp) } }
提供了Repanic()
方法用來(lái)重放捕獲的panic:
func (p *Catcher) Repanic() { if val := p.Recovered(); val != nil { panic(val) } } func (p *Catcher) Recovered() *RecoveredPanic { return p.recovered.Load() }
waitGroup
對(duì)此也分別提供了Wait()
、WaitAndRecover()
方法:
func (h *WaitGroup) Wait() { h.wg.Wait() // Propagate a panic if we caught one from a child goroutine. h.pc.Repanic() } func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic { h.wg.Wait() // Return a recovered panic if we caught one from a child goroutine. return h.pc.Recovered() }
wait
方法只要有一個(gè)goroutine發(fā)生panic就會(huì)向上拋出panic,比較簡(jiǎn)單粗暴;
waitAndRecover
方法只有有一個(gè)goroutine發(fā)生panic就會(huì)返回第一個(gè)recover的goroutine信息;
總結(jié):conc庫(kù)對(duì)waitGrouop
的封裝總體是比較不錯(cuò)的,可以減少重復(fù)的代碼;
worker池
conc
提供了幾種類型的worker池:
- ContextPool:可以傳遞context的pool,若有g(shù)oroutine發(fā)生錯(cuò)誤可以cancel其他goroutine
- ErrorPool:通過(guò)參數(shù)可以控制只收集第一個(gè)error還是所有error
- ResultContextPool:若有g(shù)oroutine發(fā)生錯(cuò)誤會(huì)cancel其他goroutine并且收集錯(cuò)誤
- RestultPool:收集work池中每個(gè)任務(wù)的執(zhí)行結(jié)果,并不能保證順序,保證順序需要使用stream或者iter.map;
我們來(lái)看一個(gè)簡(jiǎn)單的例子:
import "github.com/sourcegraph/conc/pool" func ExampleContextPool_WithCancelOnError() { p := pool.New(). WithMaxGoroutines(4). WithContext(context.Background()). WithCancelOnError() for i := 0; i < 3; i++ { i := i p.Go(func(ctx context.Context) error { if i == 2 { return errors.New("I will cancel all other tasks!") } <-ctx.Done() return nil }) } err := p.Wait() fmt.Println(err) // Output: // I will cancel all other tasks! }
在創(chuàng)建pool時(shí)有如下方法可以調(diào)用:
p.WithMaxGoroutines()
配置pool中g(shù)oroutine的最大數(shù)量p.WithErrors
:配置pool中的task是否返回errorp.WithContext(ctx)
:配置pool中運(yùn)行的task當(dāng)遇到第一個(gè)error要取消p.WithFirstError
:配置pool中的task只返回第一個(gè)errorp.WithCollectErrored
:配置pool的task收集所有error
pool的基礎(chǔ)結(jié)構(gòu)如下:
type Pool struct { handle conc.WaitGroup limiter limiter tasks chan func() initOnce sync.Once }
limiter是控制器,用chan來(lái)控制goroutine的數(shù)量:
type limiter chan struct{} func (l limiter) limit() int { return cap(l) } func (l limiter) release() { if l != nil { <-l } }
pool的核心邏輯也比較簡(jiǎn)單,如果沒(méi)有設(shè)置limiter,那么就看有沒(méi)有空閑的worker,否則就創(chuàng)建一個(gè)新的worker,然后投遞任務(wù)進(jìn)去;
如果設(shè)置了limiter,達(dá)到了limiter worker數(shù)量上限,就把任務(wù)投遞給空閑的worker,沒(méi)有空閑就阻塞等著;
func (p *Pool) Go(f func()) { p.init() if p.limiter == nil { // 沒(méi)有限制 select { case p.tasks <- f: // A goroutine was available to handle the task. default: // No goroutine was available to handle the task. // Spawn a new one and send it the task. p.handle.Go(p.worker) p.tasks <- f } } else { select { case p.limiter <- struct{}{}: // If we are below our limit, spawn a new worker rather // than waiting for one to become available. p.handle.Go(p.worker) // We know there is at least one worker running, so wait // for it to become available. This ensures we never spawn // more workers than the number of tasks. p.tasks <- f case p.tasks <- f: // A worker is available and has accepted the task. return } } }
這里work使用的是一個(gè)無(wú)緩沖的channel,這種復(fù)用方式很巧妙,如果goroutine執(zhí)行很快避免創(chuàng)建過(guò)多的goroutine;
使用pool處理任務(wù)不能保證有序性,conc庫(kù)又提供了Stream
方法,返回結(jié)果可以保持順序;
Stream
Steam的實(shí)現(xiàn)也是依賴于pool
,在此基礎(chǔ)上做了封裝保證結(jié)果的順序性,先看一個(gè)例子:
func ExampleStream() { times := []int{20, 52, 16, 45, 4, 80} stream := stream2.New() for _, millis := range times { dur := time.Duration(millis) * time.Millisecond stream.Go(func() stream2.Callback { time.Sleep(dur) // This will print in the order the tasks were submitted return func() { fmt.Println(dur) } }) } stream.Wait() // Output: // 20ms // 52ms // 16ms // 45ms // 4ms // 80ms }
stream
的結(jié)構(gòu)如下:
type Stream struct { pool pool.Pool callbackerHandle conc.WaitGroup queue chan callbackCh initOnce sync.Once }
queue
是一個(gè)channel類型,callbackCh也是channel類型 - chan func():
type callbackCh chan func()
在提交goroutine
時(shí)按照順序生成callbackCh傳遞結(jié)果:
func (s *Stream) Go(f Task) { s.init() // Get a channel from the cache. ch := getCh() // Queue the channel for the callbacker. s.queue <- ch // Submit the task for execution. s.pool.Go(func() { defer func() { // In the case of a panic from f, we don't want the callbacker to // starve waiting for a callback from this channel, so give it an // empty callback. if r := recover(); r != nil { ch <- func() {} panic(r) } }() // Run the task, sending its callback down this task's channel. callback := f() ch <- callback }) } var callbackChPool = sync.Pool{ New: func() any { return make(callbackCh, 1) }, } func getCh() callbackCh { return callbackChPool.Get().(callbackCh) } func putCh(ch callbackCh) { callbackChPool.Put(ch) }
ForEach和map
ForEach
conc庫(kù)提供了ForEach方法可以優(yōu)雅的并發(fā)處理切片,看一下官方的例子:
conc庫(kù)使用泛型進(jìn)行了封裝,我們只需要關(guān)注handle代碼即可,避免冗余代碼,我們自己動(dòng)手寫一個(gè)例子:
func main() { input := []int{1, 2, 3, 4} iterator := iter.Iterator[int]{ MaxGoroutines: len(input) / 2, } iterator.ForEach(input, func(v *int) { if *v%2 != 0 { *v = -1 } }) fmt.Println(input) }
ForEach內(nèi)部實(shí)現(xiàn)為Iterator結(jié)構(gòu)及核心邏輯如下:
type Iterator[T any] struct { MaxGoroutines int } func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) { if iter.MaxGoroutines == 0 { // iter is a value receiver and is hence safe to mutate iter.MaxGoroutines = defaultMaxGoroutines() } numInput := len(input) if iter.MaxGoroutines > numInput { // No more concurrent tasks than the number of input items. iter.MaxGoroutines = numInput } var idx atomic.Int64 // 通過(guò)atomic控制僅創(chuàng)建一個(gè)閉包 task := func() { i := int(idx.Add(1) - 1) for ; i < numInput; i = int(idx.Add(1) - 1) { f(i, &input[i]) } } var wg conc.WaitGroup for i := 0; i < iter.MaxGoroutines; i++ { wg.Go(task) } wg.Wait() }
可以設(shè)置并發(fā)的goroutine數(shù)量,默認(rèn)取的是GOMAXPROCS ,也可以自定義傳參;
并發(fā)執(zhí)行這塊設(shè)計(jì)的很巧妙,僅創(chuàng)建了一個(gè)閉包,通過(guò)atomic控制idx,避免頻繁觸發(fā)GC;
map
conc庫(kù)提供的map方法可以得到對(duì)切片中元素結(jié)果,官方例子:
使用map可以提高代碼的可讀性,并且減少了冗余代碼,自己寫個(gè)例子:
func main() { input := []int{1, 2, 3, 4} mapper := iter.Mapper[int, bool]{ MaxGoroutines: len(input) / 2, } results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 }) fmt.Println(results) // Output: // [false true false true] }
map的實(shí)現(xiàn)也依賴于Iterator,也是調(diào)用的ForEachIdx方法,區(qū)別于ForEach是記錄處理結(jié)果;
總結(jié)
花了小半天時(shí)間看了一下這個(gè)庫(kù),很多設(shè)計(jì)點(diǎn)值得我們學(xué)習(xí),總結(jié)一下我學(xué)習(xí)到的知識(shí)點(diǎn):
- conc.WatiGroup對(duì)Sync.WaitGroup進(jìn)行了封裝,對(duì)Add、Done、Recover進(jìn)行了封裝,提高了可讀性,避免了冗余代碼
- ForEach、Map方法可以更優(yōu)雅的并發(fā)處理切片,代碼簡(jiǎn)潔易讀,在實(shí)現(xiàn)上Iterator中的并發(fā)處理使用atomic來(lái)控制只創(chuàng)建一個(gè)閉包,避免了GC性能問(wèn)題
- pool是一個(gè)并發(fā)的協(xié)程隊(duì)列,可以控制協(xié)程的數(shù)量,實(shí)現(xiàn)上也很巧妙,使用一個(gè)無(wú)緩沖的channel作為worker,如果goroutine執(zhí)行速度快,避免了創(chuàng)建多個(gè)goroutine
- stream是一個(gè)保證順序的并發(fā)協(xié)程隊(duì)列,實(shí)現(xiàn)上也很巧妙,使用sync.Pool在提交goroutine時(shí)控制順序,值得我們學(xué)習(xí);
以上就是Go語(yǔ)言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫(kù)詳解的詳細(xì)內(nèi)容,更多關(guān)于Go語(yǔ)言可讀性并發(fā)庫(kù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
完美解決beego 根目錄不能訪問(wèn)靜態(tài)文件的問(wèn)題
下面小編就為大家?guī)?lái)一篇完美解決beego 根目錄不能訪問(wèn)靜態(tài)文件的問(wèn)題。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06golang生成RSA公鑰和密鑰的實(shí)現(xiàn)方法
本文主要介紹了golang生成RSA公鑰和密鑰的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08使用Go語(yǔ)言實(shí)現(xiàn)敏感詞過(guò)濾功能
敏感詞過(guò)濾,算是一個(gè)比較常見(jiàn)的功能,尤其是在內(nèi)容、社交類應(yīng)用中更是如此,本文介紹如何使用Go語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的敏感詞過(guò)濾功能,文中通過(guò)代碼示例介紹的非常詳細(xì),需要的朋友可以參考下2023-12-12詳解golang中?work與?module?的區(qū)別與聯(lián)系
Go?模塊通常由一個(gè)項(xiàng)目或庫(kù)組成,并包含一組隨后一起發(fā)布的?Go?包,Go?模塊通過(guò)允許用戶將項(xiàng)目代碼放在他們選擇的目錄中并為每個(gè)模塊指定依賴項(xiàng)的版本,解決了原始系統(tǒng)的許多問(wèn)題,本文將給大家介紹一下golang中?work與?module?的區(qū)別與聯(lián)系,需要的朋友可以參考下2023-09-09golang中定時(shí)器cpu使用率高的現(xiàn)象詳析
這篇文章主要給大家介紹了關(guān)于golang中定時(shí)器cpu使用率高的現(xiàn)象的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-04-04golang中json小談之字符串轉(zhuǎn)浮點(diǎn)數(shù)的操作
這篇文章主要介紹了golang中json小談之字符串轉(zhuǎn)浮點(diǎn)數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理
這篇文章主要為大家介紹了go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08