Golang定時任務框架GoCron的源碼分析
背景說明
最近工作上有個開發(fā)定時任務的需求,調(diào)研一下后發(fā)現(xiàn)Golang并沒有十分完善的定時任務庫。
整理我這邊的需求如下:
- 支持啟動僅定時執(zhí)行一次的任務;
- 任務在執(zhí)行之前可以完成撤銷;
- 服務重啟之后,未完成的定時任務需要允許重新調(diào)度;
顯然,現(xiàn)成的cron庫無法滿足我的需求。限定于工期,最終自己實現(xiàn)了一個粗糙的事件驅(qū)動定時器。
但這個事件驅(qū)動定時器具有以下的缺點:
- 事件訂閱/通知機制不成熟
- 無法適用于更靈活的場景,例如多節(jié)點的分布式任務調(diào)度執(zhí)行
- 模塊之間的職責不清晰,例如其實Timer模塊是Scheduler調(diào)度器的一部分,Event定時器相關(guān)的部分也是Scheduler調(diào)度器的一部分,而Executor執(zhí)行模塊也存在任務調(diào)度的功能,實際上它只需要負責完成調(diào)度器交給它的任務就好
- 沒有設(shè)計任務調(diào)度池,也就是但凡新建計劃任務,就會在后臺啟動一個協(xié)程持續(xù)監(jiān)聽;一旦任務數(shù)量太多,后臺停留的協(xié)程會越來越多,進程總的消耗就會變得非??鋸?,非??膳?/li>
- 任務調(diào)度時不存在優(yōu)先級的概念,假如相同時間內(nèi)有多個任務同時執(zhí)行,哪個任務被優(yōu)先調(diào)度完全取決于GMP的系統(tǒng)調(diào)度
綜上,我需要著重考察現(xiàn)有的Golang任務調(diào)度框架,對任務定時器進行重新設(shè)計。
GoCron任務調(diào)度庫
調(diào)用實例
package main import ( "fmt" "time" "github.com/jasonlvhit/gocron" ) func task() { fmt.Println("I am running task.") } func taskWithParams(a int, b string) { fmt.Println(a, b) } func main() { // Do jobs without params gocron.Every(1).Second().Do(task) gocron.Every(2).Seconds().Do(task) gocron.Every(1).Minute().Do(task) gocron.Every(2).Minutes().Do(task) gocron.Every(1).Hour().Do(task) gocron.Every(2).Hours().Do(task) gocron.Every(1).Day().Do(task) gocron.Every(2).Days().Do(task) gocron.Every(1).Week().Do(task) gocron.Every(2).Weeks().Do(task) // Do jobs with params gocron.Every(1).Second().Do(taskWithParams, 1, "hello") // Do jobs on specific weekday gocron.Every(1).Monday().Do(task) gocron.Every(1).Thursday().Do(task) // Do a job at a specific time - 'hour:min:sec' - seconds optional gocron.Every(1).Day().At("10:30").Do(task) gocron.Every(1).Monday().At("18:30").Do(task) gocron.Every(1).Tuesday().At("18:30:59").Do(task) // Begin job immediately upon start gocron.Every(1).Hour().From(gocron.NextTick()).Do(task) // Begin job at a specific date/time t := time.Date(2019, time.November, 10, 15, 0, 0, 0, time.Local) gocron.Every(1).Hour().From(&t).Do(task) // NextRun gets the next running time _, time := gocron.NextRun() fmt.Println(time) // Remove a specific job gocron.Remove(task) // Clear all scheduled jobs gocron.Clear() // Start all the pending jobs <- gocron.Start() // also, you can create a new scheduler // to run two schedulers concurrently s := gocron.NewScheduler() s.Every(3).Seconds().Do(task) <- s.Start() }
項目分析
這個工具庫僅有三個文件:
代碼主要分為job和scheduler兩個文件,gocron僅放置了回調(diào)方法和公共方法。項目整體架構(gòu)如下:
gocron通過scheduler維護一個job列表,指定MAXJOBNUM最大工作隊列,限制可執(zhí)行的工作數(shù)大小。
// gocron/scheduler.go // Scheduler struct, the only data member is the list of jobs. // - implements the sort.Interface{} for sorting jobs, by the time nextRun type Scheduler struct { jobs [MAXJOBNUM]*Job // Array store jobs size int // Size of jobs which jobs holding. loc *time.Location // Location to use when scheduling jobs with specified times }
這里需要更正一下,并不是全局列表,僅僅只是跟隨調(diào)度器的生命周期。實際上,代碼確實存在全局的默認調(diào)度器:
var ( defaultScheduler = NewScheduler() )
因此,可以直接調(diào)用。當然也支持實例化自己的調(diào)度器:
s := gocron.NewScheduler() s.Every(3).Seconds().Do(task) <- s.Start()
gocron是典型的鏈式調(diào)用,scheduler對象通過返回job對象,完成job對象的封裝操作之后,加入調(diào)度器內(nèi)部的jobs列表,再通過Start方法啟動調(diào)度器監(jiān)控協(xié)程,輪詢列表中的jobs,一旦找到可執(zhí)行的任務,就會啟動協(xié)程運行job的Func對象。
// Job struct keeping information about job type Job struct { interval uint64 // pause interval * unit between runs jobFunc string // the job jobFunc to run, func[jobFunc] //...... funcs map[string]interface{} // Map for the function task store fparams map[string][]interface{} // Map for function and params of function //...... }
funcs維護一個map,緩存funcName到func的映射關(guān)系。具體封裝在Do方法:
// gocron/job.go // func (j *Job) Do(jobFun interface{}, params ...interface{}) error fname := getFunctionName(jobFun) j.funcs[fname] = jobFun j.fparams[fname] = params j.jobFunc = fname
在執(zhí)行任務時,通過反射回調(diào)func:
// gocron/job.go // func (j *Job) run() ([]reflect.Value, error) result, err := callJobFuncWithParams(j.funcs[j.jobFunc], j.fparams[j.jobFunc]) if err != nil { return nil, err } // gocron/gocron.go func callJobFuncWithParams(jobFunc interface{}, params []interface{}) ([]reflect.Value, error) { f := reflect.ValueOf(jobFunc) if len(params) != f.Type().NumIn() { return nil, ErrParamsNotAdapted } in := make([]reflect.Value, len(params)) for k, param := range params { in[k] = reflect.ValueOf(param) } return f.Call(in), nil }
啟動調(diào)度器時,啟動監(jiān)控協(xié)程:
// Start all the pending jobs // Add seconds ticker func (s *Scheduler) Start() chan bool { stopped := make(chan bool, 1) // ticker每秒產(chǎn)生一個信號 ticker := time.NewTicker(1 * time.Second) go func() { for { // select選擇器阻塞 // case接收到信號則執(zhí)行 // 同時接收到多個信號則隨機選擇一個執(zhí)行 select { // ticker每秒產(chǎn)生一次信號 // RunPending輪詢jobs列表,尋找到了時間可執(zhí)行的任務 case <-ticker.C: s.RunPending() // stopped接收到停止信號,退出調(diào)度器協(xié)程 case <-stopped: ticker.Stop() return } } }() return stopped }
一個調(diào)度器一個協(xié)程,通過統(tǒng)一的調(diào)度協(xié)程去監(jiān)控調(diào)度器任務列表內(nèi)的任務。
// RunPending runs all the jobs that are scheduled to run. func (s *Scheduler) RunPending() { // 輪詢jobs列表,找到到時間可執(zhí)行的任務,創(chuàng)建可執(zhí)行任務列表 runnableJobs, n := s.getRunnableJobs() if n != 0 { for i := 0; i < n; i++ { // 啟動協(xié)程運行 go runnableJobs[i].run() // 刷新job執(zhí)行信息,等待下一輪調(diào)度 runnableJobs[i].lastRun = time.Now() runnableJobs[i].scheduleNextRun() } } }
綜合分析
綜上,gocron有如下好處:
- 鏈式調(diào)用簡單易用
- scheduler和job職責清晰,項目架構(gòu)非常容易理解
- 調(diào)度器一鍵啟動協(xié)程監(jiān)控,只有到了時間可執(zhí)行的任務才會被加入到runablejobs列表,大大減少了進程中協(xié)程的數(shù)量,減少資源消耗
- 調(diào)度器維護的待執(zhí)行任務池,存在預設(shè)的容量大小,限定了同時可執(zhí)行的最大任務數(shù)量,不會導致超量
但它的缺陷也同樣明顯:
- 當不同的線程同時對同一個調(diào)度器進行操作,對任務列表產(chǎn)生的影響是不可預知的。因此這個框架下,最好是每個client維護自己的scheduler對象
- 雖然調(diào)度器維護一個jobs列表,但如果超過列表設(shè)定容量的任務便無法等待執(zhí)行了……這一點gocron并沒有理睬
- 幾乎每秒,為了找到可執(zhí)行的任務去構(gòu)建runablejobs列表,都會輪詢一次任務列表。為了追求結(jié)果的一致,它會對jobs進行排序,雖然Golang編譯器對內(nèi)置的sort方法進行了優(yōu)化,會選舉最快的方式對數(shù)據(jù)進行處理,但依然存在消耗
- 依然是內(nèi)存操作,服務重啟任務列表就不存在了。也沒有考慮到多節(jié)點的場景。
新的GoCron分析
https://github.com/go-co-op/gocron
原gocron的作者居然住進ICU了,管理員說截止至2020年3月依然無法聯(lián)系上他。愿他身體安康……gocron被fork后有了新的發(fā)展,趕緊扒下來學習一下
新的gocron新增了很多內(nèi)容,依然圍繞著Scheduler和Job進行鏈式操作,但新增了executor模塊。executor僅負責執(zhí)行Scheduler調(diào)度過來的任務。
項目架構(gòu)
下面是項目README文檔里公開的架構(gòu)圖:
新功能
新版gocron支持了cron格式的語法
// cron expressions supported s.Cron("*/1 * * * *").Do(task) // every minute
新增了異步和阻塞模式的兩種調(diào)度方式
// you can start running the scheduler in two different ways: // starts the scheduler asynchronously s.StartAsync() // starts the scheduler and blocks current execution path s.StartBlocking()
通過設(shè)置信號量限制可同時運行的任務數(shù)量
// gocron/scheduler.go // SetMaxConcurrentJobs limits how many jobs can be running at the same time. // This is useful when running resource intensive jobs and a precise start time is not critical. func (s *Scheduler) SetMaxConcurrentJobs(n int, mode limitMode) { // 通過對n的配置修改并發(fā)任務數(shù)的大小 s.executor.maxRunningJobs = semaphore.NewWeighted(int64(n)) // limitMode即當可執(zhí)行任務達到最大并發(fā)量時,應該如何處理的邏輯 // RescheduleMode:跳過本次執(zhí)行,等待下一次調(diào)度 // WaitMode:持續(xù)等待,知道可執(zhí)行隊列空出。但,由于等待的任務數(shù)積累,可能導致不可預知的后果,某些任務可能一直等不到執(zhí)行 s.executor.limitMode = mode } // gocron/executor.go // 通過信號量的方式從最大數(shù)量中取一位 // 若通過,下一步可以執(zhí)行函數(shù) if e.maxRunningJobs != nil { if !e.maxRunningJobs.TryAcquire(1) { switch e.limitMode { case RescheduleMode: return case WaitMode: select { case <-stopCtx.Done(): return case <-f.ctx.Done(): return default: } if err := e.maxRunningJobs.Acquire(f.ctx, 1); err != nil { break } } } defer e.maxRunningJobs.Release(1) }
gocron支持指定Job以單例模式運行。通過siglefilght工具庫保證當前僅有一個可運行的Job
// gocron/job.go // SingletonMode prevents a new job from starting if the prior job has not yet // completed it's run // Note: If a job is added to a running scheduler and this method is then used // you may see the job run overrun itself as job is scheduled immediately // by default upon being added to the scheduler. It is recommended to use the // SingletonMode() func on the scheduler chain when scheduling the job. func (j *Job) SingletonMode() { j.mu.Lock() defer j.mu.Unlock() j.runConfig.mode = singletonMode j.jobFunction.limiter = &singleflight.Group{} } // gocron/executor.go switch f.runConfig.mode { case defaultMode: runJob() case singletonMode: // limiter是singlefilght對象,Do方法內(nèi)僅會執(zhí)行一次,保證一次只運行一個任務 _, _, _ = f.limiter.Do("main", func() (interface{}, error) { select { case <-stopCtx.Done(): return nil, nil case <-f.ctx.Done(): return nil, nil default: } runJob() return nil, nil }) }
gocron主要數(shù)據(jù)結(jié)構(gòu)
主要分為schduler調(diào)度器,job任務,以及executor執(zhí)行器對象
追蹤一下調(diào)用鏈的工作流程:
- 初始化一個Scheduler;新版gocron似乎更鼓勵用戶使用自己的scheduler,而不是如同老版一樣維護一個默認的全局調(diào)度器
func NewScheduler(loc *time.Location) *Scheduler { // 這時已經(jīng)將executor同步初始化完畢 // scheduler和executor是一對一的關(guān)系 executor := newExecutor() return &Scheduler{ jobs: make([]*Job, 0), location: loc, running: false, time: &trueTime{}, executor: &executor, tagsUnique: false, timer: afterFunc, } }
- Every方法初始化一個Job,如果scheduler已經(jīng)啟動,即任務列表中已經(jīng)存在一個等待封裝的Job,那么直接取出相應的Job
if s.updateJob || s.jobCreated { job = s.getCurrentJob() }
接下來確定Job的運行周期,并加入到任務列表
s.setJobs(append(s.Jobs(), job))
Every方法返回了新增Job的scheduler,此時scheduler的任務隊列中存在一個Job就緒,等待下一步調(diào)度。
- Do方法帶著回調(diào)的函數(shù)和對應的參數(shù)開始執(zhí)行,它從當前的scheduler中取出一個就緒的Job,進行最后的判斷,如果Job不合格,那么將它從任務隊列中移除,并返回報錯
if job.error != nil { // delete the job from the scheduler as this job // cannot be executed s.RemoveByReference(job) return nil, job.error } // 還有很多判斷條件,這里不一一列舉
將Do方法將要執(zhí)行的函數(shù)封裝進Job。接下來判斷schduler是否啟動:如之前gocron一樣,scheduler也是通過協(xié)程監(jiān)聽并執(zhí)行啟動任務協(xié)程的工作。
之前的scheduler,默認啟動一個ticker,每秒去排序并輪詢?nèi)蝿贞犃?,從中取出滿足條件的任務開始執(zhí)行,效率非常低。而現(xiàn)在的改進是:scheduler啟動監(jiān)聽協(xié)程后;不是以輪詢而是以通知的方式,從channel中獲取Job的Function,再啟動協(xié)程去執(zhí)行。
在這樣的前提下,scheduler監(jiān)聽協(xié)程什么時候啟動是位置的。此處添加一個判斷,當scheduler啟動時,同時啟動runContinuous去完成Job的最后一步操作。若是scheduler沒有啟動,那么直接返回,等待scheduler啟動后再完成操作。
// we should not schedule if not running since we can't foresee how long it will take for the scheduler to start if s.IsRunning() { s.runContinuous(job) }
通過這樣的設(shè)計,在最終啟動scheduler前后,都可以以動態(tài)的方式添加/移除任務。
- scheduler提供了兩種啟動schduler的模式:異步和阻塞(也就是同步啦)
// StartAsync starts all jobs without blocking the current thread func (s *Scheduler) StartAsync() { if !s.IsRunning() { s.start() } } // StartBlocking starts all jobs and blocks the current thread. // This blocking method can be stopped with Stop() from a separate goroutine. func (s *Scheduler) StartBlocking() { s.StartAsync() s.startBlockingStopChanMutex.Lock() s.startBlockingStopChan = make(chan struct{}, 1) s.startBlockingStopChanMutex.Unlock() <-s.startBlockingStopChan }
一般情況下,我們通過異步模式,啟動對所有任務的監(jiān)控
// start starts the scheduler, scheduling and running jobs func (s *Scheduler) start() { // 啟動監(jiān)聽協(xié)程,select選擇器配合channel阻塞 // 直到Job準備執(zhí)行發(fā)送通知 go s.executor.start() // 將scheduler置位為running s.setRunning(true) // 遍歷所有任務,以遞歸的方式監(jiān)控起來 s.runJobs(s.Jobs()) }
比較有意思的是這個部分:
func (s *Scheduler) runJobs(jobs []*Job) { for _, job := range jobs { // 這個函數(shù)是一個遞歸調(diào)用 // 這里對所有Job都以遞歸的方式監(jiān)聽著 s.runContinuous(job) } } // 這是runContinuous的部分代碼 job.setTimer(s.timer(nextRun, func() { if !next.dateTime.IsZero() { for { n := s.now().UnixNano() - next.dateTime.UnixNano() // 某個任務滿足執(zhí)行條件了,退出循環(huán) if n >= 0 { break } s.time.Sleep(time.Duration(n)) } } // 遞歸執(zhí)行本方法 // runContinuous會判斷當前Job是否可執(zhí)行 // 若不則退出,若可以則將Job設(shè)置為立即執(zhí)行,并刷新執(zhí)行時間 // 若Job“立即執(zhí)行”的標志已經(jīng)置位,直接調(diào)用run發(fā)送通知給監(jiān)聽協(xié)程 s.runContinuous(job) }))
這樣的設(shè)計太優(yōu)雅了,大佬們的奇思妙想啊~
- 最后是executor的執(zhí)行,前面已經(jīng)提到過。通過select接收channel通知的形式執(zhí)行下去,核心方法是這個:
runJob := func() { f.incrementRunState() callJobFunc(f.eventListeners.onBeforeJobExecution) callJobFuncWithParams(f.function, f.parameters) callJobFunc(f.eventListeners.onAfterJobExecution) f.decrementRunState() }
eventListeners封裝了兩個接口,用以在執(zhí)行任務和完成任務后發(fā)送給用戶事件通知。
綜合分析
gocron進行了不少方面的優(yōu)化:
- 在任務列表的維護上,可加入調(diào)度的任務數(shù)不再限定為某個值,而是以切片的方式自動增長。但最終能夠并行執(zhí)行的任務數(shù)卻通過信號量多方式加以控制;
- 不再周期性地輪詢?nèi)蝿樟斜?,以期待獲得可運行的任務;而是通過更巧妙的方式,任務遞歸監(jiān)聽,一旦發(fā)現(xiàn)可執(zhí)行的任務,就自行通知scheduler,完成調(diào)度;
- 具備更豐富的語法和模式,用戶可以根據(jù)場景自行選擇;調(diào)度器同時支持異步及同步調(diào)用,而Job也支持周期性輪詢和單點任務;
- scheduler內(nèi)加鎖了,對Jobs列表的操作都會加上讀寫鎖,一些其它的參數(shù)也擁有自己的鎖。這使得scheduler具備線程安全性,但某種程度上影響了對Jobs隊列的操作??紤]到gocron不再鼓勵使用全局Scheduler,而是每個client維護自己的Scheduler,那么被鎖影響的場景會進一步減少,與最終優(yōu)化獲得的性能提升相比,都是值得的。
最后
最后的最后,gocron依然無法滿足我當前的需求,但已經(jīng)不妨礙我對源碼進行下一步的改造:
- 我需要對Job進行上層的封裝,并將要調(diào)用的方法和參數(shù)序列化后存入數(shù)據(jù)庫,直到服務重啟時,能夠找到未完成的任務加載進scheduler重新執(zhí)行
- 我的計劃任務只需要執(zhí)行一次,而無須重復執(zhí)行,這一點已經(jīng)有SingletonMode保證
- 我需要改造gocron,讓它能夠支持在某個時間范圍內(nèi)調(diào)度任務
到此這篇關(guān)于Golang定時任務框架GoCron的源碼分析的文章就介紹到這了,更多相關(guān)Golang定時任務框架GoCron內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go通過SJSON實現(xiàn)動態(tài)修改JSON
在Go語言 json 處理領(lǐng)域,在 json 數(shù)據(jù)處理中,讀取與修改是兩個核心需求,本文我們就來看看如何使用SJSON進行動態(tài)修改JSON吧,有需要的小伙伴可以了解下2025-03-03golang調(diào)用shell命令(實時輸出,終止)
本文主要介紹了golang調(diào)用shell命令(實時輸出,終止),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-02-02掌握Golang中的select語句實現(xiàn)并發(fā)編程
Golang中的select語句用于在多個通道間選擇可讀或可寫的操作,并阻塞等待其中一個通道進行操作??梢杂糜趯崿F(xiàn)超時控制、取消和中斷操作等。同時,select語句支持default分支,用于在沒有任何通道可操作時執(zhí)行默認操作2023-04-04