詳解時間輪TimeWheel的工作原理
一.時間輪介紹
1.時間輪的簡單介紹
時間輪(TimeWheel)作為一種高效率的計時器實現(xiàn)方案,在1987年發(fā)表的論文Hashed and Hierarchical Timing Wheels中被首次提出。
其被發(fā)明的主要目的在于解決當(dāng)時操作系統(tǒng)的計時器功能實現(xiàn)中,維護一個定時器的開銷隨著所維護定時器數(shù)量的增多而逐漸變大的問題(時間復(fù)雜度為:O(n)、O(log n))。
這導(dǎo)致操作系統(tǒng)無法同時高效的維護大量計時器,進一步導(dǎo)致一些優(yōu)秀的、需要使用到大量定時器的的網(wǎng)絡(luò)協(xié)議、實時控制系統(tǒng)等程序的實際表現(xiàn)不盡人意。
2.傳統(tǒng)的計時器功能實現(xiàn)方式
計時器作為一種普遍的需求,理解起來是很簡單的。計時器主要由兩部分組成,即用戶指定一個任務(wù)(task),并在等待指定的時間(delayTime)后task將會被回調(diào)執(zhí)行。
在時間輪算法被發(fā)明出來之前,操作系統(tǒng)計時器功能的實現(xiàn)方式主要可以分為兩種:基于無序隊列和基于有序隊列。
基于無序隊列實現(xiàn)的計時器
1.新創(chuàng)建的計時器直接放在隊列的末尾,時間復(fù)雜度為O(1)。
2.在每次硬件時鐘tick中斷時(per tick),遍歷當(dāng)前隊列中所有的計時器,將當(dāng)前時間下過期的計時器移出隊列并調(diào)度執(zhí)行task,時間復(fù)雜度O(n)。
基于無序隊列的計時器中,所維護的計時器總數(shù)量越多,則每次硬件時鐘中斷時的處理流程開銷越大,最壞情況下甚至無法在一次時鐘tick的間隔內(nèi)完成計時器隊列的遍歷。
基于有序隊列實現(xiàn)的計時器
1.有序隊列下,所有計時器按照過期時間進行排序,新創(chuàng)建的計時器加入隊列時的時間復(fù)雜度為O(log n)(通常使用完全二叉堆來實現(xiàn)有序隊列)。
2.在每次硬件時鐘tick中斷時,僅檢查隊列的頭部元素(最早過期的任務(wù))是否過期。如果未過期則直接結(jié)束,如果已過期則將隊首元素出隊調(diào)度task,并再次重復(fù)上述過程,直至最新的隊首元素不過期或隊列為空。平均時間復(fù)雜度為O(1)。
基于有序隊列的計時器中,所維護的計時器總數(shù)量越多,則每次用戶創(chuàng)建新的計時器時的延遲越高,在需要反復(fù)創(chuàng)建大量計時器的場合下,性能不佳
可以看到,在基于隊列的計時器模塊運行時,最關(guān)鍵的兩個功能(創(chuàng)建新計時器/處理每次tick)至少有一個會隨著總計時器數(shù)量的增大,而引起性能大幅度的下降。
juc中自帶的ScheduledThreadPoolExecutor調(diào)度線程池就是基于有序列表(二叉堆)的計時器。因此netty等需要大量使用計時器的框架需要另辟蹊徑,采用時間輪來實現(xiàn)更高效的計時器功能。
不同計時器實現(xiàn)與排序算法的關(guān)聯(lián)
對基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)有一定了解的讀者會知道,常用的快速排序、歸并排序等基于比較的高效排序算法其時間復(fù)雜度為O(n*log n)。
而基數(shù)排序(桶排序)的時間復(fù)雜度則是O(n),其性能比上述基于比較的排序算法高出一個數(shù)量級。
但基排序最大的缺陷則是對所要排序的數(shù)據(jù)集的排布有很高的要求,如果要排序的數(shù)據(jù)集的范圍非常廣,則所需要的桶(bucket)會非常多,空間復(fù)雜度會高到不可忍受。
舉個例子,如果是對1萬副撲克(不算大小王,52張牌)進行排序,由于撲克牌只有13種可能(A-K),即使1萬副撲克中牌的總數(shù)為52萬張,基排序只需要13個桶就能在線性時間復(fù)雜度O(n)內(nèi)完成排序。
但如果是對數(shù)據(jù)范圍為0-1億范圍內(nèi)的1萬個隨機數(shù)進行一次基排序,則基排序需要多達1億個桶,其空間效率非常低,遠遜于快速排序等基于比較的排序。
截止目前,我們已經(jīng)明確了兩個關(guān)鍵點:
- 基于有序列表的計時器,由于其基于比較的特征,所以插入時的時間復(fù)雜度O(log n)會隨著計時器總量的增大而增加,在計時器總量成千上萬時效率會急劇降低。
- 對于一個較小的數(shù)據(jù)集范圍,基排序的效率遠高于快速排序等基于比較的排序算法。
一般來說,一次時鐘硬件的tick間隔非常小(納秒級別),如果想要用類似基排序的思想,使用一個巨大的數(shù)組來存儲不同過期時間的計時器,在理論上是可行的,但空間效率卻低到無法在現(xiàn)有的內(nèi)存硬件上實現(xiàn)(1納秒對應(yīng)1個bucket)。
但如果能容忍時鐘調(diào)度的時間不是那么精確,則可以極大減少所需要的bucket桶的數(shù)量。舉個例子,1毫秒等于1百萬納秒,如果時鐘調(diào)度的精度不需要是納秒級別,而是毫秒級別,則同一毫秒內(nèi)的所有計時器(第100納秒和第999999納秒超時的計時器)都可以放在同一個桶中,所需要的數(shù)組空間減少了100萬倍!
時間輪算法就是基于這一特點產(chǎn)生的,即一定程度上舍棄調(diào)度時間的精確性,參考基排序的思路,實現(xiàn)在常數(shù)時間內(nèi)創(chuàng)建新計時器,并同時在常數(shù)時間內(nèi)完成時鐘tick的處理。
3.時間輪計時器實現(xiàn)思路的簡單介紹
下面我們簡單的介紹一個基于時間輪的計時器的基本實現(xiàn)思路(還有很多可以優(yōu)化的地方):
- 時間輪在創(chuàng)建時需要指定調(diào)度精度,即時間輪內(nèi)部邏輯上1次tick的間隔。
在上述例子中,調(diào)度精度為1毫秒,則時間輪實際上1次tick的間隔也就是1毫秒(類似的,我們平常見到的鐘表中1次tick的間隔則是1秒鐘)。 - 維護一個桶數(shù)組,由于不同超時時間的任務(wù)可能會被映射到同一個桶中,因此數(shù)組桶中維護一個指向某一列表的指針(引用)。
- 創(chuàng)建新計時器時,對于任意超時時間的任務(wù)基于tick間隔進行哈希,計算出需要存入的對應(yīng)數(shù)組桶的下標(biāo)(第100納秒和第999999納秒超時的計時器,都放入第0個桶)并插入對應(yīng)桶的列表中。
- 維護一個當(dāng)前時間指針,指向某一個數(shù)組桶。每1次tick處理時,推動該指針,令其指向下一個tick對應(yīng)的桶,并將桶指向的列表中的全部任務(wù)取出,丟到一個線程池中異步處理。
- 為了節(jié)約空間,桶數(shù)組通常以環(huán)形數(shù)組的形式存儲以重復(fù)利用bucket槽,這也是時間輪名字中輪(wheel)的來源。
二.不同實現(xiàn)方式的時間輪的介紹
上面介紹的時間輪實現(xiàn)思路中繞過了一個很重要的問題,即在時間輪tick間隔確定的情況下,
雖然環(huán)形數(shù)組能夠復(fù)用之前使用過的bucket槽,但bucket桶的數(shù)量似乎限制了時間輪所能支持的最大超時時間。
舉個例子,假設(shè)tick間隔為1毫秒,那么僅僅是存儲距離當(dāng)前時間1天(86400秒)后超時的任務(wù)就至少需要86400*1000個bucket,所占用的空間無疑是巨大的。
而一般的定時器模塊所要支持的最大超時時間一般也不止1天這么短。
雖然進一步的減少精度(比如tick間隔改為100毫秒,或者1秒)似乎能解決這個問題,但事實上時間輪的論文中還提到了一些更優(yōu)秀的實現(xiàn)方案,使得能同時兼顧精度和減少空間占用。
單層多輪次時間輪
第一種方式是引入輪次(round)的概念(論文中提到的方案6),即每一個bucket中的列表元素帶上一個round屬性。
假設(shè)一個時間輪的tick間隔為1秒,并且環(huán)形數(shù)組有86400個bucket桶,那么這個時間輪明面上可以支持的最大超時時間只有1天。而引入了輪次的概念后,則理論上可以支持的最大超時時間是沒有限制的。
單層多輪次時間輪創(chuàng)建新任務(wù)
舉個例子,假設(shè)有一個定時器任務(wù)的超時時間為2天10小時20分鐘30秒,那么在創(chuàng)建新計時器任務(wù)時基于當(dāng)前時間輪單輪次可以支持的最大超時時間(即一天)進行求余,
可以得到10小時20分鐘30秒,根據(jù)余數(shù)我們可以計算出當(dāng)前任務(wù)應(yīng)該插入到哪個bucket槽的列表中。而超時時間/最大超時時間(1天)得到除法的結(jié)果就是round輪次,即round=2。
單層多輪次時間輪tick處理
同時在每次tick處理當(dāng)前時間指針?biāo)赶虻牧斜頃r,不再簡單的將列表中的所有任務(wù)一并取出執(zhí)行,而是對其進行遍歷。
- 只有round為0的任務(wù)才會被撈出來執(zhí)行
- 而round大于0的任務(wù)其邏輯上并沒有真的超時,而只是將round自減1,等到后面的輪次處理并最終自減為0后才代表著其真的超時而需要出隊執(zhí)行。
可以看到,引入了round概念后,多輪次的時間輪兼顧了精度的同時,也能夠在有限、可控的空間內(nèi)支持足夠大的超時時間。
多層時間輪
論文中提到的另一種實現(xiàn)方案便是多層次時間輪(如論文題目所指Hashed and Hierarchical Timing Wheels)。
多層時間輪的靈感來自于我們?nèi)粘I钪须S處可見的機械鐘表。通常機械鐘表有一個秒針(60秒),一個分針(60分鐘)和一個時針(12小時),其本質(zhì)上相當(dāng)于一個tick間隔為1秒,支持的最大超時時間為12小時的多層時間輪。
12小時有60 * 60 * 12=43200秒,但是鐘表中實際上并沒有這么多的bucket,卻也能準(zhǔn)確的表達12小時中的任何一秒。
這是因為鐘表中的秒針、分針和時針本質(zhì)上相當(dāng)于三個不同層次的時間輪:
- 秒針對應(yīng)的時間輪是最底層的,共60個bucket,tick間隔為1秒鐘
- 分針對應(yīng)的時間輪是第二層的,也是60個bucket,tick間隔為1分鐘
- 時針對應(yīng)的時間輪是最上層的,共12個bucket,tick間隔為1小時
在多層時間輪的實現(xiàn)中,可以建立N個不同層次的時間輪,其中上一層時間輪的tick間隔等于下一層時間輪走完一周的時間(類似1分鐘等于60秒,1小時等于60分鐘)。
如果時間輪的層次足夠多,理論上也能支持足夠大范圍的超時時間。
舉個例子,精度為秒的的時間輪,只需要5層共(60+60+24+365+100)=609個bucket就能支持最大100年的超時時間(假設(shè)一年都是365天)。
多層時間輪創(chuàng)建新任務(wù)
創(chuàng)建新計時器時,根據(jù)超時時間,先嘗試著放入最底層的時間輪,如果最底層的時間輪能放的下(比如第0分鐘58秒過期的),就根據(jù)當(dāng)前時間輪的tick間隔做除法來計算出需要放入的具體bucket。
如果當(dāng)前時間輪放不下(比如距離當(dāng)前時間10分鐘20秒過期的,無法直接放入最大60秒的秒級時間輪,但能放到最大支持60分鐘的分鐘時間輪中),則嘗試著放到上一層的時間輪中,但是是基于上一層的時間輪的tick間隔來做除法來計算出具體要放入的bucket槽。
如果還是放不下(比如距離當(dāng)前時間3小時20分鐘18秒過期的,只能放到最大12小時的小時級時間輪中)。
循環(huán)往復(fù)這一過程,直到放到合適層次的時間輪中。
多層時間輪tick處理
多層次的時間輪中的基礎(chǔ)tick間隔是由最底層的時間輪決定的。
每次tick時會推動當(dāng)前時間,首先將最底層的時間輪中新指向的插槽中的任務(wù)全部取出進行調(diào)度;
接著判斷當(dāng)前時間輪是否走完了一整圈,如果是的話則推動上一層級的時間輪推進而指向新的bucket槽(比如秒級時間輪走完了60秒,則推進分針前進1格)。
被推動的上層時間輪需要將新指向的bucket槽中的任務(wù)全部取出,嘗試著放到下層時間輪中
(下一層或者下N層都有可能,比如超時時間為1小時10分鐘30秒的任務(wù)會在小時時間輪從0推進到1時放到分鐘時間輪里,而超時時間為1小時0分鐘30秒的任務(wù)則會被直接放到最下層的秒鐘時間輪里)。
層級時間輪的tick推動是從下層蔓延到上層的,每次tick可能都會推動1至N層時間輪(比如第0小時第59分鐘59秒->第1小時第0分鐘第0秒就推動了2層)。
三.時間輪實現(xiàn)的源碼級分析
上面介紹的時間輪實現(xiàn)方式是很粗略的,連偽代碼都不算。要想真正理解時間輪的工作原理,最好的辦法還是通過參考已有實現(xiàn),并自己親手實現(xiàn)一遍才會印象深刻。
在本篇博客中將會結(jié)合源碼介紹三種實現(xiàn)方式略有不同的時間輪,分別是:
- 單層多輪次時間輪(參考netty的HashedWheelTimer實現(xiàn))
- 多層次時間輪(存在空轉(zhuǎn)問題)
- 解決了空轉(zhuǎn)問題的多層次時間輪(參考kafka的Timer實現(xiàn))
為了便于讀者理解和閱讀源碼,相比netty或kafka中的工程化的實現(xiàn),博客中實現(xiàn)的版本是簡化過的,其只聚焦于時間輪本身的工作原理,而舍棄掉了關(guān)于取消定時任務(wù)、優(yōu)雅啟動/停止等相關(guān)的邏輯。
為了便于測試,所有的時間輪實現(xiàn)都實現(xiàn)了一個自定義的Timer接口
public interface Timer { /** * 啟動時間輪 * */ void startTimeWheel(); /** * 創(chuàng)建新的超時任務(wù)(必須先startTimeWheel完成后,才能創(chuàng)建新任務(wù)) * @param task 超時時需要調(diào)度的自定義任務(wù) * @param delayTime 延遲時間 * @param timeUnit 延遲時間delayTime的單位 * */ void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit); }
1.單層/多輪次時間輪(參考netty的實現(xiàn))
- MyHashedTimeWheel是參考netty實現(xiàn)的單層多輪時間輪,其包含有一個環(huán)形數(shù)組ringBucketArray,數(shù)組中的每個槽(MyHashedTimeWheelBucket)都對應(yīng)著一個存儲任務(wù)節(jié)點的鏈表。
- 為了支持多線程并發(fā)的創(chuàng)建新任務(wù),在創(chuàng)建新任務(wù)時,不是直接將其放入時間輪的環(huán)形數(shù)組中,而是先暫時存儲在一個阻塞隊列unProcessTaskQueue中。
而由模擬tick,推動當(dāng)前時間的Worker線程來將其轉(zhuǎn)移到環(huán)形數(shù)組中的(一個時間輪計時器只有一個Worker線程,所以是單線程操作無需考慮并發(fā))。
Worker線程會在時間輪啟動后開始運行,其主要完成以下幾個任務(wù)
1.最初啟動時,設(shè)置時間輪的當(dāng)前時間(System.nanoTime()區(qū)別于System.currentTimeMillis()不是獲取現(xiàn)實中的絕對時間)。
2.隨后執(zhí)行一個無限循環(huán),主要用于推進時間輪的當(dāng)前時間。
3.因為java無法直接訪問硬件時鐘,本質(zhì)上需要依賴操作系統(tǒng)層面的計時器來感知硬件時鐘的變化。
所以無限循環(huán)中waitForNextTick方法中,基于Thread.sleep來模擬每次tick的間隔,以避免浪費CPU資源。
4.隨后在waitForNextTick返回后,代表著當(dāng)前時間輪推進了1tick,接著通過transferTaskToBuckets將當(dāng)前unProcessTaskQueue隊列中的新任務(wù)單線程挨個的加入時間輪中。
計算的過程如第二章中所描述的那樣,基于實際需要等待的超時時間與當(dāng)前時間輪最大間隔的余數(shù)獲得應(yīng)該插入的bucket槽的下標(biāo);基于除數(shù)獲得剩余的rounds。
5.再然后處理當(dāng)前時間指向的bucket槽中的所有任務(wù)(bucket.expireTimeoutTask),如果任務(wù)的round<=0,則代表已經(jīng)超時了,將其丟入指定的線程池中異步處理。
如果round>0,則將其自減1,等待后續(xù)的expireTimeoutTask最終將其減至0。
/** * 參考netty實現(xiàn)的單層時間輪 * */ public class MyHashedTimeWheel implements Timer{ /** * 環(huán)形數(shù)組 * */ private final MyHashedTimeWheelBucket[] ringBucketArray; /** * 世間輪啟動時的具體時間戳(單位:納秒nanos) * */ private long startTime; /** * 是否已啟動 * */ private final AtomicBoolean started = new AtomicBoolean(false); /** * 時間輪每次轉(zhuǎn)動的時間(單位:納秒nanos) * (perTickTime越短,調(diào)度會更精確,但cpu開銷也會越大) * */ private final long perTickTime; /** * 總tick數(shù) * */ private long totalTick = 0; /** * 待處理任務(wù)的隊列 * (多外部生產(chǎn)者寫入,時間輪內(nèi)的單worker消費者讀取,所以netty的實現(xiàn)里使用了效率更高的MpscQueue,Mpsc即MultiProducerSingleConsumer) * */ private final Queue<MyTimeoutTaskNode> unProcessTaskQueue = new LinkedBlockingDeque<>(); /** * 用于實際執(zhí)行到期任務(wù)的線程池 * */ private final Executor taskExecutor; private Thread workerThread; /** * 構(gòu)造函數(shù) * */ public MyHashedTimeWheel(int ringArraySize, long perTickTime, Executor taskExecutor) { this.ringBucketArray = new MyHashedTimeWheelBucket[ringArraySize]; for(int i=0; i<ringArraySize; i++){ // 初始化,填充滿時間輪喚醒數(shù)組 this.ringBucketArray[i] = new MyHashedTimeWheelBucket(); } this.perTickTime = perTickTime; this.taskExecutor = taskExecutor; } /** * 啟動worker線程等初始化操作,必須執(zhí)行完成后才能正常工作 * (簡單起見,和netty不一樣不是等任務(wù)被創(chuàng)建時才懶加載的,必須提前啟動) * */ @Override public void startTimeWheel(){ // 啟動worker線程 this.workerThread = new Thread(new Worker()); this.workerThread.start(); while (!this.started.get()){ // 自旋循環(huán),等待一會 } System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName()); } @Override public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){ long deadline = System.nanoTime() + timeUnit.toNanos(delayTime); // Guard against overflow. if (delayTime > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode(); newTimeoutTaskNode.setTargetTask(task); newTimeoutTaskNode.setDeadline(deadline); unProcessTaskQueue.add(newTimeoutTaskNode); } private final class Worker implements Runnable{ @Override public void run() { MyHashedTimeWheel.this.startTime = System.nanoTime(); // 啟動 MyHashedTimeWheel.this.started.set(true); // 簡單起見,不考慮優(yōu)雅啟動和暫停的邏輯 while (true){ // 等待perTick waitForNextTick(); // 在撈取當(dāng)前tick下需要處理的bucket前,先將加入到隊列中的任務(wù)轉(zhuǎn)移到環(huán)形數(shù)組中(可能包含在當(dāng)前tick下就要處理的任務(wù)) transferTaskToBuckets(); // 基于總tick數(shù),對環(huán)形數(shù)組的長度取模,計算出當(dāng)前tick下需要處理的bucket桶的下標(biāo) int idx = (int) (MyHashedTimeWheel.this.totalTick % MyHashedTimeWheel.this.ringBucketArray.length); MyHashedTimeWheelBucket bucket = MyHashedTimeWheel.this.ringBucketArray[idx]; // 處理當(dāng)前插槽內(nèi)的任務(wù)(遍歷鏈表中的所有任務(wù),round全部減一,如果減為負數(shù)了則說明這個任務(wù)超時到期了,將其從鏈表中移除后并交給線程池執(zhí)行指定的任務(wù)) bucket.expireTimeoutTask(MyHashedTimeWheel.this.taskExecutor); // 循環(huán)tick一次,總tick數(shù)自增1 MyHashedTimeWheel.this.totalTick++; } } /** * per tick時鐘跳動,基于Thread.sleep * */ private void waitForNextTick(){ // 由于Thread.sleep并不是絕對精確的被喚醒,所以只能通過(('總的tick數(shù)+1' * '每次tick的間隔') + '時間輪啟動時間')來計算精確的下一次tick時間 // 而不能簡單的Thread.sleep(每次tick的間隔) long nextTickTime = (MyHashedTimeWheel.this.totalTick + 1) * MyHashedTimeWheel.this.perTickTime + MyHashedTimeWheel.this.startTime; // 因為nextTickTime是納秒,sleep需要的是毫秒,需要保證納秒數(shù)過小時,導(dǎo)致直接計算出來的毫秒數(shù)為0 // 因此(‘實際休眠的納秒數(shù)'+999999)/1000000,保證了納秒轉(zhuǎn)毫秒時,至少會是1毫秒,而不會出現(xiàn)sleep(0毫秒)令cpu空轉(zhuǎn) long needSleepTime = (nextTickTime - System.nanoTime() + 999999) / 1000000; try { // 比起netty,忽略了一些處理特殊場景bug的邏輯 Thread.sleep(needSleepTime); } catch (InterruptedException ignored) { } } private void transferTaskToBuckets() { // 為了避免worker線程在一次循環(huán)中處理太多的任務(wù),所以直接限制了一個最大值100000 // 如果真的有這么多,就等到下次tick循環(huán)的時候再去做。 // 因為這個操作是cpu密集型的,處理太多的話,可能導(dǎo)致無法在一個短的tick周期內(nèi)完成一次循環(huán) for (int i = 0; i < 100000; i++) { MyTimeoutTaskNode timeoutTaskNode = MyHashedTimeWheel.this.unProcessTaskQueue.poll(); if (timeoutTaskNode == null) { // 隊列為空了,直接結(jié)束 return; } // 計算到任務(wù)超時時,應(yīng)該執(zhí)行多少次tick // (和netty里的不一樣,這里的deadline是超時時間的絕對時間,所以需要先減去時間輪的startTime) // (netty中是生產(chǎn)者線程在add時事先減去了startTime,比起由worker線程統(tǒng)一處理效率更高,但個人覺得這里的寫法會更直觀) long totalTickWhenTimeout = (timeoutTaskNode.getDeadline() - MyHashedTimeWheel.this.startTime) / MyHashedTimeWheel.this.perTickTime; // 減去當(dāng)前時間輪已經(jīng)進行過的tick數(shù)量 long remainingTickWhenTimeout = (totalTickWhenTimeout - MyHashedTimeWheel.this.totalTick); // 因為一次時間輪旋轉(zhuǎn)會經(jīng)過ringBucketArray.length次tick,所以求個余數(shù) long remainingRounds = remainingTickWhenTimeout / MyHashedTimeWheel.this.ringBucketArray.length; // 計算出當(dāng)前任務(wù)需要轉(zhuǎn)多少圈之后才會超時 timeoutTaskNode.setRounds(remainingRounds); // 如果傳入的deadline早于當(dāng)前系統(tǒng)時間,則totalTickWhenTimeout可能會小于當(dāng)前的totalTick // 這種情況下,讓這個任務(wù)在當(dāng)前tick下就立即超時而被調(diào)度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調(diào)度(所以必須取兩者的最大值) final long ticks = Math.max(totalTickWhenTimeout, MyHashedTimeWheel.this.totalTick); // Ensure we don't schedule for past. // 如果能限制環(huán)形數(shù)組的長度為2的冪,則可以改為ticks & mask,位運算效率更高 int stopIndex = (int) (ticks % MyHashedTimeWheel.this.ringBucketArray.length); MyHashedTimeWheelBucket bucket = MyHashedTimeWheel.this.ringBucketArray[stopIndex]; // 計算并找到應(yīng)該被放置的那個bucket后,將其插入當(dāng)前bucket指向的鏈表中 bucket.addTimeout(timeoutTaskNode); } } } }
/** * 時間輪環(huán)形數(shù)組下標(biāo)對應(yīng)的桶(保存一個超時任務(wù)MyTimeoutTaskNode的鏈表) * */ public class MyHashedTimeWheelBucket { private final LinkedList<MyTimeoutTaskNode> linkedList = new LinkedList<>(); public void addTimeout(MyTimeoutTaskNode timeout) { linkedList.add(timeout); } /** * 遍歷鏈表中的所有任務(wù),round全部減一,如果減為負數(shù)了則說明這個任務(wù)超時到期了,將其從鏈表中移除后并交給線程池執(zhí)行指定的任務(wù) * */ public void expireTimeoutTask(Executor executor){ Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator(); while(iterator.hasNext()){ MyTimeoutTaskNode currentNode = iterator.next(); long currentNodeRound = currentNode.getRounds(); if(currentNodeRound <= 0){ // 將其從鏈表中移除 iterator.remove(); // count小于等于0,說明超時了,交給線程池去異步執(zhí)行 executor.execute(currentNode.getTargetTask()); }else{ // 當(dāng)前節(jié)點還未超時,round自減1 currentNode.setRounds(currentNodeRound-1); } // 簡單起見,不考慮任務(wù)被外部自己取消的case(netty里的timeout.isCancelled()) } } }
public class MyTimeoutTaskNode { /** * 任務(wù)具體的到期時間(絕對時間) * */ private long deadline; /** * 存儲在時間輪中,需要等待的輪次 * (rounds在初始化后,每次時間輪轉(zhuǎn)動一周便自減1,當(dāng)減為0時便代表當(dāng)前任務(wù)需要被調(diào)度) * */ private long rounds; /** * 創(chuàng)建任務(wù)時,用戶指定的到期時進行調(diào)度的任務(wù) * */ private Runnable targetTask; public long getDeadline() { return deadline; } public void setDeadline(long deadline) { this.deadline = deadline; } public long getRounds() { return rounds; } public void setRounds(long rounds) { this.rounds = rounds; } public Runnable getTargetTask() { return targetTask; } public void setTargetTask(Runnable targetTask) { this.targetTask = targetTask; } }
2.層次時間輪(存在空轉(zhuǎn)問題)
層次時間輪MyHierarchicalHashedTimerV1的主體邏輯與單層多輪次時間輪MyHashedTimeWheel基本保持一致,主要的區(qū)別有幾點:
1.由于是多層次的時間輪,所以單獨抽象出了Timer(MyHierarchicalHashedTimerV1)和TimerWheel(MyHierarchicalHashedTimeWheelV1)這兩個類。
Timer類中只持有最底層的時間輪lowestTimeWheel,而單獨的時間輪類MyHierarchicalHashedTimeWheelV1中也存儲了更上層時間輪的引用overFlowWheel。
不同層次的時間輪之間按照層級構(gòu)成了一個單向鏈表。
2.從unProcessTaskQueue中轉(zhuǎn)移計時器任務(wù)到環(huán)形數(shù)組時(MyHierarchicalHashedTimeWheelV1.addTimeoutTask),
如果當(dāng)前時間輪的最大間隔內(nèi)也放不下任務(wù),則會嘗試著將其放入上層的時間輪中;如果上層時間輪不存在則創(chuàng)建之(lazy加載)。
考慮到超時時間可能會很大,所以addTimeoutTask方法可能會遞歸調(diào)用多次,直到找到一個間隔足夠大的時間輪來存儲任務(wù)。
3.在推動tick時(advanceClockByTick),先推動最底層的時間輪(level為0),將指向的bucket列表中的任務(wù)全部交給指定的線程池執(zhí)行。
同時,如果當(dāng)前時間輪已經(jīng)走完一圈后,則去推動上一層的時間輪(可能遞歸多次)。
上層的時間輪(level>0)在推動時,通過重新執(zhí)行advanceClockByTick,將對應(yīng)bucket列表中的任務(wù)轉(zhuǎn)移到更下層的時間輪中。
/** * 層次時間輪,會存在空轉(zhuǎn)問題 * */ public class MyHierarchicalHashedTimerV1 implements Timer { /** * 是否已啟動 * */ private AtomicBoolean started = new AtomicBoolean(false); /** * 世間輪啟動時的具體時間戳(單位:納秒nanos) * */ private long startTime; /** * 時間輪每次轉(zhuǎn)動的時間(單位:納秒nanos) * (perTickTime越短,調(diào)度會更精確,但cpu開銷也會越大) * */ private final long perTickTime; /** * 總tick數(shù) * */ private long totalTick = 0; /** * 待處理任務(wù)的隊列 * (多外部生產(chǎn)者寫入,時間輪內(nèi)的單worker消費者讀取,所以netty的實現(xiàn)里使用了效率更高的MpscQueue,Mpsc即MultiProducerSingleConsumer) * */ private final Queue<MyTimeoutTaskNode> unProcessTaskQueue = new LinkedBlockingDeque<>(); /** * timer持有的最低層的時間輪 * */ private final MyHierarchicalHashedTimeWheelV1 lowestTimeWheel; /** * 構(gòu)造函數(shù) * */ public MyHierarchicalHashedTimerV1(int ringArraySize, long perTickTime, Executor taskExecutor) { this.perTickTime = perTickTime; // 初始化最底層的時間輪 this.lowestTimeWheel = new MyHierarchicalHashedTimeWheelV1(ringArraySize,perTickTime,taskExecutor,0); } /** * 啟動worker線程等初始化操作,必須執(zhí)行完成后才能正常工作 * (簡單起見,和netty不一樣不是等任務(wù)被創(chuàng)建時才懶加載的,必須提前啟動) * */ @Override public void startTimeWheel(){ // 啟動worker線程 new Thread(new Worker()).start(); while (!this.started.get()){ // 自旋循環(huán),等待一會 } System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName()); } @Override public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){ long deadline = System.nanoTime() + timeUnit.toNanos(delayTime); // Guard against overflow. if (delayTime > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode(); newTimeoutTaskNode.setTargetTask(task); newTimeoutTaskNode.setDeadline(deadline); this.unProcessTaskQueue.add(newTimeoutTaskNode); } private final class Worker implements Runnable{ @Override public void run() { MyHierarchicalHashedTimerV1.this.startTime = System.nanoTime(); // 啟動 MyHierarchicalHashedTimerV1.this.started.set(true); // 簡單起見,不考慮優(yōu)雅啟動和暫停的邏輯 while (true){ // 等待perTick waitForNextTick(); // 在撈取當(dāng)前tick下需要處理的bucket前,先將加入到隊列中的任務(wù)轉(zhuǎn)移到時間輪中(可能包含在當(dāng)前tick下就要處理的任務(wù)) // 層級時間輪內(nèi)部會做進一步的分配(放不下的話就溢出到更上一層的時間輪) transferTaskToTimeWheel(); // 推進時間輪(層級時間輪內(nèi)部滿了一圈就會進一步的推進更上一層的時間輪) MyHierarchicalHashedTimerV1.this.lowestTimeWheel.advanceClockByTick( (taskNode)-> // 參考kafka的寫法,避免Timer里的一些屬性被傳到各個bucket里面 MyHierarchicalHashedTimerV1.this.lowestTimeWheel .addTimeoutTask(MyHierarchicalHashedTimerV1.this.startTime, taskNode) ); // 循環(huán)tick一次,總tick數(shù)自增1 MyHierarchicalHashedTimerV1.this.totalTick++; } } /** * per tick時鐘跳動,基于Thread.sleep * */ private void waitForNextTick(){ // 由于Thread.sleep并不是絕對精確的被喚醒,所以只能通過(('總的tick數(shù)+1' * '每次tick的間隔') + '時間輪啟動時間')來計算精確的下一次tick時間 // 而不能簡單的Thread.sleep(每次tick的間隔) long nextTickTime = (MyHierarchicalHashedTimerV1.this.totalTick + 1) * MyHierarchicalHashedTimerV1.this.perTickTime + MyHierarchicalHashedTimerV1.this.startTime; // 因為nextTickTime是納秒,sleep需要的是毫秒,需要保證納秒數(shù)過小時,導(dǎo)致直接計算出來的毫秒數(shù)為0 // 因此(‘實際休眠的納秒數(shù)'+999999)/1000000,保證了納秒轉(zhuǎn)毫秒時,至少會是1毫秒,而不會出現(xiàn)sleep(0毫秒)令cpu空轉(zhuǎn) long needSleepTime = (nextTickTime - System.nanoTime() + 999999) / 1000000; try { // 比起netty,忽略了一些處理特殊場景bug的邏輯 Thread.sleep(needSleepTime); } catch (InterruptedException ignored) { } } /** * 加入到隊列中的任務(wù)轉(zhuǎn)移到時間輪中 * */ private void transferTaskToTimeWheel() { // 為了避免worker線程在一次循環(huán)中處理太多的任務(wù),所以直接限制了一個最大值100000 // 如果真的有這么多,就等到下次tick循環(huán)的時候再去做。 // 因為這個操作是cpu密集型的,處理太多的話,可能導(dǎo)致無法在一個短的tick周期內(nèi)完成一次循環(huán) for (int i = 0; i < 100000; i++) { MyTimeoutTaskNode timeoutTaskNode = MyHierarchicalHashedTimerV1.this.unProcessTaskQueue.poll(); if (timeoutTaskNode == null) { // 隊列為空了,直接結(jié)束 return; } // 層級時間輪內(nèi)部會做進一步的分配(放不下的話就溢出到更上一層的時間輪) MyHierarchicalHashedTimerV1.this.lowestTimeWheel.addTimeoutTask( MyHierarchicalHashedTimerV1.this.startTime, timeoutTaskNode); } } } }
public class MyHierarchicalHashedTimeWheelV1 { private final MyHierarchyHashedTimeWheelBucketV1[] ringBucketArray; /** * 總tick數(shù) * */ private long totalTick = 0; /** * 當(dāng)前時間輪所能承載的時間間隔 * */ private final long interval; /** * 時間輪每次轉(zhuǎn)動的時間(單位:納秒nanos) * (perTickTime越短,調(diào)度會更精確,但cpu開銷也會越大) * */ private final long perTickTime; /** * 上一層時間跨度更大的時間輪 * */ private MyHierarchicalHashedTimeWheelV1 overFlowWheel; /** * 用于實際執(zhí)行到期任務(wù)的線程池 * */ private final Executor taskExecutor; /** * 是否是最底層的時間輪(只有最底層的時間輪才真正的對任務(wù)進行調(diào)度) * */ private final int level; public MyHierarchicalHashedTimeWheelV1(int ringArraySize,long perTickTime, Executor taskExecutor,int level) { this.ringBucketArray = new MyHierarchyHashedTimeWheelBucketV1[ringArraySize]; for(int i=0; i<ringArraySize; i++){ // 初始化,填充滿時間輪喚醒數(shù)組 this.ringBucketArray[i] = new MyHierarchyHashedTimeWheelBucketV1(); } this.perTickTime = perTickTime; this.taskExecutor = taskExecutor; this.interval = perTickTime * ringArraySize; this.level = level; if(level > 0){ this.totalTick = 1; } } /** * 當(dāng)前時間輪加入任務(wù)(溢出的話,則需要放到上一層的時間輪中) * */ public void addTimeoutTask(long startTime, MyTimeoutTaskNode timeoutTaskNode){ long deadline = timeoutTaskNode.getDeadline(); // 當(dāng)前時間輪所能承載的最大絕對時間為:每個tick的間隔 * 插槽數(shù) + (基于startTime的當(dāng)前絕對時間) long currentWheelMaxRange = this.interval + (startTime + this.perTickTime * this.totalTick); if(deadline < currentWheelMaxRange){ // 當(dāng)前時間輪能夠承載這個任務(wù),無需放到上一層時間輪中 // 計算到任務(wù)超時時,應(yīng)該執(zhí)行多少次tick // (和netty里的不一樣,這里的deadline是超時時間的絕對時間,所以需要先減去時間輪的startTime) // (netty中是生產(chǎn)者線程在add時事先減去了startTime,比起由worker線程統(tǒng)一處理效率更高,但個人覺得這里的寫法會更直觀) long totalTickWhenTimeout = (deadline - startTime) / this.perTickTime; // 如果傳入的deadline早于當(dāng)前系統(tǒng)時間,則totalTickWhenTimeout可能會小于當(dāng)前的totalTick // 這種情況下,讓這個任務(wù)在當(dāng)前tick下就立即超時而被調(diào)度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調(diào)度(所以必須取兩者的最大值) final long ticks = Math.max(totalTickWhenTimeout, this.totalTick); // Ensure we don't schedule for past. // 如果能限制環(huán)形數(shù)組的長度為2的冪,則可以改為ticks & mask,位運算效率更高 int stopIndex = (int) (ticks % this.ringBucketArray.length); MyHierarchyHashedTimeWheelBucketV1 bucket = this.ringBucketArray[stopIndex]; // 計算并找到應(yīng)該被放置的那個bucket后,將其插入當(dāng)前bucket指向的鏈表中 bucket.addTimeout(timeoutTaskNode); }else{ // 當(dāng)前時間輪無法承載這個任務(wù),需要放到上一層時間輪中 // 上層時間輪不存在,創(chuàng)建之 if(this.overFlowWheel == null){ // 上層時間輪的環(huán)形數(shù)組大小保持不變,perTick是當(dāng)前時間輪的整個間隔(類似低層的60秒等于上一層的1分鐘) this.overFlowWheel = new MyHierarchicalHashedTimeWheelV1( this.ringBucketArray.length, this.interval, taskExecutor,this.level+1); } // 加入到上一層的時間輪中(對于較大的deadline,addTimeoutTask操作可能會遞歸數(shù)次,放到第N層的時間輪中) this.overFlowWheel.addTimeoutTask(startTime,timeoutTaskNode); } } public void advanceClockByTick(Consumer<MyTimeoutTaskNode> flushInLowerWheelFn){ // 基于總tick數(shù),對環(huán)形數(shù)組的長度取模,計算出當(dāng)前tick下需要處理的bucket桶的下標(biāo) int idx = (int) (this.totalTick % this.ringBucketArray.length); MyHierarchyHashedTimeWheelBucketV1 bucket = this.ringBucketArray[idx]; if(this.level == 0){ // 如果是最底層的時間輪,將當(dāng)前tick下命中的bucket中的任務(wù)丟到taskExecutor中執(zhí)行 bucket.expireTimeoutTask(this.taskExecutor); }else{ // 如果不是最底層的時間輪,將當(dāng)前tick下命中的bucket中的任務(wù)交給下一層的時間輪 // 這里轉(zhuǎn)交到下一層有兩種方式:第一種是從上到下的轉(zhuǎn)交,另一種是當(dāng)做新任務(wù)一樣還是從最下層的時間輪開始放,放不下再往上溢出 // 選用后一種邏輯,最大的復(fù)用已有的創(chuàng)建新任務(wù)的邏輯,會好理解一點 bucket.flush(flushInLowerWheelFn); } // 當(dāng)前時間輪的總tick自增1 this.totalTick++; // 當(dāng)前時間輪的總tick數(shù)滿了一圈之后,推進上一層時間輪進行一次tick(如果上一層時間輪存在的話) if(this.totalTick % this.ringBucketArray.length == 0 && this.overFlowWheel != null){ this.overFlowWheel.advanceClockByTick(flushInLowerWheelFn); } } }
/** * 時間輪環(huán)形數(shù)組下標(biāo)對應(yīng)的桶(保存一個超時任務(wù)MyTimeoutTaskNode的鏈表) * */ public class MyHierarchyHashedTimeWheelBucketV1 { private final LinkedList<MyTimeoutTaskNode> linkedList = new LinkedList<>(); public void addTimeout(MyTimeoutTaskNode timeout) { linkedList.add(timeout); } /** * 遍歷鏈表中的所有任務(wù),round全部減一,如果減為負數(shù)了則說明這個任務(wù)超時到期了,將其從鏈表中移除后并交給線程池執(zhí)行指定的任務(wù) * */ public void expireTimeoutTask(Executor executor){ Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator(); while(iterator.hasNext()){ MyTimeoutTaskNode currentNode = iterator.next(); long currentNodeRound = currentNode.getRounds(); if(currentNodeRound <= 0){ // 將其從鏈表中移除 iterator.remove(); // count小于等于0,說明超時了,交給線程池去異步執(zhí)行 executor.execute(currentNode.getTargetTask()); }else{ // 當(dāng)前節(jié)點還未超時,round自減1 currentNode.setRounds(currentNodeRound-1); } // 簡單起見,不考慮任務(wù)被外部自己取消的case(netty里的timeout.isCancelled()) } } /** * 將當(dāng)前bucket中的數(shù)據(jù),通過flushInLowerWheelFn,全部轉(zhuǎn)移到更底層的時間輪中 * */ public void flush(Consumer<MyTimeoutTaskNode> flushInLowerWheelFn){ Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator(); while(iterator.hasNext()){ MyTimeoutTaskNode currentNode = iterator.next(); // 先從鏈表中移除 iterator.remove(); // 通過flushInLowerWheelFn,轉(zhuǎn)移到更底層的時間輪中 flushInLowerWheelFn.accept(currentNode); // 簡單起見,不考慮任務(wù)被外部自己取消的case(netty里的timeout.isCancelled()) } } }
3.解決了空轉(zhuǎn)問題的層次時間輪(參考kafka的實現(xiàn))
上面實現(xiàn)的單層多輪時間輪以及層次時間輪都存在一個問題,即時間輪論文中提到的空轉(zhuǎn)問題(step through an empty bucket)。
舉個例子,假設(shè)時間輪的tick間隔被設(shè)置為1秒,用戶創(chuàng)建了一個10秒后過期的任務(wù)和一個10小時后過期的任務(wù)。在處理完了第一個10秒后過期的任務(wù)后,剩下的幾萬次tick都由于每個時間輪當(dāng)前時間指向的bucket是一個空列表而在做無用功。
生產(chǎn)環(huán)境中為了保證一定的調(diào)度精度,tick間隔一般會設(shè)置為毫秒級別甚至更低,那么時間輪空轉(zhuǎn)對CPU的浪費就不是一個可以忽視的問題了。
在著名的消息隊列kafka中就實現(xiàn)了一個能解決空轉(zhuǎn)問題的層次時間輪(Timer/TimingWheel),其解決時間輪空轉(zhuǎn)的方式是引入延遲隊列。
請注意:這里的延遲隊列不是用于存儲計時器任務(wù)的,而是用來存儲bucket槽的(MyHierarchyHashedTimeWheelBucketV2)。
前面提到,時間輪插槽的數(shù)量是相對固定的,其遠遠少于計時器任務(wù)的數(shù)量,所以不會出現(xiàn)性能瓶頸。
MyHierarchicalHashedTimerV2由于引入了延遲隊列,所以在實現(xiàn)上相對復(fù)雜了一些。
1.在每次bucket槽中插入第一個新元素時(兩種情況:一是時間輪剛剛初始化從未插入過元素,二是當(dāng)前bucket槽中的元素已經(jīng)在之前的一次tick中被全部處理完了),將當(dāng)前bucket插槽插入延遲隊列(DelayQueue)中。
2.bucket插槽中維護了一個expiration超時時間屬性,其代表著當(dāng)前插槽距離下一次被當(dāng)前時間指針推動而被指到的絕對時間。
假設(shè)有一個時分秒三層的時間輪,當(dāng)前時間為1小時5分0秒,如果一個超時時間為2分10秒的任務(wù)創(chuàng)建時,其將會被放入分鐘時間輪的第6個插槽中(下標(biāo)從0開始),
由于對應(yīng)插槽將會在2分鐘后被當(dāng)前時間指針指到,所以其expiration的值當(dāng)前時間1小時5分0秒+2分。
3.bucket是實現(xiàn)了Delayed接口的,其實際返回的是expiration減去當(dāng)前時間的值(之所以減去當(dāng)前時間,是因為延遲隊列中只有g(shù)etDelay小于等于0才可以出隊)。
bucket在被加入延遲隊列時,會實際上會按照getDelayed計算的值來進行排序,因此時間輪中理論上越早會被調(diào)度的bucket槽,越先出隊。
4.與v1版本不同,Worker線程不再是基于固定的tick間隔來休眠并推進時間,而是監(jiān)聽延遲隊列(bucketDelayQueue.take)。
當(dāng)延遲隊列中的bucket到了超時時間時,便會被Worker取出,并進行同樣的推動操作;而那些空的bucket則不會被感知到,從而解決了空轉(zhuǎn)問題。
5.同樣的例子,如果1秒的tick間隔下,1個10秒過期和1個10小時過期的任務(wù)創(chuàng)建并最終處理。
MyHierarchicalHashedTimerV2中的Worker線程總共只會在當(dāng)前時間指向的bucket不為空時才會被喚醒(個位數(shù)級別的tick處理),而不會一直空轉(zhuǎn)。
public class MyHierarchicalHashedTimerV2 implements Timer { /** * 是否已啟動 * */ private AtomicBoolean started = new AtomicBoolean(false); /** * 關(guān)聯(lián)的最底層時間輪 * */ private volatile MyHierarchicalHashedTimeWheelV2 lowestTimeWheel; /** * 時間輪的啟動時間(單位:納秒) * */ private long startTime; /** * 每次tick的間隔(單位:納秒) * */ private final long perTickTime; /** * 時間輪的大小 * */ private final int timeWheelSize; /** * 用于實際執(zhí)行到期任務(wù)的線程池 * */ private final Executor taskExecutor; /** * 用于存儲bucket元素的延遲隊列,用于解決時間輪空轉(zhuǎn)的問題 * */ private final DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue = new DelayQueue<>(); public MyHierarchicalHashedTimerV2(int timeWheelSize,long perTickTime, Executor taskExecutor) { this.timeWheelSize = timeWheelSize; this.perTickTime = perTickTime; this.taskExecutor = taskExecutor; } /** * 啟動worker線程等初始化操作,必須執(zhí)行完成后才能正常工作 * (簡單起見,和netty不一樣不是等任務(wù)被創(chuàng)建時才懶加載的,必須提前啟動) * */ @Override public void startTimeWheel(){ // 啟動worker線程 new Thread(new Worker()).start(); while (!this.started.get()){ // 自旋循環(huán),等待一會 } System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName()); } @Override public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){ long deadline = System.nanoTime() + timeUnit.toNanos(delayTime); // Guard against overflow. if (delayTime > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode(); newTimeoutTaskNode.setTargetTask(task); newTimeoutTaskNode.setDeadline(deadline); // 加入到最底層的時間輪中,當(dāng)前時間輪放不下的會溢出都上一層時間輪 this.lowestTimeWheel.addTimeoutTask(newTimeoutTaskNode); } private void advanceClock(){ try { MyHierarchyHashedTimeWheelBucketV2 bucket = this.bucketDelayQueue.take(); lowestTimeWheel.advanceClockByTick(bucket.getExpiration()); bucket.flush((node)->{ // 當(dāng)前選中的bucket中的任務(wù),重新插入到時間輪中 // 1 原本處于高層的bucket中的任務(wù)會被放到更底層 // 2 原本就處于最低一層的bucket中的任務(wù)會被直接執(zhí)行 this.lowestTimeWheel.addTimeoutTask(node); }); // 將當(dāng)前時間輪的數(shù)據(jù) } catch (Exception e) { // 忽略掉異常 e.printStackTrace(); } } private final class Worker implements Runnable { @Override public void run() { MyHierarchicalHashedTimerV2.this.startTime = System.nanoTime(); // 初始化最底層的時間輪 MyHierarchicalHashedTimerV2.this.lowestTimeWheel = new MyHierarchicalHashedTimeWheelV2( MyHierarchicalHashedTimerV2.this.startTime, MyHierarchicalHashedTimerV2.this.perTickTime, MyHierarchicalHashedTimerV2.this.timeWheelSize, MyHierarchicalHashedTimerV2.this.taskExecutor, MyHierarchicalHashedTimerV2.this.bucketDelayQueue ); // 啟動 MyHierarchicalHashedTimerV2.this.started.set(true); while (true){ // 一直無限循環(huán),不斷推進時間 advanceClock(); } } } }
public class MyHierarchicalHashedTimeWheelV2 { /** * 上層時間輪(生產(chǎn)者/消費者都會訪問到,volatile修飾) * */ private volatile MyHierarchicalHashedTimeWheelV2 overflowTimeWheel; /** * 每次tick的間隔(單位:納秒) * */ private final long perTickTime; /** * 時間輪環(huán)形數(shù)組 * */ private final MyHierarchyHashedTimeWheelBucketV2[] ringBucketArray; /** * 用于實際執(zhí)行到期任務(wù)的線程池 * */ private final Executor taskExecutor; /** * 時間輪的當(dāng)前時間 * */ private long currentTime; /** * 當(dāng)前時間輪的間隔(每次tick的時間 * 時間輪的大小) * */ private final long interval; private final DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue; public MyHierarchicalHashedTimeWheelV2(long startTime, long perTickTime, int wheelSize, Executor taskExecutor, DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue) { // 初始化環(huán)形數(shù)組 this.ringBucketArray = new MyHierarchyHashedTimeWheelBucketV2[wheelSize]; for(int i=0; i<wheelSize; i++){ this.ringBucketArray[i] = new MyHierarchyHashedTimeWheelBucketV2(); } // 初始化時,當(dāng)前時間為startTime this.currentTime = startTime - (startTime % perTickTime); this.perTickTime = perTickTime; this.taskExecutor = taskExecutor; this.interval = perTickTime * wheelSize; this.bucketDelayQueue = bucketDelayQueue; } public void addTimeoutTask(MyTimeoutTaskNode timeoutTaskNode) { long deadline = timeoutTaskNode.getDeadline(); if(deadline < this.currentTime + this.perTickTime){ // 超時時間小于1tick,直接執(zhí)行 this.taskExecutor.execute(timeoutTaskNode.getTargetTask()); }else if(deadline < this.currentTime + this.interval){ // 當(dāng)前時間輪放的下 // 在超時時,理論上總共需要的tick數(shù) long totalTick = deadline / this.perTickTime; // 如果傳入的deadline早于當(dāng)前系統(tǒng)時間,則totalTickWhenTimeout可能會小于當(dāng)前的totalTick // 這種情況下,讓這個任務(wù)在當(dāng)前tick下就立即超時而被調(diào)度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調(diào)度(所以必須取兩者的最大值) // 如果能限制環(huán)形數(shù)組的長度為2的冪,則可以改為ticks & mask,位運算效率更高 int stopIndex = (int) (totalTick % this.ringBucketArray.length); MyHierarchyHashedTimeWheelBucketV2 bucket = this.ringBucketArray[stopIndex]; // 計算并找到應(yīng)該被放置的那個bucket后,將其插入當(dāng)前bucket指向的鏈表中 bucket.addTimeout(timeoutTaskNode); // deadline先除以this.perTickTime再乘以this.perTickTime,可以保證放在同一個插槽下的任務(wù),expiration都是一樣的 long expiration = totalTick * this.perTickTime; boolean isNewRound = bucket.setExpiration(expiration); if(isNewRound){ this.bucketDelayQueue.offer(bucket); } }else{ // 當(dāng)前時間輪放不下 if(this.overflowTimeWheel == null){ createOverflowWheel(); } // 加入到上層的時間輪中(較大的deadline會遞歸多次) this.overflowTimeWheel.addTimeoutTask(timeoutTaskNode); } } /** * 推進當(dāng)前時間輪的時鐘 * 舉個例子:假設(shè)當(dāng)前時間輪的當(dāng)前時間是第10分鐘,perTickTime是1分鐘, * 1.如果expiration是第10分鐘第1秒,則不用推動當(dāng)前時間 * 2.如果expiration是第11分鐘第0秒,則需要推動當(dāng)前時間 * */ public void advanceClockByTick(long expiration){ // 只會在tick推進時才會被調(diào)用,參數(shù)expiration可以認為是當(dāng)前時間輪的系統(tǒng)時間 if(expiration >= this.currentTime + this.perTickTime){ // 超過了1tick,則需要推進當(dāng)前時間輪 (始終保持當(dāng)前時間是perTickTime的整數(shù)倍,邏輯上的totalTick) this.currentTime = expiration - (expiration % this.perTickTime); if(this.overflowTimeWheel != null){ // 如果上層時間輪存在,則遞歸的繼續(xù)推進 this.overflowTimeWheel.advanceClockByTick(expiration); } } } private synchronized void createOverflowWheel(){ if(this.overflowTimeWheel == null){ // 創(chuàng)建上層時間輪,上層時間輪的perTickTime = 當(dāng)前時間輪的interval this.overflowTimeWheel = new MyHierarchicalHashedTimeWheelV2( this.currentTime, this.interval, this.ringBucketArray.length, this.taskExecutor, this.bucketDelayQueue); } } }
public class MyHierarchyHashedTimeWheelBucketV2 implements Delayed { private final LinkedList<MyTimeoutTaskNode> taskList = new LinkedList<>(); private final AtomicLong expiration = new AtomicLong(-1); public synchronized void addTimeout(MyTimeoutTaskNode timeout) { taskList.add(timeout); } public synchronized void flush(Consumer<MyTimeoutTaskNode> flush) { Iterator<MyTimeoutTaskNode> iterator = taskList.iterator(); while (iterator.hasNext()){ MyTimeoutTaskNode node = iterator.next(); // 從當(dāng)前bucket中移除,轉(zhuǎn)移到更下層的時間輪中 iterator.remove(); flush.accept(node); // 簡單起見,不考慮任務(wù)被外部自己取消的case(netty里的timeout.isCancelled()) } this.expiration.set(-1L); } /** * 設(shè)置當(dāng)前bucket的超時時間 * @return 是否是一個新的bucket true:是 * */ public boolean setExpiration(long expiration){ long oldValue = this.expiration.getAndSet(expiration); // 如果不一樣,說明當(dāng)前的expiration已經(jīng)超過了原來的expiration一圈了,邏輯上不再是同一個bucket return oldValue != expiration; } public long getExpiration(){ return this.expiration.get(); } @Override public long getDelay(TimeUnit unit) { // 還剩余多少時間過期 long delayNanos = Math.max(this.expiration.get() - System.nanoTime(), 0); // 將納秒單位基于unit轉(zhuǎn)換 return unit.convert(delayNanos,TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { if(o instanceof MyHierarchyHashedTimeWheelBucketV2){ return Long.compare(this.expiration.get(),((MyHierarchyHashedTimeWheelBucketV2) o).expiration.get()); } return 0; } }
為什么netty的時間輪不解決空轉(zhuǎn)問題?(個人理解)
netty作為一個網(wǎng)絡(luò)框架,大量的計時器任務(wù)的超時時間都是相對較短的(最大一般是秒級),時間上的排布相對密集,時間輪空轉(zhuǎn)的問題不是特別大(rounds的值也會很小,從創(chuàng)建到被調(diào)度的開銷很低)。
而kafka的計時器模塊所要處理的任務(wù)其超時時間的跨度就相對大很多,時間上的排布很稀疏,所以引入延遲隊列來解決空轉(zhuǎn)問題收益就會大很多。
總結(jié)
雖然很早就了解過時間輪的概念,但直到自己造RPC框架輪子玩的時候才發(fā)現(xiàn)自己對時間輪的工作原理了解的并不深。
說來慚愧,當(dāng)時的我甚至無法很好的回答為什么netty、dubbo等框架要用到計時器的地方不去使用jdk現(xiàn)成的ScheduledThreadPoolExecutor而要自己寫一個時間輪。
基于費曼學(xué)習(xí)法,我仔細的研究了時間輪的論文并參考已有的開源實現(xiàn),重新實現(xiàn)了幾種簡化版的時間輪,并以技術(shù)博客的形式分享出來,希望能幫助到對時間輪工作原理感興趣的人。
以上就是詳解時間輪TimeWheel的工作原理的詳細內(nèi)容,更多關(guān)于時間輪TimeWheel的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springMVC向Controller傳值出現(xiàn)中文亂碼的解決方案
這篇文章主要介紹了springMVC向Controller傳值出現(xiàn)中文亂碼的解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02MyBatis標(biāo)簽之Select?resultType和resultMap詳解
這篇文章主要介紹了MyBatis標(biāo)簽之Select?resultType和resultMap,在MyBatis中有一個ResultMap標(biāo)簽,它是為了映射select標(biāo)簽查詢出來的結(jié)果集,下面使用一個簡單的例子,來介紹 resultMap 的使用方法,需要的朋友可以參考下2022-09-09EditPlus運行java時從鍵盤輸入數(shù)據(jù)的操作方法
這篇文章主要介紹了EditPlus運行java時從鍵盤輸入數(shù)據(jù)的操作方法,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03Springboot ApplicationRunner的使用解讀
這篇文章主要介紹了Springboot ApplicationRunner的使用解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05java基礎(chǔ)知識之FileInputStream流的使用
這篇文章主要介紹了java基礎(chǔ)知識之FileInputStream流的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12