Netty分布式pipeline管道傳播outBound事件源碼解析
了解了inbound事件的傳播過(guò)程, 對(duì)于學(xué)習(xí)outbound事件傳輸?shù)牧鞒? 也不會(huì)太困難
outbound事件傳輸流程
在我們業(yè)務(wù)代碼中, 有可能使用wirte方法往寫(xiě)數(shù)據(jù):
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }
當(dāng)然, 直接調(diào)用write方法是不能往對(duì)方channel中寫(xiě)入數(shù)據(jù)的, 因?yàn)檫@種方式只能寫(xiě)入到緩沖區(qū), 還要調(diào)用flush方法才能將緩沖區(qū)數(shù)據(jù)刷到channel中, 或者直接調(diào)用writeAndFlush方法, 有關(guān)邏輯, 我們會(huì)在后面章節(jié)中詳細(xì)講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程
這里我們同樣給出兩種寫(xiě)法
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫(xiě)法1 ctx.channel().write("test data"); //寫(xiě)法2 ctx.write("test data"); }
這兩種寫(xiě)法有什么區(qū)別, 我們首先跟到第一種寫(xiě)法中去:
ctx.channel().write("test data");
這里獲取ctx所綁定的channel
我們跟到AbstractChannel的write方法中:
public ChannelFuture write(Object msg) { return pipeline.write(msg); }
這里pipeline是DefaultChannelPipeline
跟到其write方法中:
public final ChannelFuture write(Object msg) { //從tail節(jié)點(diǎn)開(kāi)始(從最后的節(jié)點(diǎn)往前寫(xiě)) return tail.write(msg); }
這里調(diào)用tail節(jié)點(diǎn)write方法, 這里我們應(yīng)該能分析到, outbound事件, 是通過(guò)tail節(jié)點(diǎn)開(kāi)始往上傳播的, 帶著這點(diǎn)猜想, 我們繼往下看
其實(shí)tail節(jié)點(diǎn)并沒(méi)有重寫(xiě)write方法, 最終會(huì)調(diào)用其父類AbstractChannelHandlerContext的write方法
AbstractChannelHandlerContext的write方法:
public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
我們看到這里有個(gè)newPromise()這個(gè)方法, 這里是創(chuàng)建一個(gè)Promise對(duì)象, 有關(guān)Promise的相關(guān)知識(shí)我們會(huì)在以后的章節(jié)剖析
我們繼續(xù)跟write:
public ChannelFuture write(final Object msg, final ChannelPromise promise) { //代碼省略 write(msg, false, promise); return promise; }
繼續(xù)跟write:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒(méi)有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里跟我們上一小節(jié)剖析過(guò)channelRead方法有點(diǎn)類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()是獲取上一個(gè)標(biāo)注outbound事件的HandlerContext
跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
這里的邏輯我們似曾相識(shí), 跟我們上一小節(jié)的findContextInbound()方法有點(diǎn)像, 只是過(guò)程是反過(guò)來(lái)的
在這里, 會(huì)找到當(dāng)前context的上一個(gè)節(jié)點(diǎn), 如果標(biāo)注的事件不是outbound事件, 則繼續(xù)往上找, 意思就是找到上一個(gè)標(biāo)注outbound事件的節(jié)點(diǎn)
回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();
這里將找到節(jié)點(diǎn)賦值到next屬性中
因?yàn)槲覀冎胺治龅膚rite事件是從tail節(jié)點(diǎn)傳播的, 所以上一個(gè)節(jié)點(diǎn)就有可能是用戶自定的handler所屬的context
然后判斷是否為當(dāng)前eventLoop線程, 如果是不是, 則封裝成task異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush方法, 因?yàn)槲覀冞@里沒(méi)有調(diào)用, 所以會(huì)執(zhí)行到next.invokeWrite(m, promise),
我們繼續(xù)跟invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這里會(huì)判斷當(dāng)前handler的狀態(tài)是否是添加狀態(tài), 這里返回的是true, 將會(huì)走到invokeWrite0(msg, promise)這一步
繼續(xù)跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調(diào)用當(dāng)前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
這里的邏輯也似曾相識(shí), 調(diào)用了當(dāng)前節(jié)點(diǎn)包裝的handler的write方法, 如果用戶沒(méi)有重寫(xiě)write方法, 則會(huì)交給其父類處理
我們跟到ChannelOutboundHandlerAdapter的write方法中看:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
這里調(diào)用了當(dāng)前ctx的write方法, 這種寫(xiě)法和我們小節(jié)開(kāi)始的寫(xiě)法是相同的, 我們回顧一下:
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫(xiě)法1 ctx.channel().write("test data"); //寫(xiě)法2 ctx.write("test data"); }
我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒(méi)有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點(diǎn)的上一個(gè)標(biāo)注事件為outbound事件的節(jié)點(diǎn), 繼續(xù)執(zhí)行invokeWrite方法, 根據(jù)之前的剖析, 我們知道最終會(huì)執(zhí)行到上一個(gè)handler的write方法中
走到這里已經(jīng)不難理解, ctx.channel().write("test data")其實(shí)是從tail節(jié)點(diǎn)開(kāi)始傳播寫(xiě)事件, 而ctx.write("test data")是從自身開(kāi)始傳播寫(xiě)事件
所以, 在handler中如果重寫(xiě)了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因?yàn)闀?huì)造成每次事件傳輸?shù)竭@里都會(huì)從tail節(jié)點(diǎn)重新傳輸, 導(dǎo)致不可預(yù)知的錯(cuò)誤
如果用代碼中沒(méi)有重寫(xiě)handler的write方法, 則事件會(huì)一直往上傳輸, 當(dāng)傳輸完所有的outbound節(jié)點(diǎn)之后, 最后會(huì)走到head節(jié)點(diǎn)的wirte方法中
我們跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
我們看到write事件最終會(huì)流向這里, 通過(guò)unsafe對(duì)象進(jìn)行最終的寫(xiě)操作
有關(guān)inbound事件和outbound事件的傳輸, 可通過(guò)下圖進(jìn)行說(shuō)明:
以上就是Netty分布式pipeline管道傳播outBound事件源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式pipeline管道傳播outBound的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringSceurity實(shí)現(xiàn)短信驗(yàn)證碼功能的示例代碼
這篇文章主要介紹了SpringSceurity實(shí)現(xiàn)短信驗(yàn)證碼功能的示例代碼,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06淺談一下Java為什么不能使用字符流讀取非文本的二進(jìn)制文件
這篇文章主要介紹了淺談一下為什么不能使用字符流讀取非文本的二進(jìn)制文件,剛學(xué)Java的IO流部分時(shí),書(shū)上說(shuō)只能使用字節(jié)流去讀取圖片、視頻等非文本二進(jìn)制文件,不能使用字符流,否則文件會(huì)損壞,需要的朋友可以參考下2023-04-04關(guān)于maven打包時(shí)的報(bào)錯(cuò): Return code is: 501 , ReasonPhrase:HTTPS Requ
這篇文章主要介紹了關(guān)于maven打包時(shí)的報(bào)錯(cuò): Return code is: 501 , ReasonPhrase:HTTPS Required,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09JAVA使用隨機(jī)數(shù)實(shí)現(xiàn)概率抽獎(jiǎng)
這篇文章主要為大家詳細(xì)介紹了JAVA使用隨機(jī)數(shù)實(shí)現(xiàn)概率抽獎(jiǎng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-11-11Java中關(guān)于int和Integer的區(qū)別詳解
本篇文章小編為大家介紹,在Java中 關(guān)于int和Integer的區(qū)別詳解,需要的朋友參考下2013-04-04mybatisplus邏輯刪除基本實(shí)現(xiàn)和坑點(diǎn)解決
這篇文章主要介紹了mybatisplus邏輯刪除基本實(shí)現(xiàn)和坑點(diǎn)解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03springboot 集成cas5.3 實(shí)現(xiàn)sso單點(diǎn)登錄詳細(xì)流程
SSO的定義是在多個(gè)應(yīng)用系統(tǒng)中,用戶只需要登錄一次就可以訪問(wèn)所有相互信任的應(yīng)用系統(tǒng)。單點(diǎn)登錄是目前比較流行的企業(yè)業(yè)務(wù)整合的解決方案之一,本文給大家介紹springboot 集成cas5.3 實(shí)現(xiàn)sso單點(diǎn)登錄功能,感興趣的朋友一起看看吧2021-10-10仿釘釘流程輕松實(shí)現(xiàn)JSON轉(zhuǎn)BPMN完整實(shí)現(xiàn)過(guò)程示例
這篇文章主要為大家介紹了仿釘釘流程輕松實(shí)現(xiàn)JSON轉(zhuǎn)BPMN完整實(shí)現(xiàn)過(guò)程示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08springboot項(xiàng)目中idea的pom.xml文件的引用標(biāo)簽全部爆紅問(wèn)題解決
這篇文章主要介紹了springboot項(xiàng)目中idea的pom.xml文件的引用標(biāo)簽全部爆紅問(wèn)題解決,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友參考下吧2023-12-12