springboot+dubbo實(shí)現(xiàn)時(shí)間輪算法
前言
時(shí)間輪(TimingWheel)是一種高效利用線程資源進(jìn)行批量化調(diào)度的算法,廣泛應(yīng)用于各種操作系統(tǒng)的定時(shí)任務(wù)調(diào)度中,如Linux的crontab,以及Java開發(fā)中常用的Dubbo、Netty、Kafka等框架。時(shí)間輪的核心思想是將時(shí)間劃分為若干個(gè)時(shí)間間隔(bucket),每個(gè)時(shí)間間隔代表一個(gè)時(shí)間段,并通過(guò)一個(gè)循環(huán)的數(shù)據(jù)結(jié)構(gòu)(類似于時(shí)鐘)來(lái)管理這些時(shí)間間隔。
文章基于3.1.0版本進(jìn)行分析
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>3.1.0</version>
</dependency>
一、參數(shù)說(shuō)明

- tickDuration:表示一個(gè)槽所代表的時(shí)間范圍 默認(rèn)100ms
- ticksPerWheel:表示該時(shí)間輪有多少個(gè)槽 默認(rèn)512
- startTime:表示該時(shí)間輪的開始時(shí)間
- interval:時(shí)間輪所能表示的時(shí)間跨度,也就是 tickDuration * ticksPerWheel
- currentTime:表示當(dāng)前時(shí)間,也就是時(shí)間輪指針指向的時(shí)間
- wheel:表示TimerTaskList的數(shù)組,即各個(gè)槽,每個(gè)bucket都是一個(gè) HashedWheelBucket 。
- HashedWheelBucket:存儲(chǔ)TimerTaskEntry的雙向鏈表
- HashedWheelTimeout:延遲任務(wù),有兩個(gè)值 deadline 和 remainingRounds
- deadline:TimerTask 最后的執(zhí)行時(shí)間
- remainingRounds:剩余圈數(shù)
- timeouts:用來(lái)保存新增的HashedWheelTimeout,每次執(zhí)行會(huì)拿出10W個(gè)放入HashedWheelBucket
二、具體實(shí)現(xiàn)
1、HashedWheelTimer
時(shí)間輪實(shí)現(xiàn)類
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
// 檢查參數(shù)
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
// 創(chuàng)建時(shí)間輪
wheel = createWheel(ticksPerWheel);
// 位運(yùn)算標(biāo)識(shí)
// 因?yàn)橐蝗Φ拈L(zhǎng)度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline& mast == deadline % wheel.length
// deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODO
mask = wheel.length - 1;
// Convert tickDuration to nanos.
// 時(shí)間輪的基本時(shí)間跨度,轉(zhuǎn)成最小時(shí)間單位Nanos
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
// 時(shí)間跨度限制不能太大,計(jì)算會(huì)有問題
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
// 創(chuàng)建時(shí)間輪工作線程
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
// 延遲任務(wù)太多的時(shí)間,警告日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
參數(shù)說(shuō)明:
- threadFactory
線程工廠,創(chuàng)建時(shí)間輪線程 - tickDuration
每一tick的時(shí)間 - timeUnit
tickDuration的時(shí)間單位 - ticksPerWheel
就是輪子一共有多個(gè)格子,即要多少個(gè)tick才能走完這個(gè)wheel一圈。
2、createWheel
創(chuàng)建時(shí)間輪
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// 如果不是2^n 則調(diào)整為2^n
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 初始化時(shí)間輪槽
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
3、newTimeout
添加新任務(wù)
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 如果任務(wù)大于最大量,則不允許繼續(xù)添加
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 啟動(dòng)時(shí)間輪工作線程
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
// 如果為負(fù)數(shù),那么說(shuō)明超過(guò)了long的最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
4、start
啟動(dòng)時(shí)間輪
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
// 如果是初始化 啟動(dòng)實(shí)踐論
case WORKER_STATE_INIT:
// 保證只啟動(dòng)一次,原子性
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
// 已經(jīng)啟動(dòng)過(guò)了
case WORKER_STATE_STARTED:
break;
// 時(shí)間輪停止了
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
// 線程啟動(dòng)執(zhí)行任務(wù)是異步的,這里是等待workerThread.start(),線程已經(jīng)啟動(dòng)了
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
5、run
workerThread.start()啟動(dòng)后,會(huì)執(zhí)行Worker的run方法
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
// 喚醒被阻塞的start()方法。
startTimeInitialized.countDown();
do {
// 等下一個(gè)槽的到達(dá)時(shí)間,開始執(zhí)行上一個(gè)槽的任務(wù) TODO 不明白這里的設(shè)計(jì),哪位大佬知道可以指點(diǎn)一下
final long deadline = waitForNextTick();
if (deadline > 0) {
// 計(jì)算時(shí)間輪的槽位
int idx = (int) (tick & mask);
// 移除取消的了task
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 將newTimeout()方法中加入到待處理定時(shí)任務(wù)隊(duì)列中的任務(wù)加入到指定的格子中
transferTimeoutsToBuckets();
// 運(yùn)行目前指針指向的槽中的bucket鏈表中的任務(wù),執(zhí)行到期的延時(shí)任務(wù)
bucket.expireTimeouts(deadline);
tick++;
}
}
// 如果Worker_State一只是started狀態(tài),就一直循環(huán)
while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
// 清除時(shí)間輪中超時(shí)未處理的任務(wù)
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ; ) {
// 遍歷任務(wù)隊(duì)列,發(fā)現(xiàn)如果有任務(wù)被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊(duì)列中。
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 再次移除取消的了task
processCancelledTasks();
}
6、waitForNextTick
一個(gè)鐘表上的間隔是代表一個(gè)單位時(shí)間的間隔,那么waitForNextTick就是根據(jù)當(dāng)前時(shí)間計(jì)算出跳動(dòng)到下個(gè)時(shí)間的時(shí)間間隔,然后進(jìn)行sleep,結(jié)束后進(jìn)入下一個(gè)時(shí)間間隔,下一個(gè)間隔到來(lái)的時(shí)候返回。
/**
* 根據(jù)startTime和當(dāng)前槽位計(jì)算目標(biāo)nanoTime,
* 等待時(shí)間到達(dá)
*
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
// tick槽位,tickDuration表示每個(gè)時(shí)間格的跨度,所以deadline返回的是下一次時(shí)間輪指針跳動(dòng)的時(shí)間
long deadline = tickDuration * (tick + 1);
for (; ; ) {
// 計(jì)算當(dāng)前時(shí)間距離啟動(dòng)時(shí)間的時(shí)間間隔,期間休眠
final long currentTime = System.nanoTime() - startTime;
// 計(jì)算sleepTimeMs先加999999,應(yīng)該是不足1ms的,補(bǔ)足1ms
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// sleepTimeMs小于零表示走到了下一個(gè)時(shí)間槽位置
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Windows 時(shí)間換算
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
// 當(dāng)前時(shí)間距離下一次tick時(shí)間還有一段距離,需要sleep
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
7、transferTimeoutsToBuckets
轉(zhuǎn)移任務(wù)到時(shí)間輪的具體槽中
// 將延時(shí)任務(wù)隊(duì)列中等待添加到時(shí)間輪中的延時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪的指定位置
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 循環(huán)100000次,也就是每次轉(zhuǎn)移10w個(gè)任務(wù)
// 為了防止這個(gè)操作銷毀太多時(shí)間,導(dǎo)致更多的任務(wù)時(shí)間不準(zhǔn),因此一次最多操作10w個(gè)
for (int i = 0; i < 100000; i++) {
// 從阻塞隊(duì)列中獲得具體的任務(wù)
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
// 計(jì)算tick次數(shù),deadline表示當(dāng)前任務(wù)的延遲時(shí)間,tickDuration表示時(shí)間槽的間隔,兩者相除就可以計(jì)算當(dāng)前任務(wù)需要tick幾次才能被執(zhí)行
long calculated = timeout.deadline / tickDuration;
// 計(jì)算剩余的輪數(shù), 只有 timer 走夠輪數(shù), 并且到達(dá)了 task 所在的 slot, task 才會(huì)過(guò)期.(被執(zhí)行)
timeout.remainingRounds = (calculated - tick) / wheel.length;
// Ensure we don't schedule for past.
// 如果任務(wù)在timeouts隊(duì)列里面放久了, 以至于已經(jīng)過(guò)了執(zhí)行時(shí)間, 這個(gè)時(shí)候就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會(huì)被執(zhí)行
final long ticks = Math.max(calculated, tick);
// 算出任務(wù)應(yīng)該插入的 wheel 的 slot, stopIndex = tick 次數(shù) & mask, mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
// 把timeout任務(wù)插入到指定的bucket鏈中。
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
8、expireTimeouts
當(dāng)指針跳動(dòng)到某一個(gè)時(shí)間槽中時(shí),會(huì)就觸發(fā)這個(gè)槽中的任務(wù)的執(zhí)行。該功能是通過(guò)expireTimeouts來(lái)實(shí)現(xiàn)
void expireTimeouts(long deadline) {
// 雙向鏈表
HashedWheelTimeout timeout = head;
// process all timeouts
// 遍歷當(dāng)前時(shí)間槽中的所有任務(wù)
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 如果當(dāng)前任務(wù)要被執(zhí)行,那么remainingRounds應(yīng)該小于或者等于0
if (timeout.remainingRounds <= 0) {
// 從bucket鏈表中移除當(dāng)前timeout,并返回鏈表中下一個(gè)timeout
next = remove(timeout);
// 如果timeout的時(shí)間小于當(dāng)前的時(shí)間,那么就調(diào)用expire執(zhí)行task
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
// 不可能發(fā)生的情況,就是說(shuō)round已經(jīng)為0了,deadline卻 > 當(dāng)前槽的deadline
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 因?yàn)楫?dāng)前的槽位已經(jīng)過(guò)了,說(shuō)明已經(jīng)走了一圈了,把輪數(shù)減一
timeout.remainingRounds--;
}
// 把指針放置到下一個(gè)timeout
timeout = next;
}
}
總結(jié)
時(shí)間輪(TimingWheel)在計(jì)算機(jī)科學(xué)中,特別是在任務(wù)調(diào)度和時(shí)間管理方面,具有重要的意義,我們可以結(jié)合業(yè)務(wù)進(jìn)行使用
- 節(jié)省cpu資源
- 易于實(shí)現(xiàn)和維護(hù)
- 批量化調(diào)度模型
- 高效處理大量定時(shí)任務(wù)
- 靈活適應(yīng)不同應(yīng)用場(chǎng)景
到此這篇關(guān)于springboot+dubbo實(shí)現(xiàn)時(shí)間輪算法的文章就介紹到這了,更多相關(guān)springboot dubbo時(shí)間輪內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Dubbo?LoadBalance基于權(quán)重的隨機(jī)負(fù)載均衡算法提高服務(wù)性能
- 解決dubbo啟動(dòng)報(bào)服務(wù)注冊(cè)失敗Failed?to?register?dubbo
- Springboot整合Dubbo+Nacos實(shí)現(xiàn)RPC調(diào)用的示例代碼
- SpringCloud微服務(wù)集成Dubbo的詳細(xì)過(guò)程
- 解決Dubbo應(yīng)用啟動(dòng)注冊(cè)ZK獲取IP慢的原因之一
- 解決dubbo注冊(cè)到zookeeper速度慢的問題
- dubbo?filter中有關(guān)bean注入和配置文件讀取方式
- dubbo如何設(shè)置連接zookeeper權(quán)限
- 將Dubbo服務(wù)打包成Jar包的操作步驟
- JAVA的Dubbo如何實(shí)現(xiàn)各種限流算法
相關(guān)文章
SpringBoot項(xiàng)目實(shí)現(xiàn)日志打印SQL的常用方法(包括SQL語(yǔ)句和參數(shù))
有時(shí)候遇到問題需要根據(jù)我們編寫的SQL進(jìn)行分析,但如果不進(jìn)行一些開發(fā)或者配置的話,這些SQL是不會(huì)打印到控制臺(tái)的,它們默認(rèn)是隱藏的。下面給大家介紹幾種常用的方法,感興趣的朋友跟隨小編一起看看吧2024-04-04
java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法
這篇文章主要介紹了java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法,涉及java字符串的遍歷、轉(zhuǎn)換及運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02
Java實(shí)現(xiàn)枚舉狀態(tài)轉(zhuǎn)換的方法詳解
在軟件開發(fā)中,我們經(jīng)常需要處理不同系統(tǒng)或模塊間的狀態(tài)轉(zhuǎn)換,今天,我將通過(guò)一個(gè)電商訂單與物流狀態(tài)的轉(zhuǎn)換案例,展示如何優(yōu)雅地實(shí)現(xiàn)枚舉間的互相轉(zhuǎn)換,需要的朋友可以參考下2025-04-04
Java中LinkedList詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
LinkedList 是一個(gè)繼承于AbstractSequentialList的雙向鏈表。它也可以被當(dāng)作堆棧、隊(duì)列或雙端隊(duì)列進(jìn)行操作。接下來(lái)通過(guò)示例代碼給大家詳細(xì)介紹java中l(wèi)inkedlist的使用,需要的朋友參考下吧2017-05-05
Idea中mapper注入報(bào)錯(cuò)問題及解決
這篇文章主要介紹了Idea中mapper注入報(bào)錯(cuò)問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
EasyUi+Spring Data 實(shí)現(xiàn)按條件分頁(yè)查詢的實(shí)例代碼
這篇文章主要介紹了EasyUi+Spring Data 實(shí)現(xiàn)按條件分頁(yè)查詢的實(shí)例代碼,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-07-07
Spring Boot實(shí)現(xiàn)發(fā)送郵件
這篇文章主要為大家詳細(xì)介紹了Spring Boot實(shí)現(xiàn)發(fā)送郵件,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-06-06
Java實(shí)現(xiàn)簡(jiǎn)單酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05

