欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn)

 更新時(shí)間:2022年04月15日 15:41:09   作者:ag9920  
本文主要介紹了Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

Goroutine

Goroutine 是 Golang 提供的一種輕量級(jí)線程,我們通常稱之為「協(xié)程」,相比較線程,創(chuàng)建一個(gè)協(xié)程的成本是很低的。所以你會(huì)經(jīng)??吹?Golang 開發(fā)的應(yīng)用出現(xiàn)上千個(gè)協(xié)程并發(fā)的場(chǎng)景。

Goroutine 的優(yōu)勢(shì):

  • 與線程相比,Goroutines 成本很低。

它們的堆棧大小只有幾 kb,堆??梢愿鶕?jù)應(yīng)用程序的需要增長(zhǎng)和縮小,context switch 也很快,而在線程的情況下,堆棧大小必須指定并固定。

  • Goroutine 被多路復(fù)用到更少數(shù)量的 OS 線程。

一個(gè)包含數(shù)千個(gè) Goroutine 的程序中可能只有一個(gè)線程。如果該線程中的任何 Goroutine 阻塞等待用戶輸入,則創(chuàng)建另一個(gè) OS 線程并將剩余的 Goroutine 移動(dòng)到新的 OS 線程。所有這些都由運(yùn)行時(shí)處理,作為開發(fā)者無(wú)需耗費(fèi)心力關(guān)心,這也使得我們有很干凈的 API 來(lái)支持并發(fā)。

  • Goroutines 使用 channel 進(jìn)行通信。

channel 的設(shè)計(jì)有效防止了在使用 Goroutine 訪問(wèn)共享內(nèi)存時(shí)發(fā)生競(jìng)爭(zhēng)條件(race conditions) 。channel 可以被認(rèn)為是 Goroutine 進(jìn)行通信的管道。

下文中我們會(huì)以「協(xié)程」來(lái)代指 Goroutine。

協(xié)程池

在高并發(fā)場(chǎng)景下,我們可能會(huì)啟動(dòng)大量的協(xié)程來(lái)處理業(yè)務(wù)邏輯。協(xié)程池是一種利用池化技術(shù),復(fù)用對(duì)象,減少內(nèi)存分配的頻率以及協(xié)程創(chuàng)建開銷,從而提高協(xié)程執(zhí)行效率的技術(shù)。

最近抽空了解了字節(jié)官方開源的 gopkg 庫(kù)提供的 gopool 協(xié)程池實(shí)現(xiàn),感覺還是很高質(zhì)量的,代碼也非常簡(jiǎn)潔清晰,而且 Kitex 底層也在使用 gopool 來(lái)管理協(xié)程,這里我們梳理一下設(shè)計(jì)和實(shí)現(xiàn)。

gopool

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the go keyword.

了解官方 README 就會(huì)發(fā)現(xiàn)gopool的用法其實(shí)非常簡(jiǎn)單,將曾經(jīng)我們經(jīng)常使用的 go func(){...} 替換為 gopool.Go(func(){...}) 即可。

此時(shí) gopool 將會(huì)使用默認(rèn)的配置來(lái)管理你啟動(dòng)的協(xié)程,你也可以選擇針對(duì)業(yè)務(wù)場(chǎng)景配置池子大小,以及擴(kuò)容上限。

old:

go func() {
	// do your job
}()

new:

import (
    "github.com/bytedance/gopkg/util/gopool"
)

gopool.Go(func(){
	/// do your job
})

核心實(shí)現(xiàn)

下面我們來(lái)看看gopool是怎樣實(shí)現(xiàn)協(xié)程池管理的。

Pool

Pool 是一個(gè)定義了協(xié)程池能力的接口。

type Pool interface {
	// 池子的名稱
	Name() string
        
	// 設(shè)置池子內(nèi)Goroutine的容量
	SetCap(cap int32)
        
	// 執(zhí)行 f 函數(shù)
	Go(f func())
        
	// 帶 ctx,執(zhí)行 f 函數(shù)
	CtxGo(ctx context.Context, f func())
        
	// 設(shè)置發(fā)生panic時(shí)調(diào)用的函數(shù)
	SetPanicHandler(f func(context.Context, interface{}))
}

gopool 提供了這個(gè)接口的默認(rèn)實(shí)現(xiàn)(即下面即將介紹的pool),當(dāng)我們直接調(diào)用 gopool.CtxGo 時(shí)依賴的就是這個(gè)。

這樣的設(shè)計(jì)模式在 Kitex 中也經(jīng)常出現(xiàn),所有的依賴均設(shè)計(jì)為接口,便于隨后擴(kuò)展,底層提供一個(gè)默認(rèn)的實(shí)現(xiàn)暴露出去,這樣對(duì)調(diào)用方也很友好。

type pool struct {
	// 池子名稱
	name string

	// 池子的容量, 即最大并發(fā)工作的 goroutine 的數(shù)量
	cap int32
        
	// 池子配置
	config *Config
        
	// task 鏈表
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// 記錄當(dāng)前正在運(yùn)行的 worker 的數(shù)量
	workerCount int32

	// 當(dāng) worker 出現(xiàn)panic時(shí)被調(diào)用
	panicHandler func(context.Context, interface{})
}

// NewPool 創(chuàng)建一個(gè)新的協(xié)程池,初始化名稱,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
	p := &pool{
		name:   name,
		cap:    cap,
		config: config,
	}
	return p
}

調(diào)用 NewPool 獲取了以 Pool 的形式返回的 pool 結(jié)構(gòu)體。

Task

type task struct {
	ctx context.Context
	f   func()

	next *task
}

task 是一個(gè)鏈表結(jié)構(gòu),可以把它理解為一個(gè)待執(zhí)行的任務(wù),它包含了當(dāng)前節(jié)點(diǎn)需要執(zhí)行的函數(shù)f, 以及指向下一個(gè)task的指針。

綜合前一節(jié) pool 的定義,我們可以看到,一個(gè)協(xié)程池 pool 對(duì)應(yīng)了一組task。

pool 維護(hù)了指向鏈表的頭尾的兩個(gè)指針:taskHeadtaskTail,以及鏈表的長(zhǎng)度taskCount 和對(duì)應(yīng)的鎖 taskLock。

Worker

type worker struct {
	pool *pool
}

一個(gè) worker 就是邏輯上的一個(gè)執(zhí)行器,它唯一對(duì)應(yīng)到一個(gè)協(xié)程池 pool。當(dāng)一個(gè)worker被喚起,將會(huì)開啟一個(gè)goroutine ,不斷地從 pool 中的 task鏈表獲取任務(wù)并執(zhí)行。

func (w *worker) run() {
	go func() {
		for {
                        // 聲明即將執(zhí)行的 task
			var t *task
                        
                        // 操作 pool 中的 task 鏈表,加鎖
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
                                // 拿到 taskHead 準(zhǔn)備執(zhí)行
				t = w.pool.taskHead
                                
                                // 更新鏈表的 head 以及數(shù)量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
                        // 如果前一步拿到的 taskHead 為空,說(shuō)明無(wú)任務(wù)需要執(zhí)行,清理后返回
			if t == nil {
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
                        
                        // 執(zhí)行任務(wù),針對(duì) panic 會(huì)recover,并調(diào)用配置的 handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

整體來(lái)看

看到這里,其實(shí)就能把整個(gè)流程串起來(lái)了。我們來(lái)看看對(duì)外的接口 CtxGo(context.Context, f func()) 到底做了什么?

func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}

func (p *pool) CtxGo(ctx context.Context, f func()) {

        // 創(chuàng)建一個(gè) task 對(duì)象,將 ctx 和待執(zhí)行的函數(shù)賦值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
        
        // 將 task 插入 pool 的鏈表的尾部,更新鏈表數(shù)量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
        
        
	// 以下兩個(gè)條件滿足時(shí),創(chuàng)建新的 worker 并喚起執(zhí)行:
	// 1. task的數(shù)量超過(guò)了配置的限制 
	// 2. 當(dāng)前運(yùn)行的worker數(shù)量小于上限(或無(wú)worker運(yùn)行)
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
        
                // worker數(shù)量+1
		p.incWorkerCount()
                
                // 創(chuàng)建一個(gè)新的worker,并把當(dāng)前 pool 賦值
		w := workerPool.Get().(*worker)
		w.pool = p
                
                // 喚起worker執(zhí)行
		w.run()
	}
}

相信看了代碼注釋,大家就能理解發(fā)生了什么。

gopool 會(huì)自行維護(hù)一個(gè) defaultPool,這是一個(gè)默認(rèn)的 pool 結(jié)構(gòu)體,在引入包的時(shí)候就進(jìn)行初始化。當(dāng)我們直接調(diào)用 gopool.CtxGo() 時(shí),本質(zhì)上是調(diào)用了 defaultPool 的同名方法

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}

const (
	defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
	// 控制擴(kuò)容的門檻,一旦待執(zhí)行的 task 超過(guò)此值,且 worker 數(shù)量未達(dá)到上限,就開始啟動(dòng)新的 worker
	ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPool 的名稱為 gopool.DefaultPool,池子容量一萬(wàn),擴(kuò)容下限為 1。

當(dāng)我們調(diào)用 CtxGo時(shí),gopool 就會(huì)更新維護(hù)的任務(wù)鏈表,并且判斷是否需要擴(kuò)容 worker

  • 若此時(shí)已經(jīng)有很多 worker 啟動(dòng)(底層一個(gè) worker 對(duì)應(yīng)一個(gè) goroutine),不需要擴(kuò)容,就直接返回。
  • 若判斷需要擴(kuò)容,就創(chuàng)建一個(gè)新的worker,并調(diào)用 worker.run()方法啟動(dòng),各個(gè)worker會(huì)異步地檢查 pool 里面的任務(wù)鏈表是否還有待執(zhí)行的任務(wù),如果有就執(zhí)行。

三個(gè)角色的定位

  • task 是一個(gè)待執(zhí)行的任務(wù)節(jié)點(diǎn),同時(shí)還包含了指向下一個(gè)任務(wù)的指針,鏈表結(jié)構(gòu);
  • worker 是一個(gè)實(shí)際執(zhí)行任務(wù)的執(zhí)行器,它會(huì)異步啟動(dòng)一個(gè) goroutine 執(zhí)行協(xié)程池里面未執(zhí)行的task
  • pool 是一個(gè)邏輯上的協(xié)程池,對(duì)應(yīng)了一個(gè)task鏈表,同時(shí)負(fù)責(zé)維護(hù)task狀態(tài)的更新,以及在需要的時(shí)候創(chuàng)建新的 worker。

使用 sync.Pool 進(jìn)行性能優(yōu)化

其實(shí)到這個(gè)地方,gopool已經(jīng)是一個(gè)代碼簡(jiǎn)潔清晰的協(xié)程池庫(kù)了,但是性能上顯然有改進(jìn)空間,所以gopool的作者應(yīng)用了多次 sync.Pool 來(lái)池化對(duì)象的創(chuàng)建,復(fù)用woker和task對(duì)象。

這里建議大家直接看源碼,其實(shí)在上面的代碼中已經(jīng)有所涉及。

  • task 池化
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}
  • worker 池化
var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}

到此這篇關(guān)于Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Golang協(xié)程池gopool內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • GO語(yǔ)言映射(Map)用法分析

    GO語(yǔ)言映射(Map)用法分析

    這篇文章主要介紹了GO語(yǔ)言映射(Map)用法,以實(shí)例形式較為詳細(xì)的分析了針對(duì)映射的創(chuàng)建、填充、遍歷及修改等操作的技巧,需要的朋友可以參考下
    2014-12-12
  • Go語(yǔ)言開發(fā)必知的一個(gè)內(nèi)存模型細(xì)節(jié)

    Go語(yǔ)言開發(fā)必知的一個(gè)內(nèi)存模型細(xì)節(jié)

    這篇文章主要為大家介紹了Go語(yǔ)言開發(fā)必知的一個(gè)內(nèi)存模型細(xì)節(jié)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • 使用Go語(yǔ)言實(shí)現(xiàn)benchmark解析器

    使用Go語(yǔ)言實(shí)現(xiàn)benchmark解析器

    這篇文章主要為大家詳細(xì)介紹了如何使用Go語(yǔ)言實(shí)現(xiàn)benchmark解析器并實(shí)現(xiàn)及Web UI 數(shù)據(jù)可視化,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下
    2025-04-04
  • 深入了解Go語(yǔ)言中database/sql是如何設(shè)計(jì)的

    深入了解Go語(yǔ)言中database/sql是如何設(shè)計(jì)的

    在?Go?語(yǔ)言中內(nèi)置了?database/sql?包,它只對(duì)外暴露了一套統(tǒng)一的編程接口,便可以操作不同數(shù)據(jù)庫(kù),那么database/sql?是如何設(shè)計(jì)的呢,下面就來(lái)和大家簡(jiǎn)單聊聊吧
    2023-07-07
  • GO excelize讀取excel進(jìn)行時(shí)間類型轉(zhuǎn)換的示例代碼(自動(dòng)轉(zhuǎn)換)

    GO excelize讀取excel進(jìn)行時(shí)間類型轉(zhuǎn)換的示例代碼(自動(dòng)轉(zhuǎn)換)

    我們經(jīng)常會(huì)遇到如何自動(dòng)識(shí)別excel中的時(shí)間類型數(shù)據(jù)并轉(zhuǎn)化成對(duì)應(yīng)的 "Y-m-d H:i:s"類型數(shù)據(jù),本文小編給大家介紹了GO excelize讀取excel進(jìn)行時(shí)間類型轉(zhuǎn)換的示例代碼(自動(dòng)轉(zhuǎn)換),需要的朋友可以參考下
    2024-10-10
  • Golang 性能基準(zhǔn)測(cè)試(benchmark)詳解

    Golang 性能基準(zhǔn)測(cè)試(benchmark)詳解

    Golang性能基準(zhǔn)測(cè)試可以幫助開發(fā)人員比較不同的實(shí)現(xiàn)方式對(duì)性能的影響,以便優(yōu)化程序,本文就來(lái)講解一下如何使用Golang的性能基準(zhǔn)測(cè)試功能,需要的朋友可以參考下
    2023-06-06
  • Golang?RPC的原理與簡(jiǎn)單調(diào)用詳解

    Golang?RPC的原理與簡(jiǎn)單調(diào)用詳解

    RPC(Remote?Procedure?Call),主要是幫助我們屏蔽網(wǎng)絡(luò)編程細(xì)節(jié)?,使我們更專注于業(yè)務(wù)邏輯,所以本文主要來(lái)和大家聊聊RPC的原理與簡(jiǎn)單調(diào)用,希望對(duì)大家有所幫助
    2023-05-05
  • golang搭建靜態(tài)web服務(wù)器的實(shí)現(xiàn)方法

    golang搭建靜態(tài)web服務(wù)器的實(shí)現(xiàn)方法

    這篇文章主要介紹了golang搭建靜態(tài)web服務(wù)器的實(shí)現(xiàn)方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-08-08
  • Go語(yǔ)言學(xué)習(xí)函數(shù)+結(jié)構(gòu)體+方法+接口

    Go語(yǔ)言學(xué)習(xí)函數(shù)+結(jié)構(gòu)體+方法+接口

    這篇文章主要介紹了Go語(yǔ)言學(xué)習(xí)函數(shù)+結(jié)構(gòu)體+方法+接口,文章圍繞主題的相關(guān)資料展開詳細(xì)的文章說(shuō)明,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-05-05
  • Go語(yǔ)言通過(guò)WaitGroup實(shí)現(xiàn)控制并發(fā)的示例詳解

    Go語(yǔ)言通過(guò)WaitGroup實(shí)現(xiàn)控制并發(fā)的示例詳解

    Channel能夠很好的幫助我們控制并發(fā),但是在開發(fā)習(xí)慣上與顯示的表達(dá)不太相同,所以在Go語(yǔ)言中可以利用sync包中的WaitGroup實(shí)現(xiàn)并發(fā)控制,本文就來(lái)和大家詳細(xì)聊聊WaitGroup如何實(shí)現(xiàn)控制并發(fā)
    2023-01-01

最新評(píng)論