Golang分布式應(yīng)用定時任務(wù)示例詳解
正文
在系統(tǒng)開發(fā)中,有一類任務(wù)不是立即執(zhí)行,而是在未來某個時間點或者按照一定間隔去執(zhí)行,比如日志定期壓縮、報表制作、過期數(shù)據(jù)清理等,這就是定時任務(wù)。
在單機中,定時任務(wù)通常需要實現(xiàn)一個類似crontab的系統(tǒng),一般有兩種方式:
- 最小堆,按照任務(wù)執(zhí)行時間建堆,每次取最近的任務(wù)執(zhí)行
- 時間輪,將任務(wù)放到時間輪列表中,每次轉(zhuǎn)動取對應(yīng)的任務(wù)列表執(zhí)行
最小堆
最小堆是一種特殊的完全二叉樹,任意非葉子節(jié)點的值不大于其子節(jié)點,如圖

通過最小堆,根據(jù)任務(wù)最近執(zhí)行時間鍵堆,每次取堆頂元素即最近需要執(zhí)行的任務(wù),設(shè)置timer定時器,到期后觸發(fā)任務(wù)執(zhí)行。由于堆的特性每次調(diào)整的時間復(fù)雜度為O(lgN),相較于普通隊列性能更快。
在container/heap中已經(jīng)實現(xiàn)操作堆的相關(guān)函數(shù),我們只需要實現(xiàn)定期任務(wù)核心邏輯即可。
// 運行
func (c *Cron) Run() error {
// 設(shè)置cron已啟動,atomic.Bool來保證并發(fā)安全
c.started.Store(true)
// 主循環(huán)
for {
// 如果停止則退出
if !c.started.Load() {
break
}
c.runTask()
}
return nil
}
// 核心邏輯
func (c *Cron) runTask() {
now := time.Now()
duration := infTime
// 獲取堆頂元素
task, ok := c.tasks.Peek()
if ok {
// 如果已刪除則彈出
if !c.set.Has(task.Name()) {
c.tasks.Pop()
return
}
// 計算于當(dāng)前時間查找,設(shè)置定時器
if task.next.After(now) {
duration = task.next.Sub(now)
} else {
duration = 0
}
}
timer := time.NewTimer(duration)
defer timer.Stop()
// 當(dāng)有新元素插入直接返回,防止新元素執(zhí)行時間小于當(dāng)前堆頂元素
select {
case <-c.new:
return
case <-timer.C:
}
// 彈出任務(wù),執(zhí)行
go task.Exec()
// 計算下次執(zhí)行時間,如果為0說明任務(wù)已結(jié)束,否則重新入堆
task.next = task.Next(time.Now())
if task.next.IsZero() {
c.set.Delete(task.Name())
} else {
c.tasks.Push(task)
}
}
主要邏輯可總結(jié)為:
- 將任務(wù)按照下次執(zhí)行時間建最小堆
- 每次取堆頂任務(wù),設(shè)置定時器
- 如果中間有新加入任務(wù),轉(zhuǎn)入步驟2
- 定時器到期后執(zhí)行任務(wù)
- 再次取下個任務(wù),轉(zhuǎn)入步驟2,依次執(zhí)行
時間輪
另一種實現(xiàn)Cron的方式是時間輪,時間輪通過一個環(huán)形隊列,每個插槽放入需要到期執(zhí)行的任務(wù),按照固定間隔轉(zhuǎn)動時間輪,取插槽中任務(wù)列表執(zhí)行,如圖所示:

時間輪可看作一個表盤,如圖中時間間隔為1秒,總共60個格子,如果任務(wù)在3秒后執(zhí)行則放為插槽3,每秒轉(zhuǎn)動次取插槽上所有任務(wù)執(zhí)行。
如果執(zhí)行時間超過最大插槽,比如有個任務(wù)需要63秒后執(zhí)行(超過了最大格子刻度),一般可以通過多層時間輪,或者設(shè)置一個額外變量圈數(shù),只執(zhí)行圈數(shù)為0的任務(wù)。
時間輪插入的時間復(fù)雜度為O(1),獲取任務(wù)列表復(fù)雜度為O(1),執(zhí)行列表最差為O(n)。對比最小堆,時間輪插入刪除元素更快。
核心代碼如下:
// 定義
type TimeWheel struct {
interval time.Duration // 觸發(fā)間隔
slots int // 總插槽數(shù)
currentSlot int // 當(dāng)前插槽數(shù)
tasks []*list.List // 環(huán)形列表,每個元素為對應(yīng)插槽的任務(wù)列表
set containerx.Set[string] // 記錄所有任務(wù)key值,用來檢查任務(wù)是否被刪除
tricker *time.Ticker // 定時觸發(fā)器
logger logr.Logger
}
func (tw *TimeWheel) Run() error {
tw.tricker = time.NewTicker(tw.interval)
for {
// 通過定時器模擬時間輪轉(zhuǎn)動
now, ok := <-tw.tricker.C
if !ok {
break
}
// 轉(zhuǎn)動一次,執(zhí)行任務(wù)列表
tw.RunTask(now, tw.currentSlot)
tw.currentSlot = (tw.currentSlot + 1) % tw.slots
}
return nil
}
func (tw *TimeWheel) RunTask(now time.Time, slot int) {
// 一次執(zhí)行任務(wù)列表
for item := taskList.Front(); item != nil; {
task, ok := item.Value.(*TimeWheelTask)
// 任務(wù)圈數(shù)大于0,不需要執(zhí)行,將圈數(shù)減一
if task.circle > 0 {
task.circle--
item = item.Next()
continue
}
// 運行任務(wù)
go task.Exec()
// 計算任務(wù)下次運行時間
next := item.Next()
taskList.Remove(item)
item = next
task.next = task.Next(now)
if !task.next.IsZero() {
tw.add(now, task)
} else {
tw.Remove(task.Name())
}
}
}
// 添加任務(wù),計算下一次任務(wù)執(zhí)行的插槽與圈數(shù)
func (tw *TimeWheel) add(now time.Time, task *TimeWheelTask) {
if !task.initialized {
task.next = task.Next(now)
task.initialized = true
}
duration := task.next.Sub(now)
if duration <= 0 {
task.slot = tw.currentSlot + 1
task.circle = 0
} else {
mult := int(duration / tw.interval)
task.slot = (tw.currentSlot + mult) % tw.slots
task.circle = mult / tw.slots
}
tw.tasks[task.slot].PushBack(task)
tw.set.Insert(task.Name())
}
時間輪的主要邏輯如下:
- 將任務(wù)存在對應(yīng)插槽的時間
- 通過定時間模擬時間輪轉(zhuǎn)動
- 每次到期后遍歷當(dāng)前插槽的任務(wù)列表,若任務(wù)圈數(shù)為0則執(zhí)行
- 如果任務(wù)未結(jié)束,計算下次執(zhí)行的插槽與圈數(shù)
- 轉(zhuǎn)入步驟2,依次執(zhí)行
總結(jié)
本文主要總結(jié)了定時任務(wù)的兩種實現(xiàn)方式,最小堆與時間輪,并分析其核心實現(xiàn)邏輯。
對于執(zhí)行分布式定時任務(wù),可以借助延時消息隊列或者直接使用Kubernetes的CronJob。
自己開發(fā)的話可以借助Etcd:
- 中心節(jié)點Coordinator將任務(wù)按照一定算法(Hash、輪詢、或者更復(fù)雜的分配算法)將任務(wù)與工作節(jié)點Worker綁定
- 每個Worker添加到有綁定到自己的任務(wù)則取出放到本地的Cron中
- 如果Worker掛掉,執(zhí)行將其上任務(wù)重新綁定即可
本文所有代碼見github.com/qingwave/go…
以上就是Golang分布式應(yīng)用定時任務(wù)示例詳解的詳細內(nèi)容,更多關(guān)于Golang分布式定時的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang限流器time/rate設(shè)計與實現(xiàn)詳解
在?Golang?庫中官方給我們提供了限流器的實現(xiàn)golang.org/x/time/rate,它是基于令牌桶算法(Token?Bucket)設(shè)計實現(xiàn)的,下面我們就來看看他的具體使用吧2024-03-03
go?micro微服務(wù)proto開發(fā)安裝及使用規(guī)則
這篇文章主要為大家介紹了go?micro微服務(wù)proto開發(fā)中安裝Protobuf及基本規(guī)范字段的規(guī)則詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01
Golang?Redis連接池實現(xiàn)原理及示例探究
這篇文章主要為大家介紹了Golang?Redis連接池實現(xiàn)示例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01
一文詳解Golang?定時任務(wù)庫?gron?設(shè)計和原理
這篇文章主要介紹了一文詳解Golang?定時任務(wù)庫?gron?設(shè)計和原理,gron是一個比較小巧、靈活的定時任務(wù)庫,可以執(zhí)行定時的、周期性的任務(wù)。gron提供簡潔的、并發(fā)安全的接口2022-08-08
Golang并發(fā)繞不開的重要組件之Channel詳解
Channel是一個提供可接收和發(fā)送特定類型值的用于并發(fā)函數(shù)通信的數(shù)據(jù)類型,也是Golang并發(fā)繞不開的重要組件之一,本文就來和大家深入聊聊Channel的相關(guān)知識吧2023-06-06
Go基本數(shù)據(jù)類型與string類型互轉(zhuǎn)
本文主要介紹了Go基本數(shù)據(jù)類型與string類型互轉(zhuǎn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03

