Netty分布式NioEventLoop任務(wù)隊列執(zhí)行源碼分析
前文傳送門:NioEventLoop處理IO事件
執(zhí)行任務(wù)隊列
繼續(xù)回到NioEventLoop的run()方法:
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1) select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認(rèn)是50 final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時間 final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2) processSelectedKeys(); } finally { //計算耗時 final long ioTime = System.nanoTime() - ioStartTime; //執(zhí)行task(3) runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略 } }
我們看到處理完輪詢到的key之后, 首先記錄下耗時, 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執(zhí)行taskQueue中的任務(wù)
我們知道ioRatio默認(rèn)是50, 所以執(zhí)行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執(zhí)行時間:
跟進(jìn)runAllTasks方法:
protected boolean runAllTasks(long timeoutNanos) { //定時任務(wù)隊列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間 if ((runTasks & 0x3F) == 0) { //定時任務(wù)初始化到當(dāng)前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先會執(zhí)行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務(wù)隊列中聚合任務(wù), 也就是將定時任務(wù)中找到可以執(zhí)行的任務(wù)添加到taskQueue中
我們跟進(jìn)fetchFromScheduledTaskQueue()方法
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務(wù)隊列中抓取第一個定時任務(wù) //尋找截止時間為nanoTime的任務(wù) Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時任務(wù)隊列不為空, 則塞到普通任務(wù)隊列里面 while (scheduledTask != null) { //如果添加到普通任務(wù)隊列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時任務(wù)隊列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續(xù)從定時任務(wù)隊列中拉取任務(wù) //方法執(zhí)行完成之后, 所有符合運行條件的定時任務(wù)隊列, 都添加到了普通任務(wù)隊列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
long nanoTime = AbstractScheduledEventExecutor.nanoTime()
代表從定時任務(wù)初始化到現(xiàn)在過去了多長時間
Runnable scheduledTask= pollScheduledTask(nanoTime)
代表從定時任務(wù)隊列中拿到小于nanoTime時間的任務(wù), 因為小于初始化到現(xiàn)在的時間, 說明該任務(wù)需要執(zhí)行了
跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定時任務(wù)隊列 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一個任務(wù) ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //從隊列中刪除 scheduledTaskQueue.remove(); //返回該任務(wù) return scheduledTask; } return null; }
我們看到首先獲得當(dāng)前類綁定的定時任務(wù)隊列的成員變量
如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務(wù)
如果當(dāng)前任務(wù)小于傳來的時間, 說明該任務(wù)需要執(zhí)行, 則從定時任務(wù)隊列中刪除
我們繼續(xù)回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務(wù)隊列中抓取第一個定時任務(wù) //尋找截止時間為nanoTime的任務(wù) Runnable scheduledTask = pollScheduledTask(nanoTime); //如果該定時任務(wù)隊列不為空, 則塞到普通任務(wù)隊列里面 while (scheduledTask != null) { //如果添加到普通任務(wù)隊列過程中失敗 if (!taskQueue.offer(scheduledTask)) { //則重新添加到定時任務(wù)隊列中 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續(xù)從定時任務(wù)隊列中拉取任務(wù) //方法執(zhí)行完成之后, 所有符合運行條件的定時任務(wù)隊列, 都添加到了普通任務(wù)隊列中 scheduledTask = pollScheduledTask(nanoTime); } return true; }
彈出需要執(zhí)行的定時任務(wù)之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)
重新添加到定時任務(wù)隊列中
如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續(xù)添加, 直到?jīng)]有需要執(zhí)行的任務(wù)
這樣就將定時任務(wù)隊列需要執(zhí)行的任務(wù)添加到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中
protected boolean runAllTasks(long timeoutNanos) { //定時任務(wù)隊列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間 if ((runTasks & 0x3F) == 0) { //定時任務(wù)初始化到當(dāng)前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
首先通過 Runnable task = pollTask() 從taskQueue中拿一個任務(wù)
任務(wù)不為空, 則通過
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos
計算一個截止時間, 任務(wù)的執(zhí)行時間不能超過這個時間
然后在for循環(huán)中通過safeExecute(task)執(zhí)行task
我們跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) { try { //直接調(diào)用run()方法執(zhí)行 task.run(); } catch (Throwable t) { //發(fā)生異常不終止 logger.warn("A task raised an exception. Task: {}", task, t); } }
這里直接調(diào)用task的run()方法進(jìn)行執(zhí)行, 其中發(fā)生異常, 只打印一條日志, 代表發(fā)生異常不終止, 繼續(xù)往下執(zhí)行
回到runAllTasks(long timeoutNanos)方法
protected boolean runAllTasks(long timeoutNanos) { //定時任務(wù)隊列中聚合任務(wù) fetchFromScheduledTaskQueue(); //從普通taskQ里面拿一個任務(wù) Runnable task = pollTask(); //task為空, 則直接返回 if (task == null) { //跑完所有的任務(wù)執(zhí)行收尾的操作 afterRunningAllTasks(); return false; } //如果隊列不為空 //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間) final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執(zhí)行每一個任務(wù) for (;;) { safeExecute(task); //標(biāo)記當(dāng)前跑完的任務(wù) runTasks ++; //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間 if ((runTasks & 0x3F) == 0) { //定時任務(wù)初始化到當(dāng)前的時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的) if (lastExecutionTime >= deadline) { break; } } //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù) task = pollTask(); //直到?jīng)]有任務(wù)執(zhí)行 if (task == null) { //記錄下最后執(zhí)行時間 lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工作 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
每次執(zhí)行完task, runTasks自增
這里 if ((runTasks & 0x3F) == 0) 代表是否執(zhí)行了64個任務(wù), 如果執(zhí)行了64個任務(wù), 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務(wù)初始化到現(xiàn)在的時間, 如果這個時間超過了截止時間, 則退出循環(huán)
如果沒有超過截止時間, 則通過 task = pollTask() 繼續(xù)彈出任務(wù)執(zhí)行
這里執(zhí)行64個任務(wù)統(tǒng)計一次時間, 而不是每次執(zhí)行任務(wù)都統(tǒng)計, 主要原因是因為獲取系統(tǒng)時間是個比較耗時的操作, 這里是netty的一種優(yōu)化方式
如果沒有task需要執(zhí)行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執(zhí)行時間
以上就是有關(guān)執(zhí)行任務(wù)隊列的相關(guān)邏輯
章節(jié)小結(jié)
本章學(xué)習(xí)了有關(guān)NioEventLoopGroup的創(chuàng)建, NioEventLoop的創(chuàng)建和啟動, 以及多路復(fù)用器的輪詢處理和task執(zhí)行的相關(guān)邏輯, 通過本章學(xué)習(xí), 我們應(yīng)該掌握如下內(nèi)容:
1. NioEventLoopGroup如何選擇分配NioEventLoop
2. NioEventLoop如何開啟
3. NioEventLoop如何進(jìn)行select操作
4. NioEventLoop如何執(zhí)行task
以上就是Netty分布式NioEventLoop任務(wù)隊列執(zhí)行源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式NioEventLoop執(zhí)行任務(wù)隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
AbstractQueuedSynchronizer內(nèi)部類Node使用講解
這篇文章主要為大家介紹了AbstractQueuedSynchronizer內(nèi)部類Node使用講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務(wù)
本篇文章主要介紹了詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務(wù),具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-03-03Springboot打包為Docker鏡像并部署的實現(xiàn)
這篇文章主要介紹了Springboot打包為Docker鏡像并部署的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例
這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-12-12