Go語(yǔ)言ants協(xié)程池的具體使用
在 Go 語(yǔ)言中,goroutine 的輕量特性使得高并發(fā)編程變得異常簡(jiǎn)單。然而,隨著并發(fā)量的增加,頻繁創(chuàng)建對(duì)象和無(wú)限制啟動(dòng) goroutine 也可能帶來(lái)內(nèi)存浪費(fèi)、GC 壓力和資源搶占等問(wèn)題。為了解決這些隱患,協(xié)程池成為常用的優(yōu)化手段。用于控制并發(fā)數(shù)量、避免系統(tǒng)過(guò)載。本文將簡(jiǎn)要介紹golang 中大名鼎鼎的 ants 協(xié)程池庫(kù)的實(shí)現(xiàn)原理。
ants包倉(cāng)庫(kù) : https://github.com/panjf2000/ants
為什么用協(xié)程池?
- 提升性能:主要面向一類場(chǎng)景:大批量輕量級(jí)并發(fā)任務(wù),任務(wù)執(zhí)行成本與協(xié)程創(chuàng)建/銷毀成本量級(jí)接近;
- 動(dòng)態(tài)調(diào)配并發(fā)資源 : 能夠動(dòng)態(tài)調(diào)整所需的協(xié)程數(shù)量以及各個(gè)模塊的并發(fā)度上限;
- 協(xié)程生命周期控制:實(shí)時(shí)查看當(dāng)前全局并發(fā)的協(xié)程數(shù)量;有一個(gè)統(tǒng)一的緊急入口釋放全局協(xié)程.
1. 使用方法
安裝ants庫(kù)
go get -u github.com/panjf2000/ants/v2
1.1 創(chuàng)建協(xié)程池 NewPool(size int)
用于創(chuàng)建一個(gè)容量為 size 的協(xié)程池。默認(rèn)情況下,協(xié)程池不會(huì)自動(dòng)擴(kuò)容,因此超出容量限制的任務(wù)會(huì)等待空閑 worker。
import "github.com/panjf2000/ants/v2"
var pool *ants.Pool
func init() {
var err error
pool, err = ants.NewPool(10) // 創(chuàng)建容量為10的協(xié)程池
if err != nil {
log.Fatalf("Failed to create goroutine pool: %v", err)
}
}
- NewPool() 返回的是一個(gè)可復(fù)用的固定容量協(xié)程池,內(nèi)部通過(guò)任務(wù)隊(duì)列與 worker 協(xié)同處理。
1.2 提交任務(wù) Submit(task func())
協(xié)程池的核心方法
// Submit submits a task to the pool. // // Note that you are allowed to call Pool.Submit() from the current Pool.Submit(), // but what calls for special attention is that you will get blocked with the last // Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this, // you should instantiate a Pool with ants.WithNonblocking(true). func (p *Pool) Submit(task func()) error
使用 Submit() 提交一個(gè)函數(shù)類型任務(wù)給協(xié)程池異步執(zhí)行
示例 :
err := pool.Submit(func() {
fmt.Println("Task executed by goroutine:", runtime.NumGoroutine())
})
if err != nil {
log.Println("Failed to submit task:", err)
}
- 每次調(diào)用 Submit() 不會(huì)阻塞主線程。
- 如果當(dāng)前運(yùn)行的 goroutine 已達(dá)到上限,任務(wù)將等待空閑 worker。
1.3 釋放協(xié)程池 Release()
釋放協(xié)程池資源,釋放后協(xié)程池不再接受新的任務(wù)提交。
pool.Release()
?? 注意:一旦調(diào)用 Release(),協(xié)程池將被永久關(guān)閉,不能再次使用。再次提交任務(wù)將 panic。
1.4 查詢當(dāng)前運(yùn)行數(shù) Running()
適合用于實(shí)時(shí)監(jiān)控協(xié)程池負(fù)載狀態(tài)。
fmt.Printf("Running goroutines: %d\n", pool.Running())
適合用于實(shí)時(shí)監(jiān)控協(xié)程池負(fù)載狀態(tài)。
1.5 池容量
獲取池容量 Cap()
返回協(xié)程池的最大容量(即最大 goroutine 數(shù)量)??捎糜谂c Running() 搭配分析使用率。
fmt.Printf("Pool capacity: %d\n", pool.Cap())
動(dòng)態(tài)調(diào)整容量 Tune(newSize int)
在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整協(xié)程池容量,適應(yīng)系統(tǒng)負(fù)載變化。
pool.Tune(20) // 將容量調(diào)整為20
- 擴(kuò)容會(huì)立即生效。
- 縮容后,多余的 worker 會(huì)在任務(wù)完成后自動(dòng)回收。
- Tune() 不會(huì)中斷正在執(zhí)行的任務(wù)。
流程

2. 底層實(shí)現(xiàn)
原理篇前置知識(shí)
詳細(xì)請(qǐng)看以往文章 : sync 包鎖與對(duì)象池
2.1 核心數(shù)據(jù)結(jié)構(gòu)
2.1.1 goWorker
type goWorker struct {
pool *Pool
task chan func()
recycleTime time.Time
}
goWorker 就是我們協(xié)程池里的實(shí)例 , 簡(jiǎn)單理解為一個(gè)長(zhǎng)時(shí)間運(yùn)行而不回收的協(xié)程,用于反復(fù)處理用戶提交的異步任務(wù)
- pool:goWorker 所屬的協(xié)程池;
- task:goWorker 用于接收異步任務(wù)包的管道;
- recycleTime:goWorker 回收到協(xié)程池的時(shí)間.
2.1.2 Pool
type Pool struct {
capacity int32
running int32
lock sync.Locker
workers workerArray
state int32
cond *sync.Cond
workerCache sync.Pool
waiting int32
heartbeatDone int32
stopHeartbeat context.CancelFunc
options *Options
}
- capacity:池子的容量
- running:出于運(yùn)行中的協(xié)程數(shù)量
- lock:自制的自旋鎖,保證取 goWorker 時(shí)并發(fā)安全
- workers:goWorker 列表,即“真正意義上的協(xié)程池”
- state:池子狀態(tài)標(biāo)識(shí),0-打開;1-關(guān)閉
- cond:并發(fā)協(xié)調(diào)器,用于阻塞模式下,掛起和喚醒等待資源的協(xié)程
- waiting:標(biāo)識(shí)出于等待狀態(tài)的協(xié)程數(shù)量;
- heartbeatDone:標(biāo)識(shí)回收協(xié)程是否關(guān)閉;
- stopHeartbeat:用于關(guān)閉回收協(xié)程的控制器函數(shù);
- options:一些定制化的配置.
- workerCache:存放 goWorker 的對(duì)象池,用于緩存釋放的 goworker 資源用于復(fù)用. 對(duì)象池需要區(qū)別于協(xié)程池,協(xié)程池中的
goWorker 仍存活,進(jìn)入對(duì)象池的 goWorker 邏輯意義已經(jīng)銷毀;

2.1.3 workerArray
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
該 interface 主要定義了作為數(shù)據(jù)集合的幾個(gè)通用 api,以及用于回收過(guò)期 goWorker 的 api.
- insert 插入一個(gè)
goWorker - detach 取出一個(gè)
goWorker - retrieveExpiry 獲取池中空閑時(shí)間超過(guò) duration 的 已經(jīng)過(guò)期的
goWorker集合 ,其中goWorker的回收時(shí)間與入棧先后順序相關(guān),因此可以借助binarySearch方法基于二分法快速獲取到目標(biāo)集合.
2.2 核心方法的實(shí)現(xiàn)
2.2.1 NewPool 創(chuàng)建協(xié)程池
func NewPool(size int, options ...Option) (*Pool, error) {
// 讀取用戶配置,做一些前置校驗(yàn),默認(rèn)值賦值等前處理動(dòng)作...
opts := loadOptions(options...)
// 構(gòu)造好 Pool 數(shù)據(jù)結(jié)構(gòu);
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
// 構(gòu)造對(duì)象池
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
// 構(gòu)造好 goWorker 對(duì)象池 workerCache,聲明好工廠函數(shù);
p.workers = newWorkerArray(stackType, 0)
// golang 標(biāo)準(zhǔn)庫(kù)提供的并發(fā)協(xié)調(diào)器,用于實(shí)現(xiàn)指定條件下阻塞和喚醒協(xié)程的操作.
p.cond = sync.NewCond(p.lock)
// 異步啟動(dòng) goWorker 過(guò)期銷毀協(xié)程.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
go p.purgePeriodically(ctx)
return p, nil
}
2.2.2 pool.Submit 提交任務(wù)
func (p *Pool) Submit(task func()) error {
// 從 Pool 中取出一個(gè)可用的 goWorker;
var w *goWorker
if w = p.retrieveWorker(); w == nil {
return ErrPoolOverload
}
// 將用戶提交的任務(wù)包添加到 goWorker 的 channel 中.
w.task <- task
return nil
}
取出goWorker 的實(shí)現(xiàn):
func (p *Pool) retrieveWorker() (w *goWorker) {
// 聲明了一個(gè)構(gòu)造 goWorker 的函數(shù) spawnWorker 用于兜底,從對(duì)象池 workerCache 中獲取 goWorker;
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
// 嘗試從池中取出一個(gè)空閑的 goWorker;
w = p.workers.detach()
if w != nil {
p.lock.Unlock()
// 倘若池子容量未超過(guò)上限, 從對(duì)象池中取出一個(gè) goWorker
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
p.lock.Unlock()
spawnWorker()
} else {
// 倘若池子容量超限,且池子為非阻塞模式,直接拋回錯(cuò)誤;
if p.options.Nonblocking {
p.lock.Unlock()
return
}
// 倘若池子容量超限,且池子為阻塞模式,則基于并發(fā)協(xié)調(diào)器 cond 掛起等待有空閑 worker;
retry:
// 若阻塞任務(wù)已達(dá)最大限制,也直接返回;
if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
// 增加等待數(shù)并使用 cond 條件變量掛起當(dāng)前協(xié)程;
p.addWaiting(1)
p.cond.Wait() // block and wait for an available worker
p.addWaiting(-1)
// 被喚醒后(可能是因?yàn)?scavenger 清理協(xié)程),判斷是否還有運(yùn)行中的 worker;
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
spawnWorker()
return
}
// 再次嘗試重新獲取一個(gè)空閑 worker;
if w = p.workers.detach(); w == nil {
if nw < p.Cap() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
// 獲取到了可用 worker,解鎖并返回;
p.lock.Unlock()
}
return
}
2.2.3 goWorker 運(yùn)行
func (w *goWorker) run() {
w.pool.addRunning(1)
go func() {
defer func() {
w.pool.addRunning(-1)
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
// panic 后處理
}
w.pool.cond.Signal()
}()
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
- 循環(huán) + 阻塞等待,直到獲取到用戶提交的異步任務(wù)包 task 并執(zhí)行;
- 執(zhí)行完成 task 后,會(huì)將自己交還給協(xié)程池;
- 倘若回歸協(xié)程池失敗,或者用戶提交了一個(gè)空的任務(wù)包,則該 goWorker 會(huì)被銷毀,銷毀方式是將自身放回協(xié)程池的對(duì)象池 workerCache. 并且會(huì)調(diào)用協(xié)調(diào)器 cond 喚醒一個(gè)阻塞等待的協(xié)程.
到此這篇關(guān)于Go語(yǔ)言ants協(xié)程池的具體使用的文章就介紹到這了,更多相關(guān)Go語(yǔ)言ants協(xié)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go-zero創(chuàng)建RESTful API 服務(wù)的方法
文章介紹了如何使用go-zero框架和goctl工具快速創(chuàng)建RESTfulAPI服務(wù),通過(guò)定義.api文件并使用goctl命令,可以自動(dòng)生成項(xiàng)目結(jié)構(gòu)、路由、請(qǐng)求和響應(yīng)模型以及處理邏輯,感興趣的朋友一起看看吧2024-11-11
Golang實(shí)現(xiàn)不被復(fù)制的結(jié)構(gòu)體的方法
sync包中的許多結(jié)構(gòu)都是不允許拷貝的,因?yàn)樗鼈冏陨泶鎯?chǔ)了一些狀態(tài)(比如等待者的數(shù)量),如果你嘗試復(fù)制這些結(jié)構(gòu)體,就會(huì)在你的?IDE中看到警告,那這是怎么實(shí)現(xiàn)的呢,下文就來(lái)和大家詳細(xì)講講2023-03-03
go簡(jiǎn)介及國(guó)內(nèi)鏡像源配置全過(guò)程
本文介紹了Go語(yǔ)言的基本概念和環(huán)境配置,包括GOROOT、GOPATH和GOMODULE的設(shè)置,還展示了如何在IDEA中配置Go語(yǔ)言的開發(fā)環(huán)境,并通過(guò)一個(gè)簡(jiǎn)單的“HelloWorld”項(xiàng)目來(lái)熟悉Go語(yǔ)言的基本語(yǔ)法和開發(fā)流程2025-01-01
go程序中同一個(gè)包下為什么會(huì)存在多個(gè)同名的函數(shù)或變量(詳細(xì)解析)
這篇文章主要介紹了go程序中同一個(gè)包下為什么會(huì)存在多個(gè)同名的函數(shù)或變量(詳細(xì)解析),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2024-05-05
Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解
這篇文章主要為大家介紹了Golang迭代之如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10
Golang爬蟲及正則表達(dá)式的實(shí)現(xiàn)示例
本文主要介紹了Golang爬蟲及正則表達(dá)式的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)
本文主要介紹了淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04

