PowerJob的HashedWheelTimer工作流程源碼解讀
序
本文主要研究一下PowerJob的HashedWheelTimer
Timer
tech/powerjob/server/common/timewheel/Timer.java
public interface Timer {
/**
* 調度定時任務
*/
TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
/**
* 停止所有調度任務
*/
Set<TimerTask> stop();
}Timer接口定義了schedule方法,用于在指定時間之后調度TimerTask,它返回TimerFuture;stop方法返回未處理的TimerTask
TimerTask
@FunctionalInterface
public interface TimerTask extends Runnable {
}TimerTask繼承了Runnable
TimerFuture
tech/powerjob/server/common/timewheel/TimerFuture.java
public interface TimerFuture {
TimerTask getTask();
boolean cancel();
boolean isCancelled();
boolean isDone();
}TimerFuture定于了getTask、cancel、isCancelled、isDone方法
HashedWheelTimer
tech/powerjob/server/common/timewheel/HashedWheelTimer.java
@Slf4j
public class HashedWheelTimer implements Timer {
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final Indicator indicator;
private final long startTime;
private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();
private final ExecutorService taskProcessPool;
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
/**
* 新建時間輪定時器
* @param tickDuration 時間間隔,單位毫秒(ms)
* @param ticksPerWheel 輪盤個數(shù)
* @param processThreadNum 處理任務的線程個數(shù),0代表不啟用新線程(如果定時任務需要耗時操作,請啟用線程池)
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
// 初始化輪盤,大小格式化為2的N次,可以使用 & 代替取余
int ticksNum = CommonUtils.formatSize(ticksPerWheel);
wheel = new HashedWheelBucket[ticksNum];
for (int i = 0; i < ticksNum; i++) {
wheel[i] = new HashedWheelBucket();
}
mask = wheel.length - 1;
// 初始化執(zhí)行線程池
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
// 這里需要調整一下隊列大小
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
// 基本都是 io 密集型任務
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
// 啟動后臺線程
indicator = new Indicator();
new Thread(indicator, "HashedWheelTimer-Indicator").start();
}
//......
}HashedWheelTimer實現(xiàn)了Timer接口,其構造器要求輸入tickDuration、ticksPerWheel,它會將ticksPerWheel轉換為2的N次,然后創(chuàng)建對應的HashedWheelBucket,若processThreadNum大于0則同時創(chuàng)建ThreadPoolExecutor用于處理任務,最后啟動異步線程執(zhí)行Indicator
schedule
@Override
public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);
// 直接運行到期、過期任務
if (delay <= 0) {
runTask(timerFuture);
return timerFuture;
}
// 寫入阻塞隊列,保證并發(fā)安全(性能進一步優(yōu)化可以考慮 Netty 的 Multi-Producer-Single-Consumer隊列)
waitingTasks.add(timerFuture);
return timerFuture;
}schedule方法先計算目標時間,然后創(chuàng)建對應的HashedWheelTimerFuture,若delay小于等于0則執(zhí)行runTask,否則添加到waitingTasks
stop
@Override
public Set<TimerTask> stop() {
indicator.stop.set(true);
taskProcessPool.shutdown();
while (!taskProcessPool.isTerminated()) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
}
return indicator.getUnprocessedTasks();
}stop方法先設置indicator的stop為true,然后執(zhí)行taskProcessPool.shutdown(),等待關閉,最后返回indicator.getUnprocessedTasks()
HashedWheelBucket
private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {
public void expireTimerTasks(long currentTick) {
removeIf(timerFuture -> {
// processCanceledTasks 后外部操作取消任務會導致 BUCKET 中仍存在 CANCELED 任務的情況
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
return true;
}
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;
}
// 本輪直接調度
if (timerFuture.totalTicks <= currentTick) {
if (timerFuture.totalTicks < currentTick) {
log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
}
try {
// 提交執(zhí)行
runTask(timerFuture);
}catch (Exception ignore) {
} finally {
timerFuture.status = HashedWheelTimerFuture.FINISHED;
}
return true;
}
return false;
});
}
}
private void runTask(HashedWheelTimerFuture timerFuture) {
timerFuture.status = HashedWheelTimerFuture.RUNNING;
if (taskProcessPool == null) {
timerFuture.timerTask.run();
}else {
taskProcessPool.submit(timerFuture.timerTask);
}
}HashedWheelBucket繼承了LinkedList,其泛型為HashedWheelTimerFuture,它提供了expireTimerTasks方法,通過removeIf刪除status為CANCELED、status不為WAITING,以及執(zhí)行runTask(注意這里忽略了異常)之后標記status為FINISHED的元素;runTask先標記為RUNNING,對于taskProcessPool為null則直接執(zhí)行,否則提交到taskProcessPool
HashedWheelTimerFuture
tech/powerjob/server/common/timewheel/HashedWheelTimer.java
private final class HashedWheelTimerFuture implements TimerFuture {
// 預期執(zhí)行時間
private final long targetTime;
private final TimerTask timerTask;
// 所屬的時間格,用于快速刪除該任務
private HashedWheelBucket bucket;
// 總圈數(shù)
private long totalTicks;
// 當前狀態(tài) 0 - 初始化等待中,1 - 運行中,2 - 完成,3 - 已取消
private int status;
// 狀態(tài)枚舉值
private static final int WAITING = 0;
private static final int RUNNING = 1;
private static final int FINISHED = 2;
private static final int CANCELED = 3;
public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {
this.targetTime = targetTime;
this.timerTask = timerTask;
this.status = WAITING;
}
@Override
public TimerTask getTask() {
return timerTask;
}
@Override
public boolean cancel() {
if (status == WAITING) {
status = CANCELED;
canceledTasks.add(this);
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return status == CANCELED;
}
@Override
public boolean isDone() {
return status == FINISHED;
}
}HashedWheelTimerFuture實現(xiàn)了TimerFuture接口,它定義了WAITING、RUNNING、FINISHED、CANCELED狀態(tài);初始狀態(tài)為WAITING,對于WAITING狀態(tài)的可以設置為CANCELED,并添加到canceledTasks;isCancelled判斷狀態(tài)是不是CANCELED,isDone判斷狀態(tài)是不是FINISHED
getUnprocessedTasks
public Set<TimerTask> getUnprocessedTasks() {
try {
latch.await();
}catch (Exception ignore) {
}
Set<TimerTask> tasks = Sets.newHashSet();
Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
tasks.add(timerFuture.timerTask);
}
};
waitingTasks.forEach(consumer);
for (HashedWheelBucket bucket : wheel) {
bucket.forEach(consumer);
}
return tasks;
}getUnprocessedTasks會等待Indicator的while循環(huán)結束,然后遍歷所有的HashedWheelBucket找出狀態(tài)還是WAITING的任務
Indicator
private class Indicator implements Runnable {
private long tick = 0;
private final AtomicBoolean stop = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run() {
while (!stop.get()) {
// 1. 將任務從隊列推入時間輪
pushTaskToBucket();
// 2. 處理取消的任務
processCanceledTasks();
// 3. 等待指針跳向下一刻
tickTack();
// 4. 執(zhí)行定時任務
int currentIndex = (int) (tick & mask);
HashedWheelBucket bucket = wheel[currentIndex];
bucket.expireTimerTasks(tick);
tick ++;
}
latch.countDown();
}
/**
* 模擬指針轉動,當返回時指針已經轉到了下一個刻度
*/
private void tickTack() {
// 下一次調度的絕對時間
long nextTime = startTime + (tick + 1) * tickDuration;
long sleepTime = nextTime - System.currentTimeMillis();
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
}catch (Exception ignore) {
}
}
}
/**
* 處理被取消的任務
*/
private void processCanceledTasks() {
while (true) {
HashedWheelTimerFuture canceledTask = canceledTasks.poll();
if (canceledTask == null) {
return;
}
// 從鏈表中刪除該任務(bucket為null說明還沒被正式推入時間格中,不需要處理)
if (canceledTask.bucket != null) {
canceledTask.bucket.remove(canceledTask);
}
}
}
/**
* 將隊列中的任務推入時間輪中
*/
private void pushTaskToBucket() {
while (true) {
HashedWheelTimerFuture timerTask = waitingTasks.poll();
if (timerTask == null) {
return;
}
// 總共的偏移量
long offset = timerTask.targetTime - startTime;
// 總共需要走的指針步數(shù)
timerTask.totalTicks = offset / tickDuration;
// 取余計算 bucket index
int index = (int) (timerTask.totalTicks & mask);
HashedWheelBucket bucket = wheel[index];
// TimerTask 維護 Bucket 引用,用于刪除該任務
timerTask.bucket = bucket;
if (timerTask.status == HashedWheelTimerFuture.WAITING) {
bucket.add(timerTask);
}
}
}
public Set<TimerTask> getUnprocessedTasks() {
try {
latch.await();
}catch (Exception ignore) {
}
Set<TimerTask> tasks = Sets.newHashSet();
Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
tasks.add(timerFuture.timerTask);
}
};
waitingTasks.forEach(consumer);
for (HashedWheelBucket bucket : wheel) {
bucket.forEach(consumer);
}
return tasks;
}
}Indicator實現(xiàn)了Runnable接口,其run方法在stop為false的時候循環(huán)執(zhí)行,pushTaskToBucket、processCanceledTasks、tickTack、expireTimerTasks
pushTaskToBucket
private void pushTaskToBucket() {
while (true) {
HashedWheelTimerFuture timerTask = waitingTasks.poll();
if (timerTask == null) {
return;
}
// 總共的偏移量
long offset = timerTask.targetTime - startTime;
// 總共需要走的指針步數(shù)
timerTask.totalTicks = offset / tickDuration;
// 取余計算 bucket index
int index = (int) (timerTask.totalTicks & mask);
HashedWheelBucket bucket = wheel[index];
// TimerTask 維護 Bucket 引用,用于刪除該任務
timerTask.bucket = bucket;
if (timerTask.status == HashedWheelTimerFuture.WAITING) {
bucket.add(timerTask);
}
}
}pushTaskToBucket通過waitingTasks.poll()拉取任務,若為null直接返回,否則通過timerTask.targetTime與startTime計算offset,再根據(jù)tickDuration計算需要走的步數(shù),然后計算并獲取目標HashedWheelBucket,然后將timerTask添加到bucket中
processCanceledTasks
private void processCanceledTasks() {
while (true) {
HashedWheelTimerFuture canceledTask = canceledTasks.poll();
if (canceledTask == null) {
return;
}
// 從鏈表中刪除該任務(bucket為null說明還沒被正式推入時間格中,不需要處理)
if (canceledTask.bucket != null) {
canceledTask.bucket.remove(canceledTask);
}
}
}processCanceledTasks會執(zhí)行canceledTasks.poll()拉取canceledTask,若canceledTask.bucket不為null則將canceledTask從該bucket中移除
tickTack
private void tickTack() {
// 下一次調度的絕對時間
long nextTime = startTime + (tick + 1) * tickDuration;
long sleepTime = nextTime - System.currentTimeMillis();
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
}catch (Exception ignore) {
}
}
}tickTack模擬指針移動,它先計算nextTime,再計算需要sleep多久,然后執(zhí)行Thread.sleep(sleepTime)
小結
PowerJob定義了Timer接口,并提供了HashedWheelTimer的實現(xiàn),它定義了waitingTasks、canceledTasks兩個LinkedBlockingQueue(無界隊列),同時還支持定義任務處理線程池的core線程數(shù);它通過Indicator線程來處理時間輪的轉動及任務處理,Indicator循環(huán)將waitingTasks的任務放入到對應的bucket,然后模擬時間輪等待,然后通過bucket.expireTimerTasks(tick)處理到期任務,最后再遞增tick。
以上就是PowerJob的HashedWheelTimer工作流程源碼解讀的詳細內容,更多關于PowerJob HashedWheelTimer的資料請關注腳本之家其它相關文章!
相關文章
JSON各種轉換問題(json轉List,json轉對象等)
這篇文章主要介紹了JSON各種轉換問題(json轉List,json轉對象等),本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-03-03
詳解Java如何使用集合來實現(xiàn)一個客戶信息管理系統(tǒng)
讀萬卷書不如行萬里路,只學書上的理論是遠遠不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java 集合實現(xiàn)一個客戶信息管理系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11

