Go實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)的實例代碼
一、背景
平常我們在開發(fā)API的時候,前端傳遞過來的大批數(shù)據(jù)需要經(jīng)過后端處理,如果后端處理的速度快,前端響應(yīng)就快,反之則很慢,影響用戶體驗。針對這種場景我們一般都是后臺異步處理,不需要前端等待所有的都執(zhí)行完才返回。為了解決這一問題,需要我們自己實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)。
二、任務(wù)調(diào)度器實現(xiàn)

poll.go
package poller
import (
"context"
"fmt"
"log"
"sync"
"time"
)
type Poller struct {
routineGroup *goroutineGroup // 并發(fā)控制
workerNum int // 記錄同時在運行的最大goroutine數(shù)
sync.Mutex
ready chan struct{} // 某個goroutine已經(jīng)準(zhǔn)備好了
metric *metric // 統(tǒng)計當(dāng)前在運行中的goroutine數(shù)量
}
func NewPoller(workerNum int) *Poller {
return &Poller{
routineGroup: newRoutineGroup(),
workerNum: workerNum,
ready: make(chan struct{}, 1),
metric: newMetric(),
}
}
// 調(diào)度器
func (p *Poller) schedule() {
p.Lock()
defer p.Unlock()
if int(p.metric.BusyWorkers()) >= p.workerNum {
return
}
select {
case p.ready <- struct{}{}: // 只要滿足當(dāng)前goroutine數(shù)量小于最大goroutine數(shù)量 那么就通知poll去調(diào)度goroutine執(zhí)行任務(wù)
default:
}
}
func (p *Poller) Poll(ctx context.Context) error {
for {
// step01
p.schedule() // 調(diào)度
select {
case <-p.ready: // goroutine準(zhǔn)備好之后 這里就會有消息
case <-ctx.Done():
return nil
}
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
// step02
task, err := p.fetch(ctx) // 獲取任務(wù)
if err != nil {
log.Println("fetch task error:", err.Error())
break
}
fmt.Println(task)
p.metric.IncBusyWorker() // 當(dāng)前正在運行的goroutine+1
// step03
p.routineGroup.Run(func() { // 執(zhí)行任務(wù)
if err := p.execute(ctx, task); err != nil {
log.Println("execute task error:", err.Error())
}
})
break LOOP
}
}
}
}
func (p *Poller) fetch(ctx context.Context) (string, error) {
time.Sleep(1000 * time.Millisecond)
return "task", nil
}
func (p *Poller) execute(ctx context.Context, task string) error {
defer func() {
p.metric.DecBusyWorker() // 執(zhí)行完成之后 goroutine數(shù)量-1
p.schedule() // 重新調(diào)度下一個goroutine去執(zhí)行任務(wù) 這一步是必須的
}()
return nil
}metric.go
package poller
import "sync/atomic"
type metric struct {
busyWorkers uint64
}
func newMetric() *metric {
return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}goroutine_group.go
package poller
import "sync"
type goroutineGroup struct {
waitGroup sync.WaitGroup
}
func newRoutineGroup() *goroutineGroup {
return new(goroutineGroup)
}
func (g *goroutineGroup) Run(fn func()) {
g.waitGroup.Add(1)
go func() {
defer g.waitGroup.Done()
fn()
}()
}
func (g *goroutineGroup) Wait() {
g.waitGroup.Wait()
}三、測試
package main
import (
"context"
"fmt"
"ta/poller"
"go.uber.org/goleak"
"testing"
)
func TestMain(m *testing.M) {
fmt.Println("start")
goleak.VerifyTestMain(m)
}
func TestPoller(t *testing.T) {
producer := poller.NewPoller(5)
producer.Poll(context.Background())
}結(jié)果:

四、總結(jié)
大家用別的方式也可以實現(xiàn),核心要點就是控制并發(fā)節(jié)奏,防止大量請求打到task service,在這里起到核心作用的就是schedule,它控制著整個任務(wù)系統(tǒng)的調(diào)度。同時還封裝了WaitGroup,這在大多數(shù)開源代碼中都比較常見,大家可以去嘗試。另外就是test case一定得跟上,防止goroutine泄漏。
以上就是Go實現(xiàn)后臺任務(wù)調(diào)度系統(tǒng)的實例代碼的詳細(xì)內(nèi)容,更多關(guān)于Go后臺任務(wù)調(diào)度系統(tǒng)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go?slice切片make生成append追加copy復(fù)制示例
這篇文章主要為大家介紹了Go使用make生成切片、使用append追加切片元素、使用copy復(fù)制切片使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06
一文帶你了解Golang中select的實現(xiàn)原理
select是go提供的一種跟并發(fā)相關(guān)的語法,非常有用。本文將介紹?Go?語言中的?select?的實現(xiàn)原理,包括?select?的結(jié)構(gòu)和常見問題、編譯期間的多種優(yōu)化以及運行時的執(zhí)行過程2023-02-02
Go語言實現(xiàn)一個Http?Server框架(一)?http庫的使用
本文主要介紹用Go語言實現(xiàn)一個Http?Server框架中對http庫的基本使用說明,文中有詳細(xì)的代碼示例,感興趣的同學(xué)可以借鑒一下2023-04-04
Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解
這篇文章主要為大家介紹了Go并發(fā)編程結(jié)構(gòu)體多字段原子操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12

