Go實(shí)現(xiàn)一個(gè)輕量級(jí)并發(fā)任務(wù)調(diào)度器(支持限速)
前言
在日常開(kāi)發(fā)中,我們經(jīng)常會(huì)遇到這樣的場(chǎng)景:
- 有一堆任務(wù)要跑(比如:發(fā)請(qǐng)求、處理數(shù)據(jù)、爬蟲(chóng)等)
- 不希望一次性全部跑完,擔(dān)心打爆服務(wù)端或者被封
- 想要設(shè)置并發(fā)數(shù)、限速,還能控制任務(wù)重試、失敗記錄
那么,能不能用 Go 實(shí)現(xiàn)一個(gè)“輕量級(jí)的并發(fā)任務(wù)調(diào)度器”?——答案是:當(dāng)然可以!
今天我們就來(lái)用 Go 從零實(shí)現(xiàn)一個(gè)可配置的任務(wù)調(diào)度器,支持:
- 最大并發(fā)數(shù)控制(worker pool)
- 每秒請(qǐng)求速率限制(rate limit)
- 簡(jiǎn)單的失敗重試機(jī)制
- 支持結(jié)果收集與錯(cuò)誤輸出
效果展示
你可以像這樣調(diào)用我們的調(diào)度器:
scheduler := NewScheduler(5, 10) // 并發(fā) 5,速率限制每秒 10 次 for i := 0; i < 100; i++ { task := NewTask(func() error { // 模擬網(wǎng)絡(luò)請(qǐng)求或業(yè)務(wù)邏輯 fmt.Println("正在處理任務(wù):", i) time.Sleep(300 * time.Millisecond) return nil }) scheduler.Submit(task) } scheduler.Wait() fmt.Println("全部任務(wù)完成")
核心組件設(shè)計(jì)
1. 任務(wù)(Task)
我們將每個(gè)任務(wù)抽象為一個(gè)結(jié)構(gòu)體:
type Task struct { fn func() error retry int }
2. 調(diào)度器(Scheduler)
負(fù)責(zé)維護(hù)任務(wù)隊(duì)列、worker、速率限制器:
type Scheduler struct { tasks chan *Task wg sync.WaitGroup rateLimiter <-chan time.Time }
實(shí)現(xiàn)代碼
下面是完整實(shí)現(xiàn)(可以直接復(fù)制使用):
type Task struct { fn func() error retry int } func NewTask(fn func() error) *Task { return &Task{fn: fn, retry: 3} } type Scheduler struct { tasks chan *Task wg sync.WaitGroup rateLimiter <-chan time.Time } func NewScheduler(concurrency int, ratePerSecond int) *Scheduler { s := &Scheduler{ tasks: make(chan *Task, 100), rateLimiter: time.Tick(time.Second / time.Duration(ratePerSecond)), } for i := 0; i < concurrency; i++ { go s.worker() } return s } func (s *Scheduler) Submit(task *Task) { s.wg.Add(1) s.tasks <- task } func (s *Scheduler) worker() { for task := range s.tasks { <-s.rateLimiter // 限速 err := task.fn() if err != nil && task.retry > 0 { fmt.Println("任務(wù)失敗,重試中...") task.retry-- s.Submit(task) } else if err != nil { fmt.Println("任務(wù)最終失敗:", err) } s.wg.Done() } } func (s *Scheduler) Wait() { s.wg.Wait() close(s.tasks) }
實(shí)戰(zhàn)應(yīng)用場(chǎng)景
- 網(wǎng)絡(luò)爬蟲(chóng)限速抓取
- 批量發(fā)送郵件/SMS/請(qǐng)求,防止接口限流
- 云服務(wù)任務(wù)調(diào)度、批量自動(dòng)化操作
- 異步數(shù)據(jù)采集和聚合
總結(jié)
Go 的并發(fā)模型非常適合處理“海量任務(wù) + 控制速率 + 錯(cuò)誤重試”的需求。本篇實(shí)現(xiàn)的調(diào)度器非常輕量,適合作為基礎(chǔ)組件集成到你自己的系統(tǒng)中。
如果你有更多需求,比如:
- 增加失敗回調(diào)
- 支持超時(shí)控制
- 任務(wù)優(yōu)先級(jí)
- 后臺(tái)監(jiān)控 dashboard
到此這篇關(guān)于Go實(shí)現(xiàn)一個(gè)輕量級(jí)并發(fā)任務(wù)調(diào)度器(支持限速)的文章就介紹到這了,更多相關(guān)Go 并發(fā)任務(wù)調(diào)度器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解golang開(kāi)發(fā)中http請(qǐng)求redirect的問(wèn)題
這篇文章主要介紹了詳解golang開(kāi)發(fā)中http請(qǐng)求redirect的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例
本文主要介紹了Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程池的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06Linux系統(tǒng)下Go語(yǔ)言開(kāi)發(fā)環(huán)境搭建
這篇文章主要介紹了Linux系統(tǒng)下Go開(kāi)發(fā)環(huán)境搭建,需要的朋友可以參考下2022-04-04gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼
這篇文章主要介紹了gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11