詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池
背景
學習完優(yōu)秀的協(xié)程池開源項目ants之后,想要對協(xié)程池做了一個總結,在ants的基礎上做了簡化,保留了一個協(xié)程池的核心功能,旨在幫助協(xié)程池的理解和使用。
為什么要用協(xié)程池
- 降低資源開銷:協(xié)程池可以重復使用協(xié)程,減少創(chuàng)建、銷毀協(xié)程的開銷,從而提高系統(tǒng)性能。
- 提高響應速度:接收任務后可以立即執(zhí)行,無需等待創(chuàng)建協(xié)程時間。
- 增強可管理型:可以對協(xié)程進行集中調(diào)度,統(tǒng)一管理,方便調(diào)優(yōu)。
實現(xiàn)協(xié)程池都需要實現(xiàn)哪些功能
思考一下,如果如果讓你實現(xiàn)一個協(xié)程池,有哪些必要的核心功能呢?
- 協(xié)程池需要提供一個對外接收任務提交的接口
- 如何創(chuàng)建一個協(xié)程,創(chuàng)建好的協(xié)程存放在哪里?
- 協(xié)程池中的協(xié)程在沒有task時,是如何存在的?
- 如何為使用者提交的task分配一個協(xié)程?
- 協(xié)程執(zhí)行完task后,如何返還到協(xié)程池中?
協(xié)程池內(nèi)部需要維護一個裝有一個個協(xié)程的隊列,用于存放管理的協(xié)程,為了拓展功能方便,我們把每個協(xié)程都封裝一個worker,這個worker隊列需要具備幾個核心功能:
協(xié)程池整體的架構
帶著這些如何實現(xiàn)一個簡單協(xié)程池必要核心功能的問題,我們來看下,一個協(xié)程池的核心流程,用圖來表示就是:
從圖上可以看出協(xié)程池主要包括3個組件:
協(xié)程池(gorutine-pool) :它是整個協(xié)程池的入口和主體,內(nèi)部持有一個協(xié)程隊列,用于存放、調(diào)度worker。
協(xié)程隊列(worker-queue) :持有協(xié)程池維護的所有的協(xié)程,為了拓展方便,將協(xié)程封裝成worker,一個worker對應一個協(xié)程。
worker:每個worker對應一個協(xié)程,它能夠運行一個任務,通常是個函數(shù),是真正干活的地方。
主要流程:
當一個使用者一個task提交后,協(xié)程池從workerQueue中獲取一個可用的worker負責執(zhí)行此task,如果worker隊列中沒有可用的worker,并且worker的數(shù)量還沒有達到隊列設置最大數(shù)量,可以新建一個worker補充到隊列中,worker執(zhí)行完任務后,還需要能夠將自己返還到workder隊列中,才能達到復用的目的
三個組件的實現(xiàn)
分別來看下三個組件是如何實現(xiàn)協(xié)程池的
gorutine-pool實現(xiàn)
// Pool pool負責worker的調(diào)度 type Pool struct { //pool最大最大線程數(shù)量 cap int32 //當前運行的worker數(shù)量 running int32 //worker隊列 workers workerQueue //控制并發(fā)訪問臨界資源 lock sync.Mutex }
pool結構體中cap
和running
兩個屬性用來管理協(xié)程池的數(shù)量,workers
存放創(chuàng)建的協(xié)程,lock
控制并發(fā)訪問臨界資源。
從上述的架構圖中可以看出,pool需要對外提供接收task的方法,以及兩個內(nèi)部從workerQueue獲取worker、返還worker到workerQueue的方法。
Submit
// Submit 提交任務 func (p *Pool) Submit(f func()) error { if worker := p.retrieveWorker(); worker != nil { worker.inputFunc(f) return nil } return ErrPoolOverload }
Submit()是給調(diào)用者提交task任務的方法,它的入?yún)⑹且粋€函數(shù),這個函數(shù)就是協(xié)程池使用者想讓協(xié)程池執(zhí)行的內(nèi)容。協(xié)程池pool會嘗試為這個task分配一個worker來處理task,但是,如果協(xié)程池的worker都被占用,并且有數(shù)量限制無法再創(chuàng)建新的worker,pool也無能為力,這里會返回給調(diào)用者一個"過載"的異常,當然這里可以拓展其它的拒絕策略。
retrieveWorker()
//從workerQueue中獲取一個worker func (p *Pool) retrieveWorker() worker { p.lock.Lock() w := p.workers.detach() if w != nil { p.lock.Unlock() } else { //沒有拿到可用的worker //如果容量還沒耗盡,再創(chuàng)建一個worker if p.running < p.cap { w = &goWorker{ pool: p, task: make(chan func()), } w.run() } p.lock.Unlock() } return w }
retrieveWorker()
是從pool中的workerQueue中獲取一個worker具體實現(xiàn)。獲取worker是一個并發(fā)操作,這里使用鎖控制并發(fā)。調(diào)用workers.detach()
從workerQueue中獲取worker,如果沒有拿到可用的worker,這時候還需要看看目前pool中現(xiàn)存活的worker數(shù)量是否已經(jīng)達到上限,未達上限,則可以創(chuàng)建新的worker加入到pool中。
revertWorker()
// 執(zhí)行完任務的worker返還到workerQueue func (p *Pool) revertWorker(w *goWorker) bool { defer func() { p.lock.Unlock() }() p.lock.Lock() //判斷容量,如果協(xié)程存活數(shù)量大于容量,銷毀 if p.running < p.cap { p.workers.insert(w) return true } return false }
返還當前worker到pool是在worker執(zhí)行完task之后,返回時需要判斷當前存活的worker數(shù)量是否到達pool的上限,已達上限則返回失敗。另外,由于running
屬性存在并發(fā)訪問的問題,返還操作也需要加鎖。
workerQueue實現(xiàn)
為了提高拓展性,我們將workerQueue抽象成接口,也就是說可以有多種協(xié)程隊列實現(xiàn)來適配更多的使用場景
workerQueue接口
// 定義一個協(xié)程隊列的接口 type workerQueue interface { //隊列長度 len() int //插入worker insert(w worker) //分派一個worker detach() worker }
插入insert()
和分配detach()
是實現(xiàn)協(xié)程隊列的核心方法。這里我們以底層基于“棧”思想的結構作為workerQueue的默認實現(xiàn),也即后進入隊列的協(xié)程優(yōu)先被分配使用。
// 底層構造一個棧類型的隊列來管理多個worker type workerStack struct { items []worker }
我們使用數(shù)組來存放worker,用數(shù)組來模擬先進后出的協(xié)程隊列
// 新創(chuàng)建一個worker func (ws *workerStack) insert(w worker) { ws.items = append(ws.items, w) }
workerStack的insert()
實現(xiàn)很簡單,直接在數(shù)組尾巴追加一個worker
// 分配一個worker func (ws *workerStack) detach() worker { l := ws.len() if l == 0 { return nil } w := ws.items[l-1] ws.items[l-1] = nil ws.items = ws.items[:l-1] return w }
detach()
負責從數(shù)組中獲取一個可用的空閑worker,每次獲取時取用的是數(shù)組的最后一個元素,也就是協(xié)程隊列末尾的worker優(yōu)先被分配出去了。
注意這里將下標l-1
位置的對象置為nil,可以防止內(nèi)存泄露
worker實現(xiàn)
type worker interface { workId() string run() //接收函數(shù)執(zhí)行任務 inputFunc(func()) } type goWorker struct { workerId string //需要持有自己所屬的 Pool 因為要和它進行交互 pool *Pool task chan func() }
這里同樣了為了拓展,將worker抽象成了一個接口,goWorker是它的一個默認實現(xiàn),worker最核心的工作就是等待著task到來,接到task后執(zhí)行,task具體來說就是一個函數(shù)。這里其實是一個很簡單的生產(chǎn)者/消費者模型,我們想到使用管道來實現(xiàn)生產(chǎn)消費模型,定義一個函數(shù)類型的管道,你或許要問為什么使用管道,還有別的方式可以實現(xiàn)這個功能么?不急,我們來看看worker要實現(xiàn)什么功能:
- 創(chuàng)建出來的worker,未必馬上就有task分配過來執(zhí)行,它肯定要把自己“阻塞”住,隨時等待task到來
- task傳遞過來終歸需要一個介質(zhì)
鑒于這個場景,使用管道式非常合適的,管道內(nèi)沒有元素時,worker阻塞等待,當管道內(nèi)有task進來時,worker被喚醒,從管道中取出task進行處理。當然,我們使用一個死循環(huán),不斷自旋的從一個容器中讀取task,也能達到同樣的目的,但卻沒有使用管道合適、優(yōu)雅!
func (g *goWorker) run() { go func() { defer func() { atomic.AddInt32(&g.pool.running, -1) }() //running+1 atomic.AddInt32(&g.pool.running, 1) for { select { case f := <-g.task: if f == nil { return } //執(zhí)行提交的任務 f() } //worker返還到queue中 if ok := g.pool.revertWorker(g); !ok { return } } }() } func (g *goWorker) inputFunc(f func()) { g.task <- f }
run()
是worker的核心方法,worker通常被創(chuàng)建后就會調(diào)用run()
,一起來看下主要做了那些內(nèi)容:
- 使用select 監(jiān)聽task管道,阻塞當前協(xié)程,回到管道內(nèi)有task進入
- 監(jiān)聽到task,則會執(zhí)行task的內(nèi)容
- task執(zhí)行完畢后,會調(diào)用
poo.revertWorker()
將當前worker返還到協(xié)程池中待用,當然這里未必會返成功。
很容易忽略的一個點是:為什么要新啟動一個協(xié)程來完成以上工作?因為worker中沒有task時,要阻塞等待任務,如果不是在一個新的協(xié)程中,整個程序都阻塞在第一個worker的run()
中,所謂協(xié)程池,就是指每個worker對應的這個協(xié)程。
另外,pool維護一個running
屬性來表示存活的worker數(shù)量,當調(diào)用run()
方法后,表示worker是可用的了,running
值+1。如果worker返回協(xié)程池失敗,run()
執(zhí)行完畢,worker對應的協(xié)程被系統(tǒng)銷毀,表示當前worker生命周期結束了,對應的寫成會將running
值-1。由于多個worker并發(fā)修改running
值,使用了atomic.AddInt32
控制臨界資源的修改。
至此,實現(xiàn)一個簡單協(xié)程池的核心的功能都已經(jīng)完成,和ants相比,這是一個相當精簡的協(xié)程池,旨在幫助我們加深對協(xié)程池、線程池這類組件模型的理解,離真正可用有段距離。ants中更完備的功能,比如:ants實現(xiàn)了定期清理空閑worker,以及對鎖的優(yōu)化、worker的池化等等,感興趣的可以看看ants,短小精悍的開源項目!
到此這篇關于詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池的文章就介紹到這了,更多相關Go協(xié)程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
gin自定義中間件解決requestBody不可重讀(請求體取值)
這篇文章主要介紹了gin自定義中間件解決requestBody不可重讀,確??刂破髂軌颢@取請求體值,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-10-10golang使用map支持高并發(fā)的方法(1000萬次操作14ms)
這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬次操作14ms),本文給大家詳細講解,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-11-11使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄
gorm是目前用得最多的go語言orm庫,本文主要介紹了使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2022-06-06