Netty分布式NioEventLoop任務隊列執(zhí)行源碼分析
前文傳送門:NioEventLoop處理IO事件
執(zhí)行任務隊列
繼續(xù)回到NioEventLoop的run()方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//輪詢io事件(1)
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
//默認是50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
//記錄下開始時間
final long ioStartTime = System.nanoTime();
try {
//處理輪詢到的key(2)
processSelectedKeys();
} finally {
//計算耗時
final long ioTime = System.nanoTime() - ioStartTime;
//執(zhí)行task(3)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//代碼省略
}
}我們看到處理完輪詢到的key之后, 首先記錄下耗時, 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執(zhí)行taskQueue中的任務
我們知道ioRatio默認是50, 所以執(zhí)行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執(zhí)行時間:
跟進runAllTasks方法:
protected boolean runAllTasks(long timeoutNanos) {
//定時任務隊列中聚合任務
fetchFromScheduledTaskQueue();
//從普通taskQ里面拿一個任務
Runnable task = pollTask();
//task為空, 則直接返回
if (task == null) {
//跑完所有的任務執(zhí)行收尾的操作
afterRunningAllTasks();
return false;
}
//如果隊列不為空
//首先算一個截止時間(+50毫秒, 因為執(zhí)行任務, 不要超過這個時間)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//執(zhí)行每一個任務
for (;;) {
safeExecute(task);
//標記當前跑完的任務
runTasks ++;
//當跑完64個任務的時候, 會計算一下當前時間
if ((runTasks & 0x3F) == 0) {
//定時任務初始化到當前的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果沒有超過這個時間, 則繼續(xù)從普通任務隊列拿任務
task = pollTask();
//直到?jīng)]有任務執(zhí)行
if (task == null) {
//記錄下最后執(zhí)行時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}首先會執(zhí)行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務隊列中聚合任務, 也就是將定時任務中找到可以執(zhí)行的任務添加到taskQueue中
我們跟進fetchFromScheduledTaskQueue()方法
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//從定時任務隊列中抓取第一個定時任務
//尋找截止時間為nanoTime的任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果該定時任務隊列不為空, 則塞到普通任務隊列里面
while (scheduledTask != null) {
//如果添加到普通任務隊列過程中失敗
if (!taskQueue.offer(scheduledTask)) {
//則重新添加到定時任務隊列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//繼續(xù)從定時任務隊列中拉取任務
//方法執(zhí)行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}long nanoTime = AbstractScheduledEventExecutor.nanoTime()
代表從定時任務初始化到現(xiàn)在過去了多長時間
Runnable scheduledTask= pollScheduledTask(nanoTime)
代表從定時任務隊列中拿到小于nanoTime時間的任務, 因為小于初始化到現(xiàn)在的時間, 說明該任務需要執(zhí)行了
跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
//拿到定時任務隊列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
//peek()方法拿到第一個任務
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
//從隊列中刪除
scheduledTaskQueue.remove();
//返回該任務
return scheduledTask;
}
return null;
}我們看到首先獲得當前類綁定的定時任務隊列的成員變量
如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務
如果當前任務小于傳來的時間, 說明該任務需要執(zhí)行, 則從定時任務隊列中刪除
我們繼續(xù)回到fetchFromScheduledTaskQueue()方法中:
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
//從定時任務隊列中抓取第一個定時任務
//尋找截止時間為nanoTime的任務
Runnable scheduledTask = pollScheduledTask(nanoTime);
//如果該定時任務隊列不為空, 則塞到普通任務隊列里面
while (scheduledTask != null) {
//如果添加到普通任務隊列過程中失敗
if (!taskQueue.offer(scheduledTask)) {
//則重新添加到定時任務隊列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
//繼續(xù)從定時任務隊列中拉取任務
//方法執(zhí)行完成之后, 所有符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}彈出需要執(zhí)行的定時任務之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)
重新添加到定時任務隊列中
如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續(xù)添加, 直到?jīng)]有需要執(zhí)行的任務
這樣就將定時任務隊列需要執(zhí)行的任務添加到了taskQueue中
回到runAllTasks(long timeoutNanos)方法中
protected boolean runAllTasks(long timeoutNanos) {
//定時任務隊列中聚合任務
fetchFromScheduledTaskQueue();
//從普通taskQ里面拿一個任務
Runnable task = pollTask();
//task為空, 則直接返回
if (task == null) {
//跑完所有的任務執(zhí)行收尾的操作
afterRunningAllTasks();
return false;
}
//如果隊列不為空
//首先算一個截止時間(+50毫秒, 因為執(zhí)行任務, 不要超過這個時間)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//執(zhí)行每一個任務
for (;;) {
safeExecute(task);
//標記當前跑完的任務
runTasks ++;
//當跑完64個任務的時候, 會計算一下當前時間
if ((runTasks & 0x3F) == 0) {
//定時任務初始化到當前的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果沒有超過這個時間, 則繼續(xù)從普通任務隊列拿任務
task = pollTask();
//直到?jīng)]有任務執(zhí)行
if (task == null) {
//記錄下最后執(zhí)行時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}首先通過 Runnable task = pollTask() 從taskQueue中拿一個任務
任務不為空, 則通過
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos
計算一個截止時間, 任務的執(zhí)行時間不能超過這個時間
然后在for循環(huán)中通過safeExecute(task)執(zhí)行task
我們跟到safeExecute(task)中:
protected static void safeExecute(Runnable task) {
try {
//直接調(diào)用run()方法執(zhí)行
task.run();
} catch (Throwable t) {
//發(fā)生異常不終止
logger.warn("A task raised an exception. Task: {}", task, t);
}
}這里直接調(diào)用task的run()方法進行執(zhí)行, 其中發(fā)生異常, 只打印一條日志, 代表發(fā)生異常不終止, 繼續(xù)往下執(zhí)行
回到runAllTasks(long timeoutNanos)方法
protected boolean runAllTasks(long timeoutNanos) {
//定時任務隊列中聚合任務
fetchFromScheduledTaskQueue();
//從普通taskQ里面拿一個任務
Runnable task = pollTask();
//task為空, 則直接返回
if (task == null) {
//跑完所有的任務執(zhí)行收尾的操作
afterRunningAllTasks();
return false;
}
//如果隊列不為空
//首先算一個截止時間(+50毫秒, 因為執(zhí)行任務, 不要超過這個時間)
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
//執(zhí)行每一個任務
for (;;) {
safeExecute(task);
//標記當前跑完的任務
runTasks ++;
//當跑完64個任務的時候, 會計算一下當前時間
if ((runTasks & 0x3F) == 0) {
//定時任務初始化到當前的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
//如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
if (lastExecutionTime >= deadline) {
break;
}
}
//如果沒有超過這個時間, 則繼續(xù)從普通任務隊列拿任務
task = pollTask();
//直到?jīng)]有任務執(zhí)行
if (task == null) {
//記錄下最后執(zhí)行時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}每次執(zhí)行完task, runTasks自增
這里 if ((runTasks & 0x3F) == 0) 代表是否執(zhí)行了64個任務, 如果執(zhí)行了64個任務, 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務初始化到現(xiàn)在的時間, 如果這個時間超過了截止時間, 則退出循環(huán)
如果沒有超過截止時間, 則通過 task = pollTask() 繼續(xù)彈出任務執(zhí)行
這里執(zhí)行64個任務統(tǒng)計一次時間, 而不是每次執(zhí)行任務都統(tǒng)計, 主要原因是因為獲取系統(tǒng)時間是個比較耗時的操作, 這里是netty的一種優(yōu)化方式
如果沒有task需要執(zhí)行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執(zhí)行時間
以上就是有關執(zhí)行任務隊列的相關邏輯
章節(jié)小結
本章學習了有關NioEventLoopGroup的創(chuàng)建, NioEventLoop的創(chuàng)建和啟動, 以及多路復用器的輪詢處理和task執(zhí)行的相關邏輯, 通過本章學習, 我們應該掌握如下內(nèi)容:
1. NioEventLoopGroup如何選擇分配NioEventLoop
2. NioEventLoop如何開啟
3. NioEventLoop如何進行select操作
4. NioEventLoop如何執(zhí)行task
以上就是Netty分布式NioEventLoop任務隊列執(zhí)行源碼分析的詳細內(nèi)容,更多關于Netty分布式NioEventLoop執(zhí)行任務隊列的資料請關注腳本之家其它相關文章!
相關文章
AbstractQueuedSynchronizer內(nèi)部類Node使用講解
這篇文章主要為大家介紹了AbstractQueuedSynchronizer內(nèi)部類Node使用講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-07-07
詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務
本篇文章主要介紹了詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-03-03
Springboot打包為Docker鏡像并部署的實現(xiàn)
這篇文章主要介紹了Springboot打包為Docker鏡像并部署的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12
Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例
這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-12-12

