詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池
背景
學(xué)習(xí)完優(yōu)秀的協(xié)程池開源項目ants之后,想要對協(xié)程池做了一個總結(jié),在ants的基礎(chǔ)上做了簡化,保留了一個協(xié)程池的核心功能,旨在幫助協(xié)程池的理解和使用。
為什么要用協(xié)程池
- 降低資源開銷:協(xié)程池可以重復(fù)使用協(xié)程,減少創(chuàng)建、銷毀協(xié)程的開銷,從而提高系統(tǒng)性能。
- 提高響應(yīng)速度:接收任務(wù)后可以立即執(zhí)行,無需等待創(chuàng)建協(xié)程時間。
- 增強可管理型:可以對協(xié)程進行集中調(diào)度,統(tǒng)一管理,方便調(diào)優(yōu)。
實現(xiàn)協(xié)程池都需要實現(xiàn)哪些功能
思考一下,如果如果讓你實現(xiàn)一個協(xié)程池,有哪些必要的核心功能呢?
- 協(xié)程池需要提供一個對外接收任務(wù)提交的接口
- 如何創(chuàng)建一個協(xié)程,創(chuàng)建好的協(xié)程存放在哪里?
- 協(xié)程池中的協(xié)程在沒有task時,是如何存在的?
- 如何為使用者提交的task分配一個協(xié)程?
- 協(xié)程執(zhí)行完task后,如何返還到協(xié)程池中?
協(xié)程池內(nèi)部需要維護一個裝有一個個協(xié)程的隊列,用于存放管理的協(xié)程,為了拓展功能方便,我們把每個協(xié)程都封裝一個worker,這個worker隊列需要具備幾個核心功能:
協(xié)程池整體的架構(gòu)
帶著這些如何實現(xiàn)一個簡單協(xié)程池必要核心功能的問題,我們來看下,一個協(xié)程池的核心流程,用圖來表示就是:

從圖上可以看出協(xié)程池主要包括3個組件:
協(xié)程池(gorutine-pool) :它是整個協(xié)程池的入口和主體,內(nèi)部持有一個協(xié)程隊列,用于存放、調(diào)度worker。
協(xié)程隊列(worker-queue) :持有協(xié)程池維護的所有的協(xié)程,為了拓展方便,將協(xié)程封裝成worker,一個worker對應(yīng)一個協(xié)程。
worker:每個worker對應(yīng)一個協(xié)程,它能夠運行一個任務(wù),通常是個函數(shù),是真正干活的地方。
主要流程:
當一個使用者一個task提交后,協(xié)程池從workerQueue中獲取一個可用的worker負責(zé)執(zhí)行此task,如果worker隊列中沒有可用的worker,并且worker的數(shù)量還沒有達到隊列設(shè)置最大數(shù)量,可以新建一個worker補充到隊列中,worker執(zhí)行完任務(wù)后,還需要能夠?qū)⒆约悍颠€到workder隊列中,才能達到復(fù)用的目的
三個組件的實現(xiàn)
分別來看下三個組件是如何實現(xiàn)協(xié)程池的
gorutine-pool實現(xiàn)
// Pool pool負責(zé)worker的調(diào)度
type Pool struct {
//pool最大最大線程數(shù)量
cap int32
//當前運行的worker數(shù)量
running int32
//worker隊列
workers workerQueue
//控制并發(fā)訪問臨界資源
lock sync.Mutex
}
pool結(jié)構(gòu)體中cap和running兩個屬性用來管理協(xié)程池的數(shù)量,workers存放創(chuàng)建的協(xié)程,lock控制并發(fā)訪問臨界資源。
從上述的架構(gòu)圖中可以看出,pool需要對外提供接收task的方法,以及兩個內(nèi)部從workerQueue獲取worker、返還worker到workerQueue的方法。
Submit
// Submit 提交任務(wù)
func (p *Pool) Submit(f func()) error {
if worker := p.retrieveWorker(); worker != nil {
worker.inputFunc(f)
return nil
}
return ErrPoolOverload
}
Submit()是給調(diào)用者提交task任務(wù)的方法,它的入?yún)⑹且粋€函數(shù),這個函數(shù)就是協(xié)程池使用者想讓協(xié)程池執(zhí)行的內(nèi)容。協(xié)程池pool會嘗試為這個task分配一個worker來處理task,但是,如果協(xié)程池的worker都被占用,并且有數(shù)量限制無法再創(chuàng)建新的worker,pool也無能為力,這里會返回給調(diào)用者一個"過載"的異常,當然這里可以拓展其它的拒絕策略。
retrieveWorker()
//從workerQueue中獲取一個worker
func (p *Pool) retrieveWorker() worker {
p.lock.Lock()
w := p.workers.detach()
if w != nil {
p.lock.Unlock()
} else {
//沒有拿到可用的worker
//如果容量還沒耗盡,再創(chuàng)建一個worker
if p.running < p.cap {
w = &goWorker{
pool: p,
task: make(chan func()),
}
w.run()
}
p.lock.Unlock()
}
return w
}
retrieveWorker()是從pool中的workerQueue中獲取一個worker具體實現(xiàn)。獲取worker是一個并發(fā)操作,這里使用鎖控制并發(fā)。調(diào)用workers.detach()從workerQueue中獲取worker,如果沒有拿到可用的worker,這時候還需要看看目前pool中現(xiàn)存活的worker數(shù)量是否已經(jīng)達到上限,未達上限,則可以創(chuàng)建新的worker加入到pool中。
revertWorker()
// 執(zhí)行完任務(wù)的worker返還到workerQueue
func (p *Pool) revertWorker(w *goWorker) bool {
defer func() {
p.lock.Unlock()
}()
p.lock.Lock()
//判斷容量,如果協(xié)程存活數(shù)量大于容量,銷毀
if p.running < p.cap {
p.workers.insert(w)
return true
}
return false
}
返還當前worker到pool是在worker執(zhí)行完task之后,返回時需要判斷當前存活的worker數(shù)量是否到達pool的上限,已達上限則返回失敗。另外,由于running屬性存在并發(fā)訪問的問題,返還操作也需要加鎖。
workerQueue實現(xiàn)
為了提高拓展性,我們將workerQueue抽象成接口,也就是說可以有多種協(xié)程隊列實現(xiàn)來適配更多的使用場景
workerQueue接口
// 定義一個協(xié)程隊列的接口
type workerQueue interface {
//隊列長度
len() int
//插入worker
insert(w worker)
//分派一個worker
detach() worker
}
插入insert()和分配detach()是實現(xiàn)協(xié)程隊列的核心方法。這里我們以底層基于“棧”思想的結(jié)構(gòu)作為workerQueue的默認實現(xiàn),也即后進入隊列的協(xié)程優(yōu)先被分配使用。
// 底層構(gòu)造一個棧類型的隊列來管理多個worker
type workerStack struct {
items []worker
}
我們使用數(shù)組來存放worker,用數(shù)組來模擬先進后出的協(xié)程隊列
// 新創(chuàng)建一個worker
func (ws *workerStack) insert(w worker) {
ws.items = append(ws.items, w)
}
workerStack的insert()實現(xiàn)很簡單,直接在數(shù)組尾巴追加一個worker
// 分配一個worker
func (ws *workerStack) detach() worker {
l := ws.len()
if l == 0 {
return nil
}
w := ws.items[l-1]
ws.items[l-1] = nil
ws.items = ws.items[:l-1]
return w
}
detach()負責(zé)從數(shù)組中獲取一個可用的空閑worker,每次獲取時取用的是數(shù)組的最后一個元素,也就是協(xié)程隊列末尾的worker優(yōu)先被分配出去了。
注意這里將下標l-1位置的對象置為nil,可以防止內(nèi)存泄露
worker實現(xiàn)
type worker interface {
workId() string
run()
//接收函數(shù)執(zhí)行任務(wù)
inputFunc(func())
}
type goWorker struct {
workerId string
//需要持有自己所屬的 Pool 因為要和它進行交互
pool *Pool
task chan func()
}
這里同樣了為了拓展,將worker抽象成了一個接口,goWorker是它的一個默認實現(xiàn),worker最核心的工作就是等待著task到來,接到task后執(zhí)行,task具體來說就是一個函數(shù)。這里其實是一個很簡單的生產(chǎn)者/消費者模型,我們想到使用管道來實現(xiàn)生產(chǎn)消費模型,定義一個函數(shù)類型的管道,你或許要問為什么使用管道,還有別的方式可以實現(xiàn)這個功能么?不急,我們來看看worker要實現(xiàn)什么功能:
- 創(chuàng)建出來的worker,未必馬上就有task分配過來執(zhí)行,它肯定要把自己“阻塞”住,隨時等待task到來
- task傳遞過來終歸需要一個介質(zhì)
鑒于這個場景,使用管道式非常合適的,管道內(nèi)沒有元素時,worker阻塞等待,當管道內(nèi)有task進來時,worker被喚醒,從管道中取出task進行處理。當然,我們使用一個死循環(huán),不斷自旋的從一個容器中讀取task,也能達到同樣的目的,但卻沒有使用管道合適、優(yōu)雅!
func (g *goWorker) run() {
go func() {
defer func() {
atomic.AddInt32(&g.pool.running, -1)
}()
//running+1
atomic.AddInt32(&g.pool.running, 1)
for {
select {
case f := <-g.task:
if f == nil {
return
}
//執(zhí)行提交的任務(wù)
f()
}
//worker返還到queue中
if ok := g.pool.revertWorker(g); !ok {
return
}
}
}()
}
func (g *goWorker) inputFunc(f func()) {
g.task <- f
}
run()是worker的核心方法,worker通常被創(chuàng)建后就會調(diào)用run(),一起來看下主要做了那些內(nèi)容:
- 使用select 監(jiān)聽task管道,阻塞當前協(xié)程,回到管道內(nèi)有task進入
- 監(jiān)聽到task,則會執(zhí)行task的內(nèi)容
- task執(zhí)行完畢后,會調(diào)用
poo.revertWorker()將當前worker返還到協(xié)程池中待用,當然這里未必會返成功。
很容易忽略的一個點是:為什么要新啟動一個協(xié)程來完成以上工作?因為worker中沒有task時,要阻塞等待任務(wù),如果不是在一個新的協(xié)程中,整個程序都阻塞在第一個worker的run()中,所謂協(xié)程池,就是指每個worker對應(yīng)的這個協(xié)程。
另外,pool維護一個running屬性來表示存活的worker數(shù)量,當調(diào)用run()方法后,表示worker是可用的了,running值+1。如果worker返回協(xié)程池失敗,run()執(zhí)行完畢,worker對應(yīng)的協(xié)程被系統(tǒng)銷毀,表示當前worker生命周期結(jié)束了,對應(yīng)的寫成會將running值-1。由于多個worker并發(fā)修改running值,使用了atomic.AddInt32控制臨界資源的修改。
至此,實現(xiàn)一個簡單協(xié)程池的核心的功能都已經(jīng)完成,和ants相比,這是一個相當精簡的協(xié)程池,旨在幫助我們加深對協(xié)程池、線程池這類組件模型的理解,離真正可用有段距離。ants中更完備的功能,比如:ants實現(xiàn)了定期清理空閑worker,以及對鎖的優(yōu)化、worker的池化等等,感興趣的可以看看ants,短小精悍的開源項目!
到此這篇關(guān)于詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池的文章就介紹到這了,更多相關(guān)Go協(xié)程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang打印復(fù)雜結(jié)構(gòu)體兩種方法詳解
在?Golang?語言開發(fā)中,我們經(jīng)常會使用結(jié)構(gòu)體類型,如果我們使用的結(jié)構(gòu)體類型的變量包含指針類型的字段,我們在記錄日志的時候,指針類型的字段的值是指針地址,將會給我們?debug?代碼造成不便2022-10-10
gin自定義中間件解決requestBody不可重讀(請求體取值)
這篇文章主要介紹了gin自定義中間件解決requestBody不可重讀,確??刂破髂軌颢@取請求體值,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10
golang使用map支持高并發(fā)的方法(1000萬次操作14ms)
這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬次操作14ms),本文給大家詳細講解,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-11-11
使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄
gorm是目前用得最多的go語言orm庫,本文主要介紹了使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06

