Go高級特性探究之協(xié)程池詳解
在并發(fā)編程中,協(xié)程是 Go 語言的核心特性之一,但是在實際應用中,協(xié)程的創(chuàng)建和銷毀成本比較高。當需要同時處理大量的任務時,創(chuàng)建大量的協(xié)程會導致系統(tǒng)開銷變大,進而影響程序的性能。這時候,就需要使用協(xié)程池來管理協(xié)程的生命周期,將協(xié)程的創(chuàng)建和銷毀成本降至最小,提高程序的并發(fā)性能。
本文將介紹如何使用 Go 協(xié)程池構造一個協(xié)程池,并解決函數(shù)傳參問題、優(yōu)雅關閉協(xié)程池和保證協(xié)程安全的問題。
Pool
type Pool struct { capacity uint64 // 最大協(xié)程數(shù) runningWorkers uint64 // 當前正在運行的協(xié)程數(shù) status int64 // 協(xié)程池的狀態(tài) chTask chan *Task // 執(zhí)行任務的 channel PanicHandler func(interface{}) // 處理協(xié)程中的 panic 異常 sync.Once sync.Mutex }
Pool 類型是協(xié)程池的主要類型,包含了以下屬性:
capacity
:最大協(xié)程數(shù)。runningWorkers
:當前正在運行的協(xié)程數(shù)。status
:協(xié)程池的狀態(tài)。chTask
:執(zhí)行任務的 channel。PanicHandler
:處理協(xié)程中的 panic 異常。sync.Once
:防止 Stop 函數(shù)被多次調用。sync.Mutex
: 用于鎖定協(xié)程池的狀態(tài)和 channel。
同時 Pool 類型包含以下函數(shù):
NewPool
:用于初始化協(xié)程池。Submit
:將任務放到 channel 中供協(xié)程進行任務處理。createWorker
:用于創(chuàng)建并啟動一個協(xié)程來執(zhí)行任務。incRunning
:增加協(xié)程池的運行協(xié)程數(shù)。decRunning
:減少協(xié)程池的運行協(xié)程數(shù)。Stop
:關閉協(xié)程池,停止接受任務并且等待所有任務執(zhí)行完畢后關閉協(xié)程池。
NewPool 函數(shù)
NewPool 函數(shù)用于創(chuàng)建和初始化一個協(xié)程池。將最大協(xié)程數(shù) n 和處理協(xié)程中 panic 異常的函數(shù) panicHandler 傳入函數(shù)中,創(chuàng)建一個 Pool 類型,并將屬性初始化后返回一個 Pool 的指針類型。
func NewPool(n uint64, panicHandler func(interface{})) *Pool { return &Pool{ capacity: n, status: Running, chTask: make(chan *Task, n), PanicHandler: panicHandler, } }
Submit 函數(shù)
Submit 函數(shù)用于將任務放到 channel 中供協(xié)程進行任務處理。首先判斷協(xié)程池狀態(tài)是否為 Stopped,如果已經關閉,則返回一個錯誤;接著加鎖,并判斷 channel 中是否已滿,如果已經滿了,則返回一個錯誤,否則將任務放到 channel 中并返回 nil。
// 將任務放到 channel 中供協(xié)程進行任務處理 func (p *Pool) Submit(t *Task) error { if p.status == Stopped { return errors.New("協(xié)程池已關閉,不能提交任務") } p.Lock() defer p.Unlock() if len(p.chTask) == int(p.capacity) { return errors.New("協(xié)程池已滿,不能接受新任務") } p.chTask <- t return nil }
createWorker 函數(shù)
createWorker 函數(shù)用于創(chuàng)建并啟動一個協(xié)程來執(zhí)行任務。首先增加當前運行的協(xié)程數(shù),然后在一個 go 協(xié)程內執(zhí)行任務。如果在執(zhí)行任務的過程中出現(xiàn) panic 異常,則調用 PanicHandler
處理函數(shù),如果沒有設置 PanicHandler
處理函數(shù),則直接將異常信息打印出來。執(zhí)行完任務后,減少當前運行的協(xié)程數(shù)。
// 初始化協(xié)程池的協(xié)程數(shù)量 func (p *Pool) createWorker() { p.incRunning() // 每一個協(xié)程獲取一個任務,執(zhí)行任務 go func() { defer func() { if r := recover(); r != nil { if p.PanicHandler != nil { p.PanicHandler(r) } else { fmt.Println("Panic:", r) } } p.decRunning() }() for { select { case t := <-p.chTask: if t == nil { return } t.Handler(t.Params...) } } }() }
incRunning、decRunning 函數(shù)
incRunning、decRunning 函數(shù)用于增加和減少協(xié)程池的運行協(xié)程數(shù),使用了 atomic.AddUint64 函數(shù)來保證操作的原子性。
// 增加協(xié)程池的運行協(xié)程數(shù) func (p *Pool) incRunning() { atomic.AddUint64(&p.runningWorkers, 1) } // 減少協(xié)程池的運行協(xié)程數(shù) func (p *Pool) decRunning() { atomic.AddUint64(&p.runningWorkers, ^uint64(0)) }
Stop 函數(shù)
Stop 函數(shù)用于關閉協(xié)程池,停止接受任務并且等待所有任務執(zhí)行完畢后關閉協(xié)程池。首先判斷協(xié)程池狀態(tài)是否為 Running,如果已經關閉,則直接返回;接著將協(xié)程池狀態(tài)設置為 Stopped,然后使用 sync.Once 確保關閉 channel 的操作僅被執(zhí)行一次,同時創(chuàng)建運行的協(xié)程數(shù)個協(xié)程,等待它們執(zhí)行完畢后關閉協(xié)程池。
// 關閉協(xié)程池 func (p *Pool) Stop() { if p.status == Running { p.status = Stopped p.Once.Do(func() { close(p.chTask) for i := uint64(0); i < p.runningWorkers; i++ { p.createWorker() } }) } }
解決函數(shù)傳參問題
在使用協(xié)程池時,需要向協(xié)程池提交任務,但是協(xié)程池內部的協(xié)程如何知道要執(zhí)行什么樣的任務,參數(shù)又應該如何傳遞呢?
為了解決這個問題,可以定義一個 Task 結構體,用于存儲要執(zhí)行的函數(shù)和函數(shù)參數(shù),如下所示:
type Task struct { ? ? Handler func(v ...interface{}) ? ? Params? []interface{} }
Task 類型是一個結構體,用于封裝協(xié)程池的任務。其中 Handler 是一個函數(shù)類型,用于任務執(zhí)行的函數(shù);Params 是一個可變參數(shù),調用 Handler 時傳遞給它的參數(shù)。
其中,Handler 是一個無返回值的函數(shù),且該函數(shù)可接受變長參數(shù),Params 是一個任意類型的切片,用于傳遞函數(shù)的參數(shù)列表。
在向協(xié)程池提交任務時,可以將 Task 對象作為參數(shù)進行提交。
pool.Submit(&Task{ ? ? Handler: func(v ...interface{}) { ? ? ? ? // 執(zhí)行任務的代碼 ? ? }, ? ? Params: []interface{}{...}, // 任務的參數(shù)列表 })
在協(xié)程內部,可以通過調用 Task.Handler 方法,并將 Task.Params 作為參數(shù)傳遞進去,來運行具體的任務。
select { case t := <-p.chTask: ? ? if t == nil { ? ? ? ? return ? ? } ? ? t.Handler(t.Params...) }
通過這種方式,協(xié)程池就能夠動態(tài)地執(zhí)行不同的任務,并且傳遞任意類型和數(shù)量的參數(shù)。
優(yōu)雅關閉協(xié)程池
在使用協(xié)程池時,如何正確地關閉協(xié)程池,以避免因未正確關閉而導致的內存泄漏和程序崩潰呢?
首先,需要明確協(xié)程池的運行狀態(tài),通過內部的 status
參數(shù)控制協(xié)程池的開關。當協(xié)程池處于運行狀態(tài)時,協(xié)程池才能夠接受新的任務,否則應該拒絕新的任務請求,并盡快釋放內部的資源。
其次,在關閉協(xié)程池時,需要確保所有的已運行的協(xié)程都已經執(zhí)行完任務并退出。這時,可以使用 sync.Once 來執(zhí)行一次協(xié)程池的清理工作。當協(xié)程池處于關閉狀態(tài)時,不再接受新的任務,并通知所有的協(xié)程退出任務循環(huán),最終實現(xiàn)協(xié)程池的優(yōu)雅關閉。
func (p *Pool) Stop() { ?if p.status == Running { ? p.status = Stopped ? p.Once.Do(func() { ?? close(p.chTask) ?? for i := uint64(0); i < p.runningWorkers; i++ { ? ? p.createWorker() ?? } ? }) ?} }
保證協(xié)程安全
在使用協(xié)程池時,需要注意線程安全問題,尤其是在多個協(xié)程同時訪問協(xié)程池時,需要保證協(xié)程池的內部狀態(tài)是線程安全的。
同時對于狀態(tài)的變更以及數(shù)量的增減,還需要保證代碼的安全性。
為了保證線程安全,可以使用互斥鎖 sync.Mutex 來鎖定協(xié)程池,以避免多個協(xié)程同時讀寫協(xié)程池的運行狀態(tài)和其他內部參數(shù)。
在協(xié)程池的內部實現(xiàn)中,使用的 sync.Once
只會單次執(zhí)行的特性可以保證協(xié)程池只會初始化一次,防止因多次初始化而導致的內存泄漏或其他異常。
測試用例
為了測試協(xié)程池的正確性,以下是一個簡單的測試用例。該測試用例創(chuàng)建一個容量為 3 的協(xié)程池,并向其中提交 10 個任務,每個任務隨機睡眠一段時間,并輸出當前時間。
package main import ( ?"fmt" ?"math/rand" ?"sync" ?"testing" ?"time" ) func TestPool(t *testing.T) { ?pool := NewPool(3, func(err interface{}) { ? fmt.Println("發(fā)生 panic,錯誤信息:", err) ?}) ?var wg sync.WaitGroup ?for i := 0; i < 10; i++ { ? wg.Add(1) ? go func(id int) { ?? defer wg.Done() ?? task := \&Task{ ? ? Handler: func(v ...interface{}) { ?? ? fmt.Printf("任務 %d 開始執(zhí)行,時間:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) ?? ? rand.Seed(time.Now().UnixNano()) ?? ? time.Sleep(time.Duration(rand.Intn(5)) \* time.Second) ?? ? fmt.Printf("任務 %d 執(zhí)行完畢,時間:%v\n", id, time.Now().Format("2006-01-02 15:04:05")) ? ? }, ? ? Params: \[]interface{}{}, ?? } ?? pool.Submit(task) ? }(i) ?} ?wg.Wait() }
輸出結果如下:
任務 0 開始執(zhí)行,時間:2021-10-05 16:52:22
任務 1 開始執(zhí)行,時間:2021-10-05 16:52:22
任務 2 開始執(zhí)行,時間:2021-10-05 16:52:22
任務 0 執(zhí)行完畢,時間:2021-10-05 16:52:27
任務 3 開始執(zhí)行,時間:2021-10-05 16:52:27
任務 4 開始執(zhí)行,時間:2021-10-05 16:52:27
任務 1 執(zhí)行完畢,時間:2021-10-05 16:52:28
任務 5 開始執(zhí)行,時間:2021-10-05 16:52:28
任務 6 開始執(zhí)行,時間:2021-10-05 16:52:28
任務 7 開始執(zhí)行,時間:2021-10-05 16:52:28
任務 4 執(zhí)行完畢,時間:2021-10-05 16:52:29
任務 8 開始執(zhí)行,時間:2021-10-05 16:52:29
任務 9 開始執(zhí)行,時間:2021-10-05 16:52:29
任務 2 執(zhí)行完畢,時間:2021-10-05 16:52:32
任務 5 執(zhí)行完畢,時間:2021-10-05 16:52:33
任務 7 執(zhí)行完畢,時間:2021-10-05 16:52:33
任務 6 執(zhí)行完畢,時間:2021-10-05 16:52:34
任務 3 執(zhí)行完畢,時間:2021-10-05 16:52:35
任務 9 執(zhí)行完畢,時間:2021-10-05 16:52:35
任務 8 執(zhí)行完畢,時間:2021-10-05 16:52:37
從輸出結果可以看出,協(xié)程池成功并行處理了所有的任務,并且在容量限制的情況下,成功地保證了協(xié)程池的線程安全性。
改進
可考慮增加對協(xié)程池容量的動態(tài)調整算法,例如在高峰期時增加協(xié)程池的容量,低谷期時降低協(xié)程池的容量。另外可以增加協(xié)程池的超時控制機制,以避免任務執(zhí)行時間過長導致系統(tǒng)資源浪費和性能下降。
總結
協(xié)程池是 Go 語言中一種重要的并發(fā)編程模式,通過協(xié)程池可以高效地管理協(xié)程的生命周期、避免協(xié)程的頻繁創(chuàng)建和銷毀,提高程序的并發(fā)性能。在使用協(xié)程池時,需要注意解決函數(shù)傳參問題、優(yōu)雅關閉協(xié)程池和保證協(xié)程安全的問題,通過合理使用互斥鎖和 sync.Once 可以有效解決這些問題,從而保證協(xié)程池的正確性和高效性。
以上就是Go高級特性探究之協(xié)程池詳解的詳細內容,更多關于Go協(xié)程池的資料請關注腳本之家其它相關文章!
相關文章
go語言之給定英語文章統(tǒng)計單詞數(shù)量(go語言小練習)
這篇文章給大家分享go語言小練習給定英語文章統(tǒng)計單詞數(shù)量,實現(xiàn)思路大概是利用go語言的map類型,以每個單詞作為關鍵字存儲數(shù)量信息,本文通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧2020-01-01Golang連接PostgreSQL基本操作的實現(xiàn)
PostgreSQL是常見的免費的大型關系型數(shù)據(jù)庫,本文主要介紹了Golang連接PostgreSQL基本操作的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-02-02