golang中ants協(xié)程池使用和實現(xiàn)邏輯
golang中goroutine由運行時管理,使用go關鍵字就可以方便快捷的創(chuàng)建一個goroutine,受限于服務器硬件內存大小,如果不對goroutine數(shù)量進行限制,會出現(xiàn)Out of Memory錯誤。但是goroutine泄漏引發(fā)的血案,想必各位gopher都經(jīng)歷過,通過協(xié)程池限制goroutine數(shù)一個有效避免泄漏的手段,但是自己手動實現(xiàn)一個協(xié)程池,總是會兼顧不到各種場景,比如釋放,處理panic,動態(tài)擴容等。那么ants是公認的優(yōu)秀實現(xiàn)協(xié)程池。
ants簡介
ants是一個高性能的 goroutine 池,實現(xiàn)了對大規(guī)模 goroutine 的調度管理、goroutine 復用,允許使用者在開發(fā)并發(fā)程序的時候限制 goroutine 數(shù)量,復用資源,達到更高效執(zhí)行任務的效果
功能
- 自動調度海量的 goroutines,復用 goroutines
- 定期清理過期的 goroutines,進一步節(jié)省資源
- 提供了大量有用的接口:任務提交、獲取運行中的 goroutine 數(shù)量、動態(tài)調整 Pool 大小、釋放 Pool、重啟 Pool
- 優(yōu)雅處理 panic,防止程序崩潰
- 資源復用,極大節(jié)省內存使用量;在大規(guī)模批量并發(fā)任務場景下比原生 goroutine 并發(fā)具有更高的性能
- 非阻塞機制
1.ants庫結構
學習一個庫先從結構看起吧,pool、pool_func、ants初始化一個pool等操作都在這里

ants庫代碼結構
- pool.go提供了ants.NewPool(創(chuàng)建協(xié)程池)、Submit(task func())提交任務
- pool_func.go使用NewPoolWithFunc(創(chuàng)建pool對象需要帶具體的函數(shù)),并且使用Invoke(args interface{})進行調用,arg就是傳給池函數(shù)func(interface{})的參數(shù)
- options.go使用函數(shù)選項模式進行參數(shù)配置
- ants.go給初始化默認協(xié)程池對象defaultAntsPool(默認的pool容量是math.MaxInt32)提供了公共函數(shù)
介紹完了主要的庫文件后,我們進行逐個的了解,具體的使用,我們可以結合官方的使用案例進行了解,這里就不進行展開了。
2.ants中Pool創(chuàng)建對象
創(chuàng)建Pool對象需調用ants.NewPool(size, options)函數(shù),返回一個pool的指針
先看Pool的接口,對我們創(chuàng)建的Pool先做個初步印象

Pool結構體
// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
if size <= 0 {
size = -1
}
if expiry := opts.ExpiryDuration; expiry < 0 {
return nil, ErrInvalidPoolExpiry
} else if expiry == 0 {
opts.ExpiryDuration = DefaultCleanIntervalTime
}
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &Pool{
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
p.cond = sync.NewCond(p.lock)
// Start a goroutine to clean up expired workers periodically.
go p.purgePeriodically()
return p, nil
}
ants.NewPool創(chuàng)建Pool過程
- 接收size參數(shù)作為pool的容量,如果size<=0,那么不對池子容量進行限制
- loadOptions對Pool的配置,比如是否阻塞模式,
- workerCache這個sync.Pool對象的New方法,在調用sync.Pool的Get()方法時,如果為nil,則返回workerCache.New()的結果
- 是否初始化Pool是進行內存預分配(size > 0),來創(chuàng)建不同的worker(stack、loopQueue兩種模式)
- 使用p.lock鎖創(chuàng)建一個條件變量
- 開啟一個協(xié)程定期清理過期的workers
3.ants中的PoolWithFunc
ants.PoolWithFunc創(chuàng)建PoolWithFunc和New.Pool整體的結構很像,多了個poolFunc func(interface{})字段,也就是提交到池子的函數(shù),然后workers的類型不一樣
4.理解worker
可以查看出pool中的worker在整個流程起著很重要的作用,也就是ants中為每個任務都是由 worker 對象來處理的,每個work都會創(chuàng)建一個goroutine來處理任務,ants中的worker結構如下
type goWorker struct {
//work的所屬者
pool *Pool
//任務通道,通過這個發(fā)送給goWorker
task chan func()
//將work放入到隊列時更新
recycleTime time.Time
}
從ants.Pool創(chuàng)建對象Pool的過程第四步可以看出,通過newWorkerArray創(chuàng)建workers,因為workerArray是個接口,有如下方法。
type workerArray interface {
len() int
isEmpty() bool
insert(worker *goWorker) error
detach() *goWorker
retrieveExpiry(duration time.Duration) []*goWorker
reset()
}
通過newWorkerArray,返回實現(xiàn)了workerArray接口的workerStack,這里newWorkerArray其實是用了個工廠方法來實現(xiàn)的,根據(jù)傳入的類型,并不需要知道具體實現(xiàn)了接口的結構體,只要實現(xiàn)了workerArray接口就可以返回實現(xiàn)者的結構體,然后調用具體的實現(xiàn)
5.提交任務Submit
Submit(task func())接收一個func作為參數(shù),將task通過通道task將類型為func的函數(shù)給到goWorker,然后調用retrieveWorker返回一個可用的worker給task
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
執(zhí)行過程分析:
- spawnWorker是一個func,從p.workerCache這個sync.Pool獲取一個goWorker對象(在New.Pool中有講到),用sync.Locker上鎖
- 調用p.workers.detach方法(前面提到p.workers實現(xiàn)了workerArray接口)
- 如果獲取到了goWorker對象就直接返回
- 如果worker隊列為空,并且Pool還有容量,那么調用spawnWorker,調用worker的run方法啟動一個新的協(xié)程處理任務
- run方法的實現(xiàn)如下,從goWorker的channel中遍歷待執(zhí)行的func(),執(zhí)行,并且在執(zhí)行完后調用revertWorker放回workers
func (w *goWorker) run() {
w.pool.incRunning()
go func() {
for f := range w.task {
if f == nil {
return
}
f()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
6.釋放和重啟Pool
釋放和重啟Pool分別調用了Release和Reboot,這兩個函數(shù)都在ants.Pool這個文件中可以找到,具體實現(xiàn)這里做個簡單說明
- Release調用p.workers.reset()結束loopQueue或wokerStack中的 goroutine。都是通過發(fā)送nil到goWorker的task通道中,然后重置各個字段的值
- Reboot調用purgePeriodically,檢測到Pool關閉了就直接退出了
7.細節(jié)
task緩沖通道
下面這個是NewPool變量workerCachesyn類型sync.Pool創(chuàng)建goWorker對象的代碼
p.workerCache.New = func() interface{} {
return &goWorker{
pool: p,
task: make(chan func(), workerChanCap),
}
}
workerChanCap作為容量,這個變量定義在ants.go文件中的定義如下:
// workerChanCap determines whether the channel of a worker should be a buffered channel
// to get the best performance. Inspired by fasthttp at
// https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
workerChanCap = func() int {
// Use blocking channel if GOMAXPROCS=1.
// This switches context from sender to receiver immediately,
// which results in higher performance (under go1.5 at least).
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the sender might be dragged down if the receiver is CPU-bound.
return 1
}()
ants參考了著名的 Web框架fasthttp的實現(xiàn)。當GOMAXPROCS為 1時(即操作系統(tǒng)線程數(shù)為1),向通道task發(fā)送會掛起發(fā)送 goroutine,將執(zhí)行流程轉向接收goroutine,這能提升接收處理性能。如果GOMAXPROCS大于1,ants使用帶緩沖的通道,為了防止接收 goroutine 是 CPU密集的,導致發(fā)送 goroutine 被阻塞。
自旋鎖 SpinLock
在NewPool中l(wèi)ock,其實給lock初始化了一個自旋鎖,這里是利用atomic.CompareAndSwapUint32()這個原子操作實現(xiàn)的,在加鎖失敗后不會等待,而是繼續(xù)嘗試,提高了加鎖減鎖的性能
在開發(fā)中剛好遇到需要ants,這次也做個記錄作為分享,其實慢慢的會發(fā)現(xiàn)三方庫的xx_test用例是最好的學習例子,希望能和大家一起知其然知其所以然,加油!
到此這篇關于golang中ants協(xié)程池使用和實現(xiàn)邏輯的文章就介紹到這了,更多相關golang ants協(xié)程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Golang中goroutine和channel使用介紹深入分析
一次只做一件事情并不是完成任務最快的方法,一些大的任務可以拆解成若干個小任務,goroutine可以讓程序同時處理幾個不同的任務,goroutine使用channel來協(xié)調它們的工作,channel允許goroutine互相發(fā)送數(shù)據(jù)并同步,這樣一個goroutine就不會領先于另一個goroutine2023-01-01
gorm 結構體中 binding 和 msg 結構體標簽示例詳解
文章介紹了Gin框架中binding和msg結構體標簽的使用,包括基本用法、常用驗證規(guī)則、自定義驗證器、錯誤信息自定義、控制器使用示例、組合驗證規(guī)則、跨字段驗證和初始化驗證器等,這些標簽主要用于數(shù)據(jù)驗證、自定義錯誤信息、參數(shù)綁定和表單驗證2024-11-11

