Go實(shí)現(xiàn)一個(gè)輕量級(jí)并發(fā)任務(wù)調(diào)度器(支持限速)
前言
在日常開(kāi)發(fā)中,我們經(jīng)常會(huì)遇到這樣的場(chǎng)景:
- 有一堆任務(wù)要跑(比如:發(fā)請(qǐng)求、處理數(shù)據(jù)、爬蟲等)
- 不希望一次性全部跑完,擔(dān)心打爆服務(wù)端或者被封
- 想要設(shè)置并發(fā)數(shù)、限速,還能控制任務(wù)重試、失敗記錄
那么,能不能用 Go 實(shí)現(xiàn)一個(gè)“輕量級(jí)的并發(fā)任務(wù)調(diào)度器”?——答案是:當(dāng)然可以!
今天我們就來(lái)用 Go 從零實(shí)現(xiàn)一個(gè)可配置的任務(wù)調(diào)度器,支持:
- 最大并發(fā)數(shù)控制(worker pool)
- 每秒請(qǐng)求速率限制(rate limit)
- 簡(jiǎn)單的失敗重試機(jī)制
- 支持結(jié)果收集與錯(cuò)誤輸出
效果展示
你可以像這樣調(diào)用我們的調(diào)度器:
scheduler := NewScheduler(5, 10) // 并發(fā) 5,速率限制每秒 10 次
for i := 0; i < 100; i++ {
task := NewTask(func() error {
// 模擬網(wǎng)絡(luò)請(qǐng)求或業(yè)務(wù)邏輯
fmt.Println("正在處理任務(wù):", i)
time.Sleep(300 * time.Millisecond)
return nil
})
scheduler.Submit(task)
}
scheduler.Wait()
fmt.Println("全部任務(wù)完成")核心組件設(shè)計(jì)
1. 任務(wù)(Task)
我們將每個(gè)任務(wù)抽象為一個(gè)結(jié)構(gòu)體:
type Task struct {
fn func() error
retry int
}2. 調(diào)度器(Scheduler)
負(fù)責(zé)維護(hù)任務(wù)隊(duì)列、worker、速率限制器:
type Scheduler struct {
tasks chan *Task
wg sync.WaitGroup
rateLimiter <-chan time.Time
}實(shí)現(xiàn)代碼
下面是完整實(shí)現(xiàn)(可以直接復(fù)制使用):
type Task struct {
fn func() error
retry int
}
func NewTask(fn func() error) *Task {
return &Task{fn: fn, retry: 3}
}
type Scheduler struct {
tasks chan *Task
wg sync.WaitGroup
rateLimiter <-chan time.Time
}
func NewScheduler(concurrency int, ratePerSecond int) *Scheduler {
s := &Scheduler{
tasks: make(chan *Task, 100),
rateLimiter: time.Tick(time.Second / time.Duration(ratePerSecond)),
}
for i := 0; i < concurrency; i++ {
go s.worker()
}
return s
}
func (s *Scheduler) Submit(task *Task) {
s.wg.Add(1)
s.tasks <- task
}
func (s *Scheduler) worker() {
for task := range s.tasks {
<-s.rateLimiter // 限速
err := task.fn()
if err != nil && task.retry > 0 {
fmt.Println("任務(wù)失敗,重試中...")
task.retry--
s.Submit(task)
} else if err != nil {
fmt.Println("任務(wù)最終失敗:", err)
}
s.wg.Done()
}
}
func (s *Scheduler) Wait() {
s.wg.Wait()
close(s.tasks)
}實(shí)戰(zhàn)應(yīng)用場(chǎng)景
- 網(wǎng)絡(luò)爬蟲限速抓取
- 批量發(fā)送郵件/SMS/請(qǐng)求,防止接口限流
- 云服務(wù)任務(wù)調(diào)度、批量自動(dòng)化操作
- 異步數(shù)據(jù)采集和聚合
總結(jié)
Go 的并發(fā)模型非常適合處理“海量任務(wù) + 控制速率 + 錯(cuò)誤重試”的需求。本篇實(shí)現(xiàn)的調(diào)度器非常輕量,適合作為基礎(chǔ)組件集成到你自己的系統(tǒng)中。
如果你有更多需求,比如:
- 增加失敗回調(diào)
- 支持超時(shí)控制
- 任務(wù)優(yōu)先級(jí)
- 后臺(tái)監(jiān)控 dashboard
到此這篇關(guān)于Go實(shí)現(xiàn)一個(gè)輕量級(jí)并發(fā)任務(wù)調(diào)度器(支持限速)的文章就介紹到這了,更多相關(guān)Go 并發(fā)任務(wù)調(diào)度器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解golang開(kāi)發(fā)中http請(qǐng)求redirect的問(wèn)題
這篇文章主要介紹了詳解golang開(kāi)發(fā)中http請(qǐng)求redirect的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例
本文主要介紹了Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
Linux系統(tǒng)下Go語(yǔ)言開(kāi)發(fā)環(huán)境搭建
這篇文章主要介紹了Linux系統(tǒng)下Go開(kāi)發(fā)環(huán)境搭建,需要的朋友可以參考下2022-04-04
gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼
這篇文章主要介紹了gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11

