PowerJob的HashedWheelTimer工作流程源碼解讀
序
本文主要研究一下PowerJob的HashedWheelTimer
Timer
tech/powerjob/server/common/timewheel/Timer.java
public interface Timer { /** * 調(diào)度定時(shí)任務(wù) */ TimerFuture schedule(TimerTask task, long delay, TimeUnit unit); /** * 停止所有調(diào)度任務(wù) */ Set<TimerTask> stop(); }
Timer接口定義了schedule方法,用于在指定時(shí)間之后調(diào)度TimerTask,它返回TimerFuture;stop方法返回未處理的TimerTask
TimerTask
@FunctionalInterface public interface TimerTask extends Runnable { }
TimerTask繼承了Runnable
TimerFuture
tech/powerjob/server/common/timewheel/TimerFuture.java
public interface TimerFuture { TimerTask getTask(); boolean cancel(); boolean isCancelled(); boolean isDone(); }
TimerFuture定于了getTask、cancel、isCancelled、isDone方法
HashedWheelTimer
tech/powerjob/server/common/timewheel/HashedWheelTimer.java
@Slf4j public class HashedWheelTimer implements Timer { private final long tickDuration; private final HashedWheelBucket[] wheel; private final int mask; private final Indicator indicator; private final long startTime; private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue(); private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue(); private final ExecutorService taskProcessPool; public HashedWheelTimer(long tickDuration, int ticksPerWheel) { this(tickDuration, ticksPerWheel, 0); } /** * 新建時(shí)間輪定時(shí)器 * @param tickDuration 時(shí)間間隔,單位毫秒(ms) * @param ticksPerWheel 輪盤(pán)個(gè)數(shù) * @param processThreadNum 處理任務(wù)的線程個(gè)數(shù),0代表不啟用新線程(如果定時(shí)任務(wù)需要耗時(shí)操作,請(qǐng)啟用線程池) */ public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) { this.tickDuration = tickDuration; // 初始化輪盤(pán),大小格式化為2的N次,可以使用 & 代替取余 int ticksNum = CommonUtils.formatSize(ticksPerWheel); wheel = new HashedWheelBucket[ticksNum]; for (int i = 0; i < ticksNum; i++) { wheel[i] = new HashedWheelBucket(); } mask = wheel.length - 1; // 初始化執(zhí)行線程池 if (processThreadNum <= 0) { taskProcessPool = null; }else { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); // 這里需要調(diào)整一下隊(duì)列大小 BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192); int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); // 基本都是 io 密集型任務(wù) taskProcessPool = new ThreadPoolExecutor(core, 2 * core, 60, TimeUnit.SECONDS, queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool")); } startTime = System.currentTimeMillis(); // 啟動(dòng)后臺(tái)線程 indicator = new Indicator(); new Thread(indicator, "HashedWheelTimer-Indicator").start(); } //...... }
HashedWheelTimer實(shí)現(xiàn)了Timer接口,其構(gòu)造器要求輸入tickDuration、ticksPerWheel,它會(huì)將ticksPerWheel轉(zhuǎn)換為2的N次,然后創(chuàng)建對(duì)應(yīng)的HashedWheelBucket,若processThreadNum大于0則同時(shí)創(chuàng)建ThreadPoolExecutor用于處理任務(wù),最后啟動(dòng)異步線程執(zhí)行Indicator
schedule
@Override public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) { long targetTime = System.currentTimeMillis() + unit.toMillis(delay); HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime); // 直接運(yùn)行到期、過(guò)期任務(wù) if (delay <= 0) { runTask(timerFuture); return timerFuture; } // 寫(xiě)入阻塞隊(duì)列,保證并發(fā)安全(性能進(jìn)一步優(yōu)化可以考慮 Netty 的 Multi-Producer-Single-Consumer隊(duì)列) waitingTasks.add(timerFuture); return timerFuture; }
schedule方法先計(jì)算目標(biāo)時(shí)間,然后創(chuàng)建對(duì)應(yīng)的HashedWheelTimerFuture,若delay小于等于0則執(zhí)行runTask,否則添加到waitingTasks
stop
@Override public Set<TimerTask> stop() { indicator.stop.set(true); taskProcessPool.shutdown(); while (!taskProcessPool.isTerminated()) { try { Thread.sleep(100); }catch (Exception ignore) { } } return indicator.getUnprocessedTasks(); }
stop方法先設(shè)置indicator的stop為true,然后執(zhí)行taskProcessPool.shutdown(),等待關(guān)閉,最后返回indicator.getUnprocessedTasks()
HashedWheelBucket
private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> { public void expireTimerTasks(long currentTick) { removeIf(timerFuture -> { // processCanceledTasks 后外部操作取消任務(wù)會(huì)導(dǎo)致 BUCKET 中仍存在 CANCELED 任務(wù)的情況 if (timerFuture.status == HashedWheelTimerFuture.CANCELED) { return true; } if (timerFuture.status != HashedWheelTimerFuture.WAITING) { log.warn("[HashedWheelTimer] impossible, please fix the bug"); return true; } // 本輪直接調(diào)度 if (timerFuture.totalTicks <= currentTick) { if (timerFuture.totalTicks < currentTick) { log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug"); } try { // 提交執(zhí)行 runTask(timerFuture); }catch (Exception ignore) { } finally { timerFuture.status = HashedWheelTimerFuture.FINISHED; } return true; } return false; }); } } private void runTask(HashedWheelTimerFuture timerFuture) { timerFuture.status = HashedWheelTimerFuture.RUNNING; if (taskProcessPool == null) { timerFuture.timerTask.run(); }else { taskProcessPool.submit(timerFuture.timerTask); } }
HashedWheelBucket繼承了LinkedList,其泛型為HashedWheelTimerFuture,它提供了expireTimerTasks方法,通過(guò)removeIf刪除status為CANCELED、status不為WAITING,以及執(zhí)行runTask(注意這里忽略了異常
)之后標(biāo)記status為FINISHED的元素;runTask先標(biāo)記為RUNNING,對(duì)于taskProcessPool為null則直接執(zhí)行,否則提交到taskProcessPool
HashedWheelTimerFuture
tech/powerjob/server/common/timewheel/HashedWheelTimer.java
private final class HashedWheelTimerFuture implements TimerFuture { // 預(yù)期執(zhí)行時(shí)間 private final long targetTime; private final TimerTask timerTask; // 所屬的時(shí)間格,用于快速刪除該任務(wù) private HashedWheelBucket bucket; // 總?cè)?shù) private long totalTicks; // 當(dāng)前狀態(tài) 0 - 初始化等待中,1 - 運(yùn)行中,2 - 完成,3 - 已取消 private int status; // 狀態(tài)枚舉值 private static final int WAITING = 0; private static final int RUNNING = 1; private static final int FINISHED = 2; private static final int CANCELED = 3; public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) { this.targetTime = targetTime; this.timerTask = timerTask; this.status = WAITING; } @Override public TimerTask getTask() { return timerTask; } @Override public boolean cancel() { if (status == WAITING) { status = CANCELED; canceledTasks.add(this); return true; } return false; } @Override public boolean isCancelled() { return status == CANCELED; } @Override public boolean isDone() { return status == FINISHED; } }
HashedWheelTimerFuture實(shí)現(xiàn)了TimerFuture接口,它定義了WAITING、RUNNING、FINISHED、CANCELED狀態(tài);初始狀態(tài)為WAITING,對(duì)于WAITING狀態(tài)的可以設(shè)置為CANCELED,并添加到canceledTasks;isCancelled判斷狀態(tài)是不是CANCELED,isDone判斷狀態(tài)是不是FINISHED
getUnprocessedTasks
public Set<TimerTask> getUnprocessedTasks() { try { latch.await(); }catch (Exception ignore) { } Set<TimerTask> tasks = Sets.newHashSet(); Consumer<HashedWheelTimerFuture> consumer = timerFuture -> { if (timerFuture.status == HashedWheelTimerFuture.WAITING) { tasks.add(timerFuture.timerTask); } }; waitingTasks.forEach(consumer); for (HashedWheelBucket bucket : wheel) { bucket.forEach(consumer); } return tasks; }
getUnprocessedTasks會(huì)等待Indicator的while循環(huán)結(jié)束,然后遍歷所有的HashedWheelBucket找出狀態(tài)還是WAITING的任務(wù)
Indicator
private class Indicator implements Runnable { private long tick = 0; private final AtomicBoolean stop = new AtomicBoolean(false); private final CountDownLatch latch = new CountDownLatch(1); @Override public void run() { while (!stop.get()) { // 1. 將任務(wù)從隊(duì)列推入時(shí)間輪 pushTaskToBucket(); // 2. 處理取消的任務(wù) processCanceledTasks(); // 3. 等待指針跳向下一刻 tickTack(); // 4. 執(zhí)行定時(shí)任務(wù) int currentIndex = (int) (tick & mask); HashedWheelBucket bucket = wheel[currentIndex]; bucket.expireTimerTasks(tick); tick ++; } latch.countDown(); } /** * 模擬指針轉(zhuǎn)動(dòng),當(dāng)返回時(shí)指針已經(jīng)轉(zhuǎn)到了下一個(gè)刻度 */ private void tickTack() { // 下一次調(diào)度的絕對(duì)時(shí)間 long nextTime = startTime + (tick + 1) * tickDuration; long sleepTime = nextTime - System.currentTimeMillis(); if (sleepTime > 0) { try { Thread.sleep(sleepTime); }catch (Exception ignore) { } } } /** * 處理被取消的任務(wù) */ private void processCanceledTasks() { while (true) { HashedWheelTimerFuture canceledTask = canceledTasks.poll(); if (canceledTask == null) { return; } // 從鏈表中刪除該任務(wù)(bucket為null說(shuō)明還沒(méi)被正式推入時(shí)間格中,不需要處理) if (canceledTask.bucket != null) { canceledTask.bucket.remove(canceledTask); } } } /** * 將隊(duì)列中的任務(wù)推入時(shí)間輪中 */ private void pushTaskToBucket() { while (true) { HashedWheelTimerFuture timerTask = waitingTasks.poll(); if (timerTask == null) { return; } // 總共的偏移量 long offset = timerTask.targetTime - startTime; // 總共需要走的指針步數(shù) timerTask.totalTicks = offset / tickDuration; // 取余計(jì)算 bucket index int index = (int) (timerTask.totalTicks & mask); HashedWheelBucket bucket = wheel[index]; // TimerTask 維護(hù) Bucket 引用,用于刪除該任務(wù) timerTask.bucket = bucket; if (timerTask.status == HashedWheelTimerFuture.WAITING) { bucket.add(timerTask); } } } public Set<TimerTask> getUnprocessedTasks() { try { latch.await(); }catch (Exception ignore) { } Set<TimerTask> tasks = Sets.newHashSet(); Consumer<HashedWheelTimerFuture> consumer = timerFuture -> { if (timerFuture.status == HashedWheelTimerFuture.WAITING) { tasks.add(timerFuture.timerTask); } }; waitingTasks.forEach(consumer); for (HashedWheelBucket bucket : wheel) { bucket.forEach(consumer); } return tasks; } }
Indicator實(shí)現(xiàn)了Runnable接口,其run方法在stop為false的時(shí)候循環(huán)執(zhí)行,pushTaskToBucket、processCanceledTasks、tickTack、expireTimerTasks
pushTaskToBucket
private void pushTaskToBucket() { while (true) { HashedWheelTimerFuture timerTask = waitingTasks.poll(); if (timerTask == null) { return; } // 總共的偏移量 long offset = timerTask.targetTime - startTime; // 總共需要走的指針步數(shù) timerTask.totalTicks = offset / tickDuration; // 取余計(jì)算 bucket index int index = (int) (timerTask.totalTicks & mask); HashedWheelBucket bucket = wheel[index]; // TimerTask 維護(hù) Bucket 引用,用于刪除該任務(wù) timerTask.bucket = bucket; if (timerTask.status == HashedWheelTimerFuture.WAITING) { bucket.add(timerTask); } } }
pushTaskToBucket通過(guò)waitingTasks.poll()拉取任務(wù),若為null直接返回,否則通過(guò)timerTask.targetTime與startTime計(jì)算offset,再根據(jù)tickDuration計(jì)算需要走的步數(shù),然后計(jì)算并獲取目標(biāo)HashedWheelBucket,然后將timerTask添加到bucket中
processCanceledTasks
private void processCanceledTasks() { while (true) { HashedWheelTimerFuture canceledTask = canceledTasks.poll(); if (canceledTask == null) { return; } // 從鏈表中刪除該任務(wù)(bucket為null說(shuō)明還沒(méi)被正式推入時(shí)間格中,不需要處理) if (canceledTask.bucket != null) { canceledTask.bucket.remove(canceledTask); } } }
processCanceledTasks會(huì)執(zhí)行canceledTasks.poll()拉取canceledTask,若canceledTask.bucket不為null則將canceledTask從該bucket中移除
tickTack
private void tickTack() { // 下一次調(diào)度的絕對(duì)時(shí)間 long nextTime = startTime + (tick + 1) * tickDuration; long sleepTime = nextTime - System.currentTimeMillis(); if (sleepTime > 0) { try { Thread.sleep(sleepTime); }catch (Exception ignore) { } } }
tickTack模擬指針移動(dòng),它先計(jì)算nextTime,再計(jì)算需要sleep多久,然后執(zhí)行Thread.sleep(sleepTime)
小結(jié)
PowerJob定義了Timer接口,并提供了HashedWheelTimer的實(shí)現(xiàn),它定義了waitingTasks、canceledTasks兩個(gè)LinkedBlockingQueue(無(wú)界隊(duì)列
),同時(shí)還支持定義任務(wù)處理線程池的core線程數(shù);它通過(guò)Indicator線程來(lái)處理時(shí)間輪的轉(zhuǎn)動(dòng)及任務(wù)處理,Indicator循環(huán)將waitingTasks的任務(wù)放入到對(duì)應(yīng)的bucket,然后模擬時(shí)間輪等待,然后通過(guò)bucket.expireTimerTasks(tick)處理到期任務(wù),最后再遞增tick。
以上就是PowerJob的HashedWheelTimer工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob HashedWheelTimer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JAVA實(shí)現(xiàn)sm3加密簽名以及防止重復(fù)攻擊
這篇文章主要給大家介紹了關(guān)于JAVA實(shí)現(xiàn)sm3加密簽名以及防止重復(fù)攻擊的相關(guān)資料,SM3是簽名算法,和MD5一樣(對(duì)于應(yīng)用層來(lái)說(shuō)),SM4是對(duì)稱加密算法,和AES一樣(對(duì)于應(yīng)用層來(lái)說(shuō)),需要的朋友可以參考下2023-10-10JSON各種轉(zhuǎn)換問(wèn)題(json轉(zhuǎn)List,json轉(zhuǎn)對(duì)象等)
這篇文章主要介紹了JSON各種轉(zhuǎn)換問(wèn)題(json轉(zhuǎn)List,json轉(zhuǎn)對(duì)象等),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-03-03Java中使用內(nèi)存映射實(shí)現(xiàn)大文件上傳實(shí)例
這篇文章主要介紹了Java中使用內(nèi)存映射實(shí)現(xiàn)大文件上傳實(shí)例,本文對(duì)比測(cè)試了FileInputStream 或者FileOutputStream 抑或RandomAccessFile的頻繁讀寫(xiě)操作,最后總結(jié)出映射到內(nèi)存后進(jìn)行讀寫(xiě)以提高速度,需要的朋友可以參考下2015-01-01如何寫(xiě)好一個(gè)Spring組件的實(shí)現(xiàn)步驟
這篇文章主要介紹了如何寫(xiě)好一個(gè)Spring組件的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06Java安全之Filter權(quán)限繞過(guò)的實(shí)現(xiàn)
在一些需要挖掘一些無(wú)條件RCE中,大部分類(lèi)似于一些系統(tǒng)大部分地方都做了權(quán)限控制的,而這時(shí)候想要利用權(quán)限繞過(guò)就顯得格外重要,本文就介紹了如何實(shí)現(xiàn),一起來(lái)了解一下2021-05-05詳解Java如何使用集合來(lái)實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)
讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java 集合實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11Reactive Programming入門(mén)概念詳解
這篇文章主要為大家介紹了Reactive Programming入門(mén)概念詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09