Go 控制協(xié)程(goroutine)的并發(fā)數量
在使用協(xié)程并發(fā)處理某些任務時, 其并發(fā)數量往往因為各種因素的限制不能無限的增大. 例如網絡請求、數據庫查詢等等。
從運行效率角度考慮,在相關服務可以負載的前提下(限制最大并發(fā)數),盡可能高的并發(fā)。
在Go語言中,可以使用一些方法來控制協(xié)程(goroutine)的并發(fā)數量,以防止并發(fā)過多導致資源耗盡或性能下降
1、使用信號量(Semaphore)
可以使用 Go 語言中的 channel 來實現簡單的信號量,限制并發(fā)數量
package main import ( "fmt" "sync" ) func worker(id int, sem chan struct{}) { sem <- struct{}{} // 占用一個信號量 defer func() { <-sem // 方法運行結束釋放信號量 }() // 執(zhí)行工作任務 fmt.Printf("Worker %d: Working...\n", id) } func main() { concurrency := 3 sem := make(chan struct{}, concurrency) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id, sem) }(i) } wg.Wait() close(sem) }
sem
是一個有緩沖的 channel,通過控制 channel 中元素的數量,實現了一個簡單的信號量機制
2、使用協(xié)程池
可以創(chuàng)建一個固定數量的協(xié)程池,將任務分發(fā)給這些協(xié)程執(zhí)行。
package main import ( "fmt" "sync" ) func worker(id int, jobs <-chan int, results chan<- int) { //jobs等待主要協(xié)程往jobs放數據 for j := range jobs { fmt.Printf("協(xié)程池 %d: 協(xié)程池正在工作 %d\n", id, j) results <- j } } func main() { const numJobs = 5 //協(xié)程要做的工作數量 const numWorkers = 3 //協(xié)程池數量 jobs := make(chan int, numJobs) results := make(chan int, numJobs) var wg sync.WaitGroup // 啟動協(xié)程池 for i := 1; i <= numWorkers; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id, jobs, results) }(i) } // 提交任務 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 等待所有工作完成 go func() { wg.Wait() close(results) }() // 處理結果 for result := range results { fmt.Println("Result:", result) } }
jobs
通道用于存儲任務,results
通道用于存儲處理結果。通過創(chuàng)建固定數量的工作協(xié)程,可以有效地控制并發(fā)數量。
3、使用其他包
Go 1.16 引入了 golang.org/x/sync/semaphore
包,它提供了一個更為靈活的信號量實現。
案例一
限制對外部API的并發(fā)請求
假設我們有一個外部API,它對并發(fā)請求有限制,我們希望確保不超過這個限制。我們可以使用semaphore.Weighted
來控制對API的并發(fā)訪問
package main import ( "context" "fmt" "golang.org/x/sync/semaphore" "sync" "time" ) func main() { /* 1、在并發(fā)量一定的情況下,通過改變允許并發(fā)請求數可以更快處理請求任務(在CPU夠用的前提下) 2、sem := semaphore.NewWeighted(n),參數n就是權重量 3、當一個協(xié)程需要獲取的單位的權重越多,運行就會慢(比如權重總量n=5個,一個協(xié)程分配了2個,跟一個協(xié)程分配1個效率是不一樣的) 4、信號量沒有足夠的可用權重的情況發(fā)生在所有已分配的權重單位都已經被占用,即信號量的當前權重計數達到了它的總容量。在這種情況下,任何嘗試通過Acquire方法獲取更多權重的調用都將無法立即完成,從而導致調用者(通常是goroutine)阻塞,直到其他調用者釋放一些權重單位。 */ /* 1、權權重較大的任務在資源競爭時有更高的優(yōu)先級,更容易獲得執(zhí)行的機會 2、如果當前資源足夠滿足高權重任務的需求,這些任務將立即執(zhí)行;若資源不足,則按照權重高低順序排隊等待 3、一旦任務開始執(zhí)行,其完成的速度主要取決于任務自身的邏輯復雜度、所需資源以及系統(tǒng)的當前負載等因素,與任務在信號量中的權重無關 3、高權重的任務并不會中斷已經在執(zhí)行的低權重任務,而是等待這些任務自行釋放資源。一旦資源釋放,等待隊列中的高權重任務將優(yōu)先被喚醒 4、Acquire 方法會檢查當前信號量的可用資源量是否滿足請求的權重,如果滿足,則立即減少信號量的資源計數并返回,允許任務繼續(xù)執(zhí)行。如果不滿足,任務將阻塞等待,直到有足夠的資源被釋放 */ // 記錄開始時間 startTime := time.Now() // 假設外部API允許的最大并發(fā)請求為5(信號量的總容量是5個權重單位) const ( maxConcurrentRequests = 5 ) sem := semaphore.NewWeighted(maxConcurrentRequests) var wg sync.WaitGroup // 模擬對外部API的10個并發(fā)請求 for i := 0; i < 10; i++ { wg.Add(1) go func(requestId int) { defer wg.Done() // 假設我們想要獲取2個單位的權重 if err := sem.Acquire(context.Background(), 2); err != nil { fmt.Printf("請求 %d 無法獲取信號量: %v\n", requestId, err) return } defer sem.Release(2) // 請求完成后釋放信號量 // 模擬對API的請求處理 fmt.Printf("請求 %d 開始...\n", requestId) time.Sleep(2 * time.Second) // 模擬網絡延遲 fmt.Printf("請求 %d 完成。\n", requestId) }(i) } wg.Wait() // 記錄結束時間 endTime := time.Now() // 計算并打印總耗時 fmt.Printf("程序總耗時: %v\n", endTime.Sub(startTime)) }
信號量沒有足夠的可用權重的情況發(fā)生在所有已分配的權重單位都已經被占用,即信號量的當前權重計數達到了它的總容量。在這種情況下,任何嘗試通過Acquire
方法獲取更多權重的調用都將無法立即完成,從而導致調用者(通常是goroutine)阻塞,直到其他調用者釋放一些權重單位。
以下是一些導致信號量沒有足夠可用權重的具體情況:
信號量初始化容量較小:如果信號量的總容量設置得較小,而并發(fā)請求的數量較大,那么很快就會出現權重不足的情況。
長時間占用權重:如果某些goroutine長時間占用權重單位而不釋放,這會導致其他goroutine無法獲取到權重,即使這些goroutine只是少數。
權重分配不均:在某些情況下,可能存在一些goroutine占用了不成比例的權重單位,導致其他goroutine無法獲取足夠的權重。
權重釋放不及時:如果goroutine因為錯誤或異常情況提前退出,而沒有正確釋放它們所占用的權重,那么這些權重單位將不會被回收到信號量中。
高頻率的請求:在短時間內有大量goroutine請求權重,即使它們請求的權重不大,累積起來也可能超過信號量的總容量。
信號量權重未正確管理:如果信號量的權重管理邏輯存在缺陷,例如錯誤地釋放了過多的權重,或者在錯誤的時間點釋放權重,也可能導致可用權重不足。
為了避免信號量沒有足夠的可用權重,可以采取以下措施:
- 合理設置信號量容量:根據資源限制和并發(fā)需求合理設置信號量的總容量。
- 及時釋放權重:確保在goroutine完成工作后及時釋放權重。
- 使用超時:在
Acquire
調用中使用超時,避免無限期地等待權重。 - 監(jiān)控和日志記錄:監(jiān)控信號量的使用情況,并記錄關鍵信息,以便及時發(fā)現和解決問題。
- 權重分配策略:設計合理的權重分配策略,確保權重的公平和高效分配。
通過這些措施,可以更好地管理信號量的使用,避免因權重不足導致的并發(fā)問題。
案例二
假設有一個在線視頻平臺,它需要處理不同分辨率的視頻轉碼任務。由于高清視頻轉碼比標清視頻更消耗計算資源,因此平臺希望設計一個系統(tǒng),能夠優(yōu)先處理更多標清視頻轉碼請求,同時又不完全阻塞高清視頻的轉碼,以保持整體服務質量和資源的有效利用。
package main import ( "fmt" "golang.org/x/net/context" "golang.org/x/sync/semaphore" "runtime" "sync" "time" ) // VideoTranscodeJob 視頻轉碼任務 type VideoTranscodeJob struct { resolution string weight int64 } func main() { cpuCount := runtime.NumCPU() fmt.Printf("當前CPU個數%v\n", cpuCount) /* 1、權權重較大的任務在資源競爭時有更高的優(yōu)先級,更容易獲得執(zhí)行的機會 2、如果當前資源足夠滿足高權重任務的需求,這些任務將立即執(zhí)行;若資源不足,則按照權重高低順序排隊等待 3、一旦任務開始執(zhí)行,其完成的速度主要取決于任務自身的邏輯復雜度、所需資源以及系統(tǒng)的當前負載等因素,與任務在信號量中的權重無關 3、高權重的任務并不會中斷已經在執(zhí)行的低權重任務,而是等待這些任務自行釋放資源。一旦資源釋放,等待隊列中的高權重任務將優(yōu)先被喚醒 4、Acquire 方法會檢查當前信號量的可用資源量是否滿足請求的權重,如果滿足,則立即減少信號量的資源計數并返回,允許任務繼續(xù)執(zhí)行。如果不滿足,任務將阻塞等待,直到有足夠的資源被釋放 */ // 初始化兩個信號量,一個用于標清,一個用于高清,假設總共有8個CPU核心可用 normalSem := semaphore.NewWeighted(6) // 標清任務,分配6個單位權重,因為它們消耗資源較少 highDefSem := semaphore.NewWeighted(2) // 高清任務,分配2個單位權重,因為它們更消耗資源 var wg sync.WaitGroup //假設有20個需要轉碼的視頻 videoJobs := []VideoTranscodeJob{ {"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2}, } for _, job := range videoJobs { wg.Add(1) go func(job VideoTranscodeJob) { defer wg.Done() var sem *semaphore.Weighted switch job.resolution { case "SD": sem = normalSem //分配權重大,當前為6,任務在獲取執(zhí)行機會上有優(yōu)勢,但并不直接意味著執(zhí)行速度快 case "HD": sem = highDefSem default: panic("無效的分辨率") } if err := sem.Acquire(context.Background(), job.weight); err != nil { fmt.Printf("名為 %s 視頻無法獲取信號量: %v\n", job.resolution, err) return } defer sem.Release(job.weight) //釋放權重對應的信號量 // 模擬轉碼任務執(zhí)行 fmt.Printf("轉碼 %s 視頻 (權重: %d)...\n", job.resolution, job.weight) //通過利用VideoTranscodeJob的weight值來模擬轉碼時間的長短,HD用時長則設置2比SD的1大,*時間就自然長,運行就時間長 time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模擬不同分辨率視頻轉碼所需時間 fmt.Printf("------------------------%s 視頻轉碼完成。。。\n", job.resolution) }(job) } wg.Wait() }
標清(SD)和高清(HD),分別分配了不同的權重(1和2)。通過創(chuàng)建兩個不同權重的信號量,我們可以控制不同類型任務的同時執(zhí)行數量,從而優(yōu)先保證標清視頻的快速處理,同時也確保高清視頻能夠在不影響系統(tǒng)穩(wěn)定性的情況下進行轉碼。這展示了帶權重的并發(fā)控制如何幫助在資源有限的情況下優(yōu)化任務調度和執(zhí)行效率。
注意:對協(xié)程分配的權重單位數不能大于對應上下文semaphore.NewWeighted(n)中參數n的單位數
案例三
package main import ( "context" "fmt" "sync" "time" "golang.org/x/sync/semaphore" ) type weightedTask struct { id int weight int64 } func main() { const ( maxTotalWeight = 20 // 最大總權重 ) sem := semaphore.NewWeighted(maxTotalWeight) var wg sync.WaitGroup tasksCh := make(chan weightedTask, 10) // 發(fā)送任務 for i := 1; i <= 10; i++ { tasksCh <- weightedTask{id: i, weight: int64(i)} // 假設任務ID即為其權重 } close(tasksCh) // 啟動任務處理器 for task := range tasksCh { wg.Add(1) go func(task weightedTask) { defer wg.Done() if err := sem.Acquire(context.Background(), int64(task.id)); err != nil { fmt.Printf("任務 %d 無法獲取信號量: %v\n", task.id, err) return } defer sem.Release(int64(task.id)) //釋放 // 模擬任務執(zhí)行 fmt.Printf("任務 %d (權重: %d) 正在運行...\n", task.id, task.weight) time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中簡單用時間模擬權重影響 fmt.Printf("任務 %d 完成.\n", task.id) }(task) } wg.Wait() }
總結
選擇哪種方法取決于具體的應用場景和需求。使用信號量是一種簡單而靈活的方法,而協(xié)程池則更適用于需要批量處理任務的情況。golang.org/x/sync/semaphore
包提供了一個標準庫外的更靈活的信號量實現
到此這篇關于Go 控制協(xié)程(goroutine)的并發(fā)數量的文章就介紹到這了,更多相關Go 控制協(xié)程并發(fā)數量內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
golang 監(jiān)聽服務的信號,實現平滑啟動,linux信號說明詳解
這篇文章主要介紹了golang 監(jiān)聽服務的信號,實現平滑啟動,linux信號說明詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05