欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文帶你了解Go語言實現(xiàn)的并發(fā)神庫conc

 更新時間:2023年01月31日 08:33:02   作者:asong2020  
前幾天逛github發(fā)現(xiàn)了一個有趣的并發(fā)庫-conc,這篇文章將為大家詳細介紹一下這個庫的實現(xiàn),文中的示例代碼講解詳細,感興趣的可以了解一下

前言

哈嘍,大家好,我是asong;前幾天逛github發(fā)現(xiàn)了一個有趣的并發(fā)庫-conc,其目標(biāo)是:

  • 更難出現(xiàn)goroutine泄漏
  • 處理panic更友好
  • 并發(fā)代碼可讀性高

從簡介上看主要封裝功能如下:

  • waitGroup進行封裝,避免了產(chǎn)生大量重復(fù)代碼,并且也封裝recover,安全性更高
  • 提供panics.Catcher封裝recover邏輯,統(tǒng)一捕獲panic,打印調(diào)用棧一些信息
  • 提供一個并發(fā)執(zhí)行任務(wù)的worker池,可以控制并發(fā)度、goroutine可以進行復(fù)用,支持函數(shù)簽名,同時提供了stream方法來保證結(jié)果有序
  • 提供ForEach、map方法優(yōu)雅的處理切片

接下來就區(qū)分模塊來介紹一下這個庫;

倉庫地址:https://github.com/sourcegraph/conc

Go語言標(biāo)準(zhǔn)庫有提供sync.waitGroup控制等待goroutine,我們一般會寫出如下代碼:

func main(){
    var wg sync.WaitGroup
    for i:=0; i < 10; i++{
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer func() {
                // recover panic
                err := recover()
                if err != nil {
                    fmt.Println(err)
                }
            }
            // do something
            handle()
        }
    }
    wg.Wait()
}

上述代碼我們需要些一堆重復(fù)代碼,并且需要單獨在每一個func中處理recover邏輯,所以conc庫對其進行了封裝,代碼簡化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i < 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}

func doSomething() {
	fmt.Println("test")
}

conc庫封裝也比較簡單,結(jié)構(gòu)如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己實現(xiàn)了Catcher類型對recover邏輯進行了封裝,封裝思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指針類型,RecoveredPanic是捕獲的recover封裝,封裝了堆棧等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

提供了Try方法執(zhí)行方法,只會記錄第一個panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}

func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只會記錄第一個panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &rp)
	}
}

提供了Repanic()方法用來重放捕獲的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}

func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup對此也分別提供了Wait()、WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()

	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}

func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()

	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait方法只要有一個goroutine發(fā)生panic就會向上拋出panic,比較簡單粗暴;

waitAndRecover方法只有有一個goroutine發(fā)生panic就會返回第一個recover的goroutine信息;

總結(jié):conc庫對waitGrouop的封裝總體是比較不錯的,可以減少重復(fù)的代碼;

worker池

conc提供了幾種類型的worker池:

  • ContextPool:可以傳遞context的pool,若有g(shù)oroutine發(fā)生錯誤可以cancel其他goroutine
  • ErrorPool:通過參數(shù)可以控制只收集第一個error還是所有error
  • ResultContextPool:若有g(shù)oroutine發(fā)生錯誤會cancel其他goroutine并且收集錯誤
  • RestultPool:收集work池中每個任務(wù)的執(zhí)行結(jié)果,并不能保證順序,保證順序需要使用stream或者iter.map;

我們來看一個簡單的例子:

import "github.com/sourcegraph/conc/pool"

func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

在創(chuàng)建pool時有如下方法可以調(diào)用:

  • p.WithMaxGoroutines()配置pool中g(shù)oroutine的最大數(shù)量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中運行的task當(dāng)遇到第一個error要取消
  • p.WithFirstError:配置pool中的task只返回第一個error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基礎(chǔ)結(jié)構(gòu)如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是控制器,用chan來控制goroutine的數(shù)量:

type limiter chan struct{}

func (l limiter) limit() int {
	return cap(l)
}

func (l limiter) release() {
	if l != nil {
		<-l
	}
}

pool的核心邏輯也比較簡單,如果沒有設(shè)置limiter,那么就看有沒有空閑的worker,否則就創(chuàng)建一個新的worker,然后投遞任務(wù)進去;

如果設(shè)置了limiter,達到了limiter worker數(shù)量上限,就把任務(wù)投遞給空閑的worker,沒有空閑就阻塞等著;

func (p *Pool) Go(f func()) {
	p.init()

	if p.limiter == nil {
		// 沒有限制
		select {
		case p.tasks <- f:
			// A goroutine was available to handle the task.
		default:
			// No goroutine was available to handle the task.
			// Spawn a new one and send it the task.
			p.handle.Go(p.worker)
			p.tasks <- f
		}
	} else {
		select {
		case p.limiter <- struct{}{}:
			// If we are below our limit, spawn a new worker rather
			// than waiting for one to become available.
			p.handle.Go(p.worker)

			// We know there is at least one worker running, so wait
			// for it to become available. This ensures we never spawn
			// more workers than the number of tasks.
			p.tasks <- f
		case p.tasks <- f:
			// A worker is available and has accepted the task.
			return
		}
	}

}

這里work使用的是一個無緩沖的channel,這種復(fù)用方式很巧妙,如果goroutine執(zhí)行很快避免創(chuàng)建過多的goroutine;

使用pool處理任務(wù)不能保證有序性,conc庫又提供了Stream方法,返回結(jié)果可以保持順序;

Stream

Steam的實現(xiàn)也是依賴于pool,在此基礎(chǔ)上做了封裝保證結(jié)果的順序性,先看一個例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}

	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()

	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的結(jié)構(gòu)如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh

	initOnce sync.Once
}

queue是一個channel類型,callbackCh也是channel類型 - chan func():

type callbackCh chan func()

在提交goroutine時按照順序生成callbackCh傳遞結(jié)果:

func (s *Stream) Go(f Task) {
	s.init()

	// Get a channel from the cache.
	ch := getCh()

	// Queue the channel for the callbacker.
	s.queue <- ch

	// Submit the task for execution.
	s.pool.Go(func() {
		defer func() {
			// In the case of a panic from f, we don't want the callbacker to
			// starve waiting for a callback from this channel, so give it an
			// empty callback.
			if r := recover(); r != nil {
				ch <- func() {}
				panic(r)
			}
		}()

		// Run the task, sending its callback down this task's channel.
		callback := f()
		ch <- callback
	})
}

var callbackChPool = sync.Pool{
	New: func() any {
		return make(callbackCh, 1)
	},
}

func getCh() callbackCh {
	return callbackChPool.Get().(callbackCh)
}

func putCh(ch callbackCh) {
	callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc庫提供了ForEach方法可以優(yōu)雅的并發(fā)處理切片,看一下官方的例子:

conc庫使用泛型進行了封裝,我們只需要關(guān)注handle代碼即可,避免冗余代碼,我們自己動手寫一個例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}

	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})

	fmt.Println(input)
}

ForEach內(nèi)部實現(xiàn)為Iterator結(jié)構(gòu)及核心邏輯如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}

	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}

	var idx atomic.Int64
	// 通過atomic控制僅創(chuàng)建一個閉包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i < numInput; i = int(idx.Add(1) - 1) {
			f(i, &input[i])
		}
	}

	var wg conc.WaitGroup
	for i := 0; i < iter.MaxGoroutines; i++ {
		wg.Go(task)
	}
	wg.Wait()
}

可以設(shè)置并發(fā)的goroutine數(shù)量,默認取的是GOMAXPROCS ,也可以自定義傳參;

并發(fā)執(zhí)行這塊設(shè)計的很巧妙,僅創(chuàng)建了一個閉包,通過atomic控制idx,避免頻繁觸發(fā)GC;

map

conc庫提供的map方法可以得到對切片中元素結(jié)果,官方例子:

使用map可以提高代碼的可讀性,并且減少了冗余代碼,自己寫個例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}

	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的實現(xiàn)也依賴于Iterator,也是調(diào)用的ForEachIdx方法,區(qū)別于ForEach是記錄處理結(jié)果;

總結(jié)

花了小半天時間看了一下這個庫,很多設(shè)計點值得我們學(xué)習(xí),總結(jié)一下我學(xué)習(xí)到的知識點:

  • conc.WatiGroup對Sync.WaitGroup進行了封裝,對Add、Done、Recover進行了封裝,提高了可讀性,避免了冗余代碼
  • ForEach、Map方法可以更優(yōu)雅的并發(fā)處理切片,代碼簡潔易讀,在實現(xiàn)上Iterator中的并發(fā)處理使用atomic來控制只創(chuàng)建一個閉包,避免了GC性能問題
  • pool是一個并發(fā)的協(xié)程隊列,可以控制協(xié)程的數(shù)量,實現(xiàn)上也很巧妙,使用一個無緩沖的channel作為worker,如果goroutine執(zhí)行速度快,避免了創(chuàng)建多個goroutine
  • stream是一個保證順序的并發(fā)協(xié)程隊列,實現(xiàn)上也很巧妙,使用sync.Pool在提交goroutine時控制順序,值得我們學(xué)習(xí);

到此這篇關(guān)于一文帶你了解Go語言實現(xiàn)的并發(fā)神庫conc的文章就介紹到這了,更多相關(guān)Go語言并發(fā)庫conc內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go語言的接口詳解

    Go語言的接口詳解

    這篇文章主要介紹了go語言的接口,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧,希望能夠給你帶來幫助
    2021-10-10
  • Golang實現(xiàn)單鏈表的示例代碼

    Golang實現(xiàn)單鏈表的示例代碼

    本文主要介紹了Golang實現(xiàn)單鏈表的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • Golang標(biāo)準(zhǔn)庫time包日常用法小結(jié)

    Golang標(biāo)準(zhǔn)庫time包日常用法小結(jié)

    本文主要介紹了Golang標(biāo)準(zhǔn)庫time包日常用法小結(jié),可以通過它們來獲取當(dāng)前時間、創(chuàng)建指定時間、解析時間字符串、控制時間間隔等操作,感興趣的可以了解一下
    2023-11-11
  • Goland支持泛型了(上機實操)

    Goland支持泛型了(上機實操)

    Go的泛型不是還在設(shè)計草圖嗎?最樂觀估計也要2021年8月份。你說Go語言現(xiàn)在都沒開發(fā)好泛型,你支持這個特性有什么用呢?感興趣的朋友跟隨小編一起看看吧
    2020-12-12
  • Go泛型之泛型約束示例詳解

    Go泛型之泛型約束示例詳解

    這篇文章主要給大家介紹了關(guān)于Go泛型之泛型約束的相關(guān)資料,泛型是靜態(tài)語言中的一種編程方式,這種編程方式可以讓算法不再依賴于某個具體的數(shù)據(jù)類型,而是通過將數(shù)據(jù)類型進行參數(shù)化,以達到算法可復(fù)用的目的,需要的朋友可以參考下
    2023-12-12
  • 用Go+WebSocket快速實現(xiàn)一個chat服務(wù)

    用Go+WebSocket快速實現(xiàn)一個chat服務(wù)

    這篇文章主要介紹了用Go+WebSocket快速實現(xiàn)一個chat服務(wù),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • Golang利用casbin實現(xiàn)權(quán)限驗證詳解

    Golang利用casbin實現(xiàn)權(quán)限驗證詳解

    Casbin是一個強大的、高效的開源訪問控制框架,其權(quán)限管理機制支持多種訪問控制模型,Casbin只負責(zé)訪問控制。本文將利用casbin實現(xiàn)權(quán)限驗證功能,需要的可以參考一下
    2023-02-02
  • 深入理解Golang的單元測試和性能測試

    深入理解Golang的單元測試和性能測試

    Go語言提供了強大的測試工具,下面這篇文章主要給大家介紹了關(guān)于Golang單元測試和性能測試的相關(guān)資料,文中通過示例代碼給大家詳細介紹了單元測試和性能測試的相關(guān)內(nèi)容,需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-08-08
  • golang之?dāng)?shù)據(jù)驗證validator的實現(xiàn)

    golang之?dāng)?shù)據(jù)驗證validator的實現(xiàn)

    這篇文章主要介紹了golang之?dāng)?shù)據(jù)驗證validator的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • Gin框架限流實現(xiàn)示例

    Gin框架限流實現(xiàn)示例

    本文主要介紹了Gin框架限流實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03

最新評論