欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式NioEventLoop任務(wù)隊列執(zhí)行源碼分析

 更新時間:2022年03月25日 15:23:02   作者:向南是個萬人迷  
這篇文章主要為大家介紹了Netty分布式NioEventLoop任務(wù)隊列執(zhí)行源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前文傳送門:NioEventLoop處理IO事件

執(zhí)行任務(wù)隊列

繼續(xù)回到NioEventLoop的run()方法:

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //輪詢io事件(1)
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            //默認(rèn)是50
            final int ioRatio = this.ioRatio; 
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                //記錄下開始時間
                final long ioStartTime = System.nanoTime();
                try {
                    //處理輪詢到的key(2)
                    processSelectedKeys();
                } finally {
                    //計算耗時
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //執(zhí)行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        //代碼省略
    }
}

我們看到處理完輪詢到的key之后, 首先記錄下耗時, 然后通過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執(zhí)行taskQueue中的任務(wù)

我們知道ioRatio默認(rèn)是50, 所以執(zhí)行完ioTime * (100 - ioRatio) / ioRatio后, 方法傳入的值為ioTime, 也就是processSelectedKeys()的執(zhí)行時間:

跟進(jìn)runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務(wù)隊列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務(wù)初始化到當(dāng)前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先會執(zhí)行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務(wù)隊列中聚合任務(wù), 也就是將定時任務(wù)中找到可以執(zhí)行的任務(wù)添加到taskQueue中

我們跟進(jìn)fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務(wù)隊列中抓取第一個定時任務(wù)
    //尋找截止時間為nanoTime的任務(wù)
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務(wù)隊列不為空, 則塞到普通任務(wù)隊列里面
    while (scheduledTask != null) {
        //如果添加到普通任務(wù)隊列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時任務(wù)隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續(xù)從定時任務(wù)隊列中拉取任務(wù)
        //方法執(zhí)行完成之后, 所有符合運行條件的定時任務(wù)隊列, 都添加到了普通任務(wù)隊列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime()

 代表從定時任務(wù)初始化到現(xiàn)在過去了多長時間

 Runnable scheduledTask= pollScheduledTask(nanoTime) 

代表從定時任務(wù)隊列中拿到小于nanoTime時間的任務(wù), 因為小于初始化到現(xiàn)在的時間, 說明該任務(wù)需要執(zhí)行了

跟到其父類AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {
    assert inEventLoop();
    //拿到定時任務(wù)隊列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    //peek()方法拿到第一個任務(wù)
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    if (scheduledTask == null) {
        return null;
    }
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        //從隊列中刪除
        scheduledTaskQueue.remove();
        //返回該任務(wù)
        return scheduledTask;
    }
    return null;
}

我們看到首先獲得當(dāng)前類綁定的定時任務(wù)隊列的成員變量

如果不為空, 則通過scheduledTaskQueue.peek()彈出第一個任務(wù)

如果當(dāng)前任務(wù)小于傳來的時間, 說明該任務(wù)需要執(zhí)行, 則從定時任務(wù)隊列中刪除

我們繼續(xù)回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    //從定時任務(wù)隊列中抓取第一個定時任務(wù)
    //尋找截止時間為nanoTime的任務(wù)
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    //如果該定時任務(wù)隊列不為空, 則塞到普通任務(wù)隊列里面
    while (scheduledTask != null) {
        //如果添加到普通任務(wù)隊列過程中失敗
        if (!taskQueue.offer(scheduledTask)) {
            //則重新添加到定時任務(wù)隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        //繼續(xù)從定時任務(wù)隊列中拉取任務(wù)
        //方法執(zhí)行完成之后, 所有符合運行條件的定時任務(wù)隊列, 都添加到了普通任務(wù)隊列中
        scheduledTask = pollScheduledTask(nanoTime);
    }
    return true;
}

彈出需要執(zhí)行的定時任務(wù)之后, 我們通過taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失敗, 則通過

scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)

重新添加到定時任務(wù)隊列中

如果添加成功, 則通過pollScheduledTask(nanoTime)方法繼續(xù)添加, 直到?jīng)]有需要執(zhí)行的任務(wù)

這樣就將定時任務(wù)隊列需要執(zhí)行的任務(wù)添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務(wù)隊列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務(wù)初始化到當(dāng)前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

首先通過 Runnable task = pollTask() 從taskQueue中拿一個任務(wù)

任務(wù)不為空, 則通過

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 

計算一個截止時間, 任務(wù)的執(zhí)行時間不能超過這個時間

然后在for循環(huán)中通過safeExecute(task)執(zhí)行task

我們跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {
    try {
        //直接調(diào)用run()方法執(zhí)行
        task.run();
    } catch (Throwable t) {
        //發(fā)生異常不終止
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

這里直接調(diào)用task的run()方法進(jìn)行執(zhí)行, 其中發(fā)生異常, 只打印一條日志, 代表發(fā)生異常不終止, 繼續(xù)往下執(zhí)行

回到runAllTasks(long timeoutNanos)方法

protected boolean runAllTasks(long timeoutNanos) {
    //定時任務(wù)隊列中聚合任務(wù)
    fetchFromScheduledTaskQueue();
    //從普通taskQ里面拿一個任務(wù)
    Runnable task = pollTask();
    //task為空, 則直接返回
    if (task == null) {
        //跑完所有的任務(wù)執(zhí)行收尾的操作
        afterRunningAllTasks();
        return false;
    }
    //如果隊列不為空
    //首先算一個截止時間(+50毫秒, 因為執(zhí)行任務(wù), 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    //執(zhí)行每一個任務(wù)
    for (;;) {
        safeExecute(task);
        //標(biāo)記當(dāng)前跑完的任務(wù)
        runTasks ++;
        //當(dāng)跑完64個任務(wù)的時候, 會計算一下當(dāng)前時間
        if ((runTasks & 0x3F) == 0) {
            //定時任務(wù)初始化到當(dāng)前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            //如果超過截止時間則不執(zhí)行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        //如果沒有超過這個時間, 則繼續(xù)從普通任務(wù)隊列拿任務(wù)
        task = pollTask();
        //直到?jīng)]有任務(wù)執(zhí)行
        if (task == null) {
            //記錄下最后執(zhí)行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    //收尾工作
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

每次執(zhí)行完task, runTasks自增

這里 if ((runTasks & 0x3F) == 0) 代表是否執(zhí)行了64個任務(wù), 如果執(zhí)行了64個任務(wù), 則會通過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務(wù)初始化到現(xiàn)在的時間, 如果這個時間超過了截止時間, 則退出循環(huán)

如果沒有超過截止時間, 則通過 task = pollTask() 繼續(xù)彈出任務(wù)執(zhí)行

這里執(zhí)行64個任務(wù)統(tǒng)計一次時間, 而不是每次執(zhí)行任務(wù)都統(tǒng)計, 主要原因是因為獲取系統(tǒng)時間是個比較耗時的操作, 這里是netty的一種優(yōu)化方式

如果沒有task需要執(zhí)行, 則通過afterRunningAllTasks()做收尾工作, 最后記錄下最后的執(zhí)行時間

以上就是有關(guān)執(zhí)行任務(wù)隊列的相關(guān)邏輯

章節(jié)小結(jié)

本章學(xué)習(xí)了有關(guān)NioEventLoopGroup的創(chuàng)建, NioEventLoop的創(chuàng)建和啟動, 以及多路復(fù)用器的輪詢處理和task執(zhí)行的相關(guān)邏輯, 通過本章學(xué)習(xí), 我們應(yīng)該掌握如下內(nèi)容:

        1.  NioEventLoopGroup如何選擇分配NioEventLoop

        2.  NioEventLoop如何開啟

        3.  NioEventLoop如何進(jìn)行select操作

        4.  NioEventLoop如何執(zhí)行task

以上就是Netty分布式NioEventLoop任務(wù)隊列執(zhí)行源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式NioEventLoop執(zhí)行任務(wù)隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • AbstractQueuedSynchronizer內(nèi)部類Node使用講解

    AbstractQueuedSynchronizer內(nèi)部類Node使用講解

    這篇文章主要為大家介紹了AbstractQueuedSynchronizer內(nèi)部類Node使用講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-07-07
  • 詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務(wù)

    詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務(wù)

    本篇文章主要介紹了詳解在Spring3中使用注解(@Scheduled)創(chuàng)建計劃任務(wù),具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • 關(guān)于SpringSecurity的基本使用示例

    關(guān)于SpringSecurity的基本使用示例

    這篇文章主要介紹了關(guān)于SpringSecurity的基本使用示例,SpringSecurity 本質(zhì)是一個過濾器鏈SpringSecurity 采用的是責(zé)任鏈的設(shè)計模式,它有一條很長的過濾器鏈,需要的朋友可以參考下
    2023-05-05
  • MyBatis Plus復(fù)合主鍵問題的解決

    MyBatis Plus復(fù)合主鍵問題的解決

    在數(shù)據(jù)庫設(shè)計中,有時候需要使用復(fù)合主鍵來唯一標(biāo)識表中的一行數(shù)據(jù),本文將為您詳細(xì)介紹MyBatis Plus中復(fù)合主鍵的問題以及解決方案,具有一定的參考價值,感興趣的可以了解一下
    2023-09-09
  • Springboot打包為Docker鏡像并部署的實現(xiàn)

    Springboot打包為Docker鏡像并部署的實現(xiàn)

    這篇文章主要介紹了Springboot打包為Docker鏡像并部署的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • java基于UDP實現(xiàn)在線聊天功能

    java基于UDP實現(xiàn)在線聊天功能

    這篇文章主要為大家詳細(xì)介紹了java基于UDP實現(xiàn)在線聊天功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-09-09
  • 緩存工具類ACache使用方法詳解

    緩存工具類ACache使用方法詳解

    這篇文章主要為大家詳細(xì)介紹了緩存工具類ACache的使用方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-08-08
  • Java調(diào)用CMD命令的方法與使用技巧

    Java調(diào)用CMD命令的方法與使用技巧

    在實際的開發(fā)中我們有可能會遇到?java調(diào)用?cmd命令的情況,這篇文章主要給大家介紹了關(guān)于Java調(diào)用CMD命令的方法與使用的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-09-09
  • java配置文件取值的多種方式總結(jié)

    java配置文件取值的多種方式總結(jié)

    這篇文章主要為大家詳細(xì)介紹了java配置文件取值的多種方式,包括一般項目,國際化項目,springboot項目,文中的示例代碼講解詳細(xì),需要的可以參考下
    2023-11-11
  • Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例

    Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例

    這篇文章主要介紹了Json轉(zhuǎn)list二層解析轉(zhuǎn)換代碼實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-12-12

最新評論