欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go語(yǔ)言ants協(xié)程池的具體使用

 更新時(shí)間:2025年08月21日 10:32:27   作者:onlooker6666  
ants是Go語(yǔ)言中一款高效的協(xié)程池庫(kù),通過(guò)復(fù)用協(xié)程資源優(yōu)化高并發(fā)場(chǎng)景下的性能,本文就來(lái)介紹一下golang中大名鼎鼎的ants協(xié)程池庫(kù)的實(shí)現(xiàn)原理,感興趣的可以了解一下

在 Go 語(yǔ)言中,goroutine 的輕量特性使得高并發(fā)編程變得異常簡(jiǎn)單。然而,隨著并發(fā)量的增加,頻繁創(chuàng)建對(duì)象和無(wú)限制啟動(dòng) goroutine 也可能帶來(lái)內(nèi)存浪費(fèi)、GC 壓力和資源搶占等問(wèn)題。為了解決這些隱患,協(xié)程池成為常用的優(yōu)化手段。用于控制并發(fā)數(shù)量、避免系統(tǒng)過(guò)載。本文將簡(jiǎn)要介紹golang 中大名鼎鼎的 ants 協(xié)程池庫(kù)的實(shí)現(xiàn)原理。

ants包倉(cāng)庫(kù) : https://github.com/panjf2000/ants

為什么用協(xié)程池?

  • 提升性能:主要面向一類場(chǎng)景:大批量輕量級(jí)并發(fā)任務(wù),任務(wù)執(zhí)行成本與協(xié)程創(chuàng)建/銷毀成本量級(jí)接近;
  • 動(dòng)態(tài)調(diào)配并發(fā)資源 : 能夠動(dòng)態(tài)調(diào)整所需的協(xié)程數(shù)量以及各個(gè)模塊的并發(fā)度上限;
  • 協(xié)程生命周期控制:實(shí)時(shí)查看當(dāng)前全局并發(fā)的協(xié)程數(shù)量;有一個(gè)統(tǒng)一的緊急入口釋放全局協(xié)程.

1. 使用方法

安裝ants庫(kù)

go get -u github.com/panjf2000/ants/v2

1.1 創(chuàng)建協(xié)程池 NewPool(size int)

用于創(chuàng)建一個(gè)容量為 size 的協(xié)程池。默認(rèn)情況下,協(xié)程池不會(huì)自動(dòng)擴(kuò)容,因此超出容量限制的任務(wù)會(huì)等待空閑 worker。

import "github.com/panjf2000/ants/v2"

var pool *ants.Pool

func init() {
    var err error
    pool, err = ants.NewPool(10) // 創(chuàng)建容量為10的協(xié)程池
    if err != nil {
        log.Fatalf("Failed to create goroutine pool: %v", err)
    }
}
  • NewPool() 返回的是一個(gè)可復(fù)用的固定容量協(xié)程池,內(nèi)部通過(guò)任務(wù)隊(duì)列與 worker 協(xié)同處理。

1.2 提交任務(wù) Submit(task func())

協(xié)程池的核心方法

// Submit submits a task to the pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error 

使用 Submit() 提交一個(gè)函數(shù)類型任務(wù)給協(xié)程池異步執(zhí)行

示例 :

err := pool.Submit(func() {
    fmt.Println("Task executed by goroutine:", runtime.NumGoroutine())
})
if err != nil {
    log.Println("Failed to submit task:", err)
}
  • 每次調(diào)用 Submit() 不會(huì)阻塞主線程。
  • 如果當(dāng)前運(yùn)行的 goroutine 已達(dá)到上限,任務(wù)將等待空閑 worker。

1.3 釋放協(xié)程池 Release()

釋放協(xié)程池資源,釋放后協(xié)程池不再接受新的任務(wù)提交。

pool.Release()

?? 注意:一旦調(diào)用 Release(),協(xié)程池將被永久關(guān)閉,不能再次使用。再次提交任務(wù)將 panic。

1.4 查詢當(dāng)前運(yùn)行數(shù) Running()

適合用于實(shí)時(shí)監(jiān)控協(xié)程池負(fù)載狀態(tài)。

fmt.Printf("Running goroutines: %d\n", pool.Running())

適合用于實(shí)時(shí)監(jiān)控協(xié)程池負(fù)載狀態(tài)。

1.5 池容量

獲取池容量 Cap()
返回協(xié)程池的最大容量(即最大 goroutine 數(shù)量)??捎糜谂c Running() 搭配分析使用率。

fmt.Printf("Pool capacity: %d\n", pool.Cap())

動(dòng)態(tài)調(diào)整容量 Tune(newSize int)
在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整協(xié)程池容量,適應(yīng)系統(tǒng)負(fù)載變化。

pool.Tune(20) // 將容量調(diào)整為20
  • 擴(kuò)容會(huì)立即生效。
  • 縮容后,多余的 worker 會(huì)在任務(wù)完成后自動(dòng)回收。
  • Tune() 不會(huì)中斷正在執(zhí)行的任務(wù)。

流程

2. 底層實(shí)現(xiàn)

原理篇前置知識(shí)
詳細(xì)請(qǐng)看以往文章 :  sync 包鎖與對(duì)象池

2.1 核心數(shù)據(jù)結(jié)構(gòu)

2.1.1 goWorker

type goWorker struct {
    pool *Pool
    task chan func()
    recycleTime time.Time
}

goWorker 就是我們協(xié)程池里的實(shí)例 , 簡(jiǎn)單理解為一個(gè)長(zhǎng)時(shí)間運(yùn)行而不回收的協(xié)程,用于反復(fù)處理用戶提交的異步任務(wù)

  • pool:goWorker 所屬的協(xié)程池;
  • task:goWorker 用于接收異步任務(wù)包的管道;
  • recycleTime:goWorker 回收到協(xié)程池的時(shí)間.

2.1.2 Pool

type Pool struct {
    capacity int32
    running int32
    lock sync.Locker
    workers workerArray
    state int32
    cond *sync.Cond
    workerCache sync.Pool
    waiting int32
    heartbeatDone int32
    stopHeartbeat context.CancelFunc
    options *Options
}
  • capacity:池子的容量
  • running:出于運(yùn)行中的協(xié)程數(shù)量
  • lock:自制的自旋鎖,保證取 goWorker 時(shí)并發(fā)安全
  • workers:goWorker 列表,即“真正意義上的協(xié)程池”
  • state:池子狀態(tài)標(biāo)識(shí),0-打開;1-關(guān)閉
  • cond:并發(fā)協(xié)調(diào)器,用于阻塞模式下,掛起和喚醒等待資源的協(xié)程
  • waiting:標(biāo)識(shí)出于等待狀態(tài)的協(xié)程數(shù)量;
  • heartbeatDone:標(biāo)識(shí)回收協(xié)程是否關(guān)閉;
  • stopHeartbeat:用于關(guān)閉回收協(xié)程的控制器函數(shù);
  • options:一些定制化的配置.
  • workerCache:存放 goWorker 的對(duì)象池,用于緩存釋放的 goworker 資源用于復(fù)用. 對(duì)象池需要區(qū)別于協(xié)程池,協(xié)程池中的
    goWorker 仍存活,進(jìn)入對(duì)象池的 goWorker 邏輯意義已經(jīng)銷毀;

2.1.3 workerArray

type workerArray interface {
    len() int
    isEmpty() bool
    insert(worker *goWorker) error
    detach() *goWorker
    retrieveExpiry(duration time.Duration) []*goWorker
    reset()
}

該 interface 主要定義了作為數(shù)據(jù)集合的幾個(gè)通用 api,以及用于回收過(guò)期 goWorker 的 api.

  • insert 插入一個(gè) goWorker
  • detach 取出一個(gè) goWorker
  • retrieveExpiry 獲取池中空閑時(shí)間超過(guò) duration 的 已經(jīng)過(guò)期的 goWorker 集合 ,其中 goWorker 的回收時(shí)間與入棧先后順序相關(guān),因此可以借助 binarySearch 方法基于二分法快速獲取到目標(biāo)集合.

2.2 核心方法的實(shí)現(xiàn)

2.2.1 NewPool 創(chuàng)建協(xié)程池

func NewPool(size int, options ...Option) (*Pool, error) {
    // 讀取用戶配置,做一些前置校驗(yàn),默認(rèn)值賦值等前處理動(dòng)作...
    opts := loadOptions(options...)

	// 構(gòu)造好 Pool 數(shù)據(jù)結(jié)構(gòu);
    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    // 構(gòu)造對(duì)象池
    p.workerCache.New = func() interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(), workerChanCap),
        }
    }
    
   // 構(gòu)造好 goWorker 對(duì)象池 workerCache,聲明好工廠函數(shù);
    p.workers = newWorkerArray(stackType, 0)
	
  //  golang 標(biāo)準(zhǔn)庫(kù)提供的并發(fā)協(xié)調(diào)器,用于實(shí)現(xiàn)指定條件下阻塞和喚醒協(xié)程的操作.
    p.cond = sync.NewCond(p.lock)

 // 異步啟動(dòng) goWorker 過(guò)期銷毀協(xié)程.
    var ctx context.Context
    ctx, p.stopHeartbeat = context.WithCancel(context.Background())
    go p.purgePeriodically(ctx)
    return p, nil
}

2.2.2 pool.Submit 提交任務(wù)

func (p *Pool) Submit(task func()) error {

	// 從 Pool 中取出一個(gè)可用的 goWorker;
    var w *goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
	
	// 將用戶提交的任務(wù)包添加到 goWorker 的 channel 中.
    w.task <- task
    return nil
}

取出goWorker 的實(shí)現(xiàn):

func (p *Pool) retrieveWorker() (w *goWorker) {
    // 聲明了一個(gè)構(gòu)造 goWorker 的函數(shù) spawnWorker 用于兜底,從對(duì)象池 workerCache 中獲取 goWorker;
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

	// 嘗試從池中取出一個(gè)空閑的 goWorker;
    w = p.workers.detach()
    if w != nil { 
        p.lock.Unlock()
    
    // 倘若池子容量未超過(guò)上限, 從對(duì)象池中取出一個(gè) goWorker 
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        p.lock.Unlock()
        spawnWorker()
    } else { 

	  // 倘若池子容量超限,且池子為非阻塞模式,直接拋回錯(cuò)誤;
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
	
	// 倘若池子容量超限,且池子為阻塞模式,則基于并發(fā)協(xié)調(diào)器 cond 掛起等待有空閑 worker;
    retry:
      // 若阻塞任務(wù)已達(dá)最大限制,也直接返回;
        if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }

		// 增加等待數(shù)并使用 cond 條件變量掛起當(dāng)前協(xié)程;
        p.addWaiting(1)
        p.cond.Wait() // block and wait for an available worker
        p.addWaiting(-1)

	 // 被喚醒后(可能是因?yàn)?scavenger 清理協(xié)程),判斷是否還有運(yùn)行中的 worker;
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            spawnWorker()
            return
        }
  
      // 再次嘗試重新獲取一個(gè)空閑 worker;
        if w = p.workers.detach(); w == nil {
            if nw < p.Cap() {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }
       // 獲取到了可用 worker,解鎖并返回;
        p.lock.Unlock()
    }
    return
}

2.2.3 goWorker 運(yùn)行

func (w *goWorker) run() {
    w.pool.addRunning(1)
    go func() {
        defer func() {
            w.pool.addRunning(-1)
            w.pool.workerCache.Put(w)
            if p := recover(); p != nil {
                // panic 后處理
            }
            w.pool.cond.Signal()
        }()

        for f := range w.task {
            if f == nil {
                return
            }
            f()
            if ok := w.pool.revertWorker(w); !ok {
                return
            }
        }
    }()
  • 循環(huán) + 阻塞等待,直到獲取到用戶提交的異步任務(wù)包 task 并執(zhí)行;
  • 執(zhí)行完成 task 后,會(huì)將自己交還給協(xié)程池;
  • 倘若回歸協(xié)程池失敗,或者用戶提交了一個(gè)空的任務(wù)包,則該 goWorker 會(huì)被銷毀,銷毀方式是將自身放回協(xié)程池的對(duì)象池 workerCache. 并且會(huì)調(diào)用協(xié)調(diào)器 cond 喚醒一個(gè)阻塞等待的協(xié)程.

到此這篇關(guān)于Go語(yǔ)言ants協(xié)程池的具體使用的文章就介紹到這了,更多相關(guān)Go語(yǔ)言ants協(xié)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • go設(shè)置多個(gè)GOPATH的方式

    go設(shè)置多個(gè)GOPATH的方式

    這篇文章主要介紹了go設(shè)置多個(gè)GOPATH的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-05-05
  • go-zero創(chuàng)建RESTful API 服務(wù)的方法

    go-zero創(chuàng)建RESTful API 服務(wù)的方法

    文章介紹了如何使用go-zero框架和goctl工具快速創(chuàng)建RESTfulAPI服務(wù),通過(guò)定義.api文件并使用goctl命令,可以自動(dòng)生成項(xiàng)目結(jié)構(gòu)、路由、請(qǐng)求和響應(yīng)模型以及處理邏輯,感興趣的朋友一起看看吧
    2024-11-11
  • Golang實(shí)現(xiàn)不被復(fù)制的結(jié)構(gòu)體的方法

    Golang實(shí)現(xiàn)不被復(fù)制的結(jié)構(gòu)體的方法

    sync包中的許多結(jié)構(gòu)都是不允許拷貝的,因?yàn)樗鼈冏陨泶鎯?chǔ)了一些狀態(tài)(比如等待者的數(shù)量),如果你嘗試復(fù)制這些結(jié)構(gòu)體,就會(huì)在你的?IDE中看到警告,那這是怎么實(shí)現(xiàn)的呢,下文就來(lái)和大家詳細(xì)講講
    2023-03-03
  • go簡(jiǎn)介及國(guó)內(nèi)鏡像源配置全過(guò)程

    go簡(jiǎn)介及國(guó)內(nèi)鏡像源配置全過(guò)程

    本文介紹了Go語(yǔ)言的基本概念和環(huán)境配置,包括GOROOT、GOPATH和GOMODULE的設(shè)置,還展示了如何在IDEA中配置Go語(yǔ)言的開發(fā)環(huán)境,并通過(guò)一個(gè)簡(jiǎn)單的“HelloWorld”項(xiàng)目來(lái)熟悉Go語(yǔ)言的基本語(yǔ)法和開發(fā)流程
    2025-01-01
  • Go使用chan或context退出協(xié)程示例詳解

    Go使用chan或context退出協(xié)程示例詳解

    這篇文章主要為大家介紹了Go使用chan或context退出協(xié)程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-08-08
  • GO語(yǔ)言中接口和接口型函數(shù)的具體使用

    GO語(yǔ)言中接口和接口型函數(shù)的具體使用

    本文主要介紹了GO語(yǔ)言中接口和接口型函數(shù)的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • go程序中同一個(gè)包下為什么會(huì)存在多個(gè)同名的函數(shù)或變量(詳細(xì)解析)

    go程序中同一個(gè)包下為什么會(huì)存在多個(gè)同名的函數(shù)或變量(詳細(xì)解析)

    這篇文章主要介紹了go程序中同一個(gè)包下為什么會(huì)存在多個(gè)同名的函數(shù)或變量(詳細(xì)解析),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2024-05-05
  • Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解

    Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解

    這篇文章主要為大家介紹了Golang迭代之如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-10-10
  • Golang爬蟲及正則表達(dá)式的實(shí)現(xiàn)示例

    Golang爬蟲及正則表達(dá)式的實(shí)現(xiàn)示例

    本文主要介紹了Golang爬蟲及正則表達(dá)式的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • 淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)

    淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)

    本文主要介紹了淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04

最新評(píng)論