" />

欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式pipeline管道傳播outBound事件源碼解析

 更新時間:2022年03月28日 11:16:28   作者:向南是個萬人迷  
這篇文章主要介紹了Netty分布式pipeline管道傳播outBound事件源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

了解了inbound事件的傳播過程, 對于學習outbound事件傳輸?shù)牧鞒? 也不會太困難

outbound事件傳輸流程

在我們業(yè)務(wù)代碼中, 有可能使用wirte方法往寫數(shù)據(jù):

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

當然, 直接調(diào)用write方法是不能往對方channel中寫入數(shù)據(jù)的, 因為這種方式只能寫入到緩沖區(qū), 還要調(diào)用flush方法才能將緩沖區(qū)數(shù)據(jù)刷到channel中, 或者直接調(diào)用writeAndFlush方法, 有關(guān)邏輯, 我們會在后面章節(jié)中詳細講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程

這里我們同樣給出兩種寫法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

這兩種寫法有什么區(qū)別, 我們首先跟到第一種寫法中去:

ctx.channel().write("test data");

這里獲取ctx所綁定的channel

我們跟到AbstractChannel的write方法中:

public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

這里pipeline是DefaultChannelPipeline

跟到其write方法中:

public final ChannelFuture write(Object msg) {
    //從tail節(jié)點開始(從最后的節(jié)點往前寫)
    return tail.write(msg);
}

這里調(diào)用tail節(jié)點write方法, 這里我們應(yīng)該能分析到, outbound事件, 是通過tail節(jié)點開始往上傳播的, 帶著這點猜想, 我們繼往下看

其實tail節(jié)點并沒有重寫write方法, 最終會調(diào)用其父類AbstractChannelHandlerContext的write方法

AbstractChannelHandlerContext的write方法:

public ChannelFuture write(Object msg) { 
    return write(msg, newPromise());
}

我們看到這里有個newPromise()這個方法, 這里是創(chuàng)建一個Promise對象, 有關(guān)Promise的相關(guān)知識我們會在以后的章節(jié)剖析

我們繼續(xù)跟write:

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    //代碼省略
    write(msg, false, promise);
    return promise;
}

繼續(xù)跟write:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這里跟我們上一小節(jié)剖析過channelRead方法有點類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()是獲取上一個標注outbound事件的HandlerContext

跟到findContextOutbound中

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

這里的邏輯我們似曾相識, 跟我們上一小節(jié)的findContextInbound()方法有點像, 只是過程是反過來的

在這里, 會找到當前context的上一個節(jié)點, 如果標注的事件不是outbound事件, 則繼續(xù)往上找, 意思就是找到上一個標注outbound事件的節(jié)點

回到write方法:

AbstractChannelHandlerContext next = findContextOutbound();

這里將找到節(jié)點賦值到next屬性中

因為我們之前分析的write事件是從tail節(jié)點傳播的, 所以上一個節(jié)點就有可能是用戶自定的handler所屬的context

然后判斷是否為當前eventLoop線程, 如果是不是, 則封裝成task異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush方法, 因為我們這里沒有調(diào)用, 所以會執(zhí)行到next.invokeWrite(m, promise),

我們繼續(xù)跟invokeWrite

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

這里會判斷當前handler的狀態(tài)是否是添加狀態(tài), 這里返回的是true, 將會走到invokeWrite0(msg, promise)這一步

繼續(xù)跟invokeWrite0

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //調(diào)用當前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

這里的邏輯也似曾相識, 調(diào)用了當前節(jié)點包裝的handler的write方法, 如果用戶沒有重寫write方法, 則會交給其父類處理

我們跟到ChannelOutboundHandlerAdapter的write方法中看:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

這里調(diào)用了當前ctx的write方法, 這種寫法和我們小節(jié)開始的寫法是相同的, 我們回顧一下:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫法1
    ctx.channel().write("test data");
    //寫法2
    ctx.write("test data");
}

我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我們所熟悉邏輯, 找到當前節(jié)點的上一個標注事件為outbound事件的節(jié)點, 繼續(xù)執(zhí)行invokeWrite方法, 根據(jù)之前的剖析, 我們知道最終會執(zhí)行到上一個handler的write方法中

走到這里已經(jīng)不難理解, ctx.channel().write("test data")其實是從tail節(jié)點開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件

所以, 在handler中如果重寫了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因為會造成每次事件傳輸?shù)竭@里都會從tail節(jié)點重新傳輸, 導(dǎo)致不可預(yù)知的錯誤

如果用代碼中沒有重寫handler的write方法, 則事件會一直往上傳輸, 當傳輸完所有的outbound節(jié)點之后, 最后會走到head節(jié)點的wirte方法中

我們跟到HeadContext的write方法中

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我們看到write事件最終會流向這里, 通過unsafe對象進行最終的寫操作

有關(guān)inbound事件和outbound事件的傳輸, 可通過下圖進行說明:

以上就是Netty分布式pipeline管道傳播outBound事件源碼解析的詳細內(nèi)容,更多關(guān)于Netty分布式pipeline管道傳播outBound的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringSceurity實現(xiàn)短信驗證碼功能的示例代碼

    SpringSceurity實現(xiàn)短信驗證碼功能的示例代碼

    這篇文章主要介紹了SpringSceurity實現(xiàn)短信驗證碼功能的示例代碼,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • 淺談一下Java為什么不能使用字符流讀取非文本的二進制文件

    淺談一下Java為什么不能使用字符流讀取非文本的二進制文件

    這篇文章主要介紹了淺談一下為什么不能使用字符流讀取非文本的二進制文件,剛學Java的IO流部分時,書上說只能使用字節(jié)流去讀取圖片、視頻等非文本二進制文件,不能使用字符流,否則文件會損壞,需要的朋友可以參考下
    2023-04-04
  • 關(guān)于maven打包時的報錯: Return code is: 501 , ReasonPhrase:HTTPS Required

    關(guān)于maven打包時的報錯: Return code is: 501 , ReasonPhrase:HTTPS Requ

    這篇文章主要介紹了關(guān)于maven打包時的報錯: Return code is: 501 , ReasonPhrase:HTTPS Required,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-09-09
  • java Unsafe詳細解析

    java Unsafe詳細解析

    Unsafe為我們提供了訪問底層的機制,這種機制僅供java核心類庫使用,而不應(yīng)該被普通用戶使用。但是,為了更好地了解java的生態(tài)體系,我們應(yīng)該去學習它,去了解它,不求深入到底層的C/C++代碼,但求能了解它的基本功能。下面小編來和大家一起學習
    2019-05-05
  • JAVA使用隨機數(shù)實現(xiàn)概率抽獎

    JAVA使用隨機數(shù)實現(xiàn)概率抽獎

    這篇文章主要為大家詳細介紹了JAVA使用隨機數(shù)實現(xiàn)概率抽獎,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-11-11
  • Java中關(guān)于int和Integer的區(qū)別詳解

    Java中關(guān)于int和Integer的區(qū)別詳解

    本篇文章小編為大家介紹,在Java中 關(guān)于int和Integer的區(qū)別詳解,需要的朋友參考下
    2013-04-04
  • mybatisplus邏輯刪除基本實現(xiàn)和坑點解決

    mybatisplus邏輯刪除基本實現(xiàn)和坑點解決

    這篇文章主要介紹了mybatisplus邏輯刪除基本實現(xiàn)和坑點解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • springboot 集成cas5.3 實現(xiàn)sso單點登錄詳細流程

    springboot 集成cas5.3 實現(xiàn)sso單點登錄詳細流程

    SSO的定義是在多個應(yīng)用系統(tǒng)中,用戶只需要登錄一次就可以訪問所有相互信任的應(yīng)用系統(tǒng)。單點登錄是目前比較流行的企業(yè)業(yè)務(wù)整合的解決方案之一,本文給大家介紹springboot 集成cas5.3 實現(xiàn)sso單點登錄功能,感興趣的朋友一起看看吧
    2021-10-10
  • 仿釘釘流程輕松實現(xiàn)JSON轉(zhuǎn)BPMN完整實現(xiàn)過程示例

    仿釘釘流程輕松實現(xiàn)JSON轉(zhuǎn)BPMN完整實現(xiàn)過程示例

    這篇文章主要為大家介紹了仿釘釘流程輕松實現(xiàn)JSON轉(zhuǎn)BPMN完整實現(xiàn)過程示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • springboot項目中idea的pom.xml文件的引用標簽全部爆紅問題解決

    springboot項目中idea的pom.xml文件的引用標簽全部爆紅問題解決

    這篇文章主要介紹了springboot項目中idea的pom.xml文件的引用標簽全部爆紅問題解決,本文通過圖文并茂的形式給大家介紹的非常詳細,需要的朋友參考下吧
    2023-12-12

最新評論