SpringBoot定時(shí)任務(wù)設(shè)計(jì)之時(shí)間輪案例原理詳解
知識(shí)準(zhǔn)備
Timer和ScheduledExecutorService是JDK內(nèi)置的定時(shí)任務(wù)方案,而業(yè)內(nèi)還有一個(gè)經(jīng)典的定時(shí)任務(wù)的設(shè)計(jì)叫時(shí)間輪(Timing Wheel), Netty內(nèi)部基于時(shí)間輪實(shí)現(xiàn)了一個(gè)HashedWheelTimer來(lái)優(yōu)化百萬(wàn)量級(jí)I/O超時(shí)的檢測(cè),它是一個(gè)高性能,低消耗的數(shù)據(jù)結(jié)構(gòu),它適合用非準(zhǔn)實(shí)時(shí),延遲的短平快任務(wù),例如心跳檢測(cè)。本文主要介紹時(shí)間輪(Timing Wheel)及其使用。@pdai
需要對(duì)時(shí)間輪(Timing Wheel),以及Netty的HashedWheelTimer要解決什么問(wèn)題有初步的認(rèn)識(shí)。
什么是時(shí)間輪(Timing Wheel)
時(shí)間輪(Timing Wheel)是George Varghese和Tony Lauck在1996年的論文' Hashed and Hierarchical Timing Wheels: data structures to efficiently implement a timer facility '實(shí)現(xiàn)的,它在Linux內(nèi)核中使用廣泛,是Linux內(nèi)核定時(shí)器的實(shí)現(xiàn)方法和基礎(chǔ)之一。
時(shí)間輪(Timing Wheel)是一種環(huán)形的數(shù)據(jù)結(jié)構(gòu),就像一個(gè)時(shí)鐘可以分成很多格子(Tick),每個(gè)格子代表時(shí)間的間隔,它指向存儲(chǔ)的具體任務(wù)(timerTask)的一個(gè)鏈表。
以上述在論文中的圖片例子,這里一個(gè)輪子包含8個(gè)格子(Tick), 每個(gè)tick是一秒鐘;
任務(wù)的添加:如果一個(gè)任務(wù)要在17秒后執(zhí)行,那么它需要轉(zhuǎn)2輪,最終加到Tick=1位置的鏈表中。
任務(wù)的執(zhí)行:在時(shí)鐘轉(zhuǎn)2Round到Tick=1的位置,開(kāi)始執(zhí)行這個(gè)位置指向的鏈表中的這個(gè)任務(wù)。(# 這里表示剩余需要轉(zhuǎn)幾輪再執(zhí)行這個(gè)任務(wù))
Netty的HashedWheelTimer要解決什么問(wèn)題
HashedWheelTimer是Netty根據(jù)時(shí)間輪(Timing Wheel)開(kāi)發(fā)的工具類(lèi),它要解決什么問(wèn)題呢?這里面有兩個(gè)要點(diǎn): 延遲任務(wù) + 低時(shí)效性 。@pdai
在Netty中的一個(gè)典型應(yīng)用場(chǎng)景是判斷某個(gè)連接是否idle,如果idle(如客戶(hù)端由于網(wǎng)絡(luò)原因?qū)е碌椒?wù)器的心跳無(wú)法送達(dá)),則服務(wù)器會(huì)主動(dòng)斷開(kāi)連接,釋放資源。判斷連接是否idle是通過(guò)定時(shí)任務(wù)完成的,但是Netty可能維持?jǐn)?shù)百萬(wàn)級(jí)別的長(zhǎng)連接,對(duì)每個(gè)連接去定義一個(gè)定時(shí)任務(wù)是不可行的,所以如何提升I/O超時(shí)調(diào)度的效率呢?
Netty根據(jù)時(shí)間輪(Timing Wheel)開(kāi)發(fā)了HashedWheelTimer工具類(lèi),用來(lái)優(yōu)化I/O超時(shí)調(diào)度(本質(zhì)上是延遲任務(wù));之所以采用時(shí)間輪(Timing Wheel)的結(jié)構(gòu)還有一個(gè)很重要的原因是I/O超時(shí)這種類(lèi)型的任務(wù)對(duì)時(shí)效性不需要非常精準(zhǔn)。
HashedWheelTimer的使用方式
在了解時(shí)間輪(Timing Wheel)和Netty的HashedWheelTimer要解決的問(wèn)題后,我們看下HashedWheelTimer的使用方式
通過(guò)構(gòu)造函數(shù)看主要參數(shù)
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { }
具體參數(shù)說(shuō)明如下:
threadFactory tickDuration unit ticksPerWheel leakDetection maxPendingTimeouts
實(shí)現(xiàn)案例
這里展示下HashedWheelTimer的基本使用案例。@pdai
Pom依賴(lài)
引入pom的依賴(lài)
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency>
2個(gè)簡(jiǎn)單例子
例子1:5秒后執(zhí)行TimerTask
@SneakyThrows public static void simpleHashedWheelTimer() { log.info("init task 1..."); HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8); // add a new timeout timer.newTimeout(timeout -> { log.info("running task 1..."); }, 5, TimeUnit.SECONDS); }
執(zhí)行結(jié)果如下:
23:32:21.364 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 1...
...
23:32:27.454 [pool-1-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 1...
例子2:任務(wù)失效后cancel并讓它重新在3秒后執(zhí)行。
@SneakyThrows public static void reScheduleHashedWheelTimer() { log.info("init task 2..."); HashedWheelTimer timer = new HashedWheelTimer(1, TimeUnit.SECONDS, 8); Thread.sleep(5000); // add a new timeout Timeout tm = timer.newTimeout(timeout -> { log.info("running task 2..."); }, 5, TimeUnit.SECONDS); // cancel if (!tm.isExpired()) { log.info("cancel task 2..."); tm.cancel(); } // reschedule timer.newTimeout(tm.task(), 3, TimeUnit.SECONDS); }
23:28:36.408 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - init task 2...
23:28:41.412 [main] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - cancel task 2...
23:28:45.414 [pool-2-thread-1] INFO tech.pdai.springboot.schedule.timer.netty.HashedWheelTimerTester - running task 2...
我們通過(guò)如下問(wèn)題進(jìn)一步理解HashedWheelTimer。@pdai
HashedWheelTimer是如何實(shí)現(xiàn)的?
簡(jiǎn)單看下HashedWheelTimer是如何實(shí)現(xiàn)的
Worker HashedWheelBucket HashedWheelTimeout
構(gòu)造函數(shù)
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // Prevent overflow. if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller than {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
創(chuàng)建wheel
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { //ticksPerWheel may not be greater than 2^30 checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel"); ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = 1; while (normalizedTicksPerWheel < ticksPerWheel) { normalizedTicksPerWheel <<= 1; } return normalizedTicksPerWheel; }
任務(wù)的添加
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { checkNotNull(task, "task"); checkNotNull(unit, "unit"); long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
執(zhí)行方法
/** * Starts the background thread explicitly. The background thread will * start automatically on demand even if you did not call this method. * * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
停止方法
@Override public Set<Timeout> stop() { if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } return worker.unprocessedTimeouts(); }
什么是多級(jí)Timing Wheel?
多級(jí)的時(shí)間輪是比較好理解的,時(shí)鐘是有小時(shí),分鐘,秒的,秒轉(zhuǎn)一圈(Round)分鐘就轉(zhuǎn)一個(gè)格(Tick), 分鐘轉(zhuǎn)一圈(Round)小時(shí)就轉(zhuǎn)一格(Tick)。
PS:顯然HashedWheelTimer是一層時(shí)間輪。
以上就是SpringBoot定時(shí)任務(wù)設(shè)計(jì)之時(shí)間輪案例原理詳解的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot定時(shí)任務(wù)時(shí)間輪的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot整合SpringTask實(shí)現(xiàn)定時(shí)任務(wù)的流程
- springboot實(shí)現(xiàn)定時(shí)任務(wù)的四種方式小結(jié)
- SpringBoot定時(shí)任務(wù)動(dòng)態(tài)擴(kuò)展ScheduledTaskRegistrar詳解
- SpringBoot實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)的示例代碼
- SpringBoot中定時(shí)任務(wù)@Scheduled注解的使用解讀
- Springboot實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)流程詳解
- SpringTask實(shí)現(xiàn)定時(shí)任務(wù)方法講解
相關(guān)文章
Java實(shí)現(xiàn)后臺(tái)發(fā)送及接收json數(shù)據(jù)的方法示例
這篇文章主要介紹了Java實(shí)現(xiàn)后臺(tái)發(fā)送及接收json數(shù)據(jù)的方法,結(jié)合實(shí)例形式分析了java針對(duì)json格式數(shù)據(jù)的傳輸與操作相關(guān)技巧,需要的朋友可以參考下2018-12-12Java中Map接口使用以及有關(guān)集合的面試知識(shí)點(diǎn)匯總
在java面試過(guò)程中,Map時(shí)常會(huì)被作為一個(gè)面試點(diǎn)來(lái)問(wèn),下面這篇文章主要給大家介紹了關(guān)于Java中Map接口使用以及有關(guān)集合的面試知識(shí)點(diǎn)匯總的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Spring解決依賴(lài)版本不一致報(bào)錯(cuò)問(wèn)題
許多同學(xué)經(jīng)常會(huì)遇到依賴(lài)版本不一致導(dǎo)致代碼報(bào)錯(cuò),所以這篇文章就給大家詳細(xì)介紹一下Spring解決依賴(lài)版本不一致報(bào)錯(cuò)問(wèn)題,需要的朋友跟著小編一起來(lái)看看吧2023-07-07Java實(shí)現(xiàn)獲取小程序帶參二維碼并保存到本地
這篇文章主要介紹了Java實(shí)現(xiàn)獲取小程序帶參二維碼并保存到本地,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10