通過源碼分析Golang?cron的實現(xiàn)原理
前言
golang實現(xiàn)定時任務(wù)很簡單,只須要簡單幾步代碼即可以完成,最近在做了幾個定時任務(wù),想研究一下它內(nèi)部是怎么實現(xiàn)的,所以將源碼過了一遍,記錄和分享在此。需要的朋友可以參考以下內(nèi)容,希望對大家有幫助。
關(guān)于go cron是如何使用的可以參考之前的文章:一文帶你入門Go語言中定時任務(wù)庫Cron的使用
Demo示例
package main import ( "fmt" "github.com/robfig/cron/v3" ) func main() { // 創(chuàng)建一個默認的cron對象 c := cron.New() //添加執(zhí)行任務(wù) c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("@hourly", func() { fmt.Println("Every hour, starting an hour from now") }) c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty, starting an hour thirty from now") }) //開始執(zhí)行任務(wù) c.Start() select {} //阻塞 }
通過上面的示例,可以發(fā)現(xiàn), cron 最常用的幾個函數(shù):
- New(): 實例化一個 cron 對象。
- Cron.AddFunc(): 向 Cron 對象中添加一個作業(yè),接受兩個參數(shù),第一個是 cron 表達式,第二個是一個無參無返回值的函數(shù)(作業(yè))。
- Cron.Stop(): 停止調(diào)度,Stop 之后不會再有未執(zhí)行的作業(yè)被喚醒,但已經(jīng)開始執(zhí)行的作業(yè)不會受影響。
源碼實現(xiàn)
在了解其整體邏輯的實現(xiàn)過程前,先了解兩個重要的結(jié)構(gòu)體Entry
和Cron
:
位置在/robfig/cron/cron.go
。
結(jié)構(gòu)體 Cron 和 Entry
Cron
主要負責維護所有的任務(wù)數(shù)據(jù),調(diào)用相關(guān)的func時間指定,可以啟動、停止任務(wù)等;Entry是對添加到 Cron
中的任務(wù)的封裝,每個 Entry
有一個 ID
,除此之外,Entry
里保存了這個任務(wù)上次運行的時間和下次運行的時間。具體代碼實現(xiàn)如下:
// Entry 數(shù)據(jù)結(jié)構(gòu),每一個被調(diào)度實體一個 type Entry struct { // 唯一id,用于查詢和刪除 ID EntryID // 本Entry的調(diào)度時間,不是絕對時間,在生成entry時會計算出來 Schedule Schedule // 本entry下次需要執(zhí)行的絕對時間,會一直被更新 // 被封裝的含義是Job可以多層嵌套,可以實現(xiàn)基于需要執(zhí)行Job的額外處理 // 比如抓取Job異常、如果Job沒有返回下一個時間點的Job是還是繼續(xù)執(zhí)行還是delay Next time.Time // 上一次被執(zhí)行時間,主要用來查詢 Prev time.Time // WrappedJob 是真實執(zhí)行的Job實體 WrappedJob Job // Job 主要給用戶查詢 Job Job } // Cron保持任意數(shù)量的任務(wù)的軌道,調(diào)用相關(guān)的func時間表指定。它可以被啟動,停止,可運行的同時進行檢查。 type Cron struct { entries []*Entry // 保存了所有加入到 Cron 的任務(wù) // chain 用來定義entry里的warppedJob使用什么邏輯(e.g. skipIfLastRunning) // 即一個cron里所有entry只有一個封裝邏輯 chain Chain stop chan struct{} // 停止整個cron的channel add chan *Entry // 增加一個entry的channel remove chan EntryID // 移除一個entry的channel snapshot chan chan []Entry // 獲取entry整體快照的channel running bool // 代表是否已經(jīng)在執(zhí)行,是cron為使用者提供的動態(tài)修改entry的接口準備的 logger Logger // 封裝golang的log包 runningMu sync.Mutex // 用來修改運行中的cron數(shù)據(jù),比如增加entry,移除entry location *time.Location // 地理位置 parser ScheduleParser // 對時間格式的解析,為interface, 可以定制自己的時間規(guī)則。 nextID EntryID // entry的全局ID,新增一個entry就加1 jobWaiter sync.WaitGroup // run job時會進行add(1), job 結(jié)束會done(),stop整個cron,以此保證所有job都能退出 }
New()實現(xiàn)
cron.go
中的New()
方法用來創(chuàng)建并返回一個Cron
對象指針,其實現(xiàn)如下:
func New(opts ...Option) *Cron { c := &Cron{ entries: nil, chain: NewChain(), add: make(chan *Entry), stop: make(chan struct{}), snapshot: make(chan chan []Entry), remove: make(chan EntryID), running: false, runningMu: sync.Mutex{}, logger: DefaultLogger, location: time.Local, parser: standardParser, } for _, opt := range opts { opt(c) } return c }
AddFunc()實現(xiàn)
AddFunc()
用于向Corn
中添加一個任務(wù),AddFunc()中將func
包裝成 Job
類型然后調(diào)用AddJob()
,AddFunc()
相較于 AddJob()
幫用戶省去了包裝成 Job 類型的一步,在 AddJob()
中,調(diào)用了 standardParser.Parse()
將 cron
表達式解釋成了 schedule
類型,最終,他們調(diào)用了 Schedule()
方法;其代碼實現(xiàn)如下:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { return c.AddJob(spec, FuncJob(cmd)) //包裝成job類型然后調(diào)用AddJob()方法 } func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { schedule, err := c.parser.Parse(spec) //將cron表達式解析成schedule類型 if err != nil { return 0, err } return c.Schedule(schedule, cmd), nil //調(diào)用Schedule() } func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { c.runningMu.Lock() //為了保證線程安全,加鎖 defer c.runningMu.Unlock() c.nextID++ //下一EntryID entry := &Entry{ ID: c.nextID, Schedule: schedule, WrappedJob: c.chain.Then(cmd), Job: cmd, } // Cron是否處于運行狀態(tài) if !c.running { c.entries = append(c.entries, entry) // 追加到entries列表中 } else { c.add <- entry // 發(fā)送到Cron的add chan } return entry.ID }
Schedule()
這個方法負責創(chuàng)建 Entry
結(jié)構(gòu)體,并把它追加到 Cron
的 entries
列表中,如果 Cron
已經(jīng)處于運行狀態(tài),會將這個創(chuàng)建好的 entry
發(fā)送到 Cron
的 add chan
中,在run()
中會處理這種情況。
Start()實現(xiàn)
Start()
用于開始執(zhí)行 Cron
,其代碼實現(xiàn)如下:
func (c *Cron) Start() { c.runningMu.Lock() // 獲取鎖 defer c.runningMu.Unlock() if c.running { return } c.running = true // 將 c.running 置為 true 表示 cron 已經(jīng)在運行中了 go c.run() //開啟一個 goroutine 執(zhí)行 c.run() }
通過上面的代碼,可以看到主要干了這么幾件事:
- 獲取鎖,保證線程安全。
- 判斷
cron
是否已經(jīng)在運行中,如果是則直接返回,否則將c.running
置為true
表示cron
已經(jīng)在運行中了。 - 開啟一個
goroutine
執(zhí)行c.run()
。
Run()實現(xiàn)
Run()
是整個cron
的一個核心,它負責處理cron
開始執(zhí)行后的大部分事情, run
中會一直輪循c.entries
中的entry
, 如果一個entry
允許執(zhí)行了,就會開啟單獨的goroutine
去執(zhí)行這個任務(wù)。
// run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { c.logger.Info("start") // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } for { // Determine the next entry to run. // 將定時任務(wù)執(zhí)行時間進行排序,最近最早執(zhí)行的放在前面 sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { // 生成一個定時器,距離最近的任務(wù)時間到時 觸發(fā)定時器的channel,發(fā)送通知 timer = time.NewTimer(c.entries[0].Next.Sub(now)) } for { select { // 定時時間到了,執(zhí)行定時任務(wù),并設(shè)置下次執(zhí)行的時刻 case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now //對每個定時任務(wù)嘗試執(zhí)行 for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } //新增的定時任務(wù)添加到 任務(wù)列表中 case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) //獲取 當前所有定時任務(wù)(快照) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue //停止定時任務(wù),timer停止即可完成此功能 case <-c.stop: timer.Stop() c.logger.Info("stop") return //刪除某個定時任務(wù) case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break } } }
Stop()實現(xiàn)
Stop()
用來停止Cron
的運行,但已經(jīng)在執(zhí)行中的作業(yè)是不會被打斷的,也就是從執(zhí)行 Stop()
之后,不會再有新的任務(wù)被調(diào)度:
func (c *Cron) Stop() context.Context { c.runningMu.Lock() defer c.runningMu.Unlock() if c.running { c.stop <- struct{}{} // 會發(fā)出一個 stop 信號 c.running = false } ctx, cancel := context.WithCancel(context.Background()) go func() { // 等待所有已經(jīng)在執(zhí)行的任務(wù)執(zhí)行完畢 c.jobWaiter.Wait() // 會發(fā)出一個 cancelCtx.Done() 信號 cancel() }() return ctx }
Remove()實現(xiàn)
Remove() 用于移除一個任務(wù):
func (c *Cron) Remove(id EntryID) { c.runningMu.Lock() defer c.runningMu.Unlock() if c.running { c.remove <- id // 會發(fā)出一個 remove 信號 } else { c.removeEntry(id) } } func (c *Cron) removeEntry(id EntryID) { var entries []*Entry for _, e := range c.entries { if e.ID != id { entries = append(entries, e) } } c.entries = entries }
小結(jié)
到此這篇關(guān)于Golang Cron 定時任務(wù)的內(nèi)部實現(xiàn)的文章就介紹到這了, 其中重點如下:
在Go Cron
內(nèi)部維護了兩個結(jié)構(gòu)體Cron
和Entry
,用于維護任務(wù)數(shù)據(jù),cron.Start()
執(zhí)行后,cron
的后臺程序c.Run()
就開始執(zhí)行了,Run()
是整個cron
的一個核心,它負責處理cron
開始執(zhí)行后的大部分事情, run
中會一直輪循c.entries
中的entry
, 每個entry都包含自己下一次執(zhí)行的絕對時間,如果一個entry
允許執(zhí)行了,就會開啟單獨的goroutine
去執(zhí)行這個任務(wù)。
以上就是通過源碼分析Golang cron的實現(xiàn)原理的詳細內(nèi)容,更多關(guān)于Golang cron的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解Go語言中如何通過Goroutine實現(xiàn)高并發(fā)
在Go語言中,并發(fā)編程是一個核心且強大的特性,Go語言通過goroutine和channel等機制,使得并發(fā)編程變得更加簡單和直觀,本文給大家介紹了Go語言中如何通過Goroutine快速實現(xiàn)高并發(fā),感興趣的小伙伴跟著小編一起來看看吧2024-10-10GoFrame框架gredis優(yōu)雅的取值和類型轉(zhuǎn)換
這篇文章主要為大家介紹了GoFrame框架gredis優(yōu)雅的取值和類型轉(zhuǎn)換,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06