從源碼解析golang Timer定時器體系
Timer、Ticker使用及其注意事項(xiàng)
在剛開始學(xué)習(xí)golang語言的時候就聽說Timer、Ticker的使用要尤其注意,很容易出現(xiàn)問題,這次就來一探究竟。
本文主要脈絡(luò):
- 介紹定時器體系,并介紹常用使用方式和錯誤使用方式
- 源碼解讀
timer、ticker是什么?
timer和ticker都是定時器,不同的是:
- timer是一次性的定時器
- ticker是循環(huán)定時器,在底層實(shí)現(xiàn)上是timer觸發(fā)后又重新設(shè)置下一次觸發(fā)時間來實(shí)現(xiàn)的
正確的使用姿勢
Timer
對于Timer,可以通過三種函數(shù)創(chuàng)建:time.NewTimer?、time.AfterFunc?、time.After?。
其使用范例如下:
// FnTimer1 Timer的使用用法
func FnTimer1() {
timer := time.NewTimer(time.Second * 5) //返回生成的timer
fmt.Printf("timer 的啟動時間為:%v\n", time.Now())
expire := <-timer.C //注意這個channel是一個容量為1的channel!
fmt.Printf("timer 的觸發(fā)時間為:%v\n", expire)
}
func FnTimer2() {
ch1 := make(chan int, 1)
select {
case e1 := <-ch1:
//如果ch1通道成功讀取數(shù)據(jù),則執(zhí)行該case處理語句
fmt.Printf("1th case is selected. e1=%v",e1)
case <- time.After(2 * time.Second): //time.After直接返回生成的timer的channel,所以一般用于超時控制
fmt.Println("Timed out")
}
}
func FnTimer3() {
_ = time.AfterFunc(time.Second*5, func() { //返回的也是timer,不過可以自己傳入函數(shù)進(jìn)行執(zhí)行
fmt.Printf("定時器觸發(fā)了,觸發(fā)時間為%v\n", time.Now())
})
fmt.Printf("timer 的啟動時間為:%v\n", time.Now())
time.Sleep(10 * time.Second) // 確保定時器觸發(fā)
}
在底層原理上,三種不同的創(chuàng)建方式都是調(diào)用的time.NewTimer?,不同的是:
- ?
time.After?直接返回的是生成的timer的channel,而time.NewTimer?是返回生成的timer。 - ?
time.NewTimer?定時觸發(fā)channel的原理就是定時調(diào)用一個sendTime?函數(shù),這個函數(shù)負(fù)責(zé)向channel中發(fā)送觸發(fā)時間;time.AfterFunc?就是將定時調(diào)用的sendTime?函數(shù)換成了一個自定義的函數(shù)。
補(bǔ)充:
返回的channel的容量為1,因此是一個asynchronous的channel,即在定時器fired(觸發(fā))、stop(停止)、reset(重啟)之后,仍然有可能能收到數(shù)據(jù)~
垃圾回收:在go1.23之前,對于處于active(沒有觸發(fā)且沒有顯式調(diào)用Stop)的Timer,gc是無法回收的,Ticket也是同樣的道理。因此在高并發(fā)的場景下需要顯式搭配??
defer time.Stop()??來解決暫時的“內(nèi)存泄露”的問題。上述兩點(diǎn)在Golang 1.23得到了解決,且可能在未來的版本中channel的容量會改為0(由asynchronous改成sync),在Go 1.23相關(guān)源碼的注釋部分有對應(yīng)的說明。
Ticket
ticket相比于timer,其會循環(huán)觸發(fā),因此通常用于循環(huán)的干一些事情,like:
// FnTicket2 Ticker的正確的使用方法??!
func FnTicket2() {
ticker := time.NewTicker(1 * time.Second)
stopTicker := make(chan struct{})
defer ticker.Stop()
go func() {
for {
select {
case now := <-ticker.C:
// do something
fmt.Printf("ticker 的觸發(fā)時間為:%v\n", now)
case <-stopTicker:
fmt.Println("ticker 結(jié)束")
return
}
}
}()
time.Sleep(4 * time.Second)
stopTicker <- struct{}{}
}
注意:代碼塊中使用
stopTicker? +select?的方式是必須的,由于Stop函數(shù)只是修改定時器的狀態(tài),并不會主動關(guān)閉channel,因此如果直接使用循環(huán)(見?我的github倉庫? )會直接導(dǎo)致Ticker永遠(yuǎn)不退出而導(dǎo)致內(nèi)存泄露!
補(bǔ)充:TIcket = 觸發(fā)時自動設(shè)置下一次時間的Timer,因此上面提到的Go1.23之前的Timer存在的問題依然存在。
- 返回的channel容量為1
- 垃圾回收:由于Ticket會一直循環(huán)觸發(fā),因此如果不顯式調(diào)用??
time.Stop??方法的話,會永久內(nèi)存泄露。此外,對于Ticket,還有一些額外的注意事項(xiàng):
- 需要使用一個額外的channel+select的方式來正確停止Ticket。
- Reset函數(shù)的問題,由于比較長,單獨(dú)整理在golang1.23版本之前 Timer Reset無法正確使用的問題,這里說個結(jié)論 :?
Reset??函數(shù)由于內(nèi)部非原子,因此無法完美的使用,建議使用goroutine+Timer的方式替代!
使用的注意事項(xiàng)
由于Golang 1.23版本對定時器timer、ticker做了很大的改進(jìn),因此要分成1.23之前和1.23及其之后的版本分開考慮:
以下是1.23版本關(guān)于timer、ticker部分的修改:
未停止的定時器和不再被引用的計(jì)時器可以進(jìn)行垃圾回收。在 Go 1.23 之前,未停止的定時器無法被垃圾回收,直到定時器超時,而未停止的計(jì)時器永遠(yuǎn)無法被垃圾回收。Go 1.23 的實(shí)現(xiàn)避免了在不使用
t.Stop? 的程序中出現(xiàn)資源泄漏。定時器通道現(xiàn)在是同步的(無緩沖的),這使
t.Reset? 和t.Stop? 方法具有更強(qiáng)的保證:在其中一個方法返回后,將來從定時器通道接收到的任何值都不會觀察到與舊定時器配置相對應(yīng)的陳舊時間值。在 Go 1.23 之前,無法使用t.Reset? 避免陳舊值,而使用t.Stop? 避免陳舊值需要仔細(xì)使用t.Stop? 的返回值。Go 1.23 的實(shí)現(xiàn)完全消除了這種擔(dān)憂。
總結(jié)一下:1.23版本改進(jìn)了timer、ticker的垃圾回收和 停止、重置的相關(guān)方法(Reset、Stop)。
這也就意味著在1.23版本之前,我們在使用的時候要注意:垃圾回收和停止、重置相關(guān)方法的使用。
由于Reset、Stop方法的外在表現(xiàn)本質(zhì)上上是跟緩沖區(qū)由 有緩沖 改為 無緩沖 相關(guān),因此如果有涉及讀取緩沖區(qū)我們也需要注意相關(guān)特性。
具體來說,對于1.23之前:
- 垃圾回收:TImer的回收只會在定時器觸發(fā)(expired)或者
Stop?之后;Ticker只顯式觸發(fā)Stop?之后才會回收; - ?
Reset?、Stop?使用:對于Timer,沒有完美的做法,無論怎么樣Reset?和Stop?都可能存在一些問題;對于Ticker,記得使用完之后顯式的Stop?;
源碼解讀
源碼解讀版本:release-branch.go1.8?
運(yùn)作原理
timer(ticket)的運(yùn)作依賴于struct p?,源碼位置:src/runtime/runtime2.go:603。
所有的計(jì)時器timer都以最小四叉堆的形式存儲在struct p?的timers?字段中。并且也是交給了計(jì)時器都交由處理器的網(wǎng)絡(luò)輪詢器和調(diào)度器觸發(fā),這種方式能夠充分利用本地性、減少上下文的切換開銷,也是目前性能最好的實(shí)現(xiàn)方式。
一般來說管理定時器的結(jié)構(gòu)有3種:雙向鏈表、最小堆、時間輪(很少)。
雙向鏈表:插入|修改時間:需要遍歷去尋找插入|修改的位置,時間復(fù)雜度O(N);觸發(fā):鏈表頭觸發(fā)即可,時間復(fù)雜度O(1)。
最小堆:插入|修改時間:O(logN)的時間復(fù)雜度;觸發(fā):隊(duì)頭觸發(fā),但是觸發(fā)之后需要調(diào)整堆以保證堆的特性,O(logN)的時間復(fù)雜度。
時間輪:插入|修改時間|觸發(fā)的時間復(fù)雜度都是O(1)。但是需要額外維護(hù)時間輪的結(jié)構(gòu),其占據(jù)空間隨著需要維護(hù)的未來時間長度、時間精度增加。
涉及結(jié)構(gòu)體
定時器體系有兩種timer:一次性觸發(fā)的Timer?和循環(huán)觸發(fā)的Ticker?,這兩種的底層結(jié)構(gòu)是相同的,觸發(fā)邏輯是類似的。
畢竟
Ticker?的功能包含Timer?的功能。
定時器在源碼中使用的結(jié)構(gòu)體是timer?,其定義為:
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
type timer struct {
// If this timer is on a heap, which P's heap it is on.
// puintptr rather than *p to match uintptr in the versions
// of this struct defined in other packages.
pp puintptr //指針,指向持有當(dāng)前timer的p
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
// a well-behaved function and not block.
//
// when must be positive on an active timer.
when int64 //當(dāng)前計(jì)時器被喚醒的時間;
period int64 //兩次被喚醒的間隔;
f func(any, uintptr) //喚醒時要執(zhí)行的函數(shù)
arg any // 計(jì)時器被喚醒時調(diào)用 f 傳入的參數(shù);
seq uintptr
// What to set the when field to in timerModifiedXX status.
nextwhen int64 //當(dāng)定時器狀態(tài)變?yōu)閠imerModifiedXX的時,when 字段下一次要設(shè)置的值
// The status field holds one of the values below.
status uint32 //計(jì)時器的狀態(tài),最重要的字段之一
}
?timer?在p?中的相關(guān)字段為:
type p struct {
// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex //操作timers的時候加鎖
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer //timers數(shù)組
// Number of timers in P's heap.
// Modified using atomic instructions.
numTimers uint32 //總timers的數(shù)量
// Number of timerDeleted timers in P's heap.
// Modified using atomic instructions.
deletedTimers uint32 //處于deleted狀態(tài)的timers的數(shù)量
// Race context used while executing timer functions.
timerRaceCtx uintptr //競態(tài)檢測相關(guān)
}
狀態(tài)機(jī)
go內(nèi)部的定時器的操作是并發(fā)安全的(新建定時器、停止定時器)等,為了支持并發(fā)安全和定時器的高效調(diào)度,在源碼中設(shè)計(jì)了一套關(guān)于定時器的狀態(tài)機(jī),全部的狀態(tài)為:
| 狀態(tài) | 解釋 |
|---|---|
| timerNoStatus | 還沒有設(shè)置狀態(tài) |
| timerWaiting | 定時器等待觸發(fā) |
| timerRunning | timer正在運(yùn)行 |
| timerDeleted | 定時器被標(biāo)記刪除 |
| timerRemoving | 定時器從標(biāo)記刪除到真正刪除的中間態(tài) |
| timerRemoved | 定時器真正被刪除 |
| timerModifying | 正在被修改的中間狀態(tài) |
| timerModifiedEarlier | 定時器被修改到了更早的觸發(fā)時間 |
| timerModifiedLater | 定時器被修改到了更晚的觸發(fā)時間 |
| timerMoving | 已經(jīng)被修改正在被移動 |
修改定時器的狀態(tài)機(jī)涉及如下所示的 7 種不同操作,它們分別承擔(dān)了不同的職責(zé):
- ?
runtime.addtimer? — 向當(dāng)前處理器增加新的計(jì)時器 - ?
runtime.deltimer? — 將計(jì)時器標(biāo)記成timerDeleted? 刪除處理器中的計(jì)時器 - ?
runtime.modtimer? — 網(wǎng)絡(luò)輪詢器會調(diào)用該函數(shù)修改計(jì)時器 - ?
runtime.cleantimers? — 清除隊(duì)列頭中的計(jì)時器,能夠提升程序創(chuàng)建和刪除計(jì)時器的性能 - ?
runtime.adjusttimers? — 調(diào)整處理器持有的計(jì)時器堆,包括移動會稍后觸發(fā)的計(jì)時器、刪除標(biāo)記為timerDeleted? 的計(jì)時器 - ?
runtime.runtimer? — 檢查隊(duì)列頭中的計(jì)時器,在其準(zhǔn)備就緒時運(yùn)行該計(jì)時器
狀態(tài)機(jī)的變化流程圖 可以大概幫我們看出timer?的不同狀態(tài)的流轉(zhuǎn)情況:
@startuml
[*] --> timerNoStatus : 運(yùn)行時創(chuàng)建Timer
timerNoStatus -->timerWaiting : addtimer
timerWaiting -->timerModifying : deltimer、modtimer
timerModifying -->timerDeleted : deltimer
timerModifiedLater -->timerDeleted : deltimer
timerModifiedEarlier -->timerModifying : deltimer
timerDeleted -->timerRemoving : cleantimers、adjusttimers、runtimer
timerRemoving -->timerRemoved : cleantimers、adjusttimers、runtimer
timerModifiedEarlier --> timerMoving : cleantimers、adjusttimers
timerModifiedLater --> timerMoving : cleantimers、adjusttimers
timerMoving --> timerWaiting : cleantimers
timerWaiting --> timerRunning : runtimer
timerRunning --> timerWaiting : runtimer
timerRunning --> timerNoStatus : runtimer
state timerModifiedXX {
state timerModifiedEarlier {
}
state timerModifiedLater {
}
}
timerModifying --> timerModifiedXX : modtimer
timerModifiedXX --> timerModifying : modtimer
timerNoStatus --> timerModifying : modtimer
timerModifying --> timerWaiting : modtimer
timerRemoved --> timerModifying : modtimer
timerDeleted --> timerModifying : modtimer
timerWaiting : 定時器等待觸發(fā)
timerModifying : 定時器狀態(tài)修改的中間態(tài)
timerDeleted : 定時器被標(biāo)記刪除的狀態(tài)
timerRemoving: 定時器從標(biāo)記刪除到真正被刪除的中間態(tài)
timerRemoved: 定時器真正被刪除
timerModifiedEarlier: 定時器被修改到了更早的觸發(fā)時間
timerModifiedLater : 定時器被修改到了更晚的觸發(fā)時間
timerMoving: 定時器在堆上的位置正在重新排序
timerRunning: timer正在運(yùn)行
timerModifiedXX: 定時器在堆上的位置等待重新排序
@enduml
實(shí)際上這些狀態(tài)的流轉(zhuǎn)都被完整的寫在了golang的源碼中,在后面逐個函數(shù)的講解中也會涉及到:
// addtimer: // timerNoStatus -> timerWaiting // anything else -> panic: invalid value // deltimer: // timerWaiting -> timerModifying -> timerDeleted // timerModifiedEarlier -> timerModifying -> timerDeleted // timerModifiedLater -> timerModifying -> timerDeleted // timerNoStatus -> do nothing // timerDeleted -> do nothing // timerRemoving -> do nothing // timerRemoved -> do nothing // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerModifying -> wait until status changes // modtimer: // timerWaiting -> timerModifying -> timerModifiedXX // timerModifiedXX -> timerModifying -> timerModifiedYY // timerNoStatus -> timerModifying -> timerWaiting // timerRemoved -> timerModifying -> timerWaiting // timerDeleted -> timerModifying -> timerModifiedXX // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerRemoving -> wait until status changes // timerModifying -> wait until status changes // cleantimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting // adjusttimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting // runtimer (looks in P's timer heap): // timerNoStatus -> panic: uninitialized timer // timerWaiting -> timerWaiting or // timerWaiting -> timerRunning -> timerNoStatus or // timerWaiting -> timerRunning -> timerWaiting // timerModifying -> wait until status changes // timerModifiedXX -> timerMoving -> timerWaiting // timerDeleted -> timerRemoving -> timerRemoved // timerRunning -> panic: concurrent runtimer calls // timerRemoved -> panic: inconsistent timer heap // timerRemoving -> panic: inconsistent timer heap // timerMoving -> panic: inconsistent timer heap
addtimer源碼
addtimer對于狀態(tài)的操作:
- timerNoStatus -> timerWaiting
- anything else -> panic: invalid value
addtimer的主要功能:向當(dāng)前p的定時器堆中添加當(dāng)前定時器,并嘗試喚醒網(wǎng)絡(luò)輪訓(xùn)器(定時器的執(zhí)行依賴于網(wǎng)絡(luò)輪訓(xùn)器處理)。
這里提到的網(wǎng)絡(luò)輪訓(xùn)器可能讓人有點(diǎn)疑惑,定時器和網(wǎng)絡(luò)有什么關(guān)系?實(shí)際上確實(shí)也沒什么關(guān)系,這里提到的網(wǎng)絡(luò)輪訓(xùn)器重點(diǎn)在于輪訓(xùn)器,指的更多的是select、poll、epoll那套東西。
func addtimer(t *timer) {
// xxx
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
t.status = timerWaiting
when := t.when
// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp) //嘗試清除timers堆中堆頭的元素,以加速定時器添加
doaddtimer(pp, t)
unlock(&pp.timersLock)
wakeNetPoller(when)
releasem(mp)
}
- cleantimers真的能加速嗎?為什么?
deltimer源碼
?time.stopTimer?的底層實(shí)際調(diào)用就是deltimer?。
?deltimer?對于狀態(tài)的操作:
// timerWaiting -> timerModifying -> timerDeleted
// timerModifiedEarlier -> timerModifying -> timerDeleted
// timerModifiedLater -> timerModifying -> timerDeleted
// timerNoStatus -> do nothing
// timerDeleted -> do nothing
// timerRemoving -> do nothing
// timerRemoved -> do nothing
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerModifying -> wait until status changes
?deltimer?的主要功能:對于傳入的定時器進(jìn)行標(biāo)記刪除(狀態(tài)status設(shè)置為timerDeleted?)。
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// 必須要先拿到tpp,因?yàn)楫?dāng)狀態(tài)設(shè)置為timerDeleted之后,timer就有可能被清除(cleantimers函數(shù)),就拿不到tpp了
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
// 這里和上面case的代碼在源碼中除了注釋少一點(diǎn)其他一模一樣,暫不清楚為什么
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
tpp := t.pp.ptr() //先拿到tpp,原理同上
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerNoStatus:
// Removing timer that was never added or
// has already been run. Also see issue 21874.
return false
case timerModifying:
// Simultaneous calls to deltimer and modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
}
modtimer源碼
?time.reset?方法底層實(shí)際上調(diào)用就是modtimer?方法。
?modtimer?對于狀態(tài)的修改,可以簡單的歸納為:
- 當(dāng)前timer還在heap中:修改為
timerModifiedXX?狀態(tài)(等待重新排序觸發(fā)) - 當(dāng)前timer不在heap中:修改為等待調(diào)度
timerWaiting?狀態(tài) - 當(dāng)前timer在修改的中間態(tài)(XXing狀態(tài)):XXing相當(dāng)于是被鎖定的狀態(tài),因此等待狀態(tài)發(fā)生變動
// timerWaiting -> timerModifying -> timerModifiedXX // timerModifiedXX -> timerModifying -> timerModifiedYY // timerNoStatus -> timerModifying -> timerWaiting // timerRemoved -> timerModifying -> timerWaiting // timerDeleted -> timerModifying -> timerModifiedXX // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerRemoving -> wait until status changes // timerModifying -> wait until status changes
modtimer的主要功能:重置定時器。具體來說,首先判斷timer還在不在heap中
- 還在:修改狀態(tài)
timerModifiedXX?,等待重新觸發(fā) - 不在:重新添加到heap中,等待重新觸發(fā)
resettimer源碼
底層調(diào)用的就是modtimer,這里不多贅述了。
// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
cleantimers源碼
在addtimer中有調(diào)用,具體會在添加新定時器之前調(diào)用(addtimer源碼)。
函數(shù)作用為:嘗試清理heap頭(第一個元素)的定時器:移除或者調(diào)整到正確的位置,可以加速addtimer?添加定時器。
// cleantimers cleans up the head of the timer queue. This speeds up
// programs that create and delete timers; leaving them in the heap
// slows down addtimer. Reports whether no timer problems were found.
// The caller must have locked the timers for pp.
func cleantimers(pp *p) {
gp := getg()
for {
if len(pp.timers) == 0 {
return
}
t := pp.timers[0]
switch s := atomic.Load(&t.status); s {
case timerDeleted: //被標(biāo)記刪除,現(xiàn)在正式移除
if !atomic.Cas(&t.status, s, timerRemoving) {
continue
}
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
badTimer()
}
atomic.Xadd(&pp.deletedTimers, -1)
case timerModifiedEarlier, timerModifiedLater: //定時器被調(diào)整,移動其到正確的位置
if !atomic.Cas(&t.status, s, timerMoving) {
continue
}
// Now we can change the when field.
t.when = t.nextwhen
// Move t to the right position.
dodeltimer0(pp)
doaddtimer(pp, t)
if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
badTimer()
}
default:
// Head of timers does not need adjustment.
return
}
}
}
adjusttimers源碼
?adjusttimers?對狀態(tài)的修改:
// adjusttimers (looks in P's timer heap): // timerDeleted -> timerRemoving -> timerRemoved // timerModifiedXX -> timerMoving -> timerWaiting
?adjusttimers?的主要作用與cleantimers?相同:嘗試清理heap的定時器:移除或者調(diào)整到正確的位置。
不同的是:??cleantimers??只會對堆頭的元素進(jìn)行處理,而??adjusttimers??是遍歷堆中所有的元素進(jìn)行處理。
很有意思的一點(diǎn)是:對于
timerModifiedXX?狀態(tài)的定時器,由于是觸發(fā)時間修改了,因此需要調(diào)整其在堆中的位置,golang這邊選擇的做法是先刪除(dodeltimer?)再添加(doaddtimer?)的方法調(diào)整位置。
runtimer
對狀態(tài)的修改:
// runtimer (looks in P's timer heap): // timerNoStatus -> panic: uninitialized timer // timerWaiting -> timerWaiting or // timerWaiting -> timerRunning -> timerNoStatus or // timerWaiting -> timerRunning -> timerWaiting // timerModifying -> wait until status changes // timerModifiedXX -> timerMoving -> timerWaiting // timerDeleted -> timerRemoving -> timerRemoved // timerRunning -> panic: concurrent runtimer calls // timerRemoved -> panic: inconsistent timer heap // timerRemoving -> panic: inconsistent timer heap // timerMoving -> panic: inconsistent timer heap
主要作用:循環(huán)遍歷堆中第一個定時器并操作:
- 如果第一個定時器為
timerWaiting?狀態(tài):已經(jīng)到達(dá)觸發(fā)時間久運(yùn)行并調(diào)整時間,然后返回;未到觸發(fā)時間久直接返回。 - 其它狀態(tài):進(jìn)行對應(yīng)的操作并再次循環(huán)。對應(yīng)的操作舉例:
timerModifiedXX?-》調(diào)整時間;timerDeleted?-》從堆中移除定時器。
為了保證正確性,runtimer?肯定會在adjusttimers?之后運(yùn)行:
if len(pp.timers) > 0 {
adjusttimers(pp, now)
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
狀態(tài)機(jī)規(guī)律總結(jié)
active?和inactive?:如果閱讀了源碼,會發(fā)現(xiàn)對于定時器的狀態(tài),還有active、inactive的分類,實(shí)際上active狀態(tài)的定時器是等待未來觸發(fā)的定時器(包括但不限與timeWaiting?狀態(tài)),而正常不會再觸發(fā)的定時器則為inactive(timeRemoved?、timerDeleted?等)。在heap中和不在heap中:下方會解釋狀態(tài)機(jī)的管理和堆有什么關(guān)系?
XXing狀態(tài)相當(dāng)于是一個鎖定狀態(tài),不允許其他goroutine并發(fā)操作,可以理解成鎖。
定時器的觸發(fā)
在上面的部分中,講解了timers體系中不同函數(shù)對于不同狀態(tài)的流轉(zhuǎn)。
這里將分析器的觸發(fā)過程,Go 語言會在兩個模塊觸發(fā)計(jì)時器,運(yùn)行計(jì)時器中保存的函數(shù):
- 調(diào)度器調(diào)度時會檢查處理器中的計(jì)時器是否準(zhǔn)備就緒;
- 系統(tǒng)監(jiān)控會檢查是否有未執(zhí)行的到期計(jì)時器;
我們將依次分析上述這兩個觸發(fā)過程。
調(diào)度器調(diào)度
?runtime.checkTimers? 是調(diào)度器用來運(yùn)行處理器中計(jì)時器的函數(shù),它會在發(fā)生以下情況時被調(diào)用:
- 調(diào)度器調(diào)用
runtime.schedule? 執(zhí)行調(diào)度時; - 調(diào)度器調(diào)用
runtime.findrunnable? 獲取可執(zhí)行的 Goroutine 時; - 調(diào)度器調(diào)用
runtime.findrunnable? 從其他處理器竊取計(jì)時器時;
這里不展開介紹 runtime.schedule? 和 runtime.findrunnable? 的實(shí)現(xiàn)了,重點(diǎn)分析用于執(zhí)行計(jì)時器的runtime.checkTimers?,我們將該函數(shù)的實(shí)現(xiàn)分成調(diào)整計(jì)時器、運(yùn)行計(jì)時器和刪除計(jì)時器三個部分:
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the passed time or the current time if now was passed as 0.
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
// it is always larger than the returned time.
// We pass now in and out to avoid extra calls of nanotime.
//go:yeswritebarrierrec
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
// If it's not yet time for the first timer, or the first adjusted
// timer, then there is nothing to do.
next := int64(atomic.Load64(&pp.timer0When))
nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
if next == 0 || (nextAdj != 0 && nextAdj < next) {
next = nextAdj
}
if next == 0 {
// No timers to run or adjust.
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
// Next timer is not ready to run, but keep going
// if we would clear deleted timers.
// This corresponds to the condition below where
// we decide whether to call clearDeletedTimers.
// 當(dāng)前并沒有到觸發(fā)時間,這個檢查的目的就是為了查看位于deletedTimers狀態(tài)的 定時器 的比例,如果比例過大,就要清理
// 清理就是調(diào)用clearDeletedTimers函數(shù)。
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
lock(&pp.timersLock)
if len(pp.timers) > 0 {
adjusttimers(pp, now) //上面生成的now會傳下來,因此不用擔(dān)心函數(shù)執(zhí)行導(dǎo)致的時間流逝
for len(pp.timers) > 0 {
// Note that runtimer may temporarily unlock
// pp.timersLock.
if tw := runtimer(pp, now); tw != 0 { //上面生成的now會傳下來,因此不用擔(dān)心函數(shù)執(zhí)行導(dǎo)致的時間流逝
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
// If this is the local P, and there are a lot of deleted timers,
// clear them out. We only do this for the local P to reduce
// lock contention on timersLock.
//如果運(yùn)行當(dāng)前goroutine的p是持有timers數(shù)組的p 且 處于deletedTimers狀態(tài)的定時器 比例超過1/4,就清理掉這部分的定時器。
if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}
unlock(&pp.timersLock)
return now, pollUntil, ran
}
?runtime.clearDeletedTimers? 能夠避免堆中出現(xiàn)大量長時間運(yùn)行的計(jì)時器,該函數(shù)和 runtime.moveTimers? 也是唯二會遍歷計(jì)時器堆的函數(shù)(moveTimers? which only runs when the world is stopped)。
具體可見clearDeletedTimers?的注釋:
// This is the only function that walks through the entire timer heap,
// other than moveTimers which only runs when the world is stopped.
func clearDeletedTimers(pp *p) {
系統(tǒng)監(jiān)控
系統(tǒng)監(jiān)控中也會觸發(fā)調(diào)度器的執(zhí)行,大概率是因?yàn)橛袝r候m中可能存在不能被強(qiáng)占的情況,就有可能會導(dǎo)致timer的觸發(fā)時間滯后。需要注意的是,雖然有系統(tǒng)監(jiān)控,可以幫助timers及時觸發(fā),但是timers的觸發(fā)并不能達(dá)到嚴(yán)格的實(shí)時性(系統(tǒng)監(jiān)控檢查timers調(diào)度延遲的閾值是10ms)。
這里我也對這個過程理解的不是很深刻,這里推薦去draveness大佬的在線圖書中搜索【系統(tǒng)監(jiān)控】關(guān)鍵詞進(jìn)行全面學(xué)習(xí)
補(bǔ)充問題
狀態(tài)機(jī)的管理和堆有什么關(guān)系?
首先需要明確這里的堆指的是struct p?管理定時器所用的四叉堆,而不是 內(nèi)存管理涉及的棧和堆。
一個定時器創(chuàng)建之后是timerNoStatus?狀態(tài),其并不在堆上,需要放在p?的堆上之后才能進(jìn)行調(diào)度和觸發(fā)!
比如如下語句中的堆字眼:
- ?
timerNoStatus? 和timerRemoved? — 計(jì)時器不在堆上; - ?
timerModifiedEarlier? 和timerModifiedLater? — 計(jì)時器雖然在堆上,但是可能位于錯誤的位置上,需要重新排序;
// Active timers live in heaps attached to P, in the timers field. // Inactive timers live there too temporarily, until they are removed.
衍生問題:哪些狀態(tài)的timer在堆中?哪些狀態(tài)在?
回答:可從源碼中adjusttimers?函數(shù)中一窺究竟,timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving?狀態(tài)的定時器是不在堆上的。
此外,有些同學(xué)可能有疑惑,定時器從堆中移除的過程(可以參考cleantimers?),是先標(biāo)記成timerMoving?然后再從堆中移除,這兩步不是原子的,如果狀態(tài)已經(jīng)是timerMoving?但是還沒從堆中移除,遇上adjusttimers,豈不是會出現(xiàn)panic。
實(shí)際上這個問題并不會出現(xiàn),因?yàn)橹灰婕皩?code>p?的timers?數(shù)組操作(更改持續(xù)、加減元素)的地方都會加上鎖(lock(&pp.timersLock)?),而且每個p?也只會修改自己的timers?數(shù)組,不會修改其它p?持有的timers?數(shù)組,但是同樣如果不涉及數(shù)組更改,只設(shè)計(jì)狀態(tài)變更的話就不需要加上鎖(比如標(biāo)記刪除元素)。
為什么time.go和sleep.go中有接近相同的結(jié)構(gòu)體?
相同的結(jié)構(gòu)體示例:
time.go中:
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
// For GOOS=nacl, package syscall knows the layout of this structure.
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
type timer struct {
i int // heap index
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
// a well-behaved function and not block.
when int64 //當(dāng)前計(jì)時器被喚醒的時間
period int64 //兩次被喚醒的間隔;
f func(interface{}, uintptr) //每當(dāng)計(jì)時器被喚醒時都會調(diào)用的函數(shù);
arg interface{} //計(jì)時器被喚醒時調(diào)用 f 傳入的參數(shù);
seq uintptr
}
sleep.go中:
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
i int
when int64
period int64
f func(interface{}, uintptr) // NOTE: must not be closure
arg interface{}
seq uintptr
}
回答:實(shí)際上這兩個結(jié)構(gòu)體一個是運(yùn)行時一個是編譯時,不過作者目前對這塊也不是特別清楚,也歡迎大家指點(diǎn)指點(diǎn)。
為什么deltimer?函數(shù)只是標(biāo)記刪除,并不直接刪除timer?
回答:因?yàn)槎〞r器體系中,對于timers數(shù)組的更改需要加鎖,如果沒有更改的話就不需要加鎖,為了能快速的StopTimer,因此標(biāo)記刪除并不需要拿鎖,效率很高。什么時候加鎖可參考:struct p中的定時器加鎖。
acquirem()?和releasem()?的作用
這個問題和Go的調(diào)度模型GMP息息相關(guān),這里就做一個不嚴(yán)謹(jǐn)?shù)慕忉專?code>acquirem? 的作用之一是 為了保證當(dāng)前 ?P?? 不會被切換,粗暴理解就是對P而言,其相當(dāng)于不會被“打斷”,從而可以保證此時修改的p是當(dāng)前goroutine所屬的p。
// Disable preemption while using pp to avoid changing another P's heap. mp := acquirem() pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) doaddtimer(pp, t) unlock(&pp.timersLock) wakeNetPoller(when) releasem(mp)
定時器的狀態(tài)機(jī)中為什么有這么多中間狀態(tài)?
相信這篇文章讀完之后,這個已經(jīng)不是問題了。
nextwhen?字段的作用,其存在的必要性
首先我們要明白nextwhen?字段的作用:用于記錄下一次要設(shè)置when?字段為什么值
那么既然其用于標(biāo)識下一次when?字段的值,那為什么不直接修改when?字段呢?
這是因?yàn)樵诋?dāng)前的設(shè)計(jì)中,p只會修改自己的timers數(shù)組,如果當(dāng)前p?修改了其他p?的when?字段,timers數(shù)組就無法正常排序了。所以需要使用nextwhen?來記錄when?需要修改的值,等timers?數(shù)組對應(yīng)的p?來修改when?的值。
這里涉及跨p操作定時器的問題。
每個p都會存放在其上創(chuàng)建的timer?,但是不同的goroutine?可能會在不同的p?上面,因此可能操作timer?的goroutine?所在的p?與存放timer?所在的p?并不是同一個p?。
// The timer is in some other P's heap, so we can't change // the when field. If we did, the other P's heap would // be out of order. So we put the new when value in the // nextwhen field, and let the other P set the when field // when it is prepared to resort the heap.
為什么要用atomic相關(guān)變量,而不直接使用鎖
猜測主要原因還是性能,鎖可能還是太重了。而且實(shí)際上對于有重試的代碼,atomic相關(guān)的設(shè)置可能更加優(yōu)雅,比如下面代碼:
if !atomic.Cas(&t.status, s, timerMoving) {
continue //continue用于等待重試
}
如果使用鎖的話,那么偽代碼如下,就算使用雙重校驗(yàn),可能還是很重
for {
if t.status == s{ //雙重校驗(yàn),延遲加鎖
t.mu.Lock() // 鎖定
if t.status == s {
t.status = timerMoving // 修改狀態(tài)
t.mu.Unlock() // 釋放鎖
break // 修改成功,退出
}
t.mu.Unlock() // 如果沒有修改成功,解鎖
}
// 繼續(xù)等待重試
}
編程風(fēng)格的學(xué)習(xí)
什么時候校驗(yàn)值:在每一次調(diào)用的入口。雖然函數(shù)值之前已經(jīng)校驗(yàn)過。取之:src/runtime/time.go:255
func addtimer(t *timer) {
// when must be positive. A negative value will cause runtimer to
// overflow during its delta calculation and never expire other runtime
// timers. Zero will cause checkTimers to fail to notice the timer.
if t.when <= 0 {
throw("timer when must be positive")
}
if t.period < 0 {
throw("timer period must be non-negative")
}
if t.status != timerNoStatus {
throw("addtimer called with initialized timer")
}
xxx
}
函數(shù)的分層設(shè)計(jì):
// startTimer adds t to the timer heap.
//
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
函數(shù)與方法:
可以思考下下面這里為什么是方法而不是p的函數(shù)。
個人認(rèn)為因?yàn)檫@里并不設(shè)計(jì)直接修改結(jié)構(gòu)體p?的值,所以設(shè)計(jì)成方法可讀性更強(qiáng)。換言之,如果要修改對應(yīng)的結(jié)構(gòu)體的值,才創(chuàng)建函數(shù),否則優(yōu)先使用方法。
// updateTimerModifiedEarliest updates the recorded nextwhen field of the
// earlier timerModifiedEarier value.
// The timers for pp will not be locked.
func updateTimerModifiedEarliest(pp *p, nextwhen int64) {
for {
old := atomic.Load64(&pp.timerModifiedEarliest)
if old != 0 && int64(old) < nextwhen {
return
}
if atomic.Cas64(&pp.timerModifiedEarliest, old, uint64(nextwhen)) {
return
}
}
}
可以用注釋表明當(dāng)前函數(shù)必須被什么鎖給鎖定住:
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp. //注釋說明,這個函數(shù)必須鎖定之后才能進(jìn)入
func doaddtimer(pp *p, t *timer) {
xxx
}
總結(jié)
本文解開了Timer體系相關(guān)的狀態(tài)流轉(zhuǎn),但是對于現(xiàn)在Timer中存在的問題(reset、垃圾回收)在golang1.23怎么得到解決的機(jī)制還沒有探究,這個可以等待后續(xù)研究研究。
參考資料:
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-timer/
Go中定時器實(shí)現(xiàn)原理及源碼解析-騰訊云開發(fā)者社區(qū)-騰訊云
Golang 定時器(Timer 和 Ticker ),這篇文章就夠了定時器是什么Golang原生time包下可以用來執(zhí) - 掘金
golang源碼
到此這篇關(guān)于從源碼解析golang Timer定時器體系的文章就介紹到這了,更多相關(guān)golang Timer定時器體系解析內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言開源庫實(shí)現(xiàn)Onvif協(xié)議客戶端設(shè)備搜索
這篇文章主要為大家介紹了Go語言O(shè)nvif協(xié)議客戶端設(shè)備搜索示例實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
Go語言繼承功能使用結(jié)構(gòu)體實(shí)現(xiàn)代碼重用
今天我來給大家介紹一下在?Go?語言中如何實(shí)現(xiàn)類似于繼承的功能,讓我們的代碼更加簡潔和可重用,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Golang?中判斷兩個結(jié)構(gòu)體相等的方法
這篇文章主要介紹了Golang?中如何判斷兩個結(jié)構(gòu)體相等,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08
Go語言調(diào)用ffmpeg-api實(shí)現(xiàn)音頻重采樣
最近對golang處理音視頻很感興趣,對golang音視頻常用庫goav進(jìn)行了一番研究。自己寫了一個wav轉(zhuǎn)采樣率的功能。給大家分享一下,中間遇到了不少坑,解決的過程中還是蠻有意思的,希望大家能喜歡2022-12-12
Golang實(shí)現(xiàn)自己的Redis數(shù)據(jù)庫內(nèi)存實(shí)例探究
這篇文章主要為大家介紹了Golang實(shí)現(xiàn)自己的Redis數(shù)據(jù)庫內(nèi)存實(shí)例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01

