netty?pipeline中的inbound和outbound事件傳播分析
傳播inbound事件
有關(guān)于inbound
事件, 在概述中做過(guò)簡(jiǎn)單的介紹, 就是以自己為基準(zhǔn), 流向自己的事件, 比如最常見(jiàn)的channelRead
事件, 就是對(duì)方發(fā)來(lái)數(shù)據(jù)流的所觸發(fā)的事件, 己方要對(duì)這些數(shù)據(jù)進(jìn)行處理, 這一小節(jié), 以激活channelRead
為例講解有關(guān)inbound
事件的處理流程。
在業(yè)務(wù)代碼中, 我們自己的handler
往往會(huì)通過(guò)重寫channelRead
方法來(lái)處理對(duì)方發(fā)來(lái)的數(shù)據(jù), 那么對(duì)方發(fā)來(lái)的數(shù)據(jù)是如何走到channelRead
方法中了呢, 也是我們這一小節(jié)要剖析的內(nèi)容。
在業(yè)務(wù)代碼中, 傳遞channelRead
事件方式是通過(guò)fireChannelRead
方法進(jìn)行傳播的。
兩種寫法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里重寫了channelRead
方法, 并且方法體內(nèi)繼續(xù)通過(guò)fireChannelRead
方法進(jìn)行傳播channelRead
事件, 那么這兩種寫法有什么異同?
我們先以寫法2為例, 將這種寫法進(jìn)行剖析。
這里首先獲取當(dāng)前context
的pipeline
對(duì)象, 然后通過(guò)pipeline
對(duì)象調(diào)用自身的fireChannelRead
方法進(jìn)行傳播, 因?yàn)槟J(rèn)創(chuàng)建的DefaultChannelpipeline
。
DefaultChannelPipeline.fireChannelRead(msg)
public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
這里首先調(diào)用的是AbstractChannelHandlerContext
類的靜態(tài)方法invokeChannelRead
, 參數(shù)傳入head
節(jié)點(diǎn)和事件的消息
AbstractChannelHandlerContext.invokeChannelRead(head, msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的m
通常就是我們傳入的msg
, 而next
, 目前是head
節(jié)點(diǎn), 然后再判斷是否為當(dāng)前eventLoop
線程, 如果不是則將方法包裝成task
交給eventLoop
線程處理
AbstractChannelHandlerContext.invokeChannelRead(m)
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
首先通過(guò)invokeHandler()
判斷當(dāng)前handler
是否已添加, 如果添加, 則執(zhí)行當(dāng)前handler
的chanelRead
方法, 其實(shí)這里就明白了, 通過(guò)fireChannelRead
方法傳遞事件的過(guò)程中, 其實(shí)就是找到相關(guān)handler
執(zhí)行其channelRead
方法, 由于我們?cè)谶@里的handler
就是head
節(jié)點(diǎn), 所以我們跟到HeadContext
的channelRead
方法中
HeadContext的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //向下傳遞channelRead事件 ctx.fireChannelRead(msg); }
在這里我們看到, 這里通過(guò)fireChannelRead
方法繼續(xù)往下傳遞channelRead
事件, 而這種調(diào)用方式, 就是我們剛才分析用戶代碼的第一種調(diào)用方式
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
這里直接通過(guò)context
對(duì)象調(diào)用fireChannelRead
方法, 那么和使用pipeline
調(diào)用有什么區(qū)別的, 我會(huì)回到HeadConetx
的channelRead
方法, 我們來(lái)剖析ctx.fireChannelRead(msg)
這句, 大家就會(huì)對(duì)這個(gè)問(wèn)題有答案了, 跟到ctx
的fireChannelRead
方法中, 這里會(huì)走到AbstractChannelHandlerContext
類中的fireChannelRead
方法中
AbstractChannelHandlerContext.fireChannelRead(msg)
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這里我們看到, invokeChannelRead
方法中傳入了一個(gè)findContextInbound()
參數(shù), 而這findContextInbound
方法其實(shí)就是找到當(dāng)前Context
的下一個(gè)節(jié)點(diǎn)
AbstractChannelHandlerContext.findContextInbound()
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
這里的邏輯也比較簡(jiǎn)單, 是通過(guò)一個(gè)doWhile
循環(huán), 找到當(dāng)前handlerContext
的下一個(gè)節(jié)點(diǎn), 這里要注意循環(huán)的終止條件, while (!ctx.inbound)
表示下一個(gè)context
標(biāo)志的事件不是inbound
的事件, 則循環(huán)繼續(xù)往下找, 言外之意就是要找到下一個(gè)標(biāo)注inbound
事件的節(jié)點(diǎn)
有關(guān)事件的標(biāo)注, 之前已經(jīng)進(jìn)行了分析, 如果是用戶定義的handler
, 是通過(guò)handler
繼承的接口而定的, 如果tail
或者head
, 那么是在初始化的時(shí)候就已經(jīng)定義好, 這里不再贅述
回到AbstractChannelHandlerContext.fireChannelRead(msg)
AbstractChannelHandlerContext.fireChannelRead(msg)
public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
找到下一個(gè)節(jié)點(diǎn)后, 繼續(xù)調(diào)用invokeChannelRead
方法, 傳入下一個(gè)和消息對(duì)象
AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
這里的邏輯我們又不陌生了, 因?yàn)槲覀儌魅氲氖钱?dāng)前context
的下一個(gè)節(jié)點(diǎn), 所以這里會(huì)調(diào)用下一個(gè)節(jié)點(diǎn)invokeChannelRead
方法, 因我們剛才剖析的是head
節(jié)點(diǎn), 所以下一個(gè)節(jié)點(diǎn)有可能是用戶添加的handler
的包裝類HandlerConext
的對(duì)象
AbstractChannelHandlerContext.invokeChannelRead(Object msg)
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { //發(fā)生異常的時(shí)候在這里捕獲異常 notifyHandlerException(t); } } else { fireChannelRead(msg); } }
又是我們熟悉的邏輯, 調(diào)用了自身handler
的channelRead
方法, 如果是用戶自定義的handler
, 則會(huì)走到用戶定義的channelRead()
方法中去, 所以這里就解釋了為什么通過(guò)傳遞channelRead
事件, 最終會(huì)走到用戶重寫的channelRead
方法中去
同樣, 也解釋了該小節(jié)最初提到過(guò)的兩種寫法的區(qū)別
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //寫法1: ctx.fireChannelRead(msg); //寫法2 ctx.pipeline().fireChannelRead(msg); }
- 寫法1是通過(guò)當(dāng)前節(jié)點(diǎn)往下傳播事件
- 寫法2是通過(guò)頭節(jié)點(diǎn)往下傳遞事件
- 所以, 在
handler
中如果要在channelRead
方法中傳遞channelRead
事件, 一定要采用寫法1的方式向下傳遞, 或者交給其父類處理, 如果采用2的寫法則每次事件傳輸?shù)竭@里都會(huì)繼續(xù)從head
節(jié)點(diǎn)傳輸, 從而陷入死循環(huán)或者發(fā)生異常 - 還有一點(diǎn)需要注意, 如果用戶代碼中
channelRead
方法, 如果沒(méi)有顯示的調(diào)用ctx.fireChannelRead(msg)
那么事件則不會(huì)再往下傳播, 則事件會(huì)在這里終止, 所以如果我們寫業(yè)務(wù)代碼的時(shí)候要考慮有關(guān)資源釋放的相關(guān)操作
如果ctx.fireChannelRead(msg)
則事件會(huì)繼續(xù)往下傳播, 如果每一個(gè)handler
都向下傳播事件, 當(dāng)然, 根據(jù)我們之前的分析channelRead
事件只會(huì)在標(biāo)識(shí)為inbound
事件的HandlerConetext
中傳播, 傳播到最后, 則最終會(huì)調(diào)用到tail
節(jié)點(diǎn)的channelRead
方法
tailConext的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
onUnhandledInboundMessage(msg)
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { //釋放資源 ReferenceCountUtil.release(msg); } }
這里做了釋放資源的相關(guān)的操作
到這里,對(duì)于inbound
事件的傳輸流程以及channelRead
方法的執(zhí)行流程已經(jīng)分析完畢。
傳播outBound事件
有關(guān)于outBound
事件, 和inbound
正好相反,以自己為基準(zhǔn), 流向?qū)Ψ降氖录? 比如最常見(jiàn)的wirte
事件
在業(yè)務(wù)代碼中, , 有可能使用wirte方法往寫數(shù)據(jù)
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().write("test data"); }
當(dāng)然, 直接調(diào)用write
方法是不能往對(duì)方channel
中寫入數(shù)據(jù)的, 因?yàn)檫@種方式只能寫入到緩沖區(qū), 還要調(diào)用flush
方法才能將緩沖區(qū)數(shù)據(jù)刷到channel
中, 或者直接調(diào)用writeAndFlush
方法, 有關(guān)邏輯, 我們會(huì)在后面章節(jié)中詳細(xì)講解, 這里只是以wirte
方法為例為了演示outbound
事件的傳播的流程
兩種寫法
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
這兩種寫法有什么區(qū)別, 首先分析第一種寫法
//這里獲取ctx所綁定的channel ctx.channel().write("test data");
AbstractChannel.write(Object msg)
public ChannelFuture write(Object msg) { //這里pipeline是DefaultChannelPipeline return pipeline.write(msg); }
繼續(xù)跟蹤DefaultChannelPipeline.write(msg)
DefaultChannelPipeline.write(msg)
public final ChannelFuture write(Object msg) { //從tail節(jié)點(diǎn)開(kāi)始(從最后的節(jié)點(diǎn)往前寫) return tail.write(msg); }
這里調(diào)用tail
節(jié)點(diǎn)write
方法, 這里我們應(yīng)該能分析到, outbound
事件, 是通過(guò)tail
節(jié)點(diǎn)開(kāi)始往上傳播的。
其實(shí)tail
節(jié)點(diǎn)并沒(méi)有重寫write
方法, 最終會(huì)調(diào)用其父類AbstractChannelHandlerContext.write方法
AbstractChannelHandlerContext.write(Object msg)
public ChannelFuture write(Object msg) { return write(msg, newPromise()); }
這里有個(gè)newPromise()
這個(gè)方法, 這里是創(chuàng)建一個(gè)Promise
對(duì)象, 有關(guān)Promise
的相關(guān)知識(shí)會(huì)在以后章節(jié)進(jìn)行分析,繼續(xù)分析write
AbstractChannelHandlerContext.write(final Object msg, final ChannelPromise promise)
public ChannelFuture write(final Object msg, final ChannelPromise promise) { /** * 省略 * */ write(msg, false, promise); return promise; }
AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒(méi)有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里跟我們之前分析過(guò)channelRead
方法有點(diǎn)類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()
是獲取上一個(gè)標(biāo)注outbound
事件的HandlerContext
AbstractChannelHandlerContext.findContextOutbound()
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
這里的邏輯跟之前的findContextInbound()
方法有點(diǎn)像, 只是過(guò)程是反過(guò)來(lái)的
在這里, 會(huì)找到當(dāng)前context
的上一個(gè)節(jié)點(diǎn), 如果標(biāo)注的事件不是outbound
事件, 則繼續(xù)往上找, 意思就是找到上一個(gè)標(biāo)注outbound
事件的節(jié)點(diǎn)
回到AbstractChannelHandlerContext.write方法
AbstractChannelHandlerContext next = findContextOutbound();
這里將找到節(jié)點(diǎn)賦值到next
屬性中,因?yàn)槲覀冎胺治龅?code>write事件是從tail
節(jié)點(diǎn)傳播的, 所以上一個(gè)節(jié)點(diǎn)就有可能是用戶自定的handler
所屬的context
然后判斷是否為當(dāng)前eventLoop
線程, 如果是不是, 則封裝成task
異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush
方法, 因?yàn)槲覀冞@里沒(méi)有調(diào)用, 所以會(huì)執(zhí)行到next.invokeWrite(m, promise)
AbstractChannelHandlerContext.invokeWrite(Object msg, ChannelPromise promise)
private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } }
這里會(huì)判斷當(dāng)前handler
的狀態(tài)是否是添加狀態(tài), 這里返回的是true
, 將會(huì)走到invokeWrite0(msg, promise)
這一步
AbstractChannelHandlerContext.invokeWrite0(Object msg, ChannelPromise promise)
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調(diào)用當(dāng)前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
這里的邏輯也似曾相識(shí), 調(diào)用了當(dāng)前節(jié)點(diǎn)包裝的handler
的write
方法, 如果用戶沒(méi)有重寫write
方法, 則會(huì)交給其父類處理
ChannelOutboundHandlerAdapter.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
這里調(diào)用了當(dāng)前ctx
的write
方法, 這種寫法和我們小節(jié)開(kāi)始的寫法是相同的, 我們回顧一下
public void channelActive(ChannelHandlerContext ctx) throws Exception { //寫法1 ctx.channel().write("test data"); //寫法2 ctx.write("test data"); }
我們跟到其write
方法中, 這里走到的是AbstractChannelHandlerContext
類的write
方法
AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒(méi)有調(diào)flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點(diǎn)的上一個(gè)標(biāo)注事件為outbound
事件的節(jié)點(diǎn), 繼續(xù)執(zhí)行invokeWrite
方法, 根據(jù)之前的剖析, 我們知道最終會(huì)執(zhí)行到上一個(gè)handler
的write
方法中。
走到這里已經(jīng)不難理解, ctx.channel().write("test data")
其實(shí)是從tail
節(jié)點(diǎn)開(kāi)始傳播寫事件, 而ctx.write("test data")
是從自身開(kāi)始傳播寫事件。
所以, 在handler
中如果重寫了write
方法要傳遞write
事件, 一定采用ctx.write("test data")
這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")
這種方式, 因?yàn)闀?huì)造成每次事件傳輸?shù)竭@里都會(huì)從tail
節(jié)點(diǎn)重新傳輸, 導(dǎo)致不可預(yù)知的錯(cuò)誤。
如果用代碼中沒(méi)有重寫handler
的write
方法, 則事件會(huì)一直往上傳輸, 當(dāng)傳輸完所有的outbound
節(jié)點(diǎn)之后, 最后會(huì)走到head
節(jié)點(diǎn)的wirte
方法中。
HeadContext.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
我們看到write
事件最終會(huì)流向這里, 通過(guò)unsafe
對(duì)象進(jìn)行最終的寫操作
inbound事件和outbound事件的傳輸流程圖
以上就是netty中pipeline的inbound和outbound事件傳播分析的詳細(xì)內(nèi)容,更多關(guān)于netty pipeline事件傳播的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java如何使用ReentrantLock實(shí)現(xiàn)長(zhǎng)輪詢
這篇文章主要介紹了如何使用ReentrantLock實(shí)現(xiàn)長(zhǎng)輪詢,對(duì)ReentrantLock感興趣的同學(xué),可以參考下2021-04-04SpringBoot webSocket實(shí)現(xiàn)發(fā)送廣播、點(diǎn)對(duì)點(diǎn)消息和Android接收
這篇文章主要介紹了SpringBoot webSocket實(shí)現(xiàn)發(fā)送廣播、點(diǎn)對(duì)點(diǎn)消息和Android接收,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-03-03解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstr
這篇文章主要介紹了解讀@NoArgsConstructor,@AllArgsConstructor,@RequiredArgsConstructor的區(qū)別及在springboot常用地方,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Java數(shù)據(jù)結(jié)構(gòu)之散列表詳解
散列表(Hash table,也叫哈希表),是根據(jù)關(guān)鍵碼值(Key value)而直接進(jìn)行訪問(wèn)的數(shù)據(jù)結(jié)構(gòu)。本文將為大家具體介紹一下散列表的原理及其代碼實(shí)現(xiàn)2022-01-01java Iterator接口和LIstIterator接口分析
這篇文章主要介紹了java Iterator接口和LIstIterator接口分析的相關(guān)資料,需要的朋友可以參考下2017-05-05feign 如何獲取請(qǐng)求真實(shí)目的ip地址
這篇文章主要介紹了feign 獲取請(qǐng)求真實(shí)目的ip地址操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06