Go實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)的實例代碼
一、背景
平常我們在開發(fā)API的時候,前端傳遞過來的大批數(shù)據(jù)需要經(jīng)過后端處理,如果后端處理的速度快,前端響應(yīng)就快,反之則很慢,影響用戶體驗。針對這種場景我們一般都是后臺異步處理,不需要前端等待所有的都執(zhí)行完才返回。為了解決這一問題,需要我們自己實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)。
二、任務(wù)調(diào)度器實現(xiàn)
poll.go
package poller import ( "context" "fmt" "log" "sync" "time" ) type Poller struct { routineGroup *goroutineGroup // 并發(fā)控制 workerNum int // 記錄同時在運行的最大goroutine數(shù) sync.Mutex ready chan struct{} // 某個goroutine已經(jīng)準備好了 metric *metric // 統(tǒng)計當前在運行中的goroutine數(shù)量 } func NewPoller(workerNum int) *Poller { return &Poller{ routineGroup: newRoutineGroup(), workerNum: workerNum, ready: make(chan struct{}, 1), metric: newMetric(), } } // 調(diào)度器 func (p *Poller) schedule() { p.Lock() defer p.Unlock() if int(p.metric.BusyWorkers()) >= p.workerNum { return } select { case p.ready <- struct{}{}: // 只要滿足當前goroutine數(shù)量小于最大goroutine數(shù)量 那么就通知poll去調(diào)度goroutine執(zhí)行任務(wù) default: } } func (p *Poller) Poll(ctx context.Context) error { for { // step01 p.schedule() // 調(diào)度 select { case <-p.ready: // goroutine準備好之后 這里就會有消息 case <-ctx.Done(): return nil } LOOP: for { select { case <-ctx.Done(): break LOOP default: // step02 task, err := p.fetch(ctx) // 獲取任務(wù) if err != nil { log.Println("fetch task error:", err.Error()) break } fmt.Println(task) p.metric.IncBusyWorker() // 當前正在運行的goroutine+1 // step03 p.routineGroup.Run(func() { // 執(zhí)行任務(wù) if err := p.execute(ctx, task); err != nil { log.Println("execute task error:", err.Error()) } }) break LOOP } } } } func (p *Poller) fetch(ctx context.Context) (string, error) { time.Sleep(1000 * time.Millisecond) return "task", nil } func (p *Poller) execute(ctx context.Context, task string) error { defer func() { p.metric.DecBusyWorker() // 執(zhí)行完成之后 goroutine數(shù)量-1 p.schedule() // 重新調(diào)度下一個goroutine去執(zhí)行任務(wù) 這一步是必須的 }() return nil }
metric.go
package poller import "sync/atomic" type metric struct { busyWorkers uint64 } func newMetric() *metric { return &metric{} } func (m *metric) IncBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, 1) } func (m *metric) DecBusyWorker() uint64 { return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) } func (m *metric) BusyWorkers() uint64 { return atomic.LoadUint64(&m.busyWorkers) }
goroutine_group.go
package poller import "sync" type goroutineGroup struct { waitGroup sync.WaitGroup } func newRoutineGroup() *goroutineGroup { return new(goroutineGroup) } func (g *goroutineGroup) Run(fn func()) { g.waitGroup.Add(1) go func() { defer g.waitGroup.Done() fn() }() } func (g *goroutineGroup) Wait() { g.waitGroup.Wait() }
三、測試
package main import ( "context" "fmt" "ta/poller" "go.uber.org/goleak" "testing" ) func TestMain(m *testing.M) { fmt.Println("start") goleak.VerifyTestMain(m) } func TestPoller(t *testing.T) { producer := poller.NewPoller(5) producer.Poll(context.Background()) }
結(jié)果:
四、總結(jié)
大家用別的方式也可以實現(xiàn),核心要點就是控制并發(fā)節(jié)奏,防止大量請求打到task service
,在這里起到核心作用的就是schedule
,它控制著整個任務(wù)系統(tǒng)的調(diào)度。同時還封裝了WaitGroup
,這在大多數(shù)開源代碼中都比較常見,大家可以去嘗試。另外就是test case
一定得跟上,防止goroutine
泄漏。
以上就是Go實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)的實例代碼的詳細內(nèi)容,更多關(guān)于Go后臺任務(wù)調(diào)度系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go?slice切片make生成append追加copy復(fù)制示例
這篇文章主要為大家介紹了Go使用make生成切片、使用append追加切片元素、使用copy復(fù)制切片使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06一文帶你了解Golang中select的實現(xiàn)原理
select是go提供的一種跟并發(fā)相關(guān)的語法,非常有用。本文將介紹?Go?語言中的?select?的實現(xiàn)原理,包括?select?的結(jié)構(gòu)和常見問題、編譯期間的多種優(yōu)化以及運行時的執(zhí)行過程2023-02-02Go語言實現(xiàn)一個Http?Server框架(一)?http庫的使用
本文主要介紹用Go語言實現(xiàn)一個Http?Server框架中對http庫的基本使用說明,文中有詳細的代碼示例,感興趣的同學(xué)可以借鑒一下2023-04-04Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解
這篇文章主要為大家介紹了Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12