深入理解Go工作池
前言
我們使用Go語言開發(fā)項(xiàng)目,常常會(huì)使用到goroutine;goroutine太多會(huì)造成系統(tǒng)占用過高或其他系統(tǒng)異常,我們可以將goroutine控制指定數(shù)量,且減少goroutine的創(chuàng)建,這就運(yùn)用到Go工作池,下面就介紹和使用一下。
一、概念
我們可以將工作池理解為線程池。線程池的創(chuàng)建和銷毀非常消耗資源,所以專門寫一個(gè)pool,每次用過的線程池再放回pool中而不是銷毀。不過在Go語言中不會(huì)使用系統(tǒng)的線程,而是使用goroutine。gorotine的創(chuàng)建和銷毀比系統(tǒng)線程的消耗要小的多,而且goroutine沒有標(biāo)號(hào)。所以goroutine的pool就不再時(shí)線程池,而是work pool(工作池)。
雖然goroutine的系統(tǒng)消耗較小,但也不能隨意在編碼時(shí)使用go func(),如果程序頻繁啟動(dòng)goroutine,會(huì)造成極其不可控性能問題。對(duì)于可以提前預(yù)知的大量異步處理的任務(wù)就要考慮使用工作池。
工作池的作用控制goroutine的規(guī)模,或者說是goroutine的數(shù)量。在Go語言中,控制goroutine的數(shù)量最好方式就是使用緩存通道。
二、實(shí)例
1.簡單示例
下面是Go語言解決工作池的經(jīng)典用法。
func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { fmt.Printf("worker(%d) start to do job(%d)\n", id, job) time.Sleep(time.Second) fmt.Printf("worker(%d) finished job(%d)\n", id, job) results <- job } } func main() { // 為了使用我們的工作池,我們需要發(fā)送工作和接受工作的結(jié)果, // 這里我們定義兩個(gè)通道,一個(gè)jobs,一個(gè)results jobs := make(chan int, 100) results := make(chan int, 100) // 開啟3個(gè)goroutine for id := 1; id <= 3; id++ { go worker(id, jobs, results) } // 創(chuàng)建5個(gè)任務(wù) for job := 1; job <= 5; job++ { jobs <- job } close(jobs) // 輸出結(jié)果 for i := 1; i <= 5; i++ { <-results } }
上述代碼工作池思想主要體現(xiàn)在jobs的通道上,因?yàn)槎x了一個(gè)緩存長度為100的通道,所以在通道到100以后,新任務(wù)就會(huì)阻塞,只有等worker從通道取走一個(gè)工作以后才能繼續(xù)分配新工作。
本案例較為簡單,如果worker的數(shù)量較大,業(yè)務(wù)執(zhí)行時(shí)間較長的話,我們需要在程序設(shè)計(jì)上將jobs和worker的模式進(jìn)行優(yōu)化,每個(gè)worker處理一項(xiàng)工作,工作池可以自定義最大數(shù)量的worker;這樣可以保證goroutine的最大數(shù)量,可程序更加可控,避免代碼消耗壓垮系統(tǒng)。
2.讀入數(shù)據(jù)
下面時(shí)改良之后代碼
1package main import ( "fmt" "reflect" "time" ) // Job 任務(wù)內(nèi)容 type Job struct { ID int Name string } // Worker 工作 type Worker struct { id int // id WorkerPool chan chan Job // 工作者池(通道的通道),每個(gè)元素都是一個(gè)job通道, 公共的job JobChannel chan Job // 工作通道,每個(gè)元素是一個(gè)job,worker私有的job exit chan bool // 結(jié)束信號(hào) } var ( MaxWorker = 5 // 最大worker數(shù)量 JobQueue = make(chan Job, 5) // 工作通道,模擬需處理的工作 ) // Scheduler 排程中心 type Scheduler struct { WorkerPool chan chan Job // 工作池 WorkerMaxNum int // 最大工作者數(shù) Workers []*Worker // worker隊(duì)列 } // NewScheduler 創(chuàng)建排程中心 func NewScheduler(workerMaxNum int) *Scheduler { workerPool := make(chan chan Job, workerMaxNum) // 工作池 return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum} } // Start 工作池開始 func (s *Scheduler) Start() { Workers := make([]*Worker, s.WorkerMaxNum) for i := 0; i < s.WorkerMaxNum; i++ { worker := NewWorker(s.WorkerPool, i) worker.Start() Workers[i] = &worker } s.Workers = Workers go s.schedule() } // Stop 工作池的關(guān)閉 func (s *Scheduler) Stop() { Workers := s.Workers for _, w := range Workers { w.Stop() } time.Sleep(time.Second) close(s.WorkerPool) } func NewWorker(WorkerPool chan chan Job, id int) Worker { fmt.Printf("new a worker(%d)\n", id) return Worker{ id: id, WorkerPool: WorkerPool, JobChannel: make(chan Job), exit: make(chan bool), } } // Start 監(jiān)聽任務(wù)和結(jié)束信號(hào) func (w Worker) Start() { go func() { for { select { case job := <-w.JobChannel: // 收到任務(wù) fmt.Println("get a job from private w.JobChannel") fmt.Println(job) case <-w.exit: // 收到結(jié)束信號(hào) fmt.Println("worker exit", w) return } } }() } func (w Worker) Stop() { go func() { w.exit <- true }() } // 排程 func (s *Scheduler) schedule() { for { select { case job := <-JobQueue: fmt.Println("get a job from JobQueue") go func(job Job) { //從WorkerPool獲取jobChannel,忙時(shí)阻塞 jobChannel := <-s.WorkerPool fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel)) jobChannel <- job fmt.Println("worker's private jobChannel add one job") }(job) } } } func main() { scheduler := NewScheduler(MaxWorker) scheduler.Start() jobQueue() scheduler.Stop() } // 模擬Job任務(wù) func jobQueue() { for i := 1; i <= 30; i++ { JobQueue <- Job{ID: i, Name: fmt.Sprintf("Job【%d】", i)} fmt.Printf("jobQueue add %d job\n", i) } }
定義了兩個(gè)結(jié)構(gòu)體:Task任務(wù)和Job工作,Task并沒有實(shí)質(zhì)性的內(nèi)容,這里僅僅定義了一個(gè)整型變量;
定義兩個(gè)全局變量:MaxWorker是最大的worker數(shù)量;JobQueue是Job的通道。這兩個(gè)變量都用于后面的模擬,在真實(shí)場(chǎng)景中可以不設(shè)置這兩個(gè)變量。
定義了一個(gè)Worker結(jié)構(gòu)體,與上一個(gè)簡單工作池的示例不同,本例的Worker不再是簡單的一個(gè)goroutine,而是一個(gè)結(jié)構(gòu)體。結(jié)構(gòu)體內(nèi)定義了如下四個(gè)變量。?id:worker編號(hào)。?exit:這是一個(gè)bool類型的通道,當(dāng)有數(shù)據(jù)寫入時(shí)worker結(jié)束運(yùn)行。?JobChannel:Job類型的通道,該通道是專屬于當(dāng)前worker的私有工作隊(duì)列。?WorkerPool:注意看,定義的時(shí)候使用了兩個(gè)Channel,每一個(gè)元素是一個(gè)Job通道,其實(shí)每一個(gè)元素是一個(gè)JobChannel。
NewWorker方法用于創(chuàng)建一個(gè)新的worker,要注意該方法的參數(shù)workerPool用于創(chuàng)建worker時(shí)傳入,這就說明每個(gè)worker與其他worker的WorkerPool是共享的,或者說多個(gè)worker使用一個(gè)WorkerPool。這一點(diǎn)很重要,這是本示例代碼在上一個(gè)簡單示例代碼基礎(chǔ)上的優(yōu)化。而JobChannel和exit變量則是隨著Worker的新建而新建的。
Worker的Start方法,該方法用于監(jiān)聽任務(wù)或者結(jié)束信號(hào)。Start方法一開始就用goroutine運(yùn)行一個(gè)匿名函數(shù),而函數(shù)內(nèi)部是一個(gè)無限循環(huán)。在循環(huán)內(nèi)部,首先是把當(dāng)前的JobChannel注冊(cè)到WorkerPool里,一旦注冊(cè)進(jìn)去也就說明該worker可以接收任務(wù)了。然后通過select判斷JobChannel是否可以讀取,也就是其中是否有Job,或者exit通道是否可以讀取。如果JobChannel可讀取,證明有Job,后續(xù)開始處理Job;而如果exit可讀,則結(jié)束當(dāng)前的無限循環(huán)。所以,后面的代碼中要特別注意對(duì)WorkerPool的操作,Worker是從WorkerPool領(lǐng)取工作的。Worker的Stop方法,用于為exit通道寫入數(shù)據(jù),在Start方法內(nèi)Worker會(huì)讀取到寫入的數(shù)據(jù),進(jìn)而結(jié)束無限循環(huán)。
NewScheduler函數(shù)用于創(chuàng)建一個(gè)Scheduler,可以看到函數(shù)內(nèi)部的WorkerPool是通過make函數(shù)新建的,NewWorker函數(shù)一樣靠參數(shù)傳入。注意WorkerPool是有緩存通道的,緩存長度是MaxWorkers。
Scheduler的Create方法,該方法根據(jù)MaxWorkers最大數(shù)創(chuàng)建Worker,并且把引用存入Workers切片。創(chuàng)建好Worker后,馬上調(diào)用Worker的Start方法,最后通過goroutine運(yùn)行Schedule方法。Scheduler的Shutdown方法,用于關(guān)閉工作池,調(diào)用所有worker的Stop方法并且關(guān)閉WorkerPool工作池。
Scheduler的Schedule方法,該方法內(nèi)也是一個(gè)無限循環(huán),循環(huán)內(nèi)部就是不停地讀取JobQueue,然后運(yùn)行一個(gè)goroutine。在新運(yùn)行的goroutine內(nèi)從s.WorkerPool讀取一個(gè)JobChannel,注意,Worker注冊(cè)到WorkerPool以后此處才可以讀取到,如果WorkerPool的緩存通道內(nèi)沒有JobChannel,則會(huì)阻塞,直到讀取到JobChannel,才把Job寫入。
到此這篇關(guān)于深入理解Go工作池的文章就介紹到這了,更多相關(guān)Go工作池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang連接并操作PostgreSQL數(shù)據(jù)庫基本操作
PostgreSQL是常見的免費(fèi)的大型關(guān)系型數(shù)據(jù)庫,具有豐富的數(shù)據(jù)類型,也是軟件項(xiàng)目常用的數(shù)據(jù)庫之一,下面這篇文章主要給大家介紹了關(guān)于Golang連接并操作PostgreSQL數(shù)據(jù)庫基本操作的相關(guān)資料,需要的朋友可以參考下2022-09-09Golang在整潔架構(gòu)基礎(chǔ)上實(shí)現(xiàn)事務(wù)操作
這篇文章在 go-kratos 官方的 layout 項(xiàng)目的整潔架構(gòu)基礎(chǔ)上,實(shí)現(xiàn)優(yōu)雅的數(shù)據(jù)庫事務(wù)操作,需要的朋友可以參考下2024-08-08解決golang 反射interface{}做零值判斷的一個(gè)重大坑
這篇文章主要介紹了解決golang 反射interface{}做零值判斷的一個(gè)重大坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-04-04Golang服務(wù)中context超時(shí)處理的方法詳解
在Go語言中,Context是一個(gè)非常重要的概念,它存在于一個(gè)完整的業(yè)務(wù)生命周期內(nèi),Context類型是一個(gè)接口類型,在實(shí)際應(yīng)用中,我們可以使用Context包來傳遞請(qǐng)求的元數(shù)據(jù),本文將給大家介紹Golang服務(wù)中context超時(shí)處理的方法和超時(shí)原因,需要的朋友可以參考下2023-05-05淺析Golang開發(fā)中g(shù)oroutine的正確使用姿勢(shì)
很多初級(jí)的Gopher在學(xué)習(xí)了goroutine之后,在項(xiàng)目中其實(shí)使用率不高,所以這篇文章小編主要來帶大家深入了解一下goroutine的常見使用方法,希望對(duì)大家有所幫助2024-03-03Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項(xiàng)的,即哈希后的數(shù)據(jù)無法解密,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-03-03