Go 控制協(xié)程(goroutine)的并發(fā)數(shù)量
在使用協(xié)程并發(fā)處理某些任務(wù)時(shí), 其并發(fā)數(shù)量往往因?yàn)楦鞣N因素的限制不能無(wú)限的增大. 例如網(wǎng)絡(luò)請(qǐng)求、數(shù)據(jù)庫(kù)查詢等等。
從運(yùn)行效率角度考慮,在相關(guān)服務(wù)可以負(fù)載的前提下(限制最大并發(fā)數(shù)),盡可能高的并發(fā)。
在Go語(yǔ)言中,可以使用一些方法來(lái)控制協(xié)程(goroutine)的并發(fā)數(shù)量,以防止并發(fā)過(guò)多導(dǎo)致資源耗盡或性能下降
1、使用信號(hào)量(Semaphore)
可以使用 Go 語(yǔ)言中的 channel 來(lái)實(shí)現(xiàn)簡(jiǎn)單的信號(hào)量,限制并發(fā)數(shù)量
package main import ( "fmt" "sync" ) func worker(id int, sem chan struct{}) { sem <- struct{}{} // 占用一個(gè)信號(hào)量 defer func() { <-sem // 方法運(yùn)行結(jié)束釋放信號(hào)量 }() // 執(zhí)行工作任務(wù) fmt.Printf("Worker %d: Working...\n", id) } func main() { concurrency := 3 sem := make(chan struct{}, concurrency) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id, sem) }(i) } wg.Wait() close(sem) }
sem
是一個(gè)有緩沖的 channel,通過(guò)控制 channel 中元素的數(shù)量,實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的信號(hào)量機(jī)制
2、使用協(xié)程池
可以創(chuàng)建一個(gè)固定數(shù)量的協(xié)程池,將任務(wù)分發(fā)給這些協(xié)程執(zhí)行。
package main import ( "fmt" "sync" ) func worker(id int, jobs <-chan int, results chan<- int) { //jobs等待主要協(xié)程往jobs放數(shù)據(jù) for j := range jobs { fmt.Printf("協(xié)程池 %d: 協(xié)程池正在工作 %d\n", id, j) results <- j } } func main() { const numJobs = 5 //協(xié)程要做的工作數(shù)量 const numWorkers = 3 //協(xié)程池?cái)?shù)量 jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg sync.WaitGroup // 啟動(dòng)協(xié)程池 for i := 1; i <= numWorkers; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id, jobs, results) }(i) } // 提交任務(wù) for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 等待所有工作完成 go func() { wg.Wait() close(results) }() // 處理結(jié)果 for result := range results { fmt.Println("Result:", result) } }
jobs
通道用于存儲(chǔ)任務(wù),results
通道用于存儲(chǔ)處理結(jié)果。通過(guò)創(chuàng)建固定數(shù)量的工作協(xié)程,可以有效地控制并發(fā)數(shù)量。
3、使用其他包
Go 1.16 引入了 golang.org/x/sync/semaphore
包,它提供了一個(gè)更為靈活的信號(hào)量實(shí)現(xiàn)。
案例一
限制對(duì)外部API的并發(fā)請(qǐng)求
假設(shè)我們有一個(gè)外部API,它對(duì)并發(fā)請(qǐng)求有限制,我們希望確保不超過(guò)這個(gè)限制。我們可以使用semaphore.Weighted
來(lái)控制對(duì)API的并發(fā)訪問(wèn)
package main import ( "context" "fmt" "golang.org/x/sync/semaphore" "sync" "time" ) func main() { /* 1、在并發(fā)量一定的情況下,通過(guò)改變?cè)试S并發(fā)請(qǐng)求數(shù)可以更快處理請(qǐng)求任務(wù)(在CPU夠用的前提下) 2、sem := semaphore.NewWeighted(n),參數(shù)n就是權(quán)重量 3、當(dāng)一個(gè)協(xié)程需要獲取的單位的權(quán)重越多,運(yùn)行就會(huì)慢(比如權(quán)重總量n=5個(gè),一個(gè)協(xié)程分配了2個(gè),跟一個(gè)協(xié)程分配1個(gè)效率是不一樣的) 4、信號(hào)量沒(méi)有足夠的可用權(quán)重的情況發(fā)生在所有已分配的權(quán)重單位都已經(jīng)被占用,即信號(hào)量的當(dāng)前權(quán)重計(jì)數(shù)達(dá)到了它的總?cè)萘?。在這種情況下,任何嘗試通過(guò)Acquire方法獲取更多權(quán)重的調(diào)用都將無(wú)法立即完成,從而導(dǎo)致調(diào)用者(通常是goroutine)阻塞,直到其他調(diào)用者釋放一些權(quán)重單位。 */ /* 1、權(quán)權(quán)重較大的任務(wù)在資源競(jìng)爭(zhēng)時(shí)有更高的優(yōu)先級(jí),更容易獲得執(zhí)行的機(jī)會(huì) 2、如果當(dāng)前資源足夠滿足高權(quán)重任務(wù)的需求,這些任務(wù)將立即執(zhí)行;若資源不足,則按照權(quán)重高低順序排隊(duì)等待 3、一旦任務(wù)開(kāi)始執(zhí)行,其完成的速度主要取決于任務(wù)自身的邏輯復(fù)雜度、所需資源以及系統(tǒng)的當(dāng)前負(fù)載等因素,與任務(wù)在信號(hào)量中的權(quán)重?zé)o關(guān) 3、高權(quán)重的任務(wù)并不會(huì)中斷已經(jīng)在執(zhí)行的低權(quán)重任務(wù),而是等待這些任務(wù)自行釋放資源。一旦資源釋放,等待隊(duì)列中的高權(quán)重任務(wù)將優(yōu)先被喚醒 4、Acquire 方法會(huì)檢查當(dāng)前信號(hào)量的可用資源量是否滿足請(qǐng)求的權(quán)重,如果滿足,則立即減少信號(hào)量的資源計(jì)數(shù)并返回,允許任務(wù)繼續(xù)執(zhí)行。如果不滿足,任務(wù)將阻塞等待,直到有足夠的資源被釋放 */ // 記錄開(kāi)始時(shí)間 startTime := time.Now() // 假設(shè)外部API允許的最大并發(fā)請(qǐng)求為5(信號(hào)量的總?cè)萘渴?個(gè)權(quán)重單位) const ( maxConcurrentRequests = 5 ) sem := semaphore.NewWeighted(maxConcurrentRequests) var wg sync.WaitGroup // 模擬對(duì)外部API的10個(gè)并發(fā)請(qǐng)求 for i := 0; i < 10; i++ { wg.Add(1) go func(requestId int) { defer wg.Done() // 假設(shè)我們想要獲取2個(gè)單位的權(quán)重 if err := sem.Acquire(context.Background(), 2); err != nil { fmt.Printf("請(qǐng)求 %d 無(wú)法獲取信號(hào)量: %v\n", requestId, err) return } defer sem.Release(2) // 請(qǐng)求完成后釋放信號(hào)量 // 模擬對(duì)API的請(qǐng)求處理 fmt.Printf("請(qǐng)求 %d 開(kāi)始...\n", requestId) time.Sleep(2 * time.Second) // 模擬網(wǎng)絡(luò)延遲 fmt.Printf("請(qǐng)求 %d 完成。\n", requestId) }(i) } wg.Wait() // 記錄結(jié)束時(shí)間 endTime := time.Now() // 計(jì)算并打印總耗時(shí) fmt.Printf("程序總耗時(shí): %v\n", endTime.Sub(startTime)) }
信號(hào)量沒(méi)有足夠的可用權(quán)重的情況發(fā)生在所有已分配的權(quán)重單位都已經(jīng)被占用,即信號(hào)量的當(dāng)前權(quán)重計(jì)數(shù)達(dá)到了它的總?cè)萘俊T谶@種情況下,任何嘗試通過(guò)Acquire
方法獲取更多權(quán)重的調(diào)用都將無(wú)法立即完成,從而導(dǎo)致調(diào)用者(通常是goroutine)阻塞,直到其他調(diào)用者釋放一些權(quán)重單位。
以下是一些導(dǎo)致信號(hào)量沒(méi)有足夠可用權(quán)重的具體情況:
信號(hào)量初始化容量較小:如果信號(hào)量的總?cè)萘吭O(shè)置得較小,而并發(fā)請(qǐng)求的數(shù)量較大,那么很快就會(huì)出現(xiàn)權(quán)重不足的情況。
長(zhǎng)時(shí)間占用權(quán)重:如果某些goroutine長(zhǎng)時(shí)間占用權(quán)重單位而不釋放,這會(huì)導(dǎo)致其他goroutine無(wú)法獲取到權(quán)重,即使這些goroutine只是少數(shù)。
權(quán)重分配不均:在某些情況下,可能存在一些goroutine占用了不成比例的權(quán)重單位,導(dǎo)致其他goroutine無(wú)法獲取足夠的權(quán)重。
權(quán)重釋放不及時(shí):如果goroutine因?yàn)殄e(cuò)誤或異常情況提前退出,而沒(méi)有正確釋放它們所占用的權(quán)重,那么這些權(quán)重單位將不會(huì)被回收到信號(hào)量中。
高頻率的請(qǐng)求:在短時(shí)間內(nèi)有大量goroutine請(qǐng)求權(quán)重,即使它們請(qǐng)求的權(quán)重不大,累積起來(lái)也可能超過(guò)信號(hào)量的總?cè)萘俊?/p>
信號(hào)量權(quán)重未正確管理:如果信號(hào)量的權(quán)重管理邏輯存在缺陷,例如錯(cuò)誤地釋放了過(guò)多的權(quán)重,或者在錯(cuò)誤的時(shí)間點(diǎn)釋放權(quán)重,也可能導(dǎo)致可用權(quán)重不足。
為了避免信號(hào)量沒(méi)有足夠的可用權(quán)重,可以采取以下措施:
- 合理設(shè)置信號(hào)量容量:根據(jù)資源限制和并發(fā)需求合理設(shè)置信號(hào)量的總?cè)萘俊?/li>
- 及時(shí)釋放權(quán)重:確保在goroutine完成工作后及時(shí)釋放權(quán)重。
- 使用超時(shí):在
Acquire
調(diào)用中使用超時(shí),避免無(wú)限期地等待權(quán)重。 - 監(jiān)控和日志記錄:監(jiān)控信號(hào)量的使用情況,并記錄關(guān)鍵信息,以便及時(shí)發(fā)現(xiàn)和解決問(wèn)題。
- 權(quán)重分配策略:設(shè)計(jì)合理的權(quán)重分配策略,確保權(quán)重的公平和高效分配。
通過(guò)這些措施,可以更好地管理信號(hào)量的使用,避免因權(quán)重不足導(dǎo)致的并發(fā)問(wèn)題。
案例二
假設(shè)有一個(gè)在線視頻平臺(tái),它需要處理不同分辨率的視頻轉(zhuǎn)碼任務(wù)。由于高清視頻轉(zhuǎn)碼比標(biāo)清視頻更消耗計(jì)算資源,因此平臺(tái)希望設(shè)計(jì)一個(gè)系統(tǒng),能夠優(yōu)先處理更多標(biāo)清視頻轉(zhuǎn)碼請(qǐng)求,同時(shí)又不完全阻塞高清視頻的轉(zhuǎn)碼,以保持整體服務(wù)質(zhì)量和資源的有效利用。
package main import ( "fmt" "golang.org/x/net/context" "golang.org/x/sync/semaphore" "runtime" "sync" "time" ) // VideoTranscodeJob 視頻轉(zhuǎn)碼任務(wù) type VideoTranscodeJob struct { resolution string weight int64 } func main() { cpuCount := runtime.NumCPU() fmt.Printf("當(dāng)前CPU個(gè)數(shù)%v\n", cpuCount) /* 1、權(quán)權(quán)重較大的任務(wù)在資源競(jìng)爭(zhēng)時(shí)有更高的優(yōu)先級(jí),更容易獲得執(zhí)行的機(jī)會(huì) 2、如果當(dāng)前資源足夠滿足高權(quán)重任務(wù)的需求,這些任務(wù)將立即執(zhí)行;若資源不足,則按照權(quán)重高低順序排隊(duì)等待 3、一旦任務(wù)開(kāi)始執(zhí)行,其完成的速度主要取決于任務(wù)自身的邏輯復(fù)雜度、所需資源以及系統(tǒng)的當(dāng)前負(fù)載等因素,與任務(wù)在信號(hào)量中的權(quán)重?zé)o關(guān) 3、高權(quán)重的任務(wù)并不會(huì)中斷已經(jīng)在執(zhí)行的低權(quán)重任務(wù),而是等待這些任務(wù)自行釋放資源。一旦資源釋放,等待隊(duì)列中的高權(quán)重任務(wù)將優(yōu)先被喚醒 4、Acquire 方法會(huì)檢查當(dāng)前信號(hào)量的可用資源量是否滿足請(qǐng)求的權(quán)重,如果滿足,則立即減少信號(hào)量的資源計(jì)數(shù)并返回,允許任務(wù)繼續(xù)執(zhí)行。如果不滿足,任務(wù)將阻塞等待,直到有足夠的資源被釋放 */ // 初始化兩個(gè)信號(hào)量,一個(gè)用于標(biāo)清,一個(gè)用于高清,假設(shè)總共有8個(gè)CPU核心可用 normalSem := semaphore.NewWeighted(6) // 標(biāo)清任務(wù),分配6個(gè)單位權(quán)重,因?yàn)樗鼈兿馁Y源較少 highDefSem := semaphore.NewWeighted(2) // 高清任務(wù),分配2個(gè)單位權(quán)重,因?yàn)樗鼈兏馁Y源 var wg sync.WaitGroup //假設(shè)有20個(gè)需要轉(zhuǎn)碼的視頻 videoJobs := []VideoTranscodeJob{ {"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2}, } for _, job := range videoJobs { wg.Add(1) go func(job VideoTranscodeJob) { defer wg.Done() var sem *semaphore.Weighted switch job.resolution { case "SD": sem = normalSem //分配權(quán)重大,當(dāng)前為6,任務(wù)在獲取執(zhí)行機(jī)會(huì)上有優(yōu)勢(shì),但并不直接意味著執(zhí)行速度快 case "HD": sem = highDefSem default: panic("無(wú)效的分辨率") } if err := sem.Acquire(context.Background(), job.weight); err != nil { fmt.Printf("名為 %s 視頻無(wú)法獲取信號(hào)量: %v\n", job.resolution, err) return } defer sem.Release(job.weight) //釋放權(quán)重對(duì)應(yīng)的信號(hào)量 // 模擬轉(zhuǎn)碼任務(wù)執(zhí)行 fmt.Printf("轉(zhuǎn)碼 %s 視頻 (權(quán)重: %d)...\n", job.resolution, job.weight) //通過(guò)利用VideoTranscodeJob的weight值來(lái)模擬轉(zhuǎn)碼時(shí)間的長(zhǎng)短,HD用時(shí)長(zhǎng)則設(shè)置2比SD的1大,*時(shí)間就自然長(zhǎng),運(yùn)行就時(shí)間長(zhǎng) time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模擬不同分辨率視頻轉(zhuǎn)碼所需時(shí)間 fmt.Printf("------------------------%s 視頻轉(zhuǎn)碼完成。。。\n", job.resolution) }(job) } wg.Wait() }
標(biāo)清(SD)和高清(HD),分別分配了不同的權(quán)重(1和2)。通過(guò)創(chuàng)建兩個(gè)不同權(quán)重的信號(hào)量,我們可以控制不同類型任務(wù)的同時(shí)執(zhí)行數(shù)量,從而優(yōu)先保證標(biāo)清視頻的快速處理,同時(shí)也確保高清視頻能夠在不影響系統(tǒng)穩(wěn)定性的情況下進(jìn)行轉(zhuǎn)碼。這展示了帶權(quán)重的并發(fā)控制如何幫助在資源有限的情況下優(yōu)化任務(wù)調(diào)度和執(zhí)行效率。
注意:對(duì)協(xié)程分配的權(quán)重單位數(shù)不能大于對(duì)應(yīng)上下文semaphore.NewWeighted(n)中參數(shù)n的單位數(shù)
案例三
package main import ( "context" "fmt" "sync" "time" "golang.org/x/sync/semaphore" ) type weightedTask struct { id int weight int64 } func main() { const ( maxTotalWeight = 20 // 最大總權(quán)重 ) sem := semaphore.NewWeighted(maxTotalWeight) var wg sync.WaitGroup tasksCh := make(chan weightedTask, 10) // 發(fā)送任務(wù) for i := 1; i <= 10; i++ { tasksCh <- weightedTask{id: i, weight: int64(i)} // 假設(shè)任務(wù)ID即為其權(quán)重 } close(tasksCh) // 啟動(dòng)任務(wù)處理器 for task := range tasksCh { wg.Add(1) go func(task weightedTask) { defer wg.Done() if err := sem.Acquire(context.Background(), int64(task.id)); err != nil { fmt.Printf("任務(wù) %d 無(wú)法獲取信號(hào)量: %v\n", task.id, err) return } defer sem.Release(int64(task.id)) //釋放 // 模擬任務(wù)執(zhí)行 fmt.Printf("任務(wù) %d (權(quán)重: %d) 正在運(yùn)行...\n", task.id, task.weight) time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中簡(jiǎn)單用時(shí)間模擬權(quán)重影響 fmt.Printf("任務(wù) %d 完成.\n", task.id) }(task) } wg.Wait() }
總結(jié)
選擇哪種方法取決于具體的應(yīng)用場(chǎng)景和需求。使用信號(hào)量是一種簡(jiǎn)單而靈活的方法,而協(xié)程池則更適用于需要批量處理任務(wù)的情況。golang.org/x/sync/semaphore
包提供了一個(gè)標(biāo)準(zhǔn)庫(kù)外的更靈活的信號(hào)量實(shí)現(xiàn)
到此這篇關(guān)于Go 控制協(xié)程(goroutine)的并發(fā)數(shù)量的文章就介紹到這了,更多相關(guān)Go 控制協(xié)程并發(fā)數(shù)量?jī)?nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- golang并發(fā)編程中Goroutine 協(xié)程的實(shí)現(xiàn)
- 并發(fā)安全本地化存儲(chǔ)go-cache讀寫鎖實(shí)現(xiàn)多協(xié)程并發(fā)訪問(wèn)
- 詳解Go如何實(shí)現(xiàn)協(xié)程并發(fā)執(zhí)行
- Go?并發(fā)編程協(xié)程及調(diào)度機(jī)制詳情
- go語(yǔ)言限制協(xié)程并發(fā)數(shù)的方案詳情
- Go并發(fā):使用sync.WaitGroup實(shí)現(xiàn)協(xié)程同步方式
- 詳解Go多協(xié)程并發(fā)環(huán)境下的錯(cuò)誤處理
- Go 并發(fā)實(shí)現(xiàn)協(xié)程同步的多種解決方法
相關(guān)文章
go日志系統(tǒng)logrus顯示文件和行號(hào)的操作
這篇文章主要介紹了go日志系統(tǒng)logrus顯示文件和行號(hào)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-11-11golang使用信號(hào)量熱更新的實(shí)現(xiàn)示例
這篇文章主要介紹了golang使用信號(hào)量熱更新的實(shí)現(xiàn)示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-04-04Gorm存在時(shí)更新,不存在時(shí)創(chuàng)建的問(wèn)題
這篇文章主要介紹了Gorm存在時(shí)更新,不存在時(shí)創(chuàng)建的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08go HTTP2 的頭部壓縮算法hpack實(shí)現(xiàn)詳解
這篇文章主要為大家介紹了go HTTP2 的頭部壓縮算法hpack實(shí)現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10golang 監(jiān)聽(tīng)服務(wù)的信號(hào),實(shí)現(xiàn)平滑啟動(dòng),linux信號(hào)說(shuō)明詳解
這篇文章主要介紹了golang 監(jiān)聽(tīng)服務(wù)的信號(hào),實(shí)現(xiàn)平滑啟動(dòng),linux信號(hào)說(shuō)明詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05使用?gomonkey?Mock?函數(shù)及方法示例詳解
在 Golang 語(yǔ)言中,寫單元測(cè)試的時(shí)候,不可避免的會(huì)涉及到對(duì)其他函數(shù)及方法的 Mock,即在假設(shè)其他函數(shù)及方法響應(yīng)預(yù)期結(jié)果的同時(shí),校驗(yàn)被測(cè)函數(shù)的響應(yīng)是否符合預(yù)期,這篇文章主要介紹了使用?gomonkey?Mock?函數(shù)及方法,需要的朋友可以參考下2022-06-06Golang設(shè)計(jì)模式中的橋接模式詳細(xì)講解
橋接模式是一種結(jié)構(gòu)型設(shè)計(jì)模式,通過(guò)橋接模式可以將抽象部分和它的實(shí)現(xiàn)部分分離,本文主要介紹了GoLang橋接模式,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2023-01-01