Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法詳解
需求
第三方的接口,限制接口請求的QPS,每秒5次
需要控制job「訪問接口」的次數(shù),每秒不能同時超過5次,包括 進(jìn)行中的任務(wù)、剛啟動的任務(wù)
使用限流器來控制任務(wù)的啟動頻率
要確保單位時間內(nèi)(例如每秒)運行的任務(wù)數(shù)量不超過特定的上限(如5個任務(wù)),并且在任務(wù)執(zhí)行完成得很快時,考慮已完成的任務(wù)和正在執(zhí)行的任務(wù)作為正在運行的任務(wù)總數(shù),可以使用限流器來控制任務(wù)的啟動頻率,并結(jié)合使用信號量來管理同時運行的任務(wù)數(shù)量。
具體來說,使用一個信號量來限制同時進(jìn)行的任務(wù)數(shù)量,并且在任務(wù)完成時,僅在下一秒鐘允許新的任務(wù)開始,以確保即使某些任務(wù)快速完成,也不會在同一秒鐘內(nèi)啟動超過限制數(shù)量的任務(wù)
package main import ( "context" "fmt" "math/rand" "sync" "sync/atomic" "time" "golang.org/x/time/rate" ) func RateLimit() { const maxJobsPerSecond = 5 const numJobs = 22 var wg sync.WaitGroup // 計數(shù)器 var runningJobs int32 // 當(dāng)前正在執(zhí)行的任務(wù)數(shù)量 var startedJobs int32 // 啟動后的任務(wù)數(shù)量 var finishedJobs int32 // 剛完成的任務(wù)數(shù)量 limiter := rate.NewLimiter(rate.Every(time.Second/time.Duration(maxJobsPerSecond)), maxJobsPerSecond) semaphore := make(chan struct{}, maxJobsPerSecond) for i := 1; i <= numJobs; i++ { wg.Add(1) go func(jobID int) { defer wg.Done() limiter.Wait(context.Background()) // 等待限流器允許進(jìn)行下一個任務(wù) semaphore <- struct{}{} // 獲取信號量 atomic.AddInt32(&startedJobs, 1) atomic.AddInt32(&runningJobs, 1) executeJob(jobID) // 執(zhí)行任務(wù) atomic.AddInt32(&finishedJobs, 1) atomic.AddInt32(&runningJobs, -1) <-time.After(time.Second) // 等待一秒鐘后釋放信號量 <-semaphore // 打印當(dāng)前狀態(tài) printStatus(&runningJobs, &startedJobs, &finishedJobs) }(i) } wg.Wait() fmt.Println("所有工作完成") }
注意事項
- 限流器rate.NewLimiter用于控制任務(wù)啟動的頻率,以確保每秒不超過maxJobsPerSecond個任務(wù)開始執(zhí)行。
- 使用信號量semaphore來控制同時進(jìn)行的任務(wù)數(shù)量。
- 為了確保在任何一秒內(nèi)同時進(jìn)行的任務(wù)數(shù)量不超過限制,在任務(wù)完成后等待一秒鐘,然后再釋放信號量。這樣做可以保證即使任務(wù)很快完成,也不會立即啟動新的任務(wù)。
這種實現(xiàn)方式確保了即使任務(wù)執(zhí)行得很快,每秒鐘啟動的新任務(wù)數(shù)量也不會超過限制,并且同時考慮了正在執(zhí)行和剛剛完成的任務(wù)。
動態(tài)創(chuàng)建協(xié)程
- 協(xié)程的啟動是動態(tài)的。在代碼中,每個任務(wù)對應(yīng)于一個動態(tài)創(chuàng)建的協(xié)程。這些協(xié)程是在循環(huán)中根據(jù)任務(wù)數(shù)量(numJobs)動態(tài)生成的。
- 具體來說,每當(dāng)有一個新的任務(wù)需要執(zhí)行時,都會創(chuàng)建一個新的協(xié)程來處理這個任務(wù)。這是通過在main函數(shù)的循環(huán)中調(diào)用go關(guān)鍵字實現(xiàn)的。這個過程在每次循環(huán)迭代中發(fā)生,從而為每個任務(wù)動態(tài)創(chuàng)建一個新的協(xié)程
由于使用了限流器(rate.Limiter),這些協(xié)程不是一次性全部創(chuàng)建,而是根據(jù)限流器允許的速率逐個創(chuàng)建。每個協(xié)程在開始執(zhí)行任務(wù)之前會等待限流器的許可,以此確保每秒啟動的任務(wù)數(shù)量不超過設(shè)定的最大值
func executeJob(jobID int) { startTime := time.Now() // 記錄任務(wù)開始時間 // 模擬任務(wù)執(zhí)行時間 fmt.Printf("%v Job %d started\n",time.Now().Format("2006-01-02 15:04:05.000"), jobID) // 初始化隨機(jī)數(shù)種子 rand.Seed(time.Now().UnixNano()) // 隨機(jī)生成一個時間間隔(例如,1到5000毫秒之間) min := 1 max := 5000 duration := time.Duration(rand.Intn(max-min+1)+min) * time.Millisecond time.Sleep(duration) durationCost := time.Since(startTime) // 計算任務(wù)耗時 fmt.Printf("%v Job %d finished Cost:%v\n", time.Now().Format("2006-01-02 15:04:05.000"),jobID, durationCost) } func printStatus(runningJobs, startedJobs, finishedJobs *int32) { fmt.Printf("Current status - Running: %d, Started: %d, Finished: %d\n", atomic.LoadInt32(runningJobs), atomic.LoadInt32(startedJobs), atomic.LoadInt32(finishedJobs)) }
可以在代碼中添加額外的邏輯來跟蹤和打印正在執(zhí)行、進(jìn)行中、剛啟動和剛完成的任務(wù)數(shù)量。使用原子操作(來自sync/atomic包)來確保在并發(fā)環(huán)境下對這些計數(shù)器的操作是安全的。
在這個示例中:
使用sync/atomic包中的AddInt32和LoadInt32來安全地增加和讀取計數(shù)器的值。
在每個任務(wù)開始時,增加startedJobs和runningJobs計數(shù)器。
在每個任務(wù)完成時,增加finishedJobs計數(shù)器,并減少runningJobs計數(shù)器。
在任務(wù)完成后和釋放信號量前,打印當(dāng)前的任務(wù)狀態(tài)。
注意事項
這種方法可以幫助我們跟蹤不同狀態(tài)下的任務(wù)數(shù)量。
使用原子操作確保在并發(fā)環(huán)境中對計數(shù)器的讀寫是安全的。
printStatus函數(shù)在每個任務(wù)的結(jié)束時被調(diào)用,以打印當(dāng)前的任務(wù)狀態(tài)
以上就是Golang根據(jù)job數(shù)量動態(tài)控制每秒?yún)f(xié)程的最大創(chuàng)建數(shù)量方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang job控制協(xié)程創(chuàng)建數(shù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang中基于HTTP協(xié)議的網(wǎng)絡(luò)服務(wù)
HTTP協(xié)議是基于TCP/IP協(xié)議棧的,并且它也是一個面向普通文本的協(xié)議。這篇文章主要詳細(xì)介紹了Golang中基于HTTP協(xié)議的網(wǎng)絡(luò)服務(wù),感興趣的小伙伴可以借鑒一下2023-04-04Go?panic的三種產(chǎn)生方式細(xì)節(jié)探究
這篇文章主要介紹了Go?panic的三種產(chǎn)生方式細(xì)節(jié)探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Golang實現(xiàn)數(shù)據(jù)結(jié)構(gòu)Stack(堆棧)的示例詳解
在計算機(jī)科學(xué)中,stack(棧)是一種基本的數(shù)據(jù)結(jié)構(gòu),它是一種線性結(jié)構(gòu),具有后進(jìn)先出(Last In First Out)的特點。本文將通過Golang實現(xiàn)堆棧,需要的可以參考一下2023-04-04深入理解Golang中的Protocol Buffers及其應(yīng)用
本篇文章將深入探討 Go 語言中使用 Protobuf 的基礎(chǔ)知識、常見應(yīng)用以及最佳實踐,希望能幫大家了解如何在項目中高效利用 Protobuf2024-11-11golang協(xié)程設(shè)計及調(diào)度原理
這篇文章主要介紹了golang協(xié)程設(shè)計及調(diào)度原理,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,感興趣的小伙伴可以參考一下2022-06-06