欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的HashedWheelTimer工作流程源碼解讀

 更新時(shí)間:2024年01月23日 09:06:26   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的HashedWheelTimer工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的HashedWheelTimer

Timer

tech/powerjob/server/common/timewheel/Timer.java

public interface Timer {
    /**
     * 調(diào)度定時(shí)任務(wù)
     */
    TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
    /**
     * 停止所有調(diào)度任務(wù)
     */
    Set<TimerTask> stop();
}
Timer接口定義了schedule方法,用于在指定時(shí)間之后調(diào)度TimerTask,它返回TimerFuture;stop方法返回未處理的TimerTask

TimerTask

@FunctionalInterface
public interface TimerTask extends Runnable {
}
TimerTask繼承了Runnable

TimerFuture

tech/powerjob/server/common/timewheel/TimerFuture.java

public interface TimerFuture {

    TimerTask getTask();

    boolean cancel();

    boolean isCancelled();

    boolean isDone();
}
TimerFuture定于了getTask、cancel、isCancelled、isDone方法

HashedWheelTimer

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

@Slf4j
public class HashedWheelTimer implements Timer {
    private final long tickDuration;
    private final HashedWheelBucket[] wheel;
    private final int mask;
    private final Indicator indicator;
    private final long startTime;
    private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
    private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();
    private final ExecutorService taskProcessPool;
    public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
        this(tickDuration, ticksPerWheel, 0);
    }
    /**
     * 新建時(shí)間輪定時(shí)器
     * @param tickDuration 時(shí)間間隔,單位毫秒(ms)
     * @param ticksPerWheel 輪盤(pán)個(gè)數(shù)
     * @param processThreadNum 處理任務(wù)的線程個(gè)數(shù),0代表不啟用新線程(如果定時(shí)任務(wù)需要耗時(shí)操作,請(qǐng)啟用線程池)
     */
    public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
        this.tickDuration = tickDuration;
        // 初始化輪盤(pán),大小格式化為2的N次,可以使用 & 代替取余
        int ticksNum = CommonUtils.formatSize(ticksPerWheel);
        wheel = new HashedWheelBucket[ticksNum];
        for (int i = 0; i < ticksNum; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        mask = wheel.length - 1;
        // 初始化執(zhí)行線程池
        if (processThreadNum <= 0) {
            taskProcessPool = null;
        }else {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
            // 這里需要調(diào)整一下隊(duì)列大小
            BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
            int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
            // 基本都是 io 密集型任務(wù)
            taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
                    60, TimeUnit.SECONDS,
                    queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
        }
        startTime = System.currentTimeMillis();
        // 啟動(dòng)后臺(tái)線程
        indicator = new Indicator();
        new Thread(indicator, "HashedWheelTimer-Indicator").start();
    }
    //......
}
HashedWheelTimer實(shí)現(xiàn)了Timer接口,其構(gòu)造器要求輸入tickDuration、ticksPerWheel,它會(huì)將ticksPerWheel轉(zhuǎn)換為2的N次,然后創(chuàng)建對(duì)應(yīng)的HashedWheelBucket,若processThreadNum大于0則同時(shí)創(chuàng)建ThreadPoolExecutor用于處理任務(wù),最后啟動(dòng)異步線程執(zhí)行Indicator

schedule

@Override
    public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {
        long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
        HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);
        // 直接運(yùn)行到期、過(guò)期任務(wù)
        if (delay <= 0) {
            runTask(timerFuture);
            return timerFuture;
        }
        // 寫(xiě)入阻塞隊(duì)列,保證并發(fā)安全(性能進(jìn)一步優(yōu)化可以考慮 Netty 的 Multi-Producer-Single-Consumer隊(duì)列)
        waitingTasks.add(timerFuture);
        return timerFuture;
    }
schedule方法先計(jì)算目標(biāo)時(shí)間,然后創(chuàng)建對(duì)應(yīng)的HashedWheelTimerFuture,若delay小于等于0則執(zhí)行runTask,否則添加到waitingTasks

stop

@Override
    public Set<TimerTask> stop() {
        indicator.stop.set(true);
        taskProcessPool.shutdown();
        while (!taskProcessPool.isTerminated()) {
            try {
                Thread.sleep(100);
            }catch (Exception ignore) {
            }
        }
        return indicator.getUnprocessedTasks();
    }
stop方法先設(shè)置indicator的stop為true,然后執(zhí)行taskProcessPool.shutdown(),等待關(guān)閉,最后返回indicator.getUnprocessedTasks()

HashedWheelBucket

private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {
        public void expireTimerTasks(long currentTick) {
            removeIf(timerFuture -> {
                // processCanceledTasks 后外部操作取消任務(wù)會(huì)導(dǎo)致 BUCKET 中仍存在 CANCELED 任務(wù)的情況
                if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
                    return true;
                }
                if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
                    log.warn("[HashedWheelTimer] impossible, please fix the bug");
                    return true;
                }
                // 本輪直接調(diào)度
                if (timerFuture.totalTicks <= currentTick) {
                    if (timerFuture.totalTicks < currentTick) {
                        log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
                    }
                    try {
                        // 提交執(zhí)行
                        runTask(timerFuture);
                    }catch (Exception ignore) {
                    } finally {
                        timerFuture.status = HashedWheelTimerFuture.FINISHED;
                    }
                    return true;
                }
                return false;
            });
        }
    }
    private void runTask(HashedWheelTimerFuture timerFuture) {
        timerFuture.status = HashedWheelTimerFuture.RUNNING;
        if (taskProcessPool == null) {
            timerFuture.timerTask.run();
        }else {
            taskProcessPool.submit(timerFuture.timerTask);
        }
    }
HashedWheelBucket繼承了LinkedList,其泛型為HashedWheelTimerFuture,它提供了expireTimerTasks方法,通過(guò)removeIf刪除status為CANCELED、status不為WAITING,以及執(zhí)行runTask(注意這里忽略了異常)之后標(biāo)記status為FINISHED的元素;runTask先標(biāo)記為RUNNING,對(duì)于taskProcessPool為null則直接執(zhí)行,否則提交到taskProcessPool

HashedWheelTimerFuture

tech/powerjob/server/common/timewheel/HashedWheelTimer.java

private final class HashedWheelTimerFuture implements TimerFuture {
        // 預(yù)期執(zhí)行時(shí)間
        private final long targetTime;
        private final TimerTask timerTask;
        // 所屬的時(shí)間格,用于快速刪除該任務(wù)
        private HashedWheelBucket bucket;
        // 總?cè)?shù)
        private long totalTicks;
        // 當(dāng)前狀態(tài) 0 - 初始化等待中,1 - 運(yùn)行中,2 - 完成,3 - 已取消
        private int status;
        // 狀態(tài)枚舉值
        private static final int WAITING = 0;
        private static final int RUNNING = 1;
        private static final int FINISHED = 2;
        private static final int CANCELED = 3;
        public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {
            this.targetTime = targetTime;
            this.timerTask = timerTask;
            this.status = WAITING;
        }
        @Override
        public TimerTask getTask() {
            return timerTask;
        }
        @Override
        public boolean cancel() {
            if (status == WAITING) {
                status = CANCELED;
                canceledTasks.add(this);
                return true;
            }
            return false;
        }
        @Override
        public boolean isCancelled() {
            return status == CANCELED;
        }
        @Override
        public boolean isDone() {
            return status == FINISHED;
        }
    }
HashedWheelTimerFuture實(shí)現(xiàn)了TimerFuture接口,它定義了WAITING、RUNNING、FINISHED、CANCELED狀態(tài);初始狀態(tài)為WAITING,對(duì)于WAITING狀態(tài)的可以設(shè)置為CANCELED,并添加到canceledTasks;isCancelled判斷狀態(tài)是不是CANCELED,isDone判斷狀態(tài)是不是FINISHED

getUnprocessedTasks

public Set<TimerTask> getUnprocessedTasks() {
            try {
                latch.await();
            }catch (Exception ignore) {
            }
            Set<TimerTask> tasks = Sets.newHashSet();
            Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
                if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                    tasks.add(timerFuture.timerTask);
                }
            };
            waitingTasks.forEach(consumer);
            for (HashedWheelBucket bucket : wheel) {
                bucket.forEach(consumer);
            }
            return tasks;
        }
getUnprocessedTasks會(huì)等待Indicator的while循環(huán)結(jié)束,然后遍歷所有的HashedWheelBucket找出狀態(tài)還是WAITING的任務(wù)

Indicator

private class Indicator implements Runnable {
        private long tick = 0;
        private final AtomicBoolean stop = new AtomicBoolean(false);
        private final CountDownLatch latch = new CountDownLatch(1);
        @Override
        public void run() {
            while (!stop.get()) {
                // 1. 將任務(wù)從隊(duì)列推入時(shí)間輪
                pushTaskToBucket();
                // 2. 處理取消的任務(wù)
                processCanceledTasks();
                // 3. 等待指針跳向下一刻
                tickTack();
                // 4. 執(zhí)行定時(shí)任務(wù)
                int currentIndex = (int) (tick & mask);
                HashedWheelBucket bucket = wheel[currentIndex];
                bucket.expireTimerTasks(tick);
                tick ++;
            }
            latch.countDown();
        }
        /**
         * 模擬指針轉(zhuǎn)動(dòng),當(dāng)返回時(shí)指針已經(jīng)轉(zhuǎn)到了下一個(gè)刻度
         */
        private void tickTack() {
            // 下一次調(diào)度的絕對(duì)時(shí)間
            long nextTime = startTime + (tick + 1) * tickDuration;
            long sleepTime = nextTime - System.currentTimeMillis();
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                }catch (Exception ignore) {
                }
            }
        }
        /**
         * 處理被取消的任務(wù)
         */
        private void processCanceledTasks() {
            while (true) {
                HashedWheelTimerFuture canceledTask = canceledTasks.poll();
                if (canceledTask == null) {
                    return;
                }
                // 從鏈表中刪除該任務(wù)(bucket為null說(shuō)明還沒(méi)被正式推入時(shí)間格中,不需要處理)
                if (canceledTask.bucket != null) {
                    canceledTask.bucket.remove(canceledTask);
                }
            }
        }
        /**
         * 將隊(duì)列中的任務(wù)推入時(shí)間輪中
         */
        private void pushTaskToBucket() {
            while (true) {
                HashedWheelTimerFuture timerTask = waitingTasks.poll();
                if (timerTask == null) {
                    return;
                }
                // 總共的偏移量
                long offset = timerTask.targetTime - startTime;
                // 總共需要走的指針步數(shù)
                timerTask.totalTicks = offset / tickDuration;
                // 取余計(jì)算 bucket index
                int index = (int) (timerTask.totalTicks & mask);
                HashedWheelBucket bucket = wheel[index];
                // TimerTask 維護(hù) Bucket 引用,用于刪除該任務(wù)
                timerTask.bucket = bucket;
                if (timerTask.status == HashedWheelTimerFuture.WAITING) {
                    bucket.add(timerTask);
                }
            }
        }
        public Set<TimerTask> getUnprocessedTasks() {
            try {
                latch.await();
            }catch (Exception ignore) {
            }
            Set<TimerTask> tasks = Sets.newHashSet();
            Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
                if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                    tasks.add(timerFuture.timerTask);
                }
            };
            waitingTasks.forEach(consumer);
            for (HashedWheelBucket bucket : wheel) {
                bucket.forEach(consumer);
            }
            return tasks;
        }
    }
Indicator實(shí)現(xiàn)了Runnable接口,其run方法在stop為false的時(shí)候循環(huán)執(zhí)行,pushTaskToBucket、processCanceledTasks、tickTack、expireTimerTasks

pushTaskToBucket

private void pushTaskToBucket() {
            while (true) {
                HashedWheelTimerFuture timerTask = waitingTasks.poll();
                if (timerTask == null) {
                    return;
                }
                // 總共的偏移量
                long offset = timerTask.targetTime - startTime;
                // 總共需要走的指針步數(shù)
                timerTask.totalTicks = offset / tickDuration;
                // 取余計(jì)算 bucket index
                int index = (int) (timerTask.totalTicks & mask);
                HashedWheelBucket bucket = wheel[index];
                // TimerTask 維護(hù) Bucket 引用,用于刪除該任務(wù)
                timerTask.bucket = bucket;
                if (timerTask.status == HashedWheelTimerFuture.WAITING) {
                    bucket.add(timerTask);
                }
            }
        }
pushTaskToBucket通過(guò)waitingTasks.poll()拉取任務(wù),若為null直接返回,否則通過(guò)timerTask.targetTime與startTime計(jì)算offset,再根據(jù)tickDuration計(jì)算需要走的步數(shù),然后計(jì)算并獲取目標(biāo)HashedWheelBucket,然后將timerTask添加到bucket中

processCanceledTasks

private void processCanceledTasks() {
            while (true) {
                HashedWheelTimerFuture canceledTask = canceledTasks.poll();
                if (canceledTask == null) {
                    return;
                }
                // 從鏈表中刪除該任務(wù)(bucket為null說(shuō)明還沒(méi)被正式推入時(shí)間格中,不需要處理)
                if (canceledTask.bucket != null) {
                    canceledTask.bucket.remove(canceledTask);
                }
            }
        }
processCanceledTasks會(huì)執(zhí)行canceledTasks.poll()拉取canceledTask,若canceledTask.bucket不為null則將canceledTask從該bucket中移除

tickTack

private void tickTack() {
            // 下一次調(diào)度的絕對(duì)時(shí)間
            long nextTime = startTime + (tick + 1) * tickDuration;
            long sleepTime = nextTime - System.currentTimeMillis();
            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                }catch (Exception ignore) {
                }
            }
        }
tickTack模擬指針移動(dòng),它先計(jì)算nextTime,再計(jì)算需要sleep多久,然后執(zhí)行Thread.sleep(sleepTime)

小結(jié)

PowerJob定義了Timer接口,并提供了HashedWheelTimer的實(shí)現(xiàn),它定義了waitingTasks、canceledTasks兩個(gè)LinkedBlockingQueue(無(wú)界隊(duì)列),同時(shí)還支持定義任務(wù)處理線程池的core線程數(shù);它通過(guò)Indicator線程來(lái)處理時(shí)間輪的轉(zhuǎn)動(dòng)及任務(wù)處理,Indicator循環(huán)將waitingTasks的任務(wù)放入到對(duì)應(yīng)的bucket,然后模擬時(shí)間輪等待,然后通過(guò)bucket.expireTimerTasks(tick)處理到期任務(wù),最后再遞增tick。

以上就是PowerJob的HashedWheelTimer工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob HashedWheelTimer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • JAVA實(shí)現(xiàn)sm3加密簽名以及防止重復(fù)攻擊

    JAVA實(shí)現(xiàn)sm3加密簽名以及防止重復(fù)攻擊

    這篇文章主要給大家介紹了關(guān)于JAVA實(shí)現(xiàn)sm3加密簽名以及防止重復(fù)攻擊的相關(guān)資料,SM3是簽名算法,和MD5一樣(對(duì)于應(yīng)用層來(lái)說(shuō)),SM4是對(duì)稱加密算法,和AES一樣(對(duì)于應(yīng)用層來(lái)說(shuō)),需要的朋友可以參考下
    2023-10-10
  • JSON各種轉(zhuǎn)換問(wèn)題(json轉(zhuǎn)List,json轉(zhuǎn)對(duì)象等)

    JSON各種轉(zhuǎn)換問(wèn)題(json轉(zhuǎn)List,json轉(zhuǎn)對(duì)象等)

    這篇文章主要介紹了JSON各種轉(zhuǎn)換問(wèn)題(json轉(zhuǎn)List,json轉(zhuǎn)對(duì)象等),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-03-03
  • Java中使用內(nèi)存映射實(shí)現(xiàn)大文件上傳實(shí)例

    Java中使用內(nèi)存映射實(shí)現(xiàn)大文件上傳實(shí)例

    這篇文章主要介紹了Java中使用內(nèi)存映射實(shí)現(xiàn)大文件上傳實(shí)例,本文對(duì)比測(cè)試了FileInputStream 或者FileOutputStream 抑或RandomAccessFile的頻繁讀寫(xiě)操作,最后總結(jié)出映射到內(nèi)存后進(jìn)行讀寫(xiě)以提高速度,需要的朋友可以參考下
    2015-01-01
  • 如何寫(xiě)好一個(gè)Spring組件的實(shí)現(xiàn)步驟

    如何寫(xiě)好一個(gè)Spring組件的實(shí)現(xiàn)步驟

    這篇文章主要介紹了如何寫(xiě)好一個(gè)Spring組件的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • Java安全之Filter權(quán)限繞過(guò)的實(shí)現(xiàn)

    Java安全之Filter權(quán)限繞過(guò)的實(shí)現(xiàn)

    在一些需要挖掘一些無(wú)條件RCE中,大部分類(lèi)似于一些系統(tǒng)大部分地方都做了權(quán)限控制的,而這時(shí)候想要利用權(quán)限繞過(guò)就顯得格外重要,本文就介紹了如何實(shí)現(xiàn),一起來(lái)了解一下
    2021-05-05
  • Java源碼解析Integer方法解讀

    Java源碼解析Integer方法解讀

    這篇文章主要介紹了Java源碼解析Integer方法解讀,包括toString方法、toUnsignedString方法、highestOneBit方法等,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-12-12
  • 詳解Java如何使用集合來(lái)實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)

    詳解Java如何使用集合來(lái)實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng)

    讀萬(wàn)卷書(shū)不如行萬(wàn)里路,只學(xué)書(shū)上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Java 集合實(shí)現(xiàn)一個(gè)客戶信息管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平
    2021-11-11
  • java PDF添加圖層的方法 支持多頁(yè)圖層添加

    java PDF添加圖層的方法 支持多頁(yè)圖層添加

    這篇文章主要為大家詳細(xì)介紹了java PDF添加圖層的方法,支持多頁(yè)圖層添加,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • Reactive Programming入門(mén)概念詳解

    Reactive Programming入門(mén)概念詳解

    這篇文章主要為大家介紹了Reactive Programming入門(mén)概念詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • Maven使用集成測(cè)試的示例代碼

    Maven使用集成測(cè)試的示例代碼

    本文介紹了在Maven項(xiàng)目中使用maven-failsafe-plugin插件進(jìn)行集成測(cè)試,步驟包括添加測(cè)試依賴、編寫(xiě)集成測(cè)試類(lèi)、配置插件、運(yùn)行測(cè)試以及查看和分析測(cè)試結(jié)果,感興趣的可以了解一下
    2024-11-11

最新評(píng)論