Go高級(jí)特性探究之協(xié)程池詳解
在并發(fā)編程中,協(xié)程是 Go 語(yǔ)言的核心特性之一,但是在實(shí)際應(yīng)用中,協(xié)程的創(chuàng)建和銷(xiāo)毀成本比較高。當(dāng)需要同時(shí)處理大量的任務(wù)時(shí),創(chuàng)建大量的協(xié)程會(huì)導(dǎo)致系統(tǒng)開(kāi)銷(xiāo)變大,進(jìn)而影響程序的性能。這時(shí)候,就需要使用協(xié)程池來(lái)管理協(xié)程的生命周期,將協(xié)程的創(chuàng)建和銷(xiāo)毀成本降至最小,提高程序的并發(fā)性能。
本文將介紹如何使用 Go 協(xié)程池構(gòu)造一個(gè)協(xié)程池,并解決函數(shù)傳參問(wèn)題、優(yōu)雅關(guān)閉協(xié)程池和保證協(xié)程安全的問(wèn)題。
Pool
type Pool struct { capacity uint64 // 最大協(xié)程數(shù) runningWorkers uint64 // 當(dāng)前正在運(yùn)行的協(xié)程數(shù) status int64 // 協(xié)程池的狀態(tài) chTask chan *Task // 執(zhí)行任務(wù)的 channel PanicHandler func(interface{}) // 處理協(xié)程中的 panic 異常 sync.Once sync.Mutex }
Pool 類(lèi)型是協(xié)程池的主要類(lèi)型,包含了以下屬性:
capacity
:最大協(xié)程數(shù)。runningWorkers
:當(dāng)前正在運(yùn)行的協(xié)程數(shù)。status
:協(xié)程池的狀態(tài)。chTask
:執(zhí)行任務(wù)的 channel。PanicHandler
:處理協(xié)程中的 panic 異常。sync.Once
:防止 Stop 函數(shù)被多次調(diào)用。sync.Mutex
: 用于鎖定協(xié)程池的狀態(tài)和 channel。
同時(shí) Pool 類(lèi)型包含以下函數(shù):
NewPool
:用于初始化協(xié)程池。Submit
:將任務(wù)放到 channel 中供協(xié)程進(jìn)行任務(wù)處理。createWorker
:用于創(chuàng)建并啟動(dòng)一個(gè)協(xié)程來(lái)執(zhí)行任務(wù)。incRunning
:增加協(xié)程池的運(yùn)行協(xié)程數(shù)。decRunning
:減少協(xié)程池的運(yùn)行協(xié)程數(shù)。Stop
:關(guān)閉協(xié)程池,停止接受任務(wù)并且等待所有任務(wù)執(zhí)行完畢后關(guān)閉協(xié)程池。
NewPool 函數(shù)
NewPool 函數(shù)用于創(chuàng)建和初始化一個(gè)協(xié)程池。將最大協(xié)程數(shù) n 和處理協(xié)程中 panic 異常的函數(shù) panicHandler 傳入函數(shù)中,創(chuàng)建一個(gè) Pool 類(lèi)型,并將屬性初始化后返回一個(gè) Pool 的指針類(lèi)型。
func NewPool(n uint64, panicHandler func(interface{})) *Pool { return &Pool{ capacity: n, status: Running, chTask: make(chan *Task, n), PanicHandler: panicHandler, } }
Submit 函數(shù)
Submit 函數(shù)用于將任務(wù)放到 channel 中供協(xié)程進(jìn)行任務(wù)處理。首先判斷協(xié)程池狀態(tài)是否為 Stopped,如果已經(jīng)關(guān)閉,則返回一個(gè)錯(cuò)誤;接著加鎖,并判斷 channel 中是否已滿(mǎn),如果已經(jīng)滿(mǎn)了,則返回一個(gè)錯(cuò)誤,否則將任務(wù)放到 channel 中并返回 nil。
// 將任務(wù)放到 channel 中供協(xié)程進(jìn)行任務(wù)處理 func (p *Pool) Submit(t *Task) error { if p.status == Stopped { return errors.New("協(xié)程池已關(guān)閉,不能提交任務(wù)") } p.Lock() defer p.Unlock() if len(p.chTask) == int(p.capacity) { return errors.New("協(xié)程池已滿(mǎn),不能接受新任務(wù)") } p.chTask <- t return nil }
createWorker 函數(shù)
createWorker 函數(shù)用于創(chuàng)建并啟動(dòng)一個(gè)協(xié)程來(lái)執(zhí)行任務(wù)。首先增加當(dāng)前運(yùn)行的協(xié)程數(shù),然后在一個(gè) go 協(xié)程內(nèi)執(zhí)行任務(wù)。如果在執(zhí)行任務(wù)的過(guò)程中出現(xiàn) panic 異常,則調(diào)用 PanicHandler
處理函數(shù),如果沒(méi)有設(shè)置 PanicHandler
處理函數(shù),則直接將異常信息打印出來(lái)。執(zhí)行完任務(wù)后,減少當(dāng)前運(yùn)行的協(xié)程數(shù)。
// 初始化協(xié)程池的協(xié)程數(shù)量 func (p *Pool) createWorker() { p.incRunning() // 每一個(gè)協(xié)程獲取一個(gè)任務(wù),執(zhí)行任務(wù) go func() { defer func() { if r := recover(); r != nil { if p.PanicHandler != nil { p.PanicHandler(r) } else { fmt.Println("Panic:", r) } } p.decRunning() }() for { select { case t := <-p.chTask: if t == nil { return } t.Handler(t.Params...) } } }() }
incRunning、decRunning 函數(shù)
incRunning、decRunning 函數(shù)用于增加和減少協(xié)程池的運(yùn)行協(xié)程數(shù),使用了 atomic.AddUint64 函數(shù)來(lái)保證操作的原子性。
// 增加協(xié)程池的運(yùn)行協(xié)程數(shù) func (p *Pool) incRunning() { atomic.AddUint64(&p.runningWorkers, 1) } // 減少協(xié)程池的運(yùn)行協(xié)程數(shù) func (p *Pool) decRunning() { atomic.AddUint64(&p.runningWorkers, ^uint64(0)) }
Stop 函數(shù)
Stop 函數(shù)用于關(guān)閉協(xié)程池,停止接受任務(wù)并且等待所有任務(wù)執(zhí)行完畢后關(guān)閉協(xié)程池。首先判斷協(xié)程池狀態(tài)是否為 Running,如果已經(jīng)關(guān)閉,則直接返回;接著將協(xié)程池狀態(tài)設(shè)置為 Stopped,然后使用 sync.Once 確保關(guān)閉 channel 的操作僅被執(zhí)行一次,同時(shí)創(chuàng)建運(yùn)行的協(xié)程數(shù)個(gè)協(xié)程,等待它們執(zhí)行完畢后關(guān)閉協(xié)程池。
// 關(guān)閉協(xié)程池 func (p *Pool) Stop() { if p.status == Running { p.status = Stopped p.Once.Do(func() { close(p.chTask) for i := uint64(0); i < p.runningWorkers; i++ { p.createWorker() } }) } }
解決函數(shù)傳參問(wèn)題
在使用協(xié)程池時(shí),需要向協(xié)程池提交任務(wù),但是協(xié)程池內(nèi)部的協(xié)程如何知道要執(zhí)行什么樣的任務(wù),參數(shù)又應(yīng)該如何傳遞呢?
為了解決這個(gè)問(wèn)題,可以定義一個(gè) Task 結(jié)構(gòu)體,用于存儲(chǔ)要執(zhí)行的函數(shù)和函數(shù)參數(shù),如下所示:
type Task struct { ? ? Handler func(v ...interface{}) ? ? Params? []interface{} }
Task 類(lèi)型是一個(gè)結(jié)構(gòu)體,用于封裝協(xié)程池的任務(wù)。其中 Handler 是一個(gè)函數(shù)類(lèi)型,用于任務(wù)執(zhí)行的函數(shù);Params 是一個(gè)可變參數(shù),調(diào)用 Handler 時(shí)傳遞給它的參數(shù)。
其中,Handler 是一個(gè)無(wú)返回值的函數(shù),且該函數(shù)可接受變長(zhǎng)參數(shù),Params 是一個(gè)任意類(lèi)型的切片,用于傳遞函數(shù)的參數(shù)列表。
在向協(xié)程池提交任務(wù)時(shí),可以將 Task 對(duì)象作為參數(shù)進(jìn)行提交。
pool.Submit(&Task{ ? ? Handler: func(v ...interface{}) { ? ? ? ? // 執(zhí)行任務(wù)的代碼 ? ? }, ? ? Params: []interface{}{...}, // 任務(wù)的參數(shù)列表 })
在協(xié)程內(nèi)部,可以通過(guò)調(diào)用 Task.Handler 方法,并將 Task.Params 作為參數(shù)傳遞進(jìn)去,來(lái)運(yùn)行具體的任務(wù)。
select { case t := <-p.chTask: ? ? if t == nil { ? ? ? ? return ? ? } ? ? t.Handler(t.Params...) }
通過(guò)這種方式,協(xié)程池就能夠動(dòng)態(tài)地執(zhí)行不同的任務(wù),并且傳遞任意類(lèi)型和數(shù)量的參數(shù)。
優(yōu)雅關(guān)閉協(xié)程池
在使用協(xié)程池時(shí),如何正確地關(guān)閉協(xié)程池,以避免因未正確關(guān)閉而導(dǎo)致的內(nèi)存泄漏和程序崩潰呢?
首先,需要明確協(xié)程池的運(yùn)行狀態(tài),通過(guò)內(nèi)部的 status
參數(shù)控制協(xié)程池的開(kāi)關(guān)。當(dāng)協(xié)程池處于運(yùn)行狀態(tài)時(shí),協(xié)程池才能夠接受新的任務(wù),否則應(yīng)該拒絕新的任務(wù)請(qǐng)求,并盡快釋放內(nèi)部的資源。
其次,在關(guān)閉協(xié)程池時(shí),需要確保所有的已運(yùn)行的協(xié)程都已經(jīng)執(zhí)行完任務(wù)并退出。這時(shí),可以使用 sync.Once 來(lái)執(zhí)行一次協(xié)程池的清理工作。當(dāng)協(xié)程池處于關(guān)閉狀態(tài)時(shí),不再接受新的任務(wù),并通知所有的協(xié)程退出任務(wù)循環(huán),最終實(shí)現(xiàn)協(xié)程池的優(yōu)雅關(guān)閉。
func (p *Pool) Stop() { ?if p.status == Running { ? p.status = Stopped ? p.Once.Do(func() { ?? close(p.chTask) ?? for i := uint64(0); i < p.runningWorkers; i++ { ? ? p.createWorker() ?? } ? }) ?} }
保證協(xié)程安全
在使用協(xié)程池時(shí),需要注意線(xiàn)程安全問(wèn)題,尤其是在多個(gè)協(xié)程同時(shí)訪問(wèn)協(xié)程池時(shí),需要保證協(xié)程池的內(nèi)部狀態(tài)是線(xiàn)程安全的。
同時(shí)對(duì)于狀態(tài)的變更以及數(shù)量的增減,還需要保證代碼的安全性。
為了保證線(xiàn)程安全,可以使用互斥鎖 sync.Mutex 來(lái)鎖定協(xié)程池,以避免多個(gè)協(xié)程同時(shí)讀寫(xiě)協(xié)程池的運(yùn)行狀態(tài)和其他內(nèi)部參數(shù)。
在協(xié)程池的內(nèi)部實(shí)現(xiàn)中,使用的 sync.Once
只會(huì)單次執(zhí)行的特性可以保證協(xié)程池只會(huì)初始化一次,防止因多次初始化而導(dǎo)致的內(nèi)存泄漏或其他異常。
測(cè)試用例
為了測(cè)試協(xié)程池的正確性,以下是一個(gè)簡(jiǎn)單的測(cè)試用例。該測(cè)試用例創(chuàng)建一個(gè)容量為 3 的協(xié)程池,并向其中提交 10 個(gè)任務(wù),每個(gè)任務(wù)隨機(jī)睡眠一段時(shí)間,并輸出當(dāng)前時(shí)間。
package main import ( ?"fmt" ?"math/rand" ?"sync" ?"testing" ?"time" ) func TestPool(t *testing.T) { ?pool := NewPool(3, func(err interface{}) { ? fmt.Println("發(fā)生 panic,錯(cuò)誤信息:", err) ?}) ?var wg sync.WaitGroup ?for i := 0; i < 10; i++ { ? wg.Add(1) ? go func(id int) { ?? defer wg.Done() ?? task := \&Task{ ? ? Handler: func(v ...interface{}) { ?? ? fmt.Printf("任務(wù) %d 開(kāi)始執(zhí)行,時(shí)間:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) ?? ? rand.Seed(time.Now().UnixNano()) ?? ? time.Sleep(time.Duration(rand.Intn(5)) \* time.Second) ?? ? fmt.Printf("任務(wù) %d 執(zhí)行完畢,時(shí)間:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) ? ? }, ? ? Params: \[]interface{}{}, ?? } ?? pool.Submit(task) ? }(i) ?} ?wg.Wait() }
輸出結(jié)果如下:
任務(wù) 0 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:22
任務(wù) 1 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:22
任務(wù) 2 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:22
任務(wù) 0 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:27
任務(wù) 3 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:27
任務(wù) 4 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:27
任務(wù) 1 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:28
任務(wù) 5 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:28
任務(wù) 6 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:28
任務(wù) 7 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:28
任務(wù) 4 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:29
任務(wù) 8 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:29
任務(wù) 9 開(kāi)始執(zhí)行,時(shí)間:2021-10-05 16:52:29
任務(wù) 2 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:32
任務(wù) 5 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:33
任務(wù) 7 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:33
任務(wù) 6 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:34
任務(wù) 3 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:35
任務(wù) 9 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:35
任務(wù) 8 執(zhí)行完畢,時(shí)間:2021-10-05 16:52:37
從輸出結(jié)果可以看出,協(xié)程池成功并行處理了所有的任務(wù),并且在容量限制的情況下,成功地保證了協(xié)程池的線(xiàn)程安全性。
改進(jìn)
可考慮增加對(duì)協(xié)程池容量的動(dòng)態(tài)調(diào)整算法,例如在高峰期時(shí)增加協(xié)程池的容量,低谷期時(shí)降低協(xié)程池的容量。另外可以增加協(xié)程池的超時(shí)控制機(jī)制,以避免任務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng)導(dǎo)致系統(tǒng)資源浪費(fèi)和性能下降。
總結(jié)
協(xié)程池是 Go 語(yǔ)言中一種重要的并發(fā)編程模式,通過(guò)協(xié)程池可以高效地管理協(xié)程的生命周期、避免協(xié)程的頻繁創(chuàng)建和銷(xiāo)毀,提高程序的并發(fā)性能。在使用協(xié)程池時(shí),需要注意解決函數(shù)傳參問(wèn)題、優(yōu)雅關(guān)閉協(xié)程池和保證協(xié)程安全的問(wèn)題,通過(guò)合理使用互斥鎖和 sync.Once 可以有效解決這些問(wèn)題,從而保證協(xié)程池的正確性和高效性。
以上就是Go高級(jí)特性探究之協(xié)程池詳解的詳細(xì)內(nèi)容,更多關(guān)于Go協(xié)程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- golang協(xié)程池設(shè)計(jì)詳解
- golang協(xié)程池模擬實(shí)現(xiàn)群發(fā)郵件功能
- GO實(shí)現(xiàn)協(xié)程池管理的方法
- Golang協(xié)程池gopool設(shè)計(jì)與實(shí)現(xiàn)
- Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例
- Golang協(xié)程池的實(shí)現(xiàn)與應(yīng)用
- 詳解Go語(yǔ)言如何實(shí)現(xiàn)一個(gè)最簡(jiǎn)化的協(xié)程池
- Golang線(xiàn)程池與協(xié)程池的使用
- golang實(shí)現(xiàn)協(xié)程池的方法示例
- go協(xié)程池實(shí)現(xiàn)原理小結(jié)
相關(guān)文章
golang sql語(yǔ)句超時(shí)控制方案及原理
一般應(yīng)用程序在執(zhí)行一條sql語(yǔ)句時(shí),都會(huì)給這條sql設(shè)置一個(gè)超時(shí)時(shí)間,本文主要介紹了golang sql語(yǔ)句超時(shí)控制方案及原理,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12Go語(yǔ)言循環(huán)遍歷含有中文的字符串的方法小結(jié)
這篇文章主要介紹了Go語(yǔ)言循環(huán)遍歷含有中文的字符串的幾種方法,文章通過(guò)代碼示例講解的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴跟著小編一起來(lái)看看吧2023-07-07Golang正整數(shù)指定規(guī)則排序算法問(wèn)題分析
這篇文章主要介紹了Golang正整數(shù)指定規(guī)則排序算法問(wèn)題,結(jié)合實(shí)例形式分析了Go語(yǔ)言排序算法操作技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2017-01-01Golang常見(jiàn)錯(cuò)誤之值拷貝和for循環(huán)中的單一變量詳解
這篇文章主要給大家介紹了關(guān)于Golang常見(jiàn)錯(cuò)誤之值拷貝和for循環(huán)中單一變量的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-11-11一文帶你了解Go語(yǔ)言中的類(lèi)型斷言和類(lèi)型轉(zhuǎn)換
在Go中,類(lèi)型斷言和類(lèi)型轉(zhuǎn)換是一個(gè)令人困惑的事情,他們似乎都在做同樣的事情。最明顯的不同點(diǎn)是他們具有不同的語(yǔ)法(variable.(type)?vs?type(variable)?)。本文我們就來(lái)深入研究一下二者的區(qū)別2022-09-09源碼解析gtoken替換jwt實(shí)現(xiàn)sso登錄
這篇文章主要為大家介紹了源碼解析gtoken替換jwt實(shí)現(xiàn)sso登錄的示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Go標(biāo)準(zhǔn)庫(kù)http?server優(yōu)雅啟動(dòng)深入理解
這篇文章主要介紹了Go標(biāo)準(zhǔn)庫(kù)http?server優(yōu)雅啟動(dòng)深入理解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01Go生成base64圖片驗(yàn)證碼實(shí)例(超詳細(xì)工具類(lèi))
這段時(shí)間需要使用圖片驗(yàn)證碼庫(kù),下面這篇文章主要給大家介紹了關(guān)于Go生成base64圖片驗(yàn)證碼的相關(guān)資料,文中給出了詳細(xì)的實(shí)例代碼,需要的朋友可以參考下2023-06-06