Go 控制協(xié)程(goroutine)的并發(fā)數(shù)量
在使用協(xié)程并發(fā)處理某些任務(wù)時(shí), 其并發(fā)數(shù)量往往因?yàn)楦鞣N因素的限制不能無限的增大. 例如網(wǎng)絡(luò)請(qǐng)求、數(shù)據(jù)庫查詢等等。
從運(yùn)行效率角度考慮,在相關(guān)服務(wù)可以負(fù)載的前提下(限制最大并發(fā)數(shù)),盡可能高的并發(fā)。
在Go語言中,可以使用一些方法來控制協(xié)程(goroutine)的并發(fā)數(shù)量,以防止并發(fā)過多導(dǎo)致資源耗盡或性能下降
1、使用信號(hào)量(Semaphore)
可以使用 Go 語言中的 channel 來實(shí)現(xiàn)簡(jiǎn)單的信號(hào)量,限制并發(fā)數(shù)量
package main
import (
"fmt"
"sync"
)
func worker(id int, sem chan struct{}) {
sem <- struct{}{} // 占用一個(gè)信號(hào)量
defer func() {
<-sem // 方法運(yùn)行結(jié)束釋放信號(hào)量
}()
// 執(zhí)行工作任務(wù)
fmt.Printf("Worker %d: Working...\n", id)
}
func main() {
concurrency := 3
sem := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, sem)
}(i)
}
wg.Wait()
close(sem)
}sem 是一個(gè)有緩沖的 channel,通過控制 channel 中元素的數(shù)量,實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的信號(hào)量機(jī)制
2、使用協(xié)程池
可以創(chuàng)建一個(gè)固定數(shù)量的協(xié)程池,將任務(wù)分發(fā)給這些協(xié)程執(zhí)行。
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int) {
//jobs等待主要協(xié)程往jobs放數(shù)據(jù)
for j := range jobs {
fmt.Printf("協(xié)程池 %d: 協(xié)程池正在工作 %d\n", id, j)
results <- j
}
}
func main() {
const numJobs = 5 //協(xié)程要做的工作數(shù)量
const numWorkers = 3 //協(xié)程池?cái)?shù)量
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 啟動(dòng)協(xié)程池
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 提交任務(wù)
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 等待所有工作完成
go func() {
wg.Wait()
close(results)
}()
// 處理結(jié)果
for result := range results {
fmt.Println("Result:", result)
}
}
jobs 通道用于存儲(chǔ)任務(wù),results 通道用于存儲(chǔ)處理結(jié)果。通過創(chuàng)建固定數(shù)量的工作協(xié)程,可以有效地控制并發(fā)數(shù)量。
3、使用其他包
Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一個(gè)更為靈活的信號(hào)量實(shí)現(xiàn)。
案例一
限制對(duì)外部API的并發(fā)請(qǐng)求
假設(shè)我們有一個(gè)外部API,它對(duì)并發(fā)請(qǐng)求有限制,我們希望確保不超過這個(gè)限制。我們可以使用semaphore.Weighted來控制對(duì)API的并發(fā)訪問
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"sync"
"time"
)
func main() {
/*
1、在并發(fā)量一定的情況下,通過改變?cè)试S并發(fā)請(qǐng)求數(shù)可以更快處理請(qǐng)求任務(wù)(在CPU夠用的前提下)
2、sem := semaphore.NewWeighted(n),參數(shù)n就是權(quán)重量
3、當(dāng)一個(gè)協(xié)程需要獲取的單位的權(quán)重越多,運(yùn)行就會(huì)慢(比如權(quán)重總量n=5個(gè),一個(gè)協(xié)程分配了2個(gè),跟一個(gè)協(xié)程分配1個(gè)效率是不一樣的)
4、信號(hào)量沒有足夠的可用權(quán)重的情況發(fā)生在所有已分配的權(quán)重單位都已經(jīng)被占用,即信號(hào)量的當(dāng)前權(quán)重計(jì)數(shù)達(dá)到了它的總?cè)萘?。在這種情況下,任何嘗試通過Acquire方法獲取更多權(quán)重的調(diào)用都將無法立即完成,從而導(dǎo)致調(diào)用者(通常是goroutine)阻塞,直到其他調(diào)用者釋放一些權(quán)重單位。
*/
/*
1、權(quán)權(quán)重較大的任務(wù)在資源競(jìng)爭(zhēng)時(shí)有更高的優(yōu)先級(jí),更容易獲得執(zhí)行的機(jī)會(huì)
2、如果當(dāng)前資源足夠滿足高權(quán)重任務(wù)的需求,這些任務(wù)將立即執(zhí)行;若資源不足,則按照權(quán)重高低順序排隊(duì)等待
3、一旦任務(wù)開始執(zhí)行,其完成的速度主要取決于任務(wù)自身的邏輯復(fù)雜度、所需資源以及系統(tǒng)的當(dāng)前負(fù)載等因素,與任務(wù)在信號(hào)量中的權(quán)重?zé)o關(guān)
3、高權(quán)重的任務(wù)并不會(huì)中斷已經(jīng)在執(zhí)行的低權(quán)重任務(wù),而是等待這些任務(wù)自行釋放資源。一旦資源釋放,等待隊(duì)列中的高權(quán)重任務(wù)將優(yōu)先被喚醒
4、Acquire 方法會(huì)檢查當(dāng)前信號(hào)量的可用資源量是否滿足請(qǐng)求的權(quán)重,如果滿足,則立即減少信號(hào)量的資源計(jì)數(shù)并返回,允許任務(wù)繼續(xù)執(zhí)行。如果不滿足,任務(wù)將阻塞等待,直到有足夠的資源被釋放
*/
// 記錄開始時(shí)間
startTime := time.Now()
// 假設(shè)外部API允許的最大并發(fā)請(qǐng)求為5(信號(hào)量的總?cè)萘渴?個(gè)權(quán)重單位)
const (
maxConcurrentRequests = 5
)
sem := semaphore.NewWeighted(maxConcurrentRequests)
var wg sync.WaitGroup
// 模擬對(duì)外部API的10個(gè)并發(fā)請(qǐng)求
for i := 0; i < 10; i++ {
wg.Add(1)
go func(requestId int) {
defer wg.Done()
// 假設(shè)我們想要獲取2個(gè)單位的權(quán)重
if err := sem.Acquire(context.Background(), 2); err != nil {
fmt.Printf("請(qǐng)求 %d 無法獲取信號(hào)量: %v\n", requestId, err)
return
}
defer sem.Release(2) // 請(qǐng)求完成后釋放信號(hào)量
// 模擬對(duì)API的請(qǐng)求處理
fmt.Printf("請(qǐng)求 %d 開始...\n", requestId)
time.Sleep(2 * time.Second) // 模擬網(wǎng)絡(luò)延遲
fmt.Printf("請(qǐng)求 %d 完成。\n", requestId)
}(i)
}
wg.Wait()
// 記錄結(jié)束時(shí)間
endTime := time.Now()
// 計(jì)算并打印總耗時(shí)
fmt.Printf("程序總耗時(shí): %v\n", endTime.Sub(startTime))
}
信號(hào)量沒有足夠的可用權(quán)重的情況發(fā)生在所有已分配的權(quán)重單位都已經(jīng)被占用,即信號(hào)量的當(dāng)前權(quán)重計(jì)數(shù)達(dá)到了它的總?cè)萘?。在這種情況下,任何嘗試通過Acquire方法獲取更多權(quán)重的調(diào)用都將無法立即完成,從而導(dǎo)致調(diào)用者(通常是goroutine)阻塞,直到其他調(diào)用者釋放一些權(quán)重單位。
以下是一些導(dǎo)致信號(hào)量沒有足夠可用權(quán)重的具體情況:
信號(hào)量初始化容量較小:如果信號(hào)量的總?cè)萘吭O(shè)置得較小,而并發(fā)請(qǐng)求的數(shù)量較大,那么很快就會(huì)出現(xiàn)權(quán)重不足的情況。
長(zhǎng)時(shí)間占用權(quán)重:如果某些goroutine長(zhǎng)時(shí)間占用權(quán)重單位而不釋放,這會(huì)導(dǎo)致其他goroutine無法獲取到權(quán)重,即使這些goroutine只是少數(shù)。
權(quán)重分配不均:在某些情況下,可能存在一些goroutine占用了不成比例的權(quán)重單位,導(dǎo)致其他goroutine無法獲取足夠的權(quán)重。
權(quán)重釋放不及時(shí):如果goroutine因?yàn)殄e(cuò)誤或異常情況提前退出,而沒有正確釋放它們所占用的權(quán)重,那么這些權(quán)重單位將不會(huì)被回收到信號(hào)量中。
高頻率的請(qǐng)求:在短時(shí)間內(nèi)有大量goroutine請(qǐng)求權(quán)重,即使它們請(qǐng)求的權(quán)重不大,累積起來也可能超過信號(hào)量的總?cè)萘俊?/p>
信號(hào)量權(quán)重未正確管理:如果信號(hào)量的權(quán)重管理邏輯存在缺陷,例如錯(cuò)誤地釋放了過多的權(quán)重,或者在錯(cuò)誤的時(shí)間點(diǎn)釋放權(quán)重,也可能導(dǎo)致可用權(quán)重不足。
為了避免信號(hào)量沒有足夠的可用權(quán)重,可以采取以下措施:
- 合理設(shè)置信號(hào)量容量:根據(jù)資源限制和并發(fā)需求合理設(shè)置信號(hào)量的總?cè)萘俊?/li>
- 及時(shí)釋放權(quán)重:確保在goroutine完成工作后及時(shí)釋放權(quán)重。
- 使用超時(shí):在
Acquire調(diào)用中使用超時(shí),避免無限期地等待權(quán)重。 - 監(jiān)控和日志記錄:監(jiān)控信號(hào)量的使用情況,并記錄關(guān)鍵信息,以便及時(shí)發(fā)現(xiàn)和解決問題。
- 權(quán)重分配策略:設(shè)計(jì)合理的權(quán)重分配策略,確保權(quán)重的公平和高效分配。
通過這些措施,可以更好地管理信號(hào)量的使用,避免因權(quán)重不足導(dǎo)致的并發(fā)問題。
案例二
假設(shè)有一個(gè)在線視頻平臺(tái),它需要處理不同分辨率的視頻轉(zhuǎn)碼任務(wù)。由于高清視頻轉(zhuǎn)碼比標(biāo)清視頻更消耗計(jì)算資源,因此平臺(tái)希望設(shè)計(jì)一個(gè)系統(tǒng),能夠優(yōu)先處理更多標(biāo)清視頻轉(zhuǎn)碼請(qǐng)求,同時(shí)又不完全阻塞高清視頻的轉(zhuǎn)碼,以保持整體服務(wù)質(zhì)量和資源的有效利用。
package main
import (
"fmt"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
"runtime"
"sync"
"time"
)
// VideoTranscodeJob 視頻轉(zhuǎn)碼任務(wù)
type VideoTranscodeJob struct {
resolution string
weight int64
}
func main() {
cpuCount := runtime.NumCPU()
fmt.Printf("當(dāng)前CPU個(gè)數(shù)%v\n", cpuCount)
/*
1、權(quán)權(quán)重較大的任務(wù)在資源競(jìng)爭(zhēng)時(shí)有更高的優(yōu)先級(jí),更容易獲得執(zhí)行的機(jī)會(huì)
2、如果當(dāng)前資源足夠滿足高權(quán)重任務(wù)的需求,這些任務(wù)將立即執(zhí)行;若資源不足,則按照權(quán)重高低順序排隊(duì)等待
3、一旦任務(wù)開始執(zhí)行,其完成的速度主要取決于任務(wù)自身的邏輯復(fù)雜度、所需資源以及系統(tǒng)的當(dāng)前負(fù)載等因素,與任務(wù)在信號(hào)量中的權(quán)重?zé)o關(guān)
3、高權(quán)重的任務(wù)并不會(huì)中斷已經(jīng)在執(zhí)行的低權(quán)重任務(wù),而是等待這些任務(wù)自行釋放資源。一旦資源釋放,等待隊(duì)列中的高權(quán)重任務(wù)將優(yōu)先被喚醒
4、Acquire 方法會(huì)檢查當(dāng)前信號(hào)量的可用資源量是否滿足請(qǐng)求的權(quán)重,如果滿足,則立即減少信號(hào)量的資源計(jì)數(shù)并返回,允許任務(wù)繼續(xù)執(zhí)行。如果不滿足,任務(wù)將阻塞等待,直到有足夠的資源被釋放
*/
// 初始化兩個(gè)信號(hào)量,一個(gè)用于標(biāo)清,一個(gè)用于高清,假設(shè)總共有8個(gè)CPU核心可用
normalSem := semaphore.NewWeighted(6) // 標(biāo)清任務(wù),分配6個(gè)單位權(quán)重,因?yàn)樗鼈兿馁Y源較少
highDefSem := semaphore.NewWeighted(2) // 高清任務(wù),分配2個(gè)單位權(quán)重,因?yàn)樗鼈兏馁Y源
var wg sync.WaitGroup
//假設(shè)有20個(gè)需要轉(zhuǎn)碼的視頻
videoJobs := []VideoTranscodeJob{
{"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},
{"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},
{"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},
{"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},
}
for _, job := range videoJobs {
wg.Add(1)
go func(job VideoTranscodeJob) {
defer wg.Done()
var sem *semaphore.Weighted
switch job.resolution {
case "SD":
sem = normalSem //分配權(quán)重大,當(dāng)前為6,任務(wù)在獲取執(zhí)行機(jī)會(huì)上有優(yōu)勢(shì),但并不直接意味著執(zhí)行速度快
case "HD":
sem = highDefSem
default:
panic("無效的分辨率")
}
if err := sem.Acquire(context.Background(), job.weight); err != nil {
fmt.Printf("名為 %s 視頻無法獲取信號(hào)量: %v\n", job.resolution, err)
return
}
defer sem.Release(job.weight) //釋放權(quán)重對(duì)應(yīng)的信號(hào)量
// 模擬轉(zhuǎn)碼任務(wù)執(zhí)行
fmt.Printf("轉(zhuǎn)碼 %s 視頻 (權(quán)重: %d)...\n", job.resolution, job.weight)
//通過利用VideoTranscodeJob的weight值來模擬轉(zhuǎn)碼時(shí)間的長(zhǎng)短,HD用時(shí)長(zhǎng)則設(shè)置2比SD的1大,*時(shí)間就自然長(zhǎng),運(yùn)行就時(shí)間長(zhǎng)
time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模擬不同分辨率視頻轉(zhuǎn)碼所需時(shí)間
fmt.Printf("------------------------%s 視頻轉(zhuǎn)碼完成。。。\n", job.resolution)
}(job)
}
wg.Wait()
}
標(biāo)清(SD)和高清(HD),分別分配了不同的權(quán)重(1和2)。通過創(chuàng)建兩個(gè)不同權(quán)重的信號(hào)量,我們可以控制不同類型任務(wù)的同時(shí)執(zhí)行數(shù)量,從而優(yōu)先保證標(biāo)清視頻的快速處理,同時(shí)也確保高清視頻能夠在不影響系統(tǒng)穩(wěn)定性的情況下進(jìn)行轉(zhuǎn)碼。這展示了帶權(quán)重的并發(fā)控制如何幫助在資源有限的情況下優(yōu)化任務(wù)調(diào)度和執(zhí)行效率。
注意:對(duì)協(xié)程分配的權(quán)重單位數(shù)不能大于對(duì)應(yīng)上下文semaphore.NewWeighted(n)中參數(shù)n的單位數(shù)
案例三
package main
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
type weightedTask struct {
id int
weight int64
}
func main() {
const (
maxTotalWeight = 20 // 最大總權(quán)重
)
sem := semaphore.NewWeighted(maxTotalWeight)
var wg sync.WaitGroup
tasksCh := make(chan weightedTask, 10)
// 發(fā)送任務(wù)
for i := 1; i <= 10; i++ {
tasksCh <- weightedTask{id: i, weight: int64(i)} // 假設(shè)任務(wù)ID即為其權(quán)重
}
close(tasksCh)
// 啟動(dòng)任務(wù)處理器
for task := range tasksCh {
wg.Add(1)
go func(task weightedTask) {
defer wg.Done()
if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {
fmt.Printf("任務(wù) %d 無法獲取信號(hào)量: %v\n", task.id, err)
return
}
defer sem.Release(int64(task.id)) //釋放
// 模擬任務(wù)執(zhí)行
fmt.Printf("任務(wù) %d (權(quán)重: %d) 正在運(yùn)行...\n", task.id, task.weight)
time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中簡(jiǎn)單用時(shí)間模擬權(quán)重影響
fmt.Printf("任務(wù) %d 完成.\n", task.id)
}(task)
}
wg.Wait()
}
總結(jié)
選擇哪種方法取決于具體的應(yīng)用場(chǎng)景和需求。使用信號(hào)量是一種簡(jiǎn)單而靈活的方法,而協(xié)程池則更適用于需要批量處理任務(wù)的情況。golang.org/x/sync/semaphore 包提供了一個(gè)標(biāo)準(zhǔn)庫外的更靈活的信號(hào)量實(shí)現(xiàn)
到此這篇關(guān)于Go 控制協(xié)程(goroutine)的并發(fā)數(shù)量的文章就介紹到這了,更多相關(guān)Go 控制協(xié)程并發(fā)數(shù)量?jī)?nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- golang gin 框架 異步同步 goroutine 并發(fā)操作
- Go語言中的并發(fā)goroutine底層原理
- Go并發(fā)編程之goroutine使用正確方法
- Go 并發(fā)編程Goroutine的實(shí)現(xiàn)示例
- Golang 語言控制并發(fā) Goroutine的方法
- golang并發(fā)編程中Goroutine 協(xié)程的實(shí)現(xiàn)
- Go中Goroutines輕量級(jí)并發(fā)的特性及效率探究
- 使用Go?goroutine實(shí)現(xiàn)并發(fā)的Clock服務(wù)
- Go語言使用goroutine及通道實(shí)現(xiàn)并發(fā)詳解
- Go語言使用Goroutine并發(fā)打印的項(xiàng)目實(shí)踐
相關(guān)文章
Golang HTTP 服務(wù)平滑重啟及升級(jí)的思路
Golang HTTP服務(wù)在上線時(shí),需要重新編譯可執(zhí)行文件,關(guān)閉正在運(yùn)行的進(jìn)程,然后再啟動(dòng)新的運(yùn)行進(jìn)程。這篇文章主要介紹了Golang HTTP 服務(wù)平滑重啟及升級(jí),需要的朋友可以參考下2020-04-04
Go語言Gin框架中使用MySQL數(shù)據(jù)庫的三種方式
本文主要介紹了Go語言Gin框架中使用MySQL數(shù)據(jù)庫的三種方式,通過三種方式實(shí)現(xiàn)增刪改查的操作,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11
Golang中調(diào)用deepseekr1的教程詳解
這篇文章主要為大家詳細(xì)介紹了Golang中調(diào)用deepseekr1的相關(guān)教程,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解下2025-02-02

