Golang協(xié)程池gopool設(shè)計與實現(xiàn)
Goroutine
Goroutine 是 Golang 提供的一種輕量級線程,我們通常稱之為「協(xié)程」,相比較線程,創(chuàng)建一個協(xié)程的成本是很低的。所以你會經(jīng)??吹?Golang 開發(fā)的應(yīng)用出現(xiàn)上千個協(xié)程并發(fā)的場景。
Goroutine 的優(yōu)勢:
- 與線程相比,Goroutines 成本很低。
它們的堆棧大小只有幾 kb,堆??梢愿鶕?jù)應(yīng)用程序的需要增長和縮小,context switch 也很快,而在線程的情況下,堆棧大小必須指定并固定。
- Goroutine 被多路復(fù)用到更少數(shù)量的 OS 線程。
一個包含數(shù)千個 Goroutine 的程序中可能只有一個線程。如果該線程中的任何 Goroutine 阻塞等待用戶輸入,則創(chuàng)建另一個 OS 線程并將剩余的 Goroutine 移動到新的 OS 線程。所有這些都由運行時處理,作為開發(fā)者無需耗費心力關(guān)心,這也使得我們有很干凈的 API 來支持并發(fā)。
- Goroutines 使用 channel 進行通信。
channel 的設(shè)計有效防止了在使用 Goroutine 訪問共享內(nèi)存時發(fā)生競爭條件(race conditions) 。channel 可以被認(rèn)為是 Goroutine 進行通信的管道。
下文中我們會以「協(xié)程」來代指 Goroutine。
協(xié)程池
在高并發(fā)場景下,我們可能會啟動大量的協(xié)程來處理業(yè)務(wù)邏輯。協(xié)程池是一種利用池化技術(shù),復(fù)用對象,減少內(nèi)存分配的頻率以及協(xié)程創(chuàng)建開銷,從而提高協(xié)程執(zhí)行效率的技術(shù)。
最近抽空了解了字節(jié)官方開源的 gopkg 庫提供的 gopool 協(xié)程池實現(xiàn),感覺還是很高質(zhì)量的,代碼也非常簡潔清晰,而且 Kitex 底層也在使用 gopool 來管理協(xié)程,這里我們梳理一下設(shè)計和實現(xiàn)。
gopool
Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool
gopoolis a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to thegokeyword.
了解官方 README 就會發(fā)現(xiàn)gopool的用法其實非常簡單,將曾經(jīng)我們經(jīng)常使用的 go func(){...} 替換為 gopool.Go(func(){...}) 即可。
此時 gopool 將會使用默認(rèn)的配置來管理你啟動的協(xié)程,你也可以選擇針對業(yè)務(wù)場景配置池子大小,以及擴容上限。
old:
go func() {
// do your job
}()new:
import (
"github.com/bytedance/gopkg/util/gopool"
)
gopool.Go(func(){
/// do your job
})核心實現(xiàn)
下面我們來看看gopool是怎樣實現(xiàn)協(xié)程池管理的。
Pool
Pool 是一個定義了協(xié)程池能力的接口。
type Pool interface {
// 池子的名稱
Name() string
// 設(shè)置池子內(nèi)Goroutine的容量
SetCap(cap int32)
// 執(zhí)行 f 函數(shù)
Go(f func())
// 帶 ctx,執(zhí)行 f 函數(shù)
CtxGo(ctx context.Context, f func())
// 設(shè)置發(fā)生panic時調(diào)用的函數(shù)
SetPanicHandler(f func(context.Context, interface{}))
}gopool 提供了這個接口的默認(rèn)實現(xiàn)(即下面即將介紹的pool),當(dāng)我們直接調(diào)用 gopool.CtxGo 時依賴的就是這個。
這樣的設(shè)計模式在 Kitex 中也經(jīng)常出現(xiàn),所有的依賴均設(shè)計為接口,便于隨后擴展,底層提供一個默認(rèn)的實現(xiàn)暴露出去,這樣對調(diào)用方也很友好。
type pool struct {
// 池子名稱
name string
// 池子的容量, 即最大并發(fā)工作的 goroutine 的數(shù)量
cap int32
// 池子配置
config *Config
// task 鏈表
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// 記錄當(dāng)前正在運行的 worker 的數(shù)量
workerCount int32
// 當(dāng) worker 出現(xiàn)panic時被調(diào)用
panicHandler func(context.Context, interface{})
}
// NewPool 創(chuàng)建一個新的協(xié)程池,初始化名稱,容量,配置
func NewPool(name string, cap int32, config *Config) Pool {
p := &pool{
name: name,
cap: cap,
config: config,
}
return p
}調(diào)用 NewPool 獲取了以 Pool 的形式返回的 pool 結(jié)構(gòu)體。
Task
type task struct {
ctx context.Context
f func()
next *task
}task 是一個鏈表結(jié)構(gòu),可以把它理解為一個待執(zhí)行的任務(wù),它包含了當(dāng)前節(jié)點需要執(zhí)行的函數(shù)f, 以及指向下一個task的指針。
綜合前一節(jié) pool 的定義,我們可以看到,一個協(xié)程池 pool 對應(yīng)了一組task。
pool 維護了指向鏈表的頭尾的兩個指針:taskHead 和 taskTail,以及鏈表的長度taskCount 和對應(yīng)的鎖 taskLock。
Worker
type worker struct {
pool *pool
}一個 worker 就是邏輯上的一個執(zhí)行器,它唯一對應(yīng)到一個協(xié)程池 pool。當(dāng)一個worker被喚起,將會開啟一個goroutine ,不斷地從 pool 中的 task鏈表獲取任務(wù)并執(zhí)行。
func (w *worker) run() {
go func() {
for {
// 聲明即將執(zhí)行的 task
var t *task
// 操作 pool 中的 task 鏈表,加鎖
w.pool.taskLock.Lock()
if w.pool.taskHead != nil {
// 拿到 taskHead 準(zhǔn)備執(zhí)行
t = w.pool.taskHead
// 更新鏈表的 head 以及數(shù)量
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
// 如果前一步拿到的 taskHead 為空,說明無任務(wù)需要執(zhí)行,清理后返回
if t == nil {
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
// 執(zhí)行任務(wù),針對 panic 會recover,并調(diào)用配置的 handler
func() {
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
logger.CtxErrorf(t.ctx, msg)
if w.pool.panicHandler != nil {
w.pool.panicHandler(t.ctx, r)
}
}
}()
t.f()
}()
t.Recycle()
}
}()
}整體來看
看到這里,其實就能把整個流程串起來了。我們來看看對外的接口 CtxGo(context.Context, f func()) 到底做了什么?
func Go(f func()) {
CtxGo(context.Background(), f)
}
func CtxGo(ctx context.Context, f func()) {
defaultPool.CtxGo(ctx, f)
}
func (p *pool) CtxGo(ctx context.Context, f func()) {
// 創(chuàng)建一個 task 對象,將 ctx 和待執(zhí)行的函數(shù)賦值
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
// 將 task 插入 pool 的鏈表的尾部,更新鏈表數(shù)量
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// 以下兩個條件滿足時,創(chuàng)建新的 worker 并喚起執(zhí)行:
// 1. task的數(shù)量超過了配置的限制
// 2. 當(dāng)前運行的worker數(shù)量小于上限(或無worker運行)
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
// worker數(shù)量+1
p.incWorkerCount()
// 創(chuàng)建一個新的worker,并把當(dāng)前 pool 賦值
w := workerPool.Get().(*worker)
w.pool = p
// 喚起worker執(zhí)行
w.run()
}
}相信看了代碼注釋,大家就能理解發(fā)生了什么。
gopool 會自行維護一個 defaultPool,這是一個默認(rèn)的 pool 結(jié)構(gòu)體,在引入包的時候就進行初始化。當(dāng)我們直接調(diào)用 gopool.CtxGo() 時,本質(zhì)上是調(diào)用了 defaultPool 的同名方法
func init() {
defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
const (
defaultScalaThreshold = 1
)
// Config is used to config pool.
type Config struct {
// 控制擴容的門檻,一旦待執(zhí)行的 task 超過此值,且 worker 數(shù)量未達(dá)到上限,就開始啟動新的 worker
ScaleThreshold int32
}
// NewConfig creates a default Config.
func NewConfig() *Config {
c := &Config{
ScaleThreshold: defaultScalaThreshold,
}
return c
}defaultPool 的名稱為 gopool.DefaultPool,池子容量一萬,擴容下限為 1。
當(dāng)我們調(diào)用 CtxGo時,gopool 就會更新維護的任務(wù)鏈表,并且判斷是否需要擴容 worker:
- 若此時已經(jīng)有很多
worker啟動(底層一個worker對應(yīng)一個goroutine),不需要擴容,就直接返回。 - 若判斷需要擴容,就創(chuàng)建一個新的
worker,并調(diào)用worker.run()方法啟動,各個worker會異步地檢查pool里面的任務(wù)鏈表是否還有待執(zhí)行的任務(wù),如果有就執(zhí)行。
三個角色的定位
task是一個待執(zhí)行的任務(wù)節(jié)點,同時還包含了指向下一個任務(wù)的指針,鏈表結(jié)構(gòu);worker是一個實際執(zhí)行任務(wù)的執(zhí)行器,它會異步啟動一個goroutine執(zhí)行協(xié)程池里面未執(zhí)行的task;pool是一個邏輯上的協(xié)程池,對應(yīng)了一個task鏈表,同時負(fù)責(zé)維護task狀態(tài)的更新,以及在需要的時候創(chuàng)建新的worker。
使用 sync.Pool 進行性能優(yōu)化
其實到這個地方,gopool已經(jīng)是一個代碼簡潔清晰的協(xié)程池庫了,但是性能上顯然有改進空間,所以gopool的作者應(yīng)用了多次 sync.Pool 來池化對象的創(chuàng)建,復(fù)用woker和task對象。
這里建議大家直接看源碼,其實在上面的代碼中已經(jīng)有所涉及。
- task 池化
var taskPool sync.Pool
func init() {
taskPool.New = newTask
}
func newTask() interface{} {
return &task{}
}
func (t *task) Recycle() {
t.zero()
taskPool.Put(t)
}- worker 池化
var workerPool sync.Pool
func init() {
workerPool.New = newWorker
}
func newWorker() interface{} {
return &worker{}
}
func (w *worker) Recycle() {
w.zero()
workerPool.Put(w)
}到此這篇關(guān)于Golang協(xié)程池gopool設(shè)計與實現(xiàn)的文章就介紹到這了,更多相關(guān)Golang協(xié)程池gopool內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言開發(fā)必知的一個內(nèi)存模型細(xì)節(jié)
這篇文章主要為大家介紹了Go語言開發(fā)必知的一個內(nèi)存模型細(xì)節(jié)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-07-07
深入了解Go語言中database/sql是如何設(shè)計的
在?Go?語言中內(nèi)置了?database/sql?包,它只對外暴露了一套統(tǒng)一的編程接口,便可以操作不同數(shù)據(jù)庫,那么database/sql?是如何設(shè)計的呢,下面就來和大家簡單聊聊吧2023-07-07
GO excelize讀取excel進行時間類型轉(zhuǎn)換的示例代碼(自動轉(zhuǎn)換)
我們經(jīng)常會遇到如何自動識別excel中的時間類型數(shù)據(jù)并轉(zhuǎn)化成對應(yīng)的 "Y-m-d H:i:s"類型數(shù)據(jù),本文小編給大家介紹了GO excelize讀取excel進行時間類型轉(zhuǎn)換的示例代碼(自動轉(zhuǎn)換),需要的朋友可以參考下2024-10-10
Golang 性能基準(zhǔn)測試(benchmark)詳解
Golang性能基準(zhǔn)測試可以幫助開發(fā)人員比較不同的實現(xiàn)方式對性能的影響,以便優(yōu)化程序,本文就來講解一下如何使用Golang的性能基準(zhǔn)測試功能,需要的朋友可以參考下2023-06-06
golang搭建靜態(tài)web服務(wù)器的實現(xiàn)方法
這篇文章主要介紹了golang搭建靜態(tài)web服務(wù)器的實現(xiàn)方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08
Go語言學(xué)習(xí)函數(shù)+結(jié)構(gòu)體+方法+接口
這篇文章主要介紹了Go語言學(xué)習(xí)函數(shù)+結(jié)構(gòu)體+方法+接口,文章圍繞主題的相關(guān)資料展開詳細(xì)的文章說明,具有一定的參考價值,需要的小伙伴可以參考一下2022-05-05
Go語言通過WaitGroup實現(xiàn)控制并發(fā)的示例詳解
Channel能夠很好的幫助我們控制并發(fā),但是在開發(fā)習(xí)慣上與顯示的表達(dá)不太相同,所以在Go語言中可以利用sync包中的WaitGroup實現(xiàn)并發(fā)控制,本文就來和大家詳細(xì)聊聊WaitGroup如何實現(xiàn)控制并發(fā)2023-01-01

