springboot+dubbo實現(xiàn)時間輪算法
前言
時間輪(TimingWheel)是一種高效利用線程資源進(jìn)行批量化調(diào)度的算法,廣泛應(yīng)用于各種操作系統(tǒng)的定時任務(wù)調(diào)度中,如Linux的crontab,以及Java開發(fā)中常用的Dubbo、Netty、Kafka等框架。時間輪的核心思想是將時間劃分為若干個時間間隔(bucket),每個時間間隔代表一個時間段,并通過一個循環(huán)的數(shù)據(jù)結(jié)構(gòu)(類似于時鐘)來管理這些時間間隔。
文章基于3.1.0版本進(jìn)行分析
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>3.1.0</version> </dependency>
一、參數(shù)說明
- tickDuration:表示一個槽所代表的時間范圍 默認(rèn)100ms
- ticksPerWheel:表示該時間輪有多少個槽 默認(rèn)512
- startTime:表示該時間輪的開始時間
- interval:時間輪所能表示的時間跨度,也就是 tickDuration * ticksPerWheel
- currentTime:表示當(dāng)前時間,也就是時間輪指針指向的時間
- wheel:表示TimerTaskList的數(shù)組,即各個槽,每個bucket都是一個 HashedWheelBucket 。
- HashedWheelBucket:存儲TimerTaskEntry的雙向鏈表
- HashedWheelTimeout:延遲任務(wù),有兩個值 deadline 和 remainingRounds
- deadline:TimerTask 最后的執(zhí)行時間
- remainingRounds:剩余圈數(shù)
- timeouts:用來保存新增的HashedWheelTimeout,每次執(zhí)行會拿出10W個放入HashedWheelBucket
二、具體實現(xiàn)
1、HashedWheelTimer
時間輪實現(xiàn)類
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { // 檢查參數(shù) if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 創(chuàng)建時間輪 wheel = createWheel(ticksPerWheel); // 位運(yùn)算標(biāo)識 // 因為一圈的長度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline& mast == deadline % wheel.length // deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODO mask = wheel.length - 1; // Convert tickDuration to nanos. // 時間輪的基本時間跨度,轉(zhuǎn)成最小時間單位Nanos this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. // 時間跨度限制不能太大,計算會有問題 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // 創(chuàng)建時間輪工作線程 workerThread = threadFactory.newThread(worker); this.maxPendingTimeouts = maxPendingTimeouts; // 延遲任務(wù)太多的時間,警告日志 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
參數(shù)說明:
- threadFactory
線程工廠,創(chuàng)建時間輪線程 - tickDuration
每一tick的時間 - timeUnit
tickDuration的時間單位 - ticksPerWheel
就是輪子一共有多個格子,即要多少個tick才能走完這個wheel一圈。
2、createWheel
創(chuàng)建時間輪
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } // 如果不是2^n 則調(diào)整為2^n ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); // 初始化時間輪槽 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
3、newTimeout
添加新任務(wù)
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 如果任務(wù)大于最大量,則不允許繼續(xù)添加 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 啟動時間輪工作線程 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. // 如果為負(fù)數(shù),那么說明超過了long的最大值 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
4、start
啟動時間輪
public void start() { switch (WORKER_STATE_UPDATER.get(this)) { // 如果是初始化 啟動實踐論 case WORKER_STATE_INIT: // 保證只啟動一次,原子性 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; // 已經(jīng)啟動過了 case WORKER_STATE_STARTED: break; // 時間輪停止了 case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. // 線程啟動執(zhí)行任務(wù)是異步的,這里是等待workerThread.start(),線程已經(jīng)啟動了 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
5、run
workerThread.start()啟動后,會執(zhí)行Worker的run方法
public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). // 喚醒被阻塞的start()方法。 startTimeInitialized.countDown(); do { // 等下一個槽的到達(dá)時間,開始執(zhí)行上一個槽的任務(wù) TODO 不明白這里的設(shè)計,哪位大佬知道可以指點一下 final long deadline = waitForNextTick(); if (deadline > 0) { // 計算時間輪的槽位 int idx = (int) (tick & mask); // 移除取消的了task processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時任務(wù)隊列中的任務(wù)加入到指定的格子中 transferTimeoutsToBuckets(); // 運(yùn)行目前指針指向的槽中的bucket鏈表中的任務(wù),執(zhí)行到期的延時任務(wù) bucket.expireTimeouts(deadline); tick++; } } // 如果Worker_State一只是started狀態(tài),就一直循環(huán) while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { // 清除時間輪中超時未處理的任務(wù) bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { // 遍歷任務(wù)隊列,發(fā)現(xiàn)如果有任務(wù)被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊列中。 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } // 再次移除取消的了task processCancelledTasks(); }
6、waitForNextTick
一個鐘表上的間隔是代表一個單位時間的間隔,那么waitForNextTick就是根據(jù)當(dāng)前時間計算出跳動到下個時間的時間間隔,然后進(jìn)行sleep,結(jié)束后進(jìn)入下一個時間間隔,下一個間隔到來的時候返回。
/** * 根據(jù)startTime和當(dāng)前槽位計算目標(biāo)nanoTime, * 等待時間到達(dá) * * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ private long waitForNextTick() { // tick槽位,tickDuration表示每個時間格的跨度,所以deadline返回的是下一次時間輪指針跳動的時間 long deadline = tickDuration * (tick + 1); for (; ; ) { // 計算當(dāng)前時間距離啟動時間的時間間隔,期間休眠 final long currentTime = System.nanoTime() - startTime; // 計算sleepTimeMs先加999999,應(yīng)該是不足1ms的,補(bǔ)足1ms long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // sleepTimeMs小于零表示走到了下一個時間槽位置 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // Windows 時間換算 if (isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { // 當(dāng)前時間距離下一次tick時間還有一段距離,需要sleep Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }
7、transferTimeoutsToBuckets
轉(zhuǎn)移任務(wù)到時間輪的具體槽中
// 將延時任務(wù)隊列中等待添加到時間輪中的延時任務(wù)轉(zhuǎn)移到時間輪的指定位置 private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 循環(huán)100000次,也就是每次轉(zhuǎn)移10w個任務(wù) // 為了防止這個操作銷毀太多時間,導(dǎo)致更多的任務(wù)時間不準(zhǔn),因此一次最多操作10w個 for (int i = 0; i < 100000; i++) { // 從阻塞隊列中獲得具體的任務(wù) HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 計算tick次數(shù),deadline表示當(dāng)前任務(wù)的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當(dāng)前任務(wù)需要tick幾次才能被執(zhí)行 long calculated = timeout.deadline / tickDuration; // 計算剩余的輪數(shù), 只有 timer 走夠輪數(shù), 并且到達(dá)了 task 所在的 slot, task 才會過期.(被執(zhí)行) timeout.remainingRounds = (calculated - tick) / wheel.length; // Ensure we don't schedule for past. // 如果任務(wù)在timeouts隊列里面放久了, 以至于已經(jīng)過了執(zhí)行時間, 這個時候就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會被執(zhí)行 final long ticks = Math.max(calculated, tick); // 算出任務(wù)應(yīng)該插入的 wheel 的 slot, stopIndex = tick 次數(shù) & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); // 把timeout任務(wù)插入到指定的bucket鏈中。 HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } }
8、expireTimeouts
當(dāng)指針跳動到某一個時間槽中時,會就觸發(fā)這個槽中的任務(wù)的執(zhí)行。該功能是通過expireTimeouts來實現(xiàn)
void expireTimeouts(long deadline) { // 雙向鏈表 HashedWheelTimeout timeout = head; // process all timeouts // 遍歷當(dāng)前時間槽中的所有任務(wù) while (timeout != null) { HashedWheelTimeout next = timeout.next; // 如果當(dāng)前任務(wù)要被執(zhí)行,那么remainingRounds應(yīng)該小于或者等于0 if (timeout.remainingRounds <= 0) { // 從bucket鏈表中移除當(dāng)前timeout,并返回鏈表中下一個timeout next = remove(timeout); // 如果timeout的時間小于當(dāng)前的時間,那么就調(diào)用expire執(zhí)行task if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. // 不可能發(fā)生的情況,就是說round已經(jīng)為0了,deadline卻 > 當(dāng)前槽的deadline throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { // 因為當(dāng)前的槽位已經(jīng)過了,說明已經(jīng)走了一圈了,把輪數(shù)減一 timeout.remainingRounds--; } // 把指針放置到下一個timeout timeout = next; } }
總結(jié)
時間輪(TimingWheel)在計算機(jī)科學(xué)中,特別是在任務(wù)調(diào)度和時間管理方面,具有重要的意義,我們可以結(jié)合業(yè)務(wù)進(jìn)行使用
- 節(jié)省cpu資源
- 易于實現(xiàn)和維護(hù)
- 批量化調(diào)度模型
- 高效處理大量定時任務(wù)
- 靈活適應(yīng)不同應(yīng)用場景
到此這篇關(guān)于springboot+dubbo實現(xiàn)時間輪算法的文章就介紹到這了,更多相關(guān)springboot dubbo時間輪內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Dubbo?LoadBalance基于權(quán)重的隨機(jī)負(fù)載均衡算法提高服務(wù)性能
- 解決dubbo啟動報服務(wù)注冊失敗Failed?to?register?dubbo
- Springboot整合Dubbo+Nacos實現(xiàn)RPC調(diào)用的示例代碼
- SpringCloud微服務(wù)集成Dubbo的詳細(xì)過程
- 解決Dubbo應(yīng)用啟動注冊ZK獲取IP慢的原因之一
- 解決dubbo注冊到zookeeper速度慢的問題
- dubbo?filter中有關(guān)bean注入和配置文件讀取方式
- dubbo如何設(shè)置連接zookeeper權(quán)限
- 將Dubbo服務(wù)打包成Jar包的操作步驟
- JAVA的Dubbo如何實現(xiàn)各種限流算法
相關(guān)文章
SpringBoot項目實現(xiàn)日志打印SQL的常用方法(包括SQL語句和參數(shù))
有時候遇到問題需要根據(jù)我們編寫的SQL進(jìn)行分析,但如果不進(jìn)行一些開發(fā)或者配置的話,這些SQL是不會打印到控制臺的,它們默認(rèn)是隱藏的。下面給大家介紹幾種常用的方法,感興趣的朋友跟隨小編一起看看吧2024-04-04java計算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法
這篇文章主要介紹了java計算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法,涉及java字符串的遍歷、轉(zhuǎn)換及運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02Java實現(xiàn)枚舉狀態(tài)轉(zhuǎn)換的方法詳解
在軟件開發(fā)中,我們經(jīng)常需要處理不同系統(tǒng)或模塊間的狀態(tài)轉(zhuǎn)換,今天,我將通過一個電商訂單與物流狀態(tài)的轉(zhuǎn)換案例,展示如何優(yōu)雅地實現(xiàn)枚舉間的互相轉(zhuǎn)換,需要的朋友可以參考下2025-04-04Java中LinkedList詳解和使用示例_動力節(jié)點Java學(xué)院整理
LinkedList 是一個繼承于AbstractSequentialList的雙向鏈表。它也可以被當(dāng)作堆棧、隊列或雙端隊列進(jìn)行操作。接下來通過示例代碼給大家詳細(xì)介紹java中l(wèi)inkedlist的使用,需要的朋友參考下吧2017-05-05EasyUi+Spring Data 實現(xiàn)按條件分頁查詢的實例代碼
這篇文章主要介紹了EasyUi+Spring Data 實現(xiàn)按條件分頁查詢的實例代碼,非常具有實用價值,需要的朋友可以參考下2017-07-07