欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池

 更新時間:2023年10月24日 08:02:14   作者:yesAnd92  
這篇文章主要為大家詳細介紹了Go語言如何實現(xiàn)一個最簡化的協(xié)程池,文中的示例代碼講解詳細,具有一定的參考價值,有需要的小伙伴可以了解一下

背景

學習完優(yōu)秀的協(xié)程池開源項目ants之后,想要對協(xié)程池做了一個總結,在ants的基礎上做了簡化,保留了一個協(xié)程池的核心功能,旨在幫助協(xié)程池的理解和使用。

為什么要用協(xié)程池

  • 降低資源開銷:協(xié)程池可以重復使用協(xié)程,減少創(chuàng)建、銷毀協(xié)程的開銷,從而提高系統(tǒng)性能。
  • 提高響應速度:接收任務后可以立即執(zhí)行,無需等待創(chuàng)建協(xié)程時間。
  • 增強可管理型:可以對協(xié)程進行集中調(diào)度,統(tǒng)一管理,方便調(diào)優(yōu)。

實現(xiàn)協(xié)程池都需要實現(xiàn)哪些功能

思考一下,如果如果讓你實現(xiàn)一個協(xié)程池,有哪些必要的核心功能呢?

  • 協(xié)程池需要提供一個對外接收任務提交的接口
  • 如何創(chuàng)建一個協(xié)程,創(chuàng)建好的協(xié)程存放在哪里?
  • 協(xié)程池中的協(xié)程在沒有task時,是如何存在的?
  • 如何為使用者提交的task分配一個協(xié)程?
  • 協(xié)程執(zhí)行完task后,如何返還到協(xié)程池中?

協(xié)程池內(nèi)部需要維護一個裝有一個個協(xié)程的隊列,用于存放管理的協(xié)程,為了拓展功能方便,我們把每個協(xié)程都封裝一個worker,這個worker隊列需要具備幾個核心功能:

協(xié)程池整體的架構

帶著這些如何實現(xiàn)一個簡單協(xié)程池必要核心功能的問題,我們來看下,一個協(xié)程池的核心流程,用圖來表示就是:

從圖上可以看出協(xié)程池主要包括3個組件

協(xié)程池(gorutine-pool) :它是整個協(xié)程池的入口和主體,內(nèi)部持有一個協(xié)程隊列,用于存放、調(diào)度worker。

協(xié)程隊列(worker-queue) :持有協(xié)程池維護的所有的協(xié)程,為了拓展方便,將協(xié)程封裝成worker,一個worker對應一個協(xié)程。

worker:每個worker對應一個協(xié)程,它能夠運行一個任務,通常是個函數(shù),是真正干活的地方。

主要流程:

當一個使用者一個task提交后,協(xié)程池從workerQueue中獲取一個可用的worker負責執(zhí)行此task,如果worker隊列中沒有可用的worker,并且worker的數(shù)量還沒有達到隊列設置最大數(shù)量,可以新建一個worker補充到隊列中,worker執(zhí)行完任務后,還需要能夠將自己返還到workder隊列中,才能達到復用的目的

三個組件的實現(xiàn)

分別來看下三個組件是如何實現(xiàn)協(xié)程池的

gorutine-pool實現(xiàn)

// Pool pool負責worker的調(diào)度
type Pool struct {
	//pool最大最大線程數(shù)量
	cap int32
	//當前運行的worker數(shù)量
	running int32
	//worker隊列
	workers workerQueue
	//控制并發(fā)訪問臨界資源
	lock sync.Mutex
}

pool結構體中caprunning兩個屬性用來管理協(xié)程池的數(shù)量,workers存放創(chuàng)建的協(xié)程,lock控制并發(fā)訪問臨界資源。

從上述的架構圖中可以看出,pool需要對外提供接收task的方法,以及兩個內(nèi)部從workerQueue獲取worker、返還worker到workerQueue的方法。

Submit

// Submit 提交任務
func (p *Pool) Submit(f func()) error {

   if worker := p.retrieveWorker(); worker != nil {
      worker.inputFunc(f)
      return nil
   }
   return ErrPoolOverload
}

Submit()是給調(diào)用者提交task任務的方法,它的入?yún)⑹且粋€函數(shù),這個函數(shù)就是協(xié)程池使用者想讓協(xié)程池執(zhí)行的內(nèi)容。協(xié)程池pool會嘗試為這個task分配一個worker來處理task,但是,如果協(xié)程池的worker都被占用,并且有數(shù)量限制無法再創(chuàng)建新的worker,pool也無能為力,這里會返回給調(diào)用者一個"過載"的異常,當然這里可以拓展其它的拒絕策略。

retrieveWorker()

//從workerQueue中獲取一個worker
func (p *Pool) retrieveWorker() worker {

	p.lock.Lock()
	w := p.workers.detach()
	if w != nil {
		p.lock.Unlock()
	} else {
		//沒有拿到可用的worker
		//如果容量還沒耗盡,再創(chuàng)建一個worker
		if p.running < p.cap {
			w = &goWorker{
				pool: p,
				task: make(chan func()),
			}
			w.run()
		}
		p.lock.Unlock()
	}
	return w

}

retrieveWorker()是從pool中的workerQueue中獲取一個worker具體實現(xiàn)。獲取worker是一個并發(fā)操作,這里使用鎖控制并發(fā)。調(diào)用workers.detach()從workerQueue中獲取worker,如果沒有拿到可用的worker,這時候還需要看看目前pool中現(xiàn)存活的worker數(shù)量是否已經(jīng)達到上限,未達上限,則可以創(chuàng)建新的worker加入到pool中。

revertWorker()

// 執(zhí)行完任務的worker返還到workerQueue
func (p *Pool) revertWorker(w *goWorker) bool {

	defer func() {
		p.lock.Unlock()
	}()

	p.lock.Lock()
	//判斷容量,如果協(xié)程存活數(shù)量大于容量,銷毀
	if p.running < p.cap {
		p.workers.insert(w)
		return true
	}
	return false
}

返還當前worker到pool是在worker執(zhí)行完task之后,返回時需要判斷當前存活的worker數(shù)量是否到達pool的上限,已達上限則返回失敗。另外,由于running屬性存在并發(fā)訪問的問題,返還操作也需要加鎖。

workerQueue實現(xiàn)

為了提高拓展性,我們將workerQueue抽象成接口,也就是說可以有多種協(xié)程隊列實現(xiàn)來適配更多的使用場景

workerQueue接口

// 定義一個協(xié)程隊列的接口
type workerQueue interface {

   //隊列長度
   len() int

   //插入worker
   insert(w worker)

   //分派一個worker
   detach() worker
}

插入insert()和分配detach()是實現(xiàn)協(xié)程隊列的核心方法。這里我們以底層基于“棧”思想的結構作為workerQueue的默認實現(xiàn),也即后進入隊列的協(xié)程優(yōu)先被分配使用。

// 底層構造一個棧類型的隊列來管理多個worker
type workerStack struct {
   items []worker
}

我們使用數(shù)組來存放worker,用數(shù)組來模擬先進后出的協(xié)程隊列

// 新創(chuàng)建一個worker
func (ws *workerStack) insert(w worker) {
   ws.items = append(ws.items, w)
}

workerStack的insert()實現(xiàn)很簡單,直接在數(shù)組尾巴追加一個worker

// 分配一個worker
func (ws *workerStack) detach() worker {

   l := ws.len()

   if l == 0 {
      return nil
   }
   w := ws.items[l-1]
   ws.items[l-1] = nil
   ws.items = ws.items[:l-1]

   return w
}

detach()負責從數(shù)組中獲取一個可用的空閑worker,每次獲取時取用的是數(shù)組的最后一個元素,也就是協(xié)程隊列末尾的worker優(yōu)先被分配出去了。

注意這里將下標l-1位置的對象置為nil,可以防止內(nèi)存泄露

worker實現(xiàn)

type worker interface {
	workId() string

	run()
	//接收函數(shù)執(zhí)行任務
	inputFunc(func())
}

type goWorker struct {
	workerId string

	//需要持有自己所屬的 Pool 因為要和它進行交互
	pool *Pool

	task chan func()
}

這里同樣了為了拓展,將worker抽象成了一個接口,goWorker是它的一個默認實現(xiàn),worker最核心的工作就是等待著task到來,接到task后執(zhí)行,task具體來說就是一個函數(shù)。這里其實是一個很簡單的生產(chǎn)者/消費者模型,我們想到使用管道來實現(xiàn)生產(chǎn)消費模型,定義一個函數(shù)類型的管道,你或許要問為什么使用管道,還有別的方式可以實現(xiàn)這個功能么?不急,我們來看看worker要實現(xiàn)什么功能:

  • 創(chuàng)建出來的worker,未必馬上就有task分配過來執(zhí)行,它肯定要把自己“阻塞”住,隨時等待task到來
  • task傳遞過來終歸需要一個介質(zhì)

鑒于這個場景,使用管道式非常合適的,管道內(nèi)沒有元素時,worker阻塞等待,當管道內(nèi)有task進來時,worker被喚醒,從管道中取出task進行處理。當然,我們使用一個死循環(huán),不斷自旋的從一個容器中讀取task,也能達到同樣的目的,但卻沒有使用管道合適、優(yōu)雅!

func (g *goWorker) run() {

	go func() {

		defer func() {
			atomic.AddInt32(&g.pool.running, -1)
		}()

		//running+1
		atomic.AddInt32(&g.pool.running, 1)

		for {

			select {
			case f := <-g.task:
				if f == nil {
					return
				}
				//執(zhí)行提交的任務
				f()
			}
			//worker返還到queue中
			if ok := g.pool.revertWorker(g); !ok {
				return
			}
		}
	}()
}

func (g *goWorker) inputFunc(f func()) {
	g.task <- f
}

run()是worker的核心方法,worker通常被創(chuàng)建后就會調(diào)用run(),一起來看下主要做了那些內(nèi)容:

  • 使用select 監(jiān)聽task管道,阻塞當前協(xié)程,回到管道內(nèi)有task進入
  • 監(jiān)聽到task,則會執(zhí)行task的內(nèi)容
  • task執(zhí)行完畢后,會調(diào)用poo.revertWorker()將當前worker返還到協(xié)程池中待用,當然這里未必會返成功。

很容易忽略的一個點是:為什么要新啟動一個協(xié)程來完成以上工作?因為worker中沒有task時,要阻塞等待任務,如果不是在一個新的協(xié)程中,整個程序都阻塞在第一個worker的run()中,所謂協(xié)程池,就是指每個worker對應的這個協(xié)程。

另外,pool維護一個running屬性來表示存活的worker數(shù)量,當調(diào)用run()方法后,表示worker是可用的了,running值+1。如果worker返回協(xié)程池失敗,run()執(zhí)行完畢,worker對應的協(xié)程被系統(tǒng)銷毀,表示當前worker生命周期結束了,對應的寫成會將running值-1。由于多個worker并發(fā)修改running值,使用了atomic.AddInt32控制臨界資源的修改。

至此,實現(xiàn)一個簡單協(xié)程池的核心的功能都已經(jīng)完成,和ants相比,這是一個相當精簡的協(xié)程池,旨在幫助我們加深對協(xié)程池、線程池這類組件模型的理解,離真正可用有段距離。ants中更完備的功能,比如:ants實現(xiàn)了定期清理空閑worker,以及對鎖的優(yōu)化、worker的池化等等,感興趣的可以看看ants,短小精悍的開源項目!

到此這篇關于詳解Go語言如何實現(xiàn)一個最簡化的協(xié)程池的文章就介紹到這了,更多相關Go協(xié)程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • golang croncli 定時器命令詳解

    golang croncli 定時器命令詳解

    定時器是執(zhí)行任務時的常用功能,配置系統(tǒng)的定時任務太麻煩,所以就想用golang簡單實現(xiàn)一個定時器命令,包括定時器命令格式、定時執(zhí)行命令的相關知識,感興趣的朋友跟隨小編一起看看吧
    2022-03-03
  • Golang打印復雜結構體兩種方法詳解

    Golang打印復雜結構體兩種方法詳解

    在?Golang?語言開發(fā)中,我們經(jīng)常會使用結構體類型,如果我們使用的結構體類型的變量包含指針類型的字段,我們在記錄日志的時候,指針類型的字段的值是指針地址,將會給我們?debug?代碼造成不便
    2022-10-10
  • golang順時針打印矩陣的方法示例

    golang順時針打印矩陣的方法示例

    這篇文章主要介紹了golang順時針打印矩陣的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • 使用Golang打印特定的日期時間的操作

    使用Golang打印特定的日期時間的操作

    這篇文章主要給大家詳細介紹了如何使用Golang打印特定的日期時間的操作,文中有詳細的代碼示例,具有一定的參考價值,需要的朋友可以參考下
    2023-07-07
  • gin自定義中間件解決requestBody不可重讀(請求體取值)

    gin自定義中間件解決requestBody不可重讀(請求體取值)

    這篇文章主要介紹了gin自定義中間件解決requestBody不可重讀,確??刂破髂軌颢@取請求體值,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-10-10
  • golang使用map支持高并發(fā)的方法(1000萬次操作14ms)

    golang使用map支持高并發(fā)的方法(1000萬次操作14ms)

    這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬次操作14ms),本文給大家詳細講解,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-11-11
  • 使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄

    使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄

    gorm是目前用得最多的go語言orm庫,本文主要介紹了使用Gorm操作Oracle數(shù)據(jù)庫踩坑記錄,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-06-06
  • Golang定制化zap日志庫使用過程分析

    Golang定制化zap日志庫使用過程分析

    Zap是我個人比較喜歡的日志庫,是uber開源的,有較好的性能,在項目開發(fā)中,經(jīng)常需要把程序運行過程中各種信息記錄下來,有了詳細的日志有助于問題排查和功能優(yōu)化,但如何選擇和使用性能好功能強大的日志庫,這個就需要我們從多角度考慮
    2023-03-03
  • golang return省略用法說明

    golang return省略用法說明

    這篇文章主要介紹了golang return省略用法說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Gin框架使用panic處理中間件問題詳解

    Gin框架使用panic處理中間件問題詳解

    這篇文章主要介紹了Gin框架使用panic處理中間件問題,在 Gin 框架中,錯誤處理和 panic 處理是非常重要的功能。當處理 HTTP 請求時,可能會出現(xiàn)各種各樣的錯誤,例如數(shù)據(jù)庫連接錯誤、網(wǎng)絡錯誤、權限問題等等
    2023-04-04

最新評論