欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go語(yǔ)言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫(kù)詳解

 更新時(shí)間:2023年01月31日 09:11:35   作者:asong2020  
這篇文章主要為大家介紹了Go語(yǔ)言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫(kù)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

前幾天逛github發(fā)現(xiàn)了一個(gè)有趣的并發(fā)庫(kù)-conc,其目標(biāo)是:

  • 更難出現(xiàn)goroutine泄漏
  • 處理panic更友好
  • 并發(fā)代碼可讀性高

從簡(jiǎn)介上看主要封裝功能如下:

  • 對(duì)waitGroup進(jìn)行封裝,避免了產(chǎn)生大量重復(fù)代碼,并且也封裝recover,安全性更高
  • 提供panics.Catcher封裝recover邏輯,統(tǒng)一捕獲panic,打印調(diào)用棧一些信息
  • 提供一個(gè)并發(fā)執(zhí)行任務(wù)的worker池,可以控制并發(fā)度、goroutine可以進(jìn)行復(fù)用,支持函數(shù)簽名,同時(shí)提供了stream方法來(lái)保證結(jié)果有序
  • 提供ForEach、map方法優(yōu)雅的處理切片

接下來(lái)就區(qū)分模塊來(lái)介紹一下這個(gè)庫(kù);

倉(cāng)庫(kù)地址:github.com/sourcegraph…

WaitGroup的封裝

Go語(yǔ)言標(biāo)準(zhǔn)庫(kù)有提供sync.waitGroup控制等待goroutine,我們一般會(huì)寫出如下代碼:

func main(){
    var wg sync.WaitGroup
    for i:=0; i < 10; i++{
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer func() {
                // recover panic
                err := recover()
                if err != nil {
                    fmt.Println(err)
                }
            }
            // do something
            handle()
        }
    }
    wg.Wait()
}

上述代碼我們需要些一堆重復(fù)代碼,并且需要單獨(dú)在每一個(gè)func中處理recover邏輯,所以conc庫(kù)對(duì)其進(jìn)行了封裝,代碼簡(jiǎn)化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i &lt; 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}
func doSomething() {
	fmt.Println("test")
}

conc庫(kù)封裝也比較簡(jiǎn)單,結(jié)構(gòu)如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己實(shí)現(xiàn)了Catcher類型對(duì)recover邏輯進(jìn)行了封裝,封裝思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指針類型,RecoveredPanic是捕獲的recover封裝,封裝了堆棧等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

提供了Try方法執(zhí)行方法,只會(huì)記錄第一個(gè)panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}
func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只會(huì)記錄第一個(gè)panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &amp;rp)
	}
}

提供了Repanic()方法用來(lái)重放捕獲的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}
func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup對(duì)此也分別提供了Wait()、WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()
	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()
	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait方法只要有一個(gè)goroutine發(fā)生panic就會(huì)向上拋出panic,比較簡(jiǎn)單粗暴;

waitAndRecover方法只有有一個(gè)goroutine發(fā)生panic就會(huì)返回第一個(gè)recover的goroutine信息;

總結(jié):conc庫(kù)對(duì)waitGrouop的封裝總體是比較不錯(cuò)的,可以減少重復(fù)的代碼;

worker池

conc提供了幾種類型的worker池:

  • ContextPool:可以傳遞context的pool,若有g(shù)oroutine發(fā)生錯(cuò)誤可以cancel其他goroutine
  • ErrorPool:通過(guò)參數(shù)可以控制只收集第一個(gè)error還是所有error
  • ResultContextPool:若有g(shù)oroutine發(fā)生錯(cuò)誤會(huì)cancel其他goroutine并且收集錯(cuò)誤
  • RestultPool:收集work池中每個(gè)任務(wù)的執(zhí)行結(jié)果,并不能保證順序,保證順序需要使用stream或者iter.map;

我們來(lái)看一個(gè)簡(jiǎn)單的例子:

import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

在創(chuàng)建pool時(shí)有如下方法可以調(diào)用:

  • p.WithMaxGoroutines()配置pool中g(shù)oroutine的最大數(shù)量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中運(yùn)行的task當(dāng)遇到第一個(gè)error要取消
  • p.WithFirstError:配置pool中的task只返回第一個(gè)error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基礎(chǔ)結(jié)構(gòu)如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是控制器,用chan來(lái)控制goroutine的數(shù)量:

type limiter chan struct{}
func (l limiter) limit() int {
	return cap(l)
}
func (l limiter) release() {
	if l != nil {
		&lt;-l
	}
}

pool的核心邏輯也比較簡(jiǎn)單,如果沒(méi)有設(shè)置limiter,那么就看有沒(méi)有空閑的worker,否則就創(chuàng)建一個(gè)新的worker,然后投遞任務(wù)進(jìn)去;

如果設(shè)置了limiter,達(dá)到了limiter worker數(shù)量上限,就把任務(wù)投遞給空閑的worker,沒(méi)有空閑就阻塞等著;

func (p *Pool) Go(f func()) {
	p.init()
	if p.limiter == nil {
		// 沒(méi)有限制
		select {
		case p.tasks <- f:
			// A goroutine was available to handle the task.
		default:
			// No goroutine was available to handle the task.
			// Spawn a new one and send it the task.
			p.handle.Go(p.worker)
			p.tasks <- f
		}
	} else {
		select {
		case p.limiter <- struct{}{}:
			// If we are below our limit, spawn a new worker rather
			// than waiting for one to become available.
			p.handle.Go(p.worker)
			// We know there is at least one worker running, so wait
			// for it to become available. This ensures we never spawn
			// more workers than the number of tasks.
			p.tasks <- f
		case p.tasks <- f:
			// A worker is available and has accepted the task.
			return
		}
	}
}

這里work使用的是一個(gè)無(wú)緩沖的channel,這種復(fù)用方式很巧妙,如果goroutine執(zhí)行很快避免創(chuàng)建過(guò)多的goroutine;

使用pool處理任務(wù)不能保證有序性,conc庫(kù)又提供了Stream方法,返回結(jié)果可以保持順序;

Stream

Steam的實(shí)現(xiàn)也是依賴于pool,在此基礎(chǔ)上做了封裝保證結(jié)果的順序性,先看一個(gè)例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}
	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()
	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的結(jié)構(gòu)如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh
	initOnce sync.Once
}

queue是一個(gè)channel類型,callbackCh也是channel類型 - chan func():

type callbackCh chan func()

在提交goroutine時(shí)按照順序生成callbackCh傳遞結(jié)果:

func (s *Stream) Go(f Task) {
	s.init()
	// Get a channel from the cache.
	ch := getCh()
	// Queue the channel for the callbacker.
	s.queue <- ch
	// Submit the task for execution.
	s.pool.Go(func() {
		defer func() {
			// In the case of a panic from f, we don't want the callbacker to
			// starve waiting for a callback from this channel, so give it an
			// empty callback.
			if r := recover(); r != nil {
				ch <- func() {}
				panic(r)
			}
		}()
		// Run the task, sending its callback down this task's channel.
		callback := f()
		ch <- callback
	})
}
var callbackChPool = sync.Pool{
	New: func() any {
		return make(callbackCh, 1)
	},
}
func getCh() callbackCh {
	return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
	callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc庫(kù)提供了ForEach方法可以優(yōu)雅的并發(fā)處理切片,看一下官方的例子:

conc庫(kù)使用泛型進(jìn)行了封裝,我們只需要關(guān)注handle代碼即可,避免冗余代碼,我們自己動(dòng)手寫一個(gè)例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}
	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})
	fmt.Println(input)
}

ForEach內(nèi)部實(shí)現(xiàn)為Iterator結(jié)構(gòu)及核心邏輯如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}
	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}
	var idx atomic.Int64
	// 通過(guò)atomic控制僅創(chuàng)建一個(gè)閉包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i < numInput; i = int(idx.Add(1) - 1) {
			f(i, &input[i])
		}
	}
	var wg conc.WaitGroup
	for i := 0; i < iter.MaxGoroutines; i++ {
		wg.Go(task)
	}
	wg.Wait()
}

可以設(shè)置并發(fā)的goroutine數(shù)量,默認(rèn)取的是GOMAXPROCS ,也可以自定義傳參;

并發(fā)執(zhí)行這塊設(shè)計(jì)的很巧妙,僅創(chuàng)建了一個(gè)閉包,通過(guò)atomic控制idx,避免頻繁觸發(fā)GC;

map

conc庫(kù)提供的map方法可以得到對(duì)切片中元素結(jié)果,官方例子:

使用map可以提高代碼的可讀性,并且減少了冗余代碼,自己寫個(gè)例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}
	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的實(shí)現(xiàn)也依賴于Iterator,也是調(diào)用的ForEachIdx方法,區(qū)別于ForEach是記錄處理結(jié)果;

總結(jié)

花了小半天時(shí)間看了一下這個(gè)庫(kù),很多設(shè)計(jì)點(diǎn)值得我們學(xué)習(xí),總結(jié)一下我學(xué)習(xí)到的知識(shí)點(diǎn):

  • conc.WatiGroup對(duì)Sync.WaitGroup進(jìn)行了封裝,對(duì)Add、Done、Recover進(jìn)行了封裝,提高了可讀性,避免了冗余代碼
  • ForEach、Map方法可以更優(yōu)雅的并發(fā)處理切片,代碼簡(jiǎn)潔易讀,在實(shí)現(xiàn)上Iterator中的并發(fā)處理使用atomic來(lái)控制只創(chuàng)建一個(gè)閉包,避免了GC性能問(wèn)題
  • pool是一個(gè)并發(fā)的協(xié)程隊(duì)列,可以控制協(xié)程的數(shù)量,實(shí)現(xiàn)上也很巧妙,使用一個(gè)無(wú)緩沖的channel作為worker,如果goroutine執(zhí)行速度快,避免了創(chuàng)建多個(gè)goroutine
  • stream是一個(gè)保證順序的并發(fā)協(xié)程隊(duì)列,實(shí)現(xiàn)上也很巧妙,使用sync.Pool在提交goroutine時(shí)控制順序,值得我們學(xué)習(xí);

以上就是Go語(yǔ)言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫(kù)詳解的詳細(xì)內(nèi)容,更多關(guān)于Go語(yǔ)言可讀性并發(fā)庫(kù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go語(yǔ)言中工作池的原理與實(shí)現(xiàn)

    Go語(yǔ)言中工作池的原理與實(shí)現(xiàn)

    工作池是一種并發(fā)編程模式,它使用一組固定數(shù)量的工作線程來(lái)執(zhí)行任務(wù)隊(duì)列中的工作單元,本文將介紹工作池的工作原理,并通過(guò)代碼示例演示其在實(shí)際應(yīng)用中的用途,有需要的可以參考下
    2023-10-10
  • 一文了解Go 并發(fā)與并行

    一文了解Go 并發(fā)與并行

    并發(fā)性和并行性是是兩個(gè)既有聯(lián)系又有所區(qū)別的概念,本文主要介紹了Go并發(fā)與并行,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-05-05
  • 完美解決beego 根目錄不能訪問(wèn)靜態(tài)文件的問(wèn)題

    完美解決beego 根目錄不能訪問(wèn)靜態(tài)文件的問(wèn)題

    下面小編就為大家?guī)?lái)一篇完美解決beego 根目錄不能訪問(wèn)靜態(tài)文件的問(wèn)題。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-06-06
  • golang生成RSA公鑰和密鑰的實(shí)現(xiàn)方法

    golang生成RSA公鑰和密鑰的實(shí)現(xiàn)方法

    本文主要介紹了golang生成RSA公鑰和密鑰的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-08-08
  • 使用Go語(yǔ)言實(shí)現(xiàn)敏感詞過(guò)濾功能

    使用Go語(yǔ)言實(shí)現(xiàn)敏感詞過(guò)濾功能

    敏感詞過(guò)濾,算是一個(gè)比較常見(jiàn)的功能,尤其是在內(nèi)容、社交類應(yīng)用中更是如此,本文介紹如何使用Go語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的敏感詞過(guò)濾功能,文中通過(guò)代碼示例介紹的非常詳細(xì),需要的朋友可以參考下
    2023-12-12
  • GoLang與Java各自生成grpc代碼流程介紹

    GoLang與Java各自生成grpc代碼流程介紹

    這篇文章主要介紹了GoLang與Java各自生成grpc代碼流程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧
    2023-03-03
  • 詳解golang中?work與?module?的區(qū)別與聯(lián)系

    詳解golang中?work與?module?的區(qū)別與聯(lián)系

    Go?模塊通常由一個(gè)項(xiàng)目或庫(kù)組成,并包含一組隨后一起發(fā)布的?Go?包,Go?模塊通過(guò)允許用戶將項(xiàng)目代碼放在他們選擇的目錄中并為每個(gè)模塊指定依賴項(xiàng)的版本,解決了原始系統(tǒng)的許多問(wèn)題,本文將給大家介紹一下golang中?work與?module?的區(qū)別與聯(lián)系,需要的朋友可以參考下
    2023-09-09
  • golang中定時(shí)器cpu使用率高的現(xiàn)象詳析

    golang中定時(shí)器cpu使用率高的現(xiàn)象詳析

    這篇文章主要給大家介紹了關(guān)于golang中定時(shí)器cpu使用率高的現(xiàn)象的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-04-04
  • golang中json小談之字符串轉(zhuǎn)浮點(diǎn)數(shù)的操作

    golang中json小談之字符串轉(zhuǎn)浮點(diǎn)數(shù)的操作

    這篇文章主要介紹了golang中json小談之字符串轉(zhuǎn)浮點(diǎn)數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-03-03
  • go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理

    go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理

    這篇文章主要為大家介紹了go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08

最新評(píng)論