Netty分布式pipeline傳播inbound事件源碼分析
前一小結(jié)回顧:pipeline管道Handler刪除
傳播inbound事件
有關(guān)于inbound事件, 在概述中做過(guò)簡(jiǎn)單的介紹, 就是以自己為基準(zhǔn), 流向自己的事件, 比如最常見(jiàn)的channelRead事件, 就是對(duì)方發(fā)來(lái)數(shù)據(jù)流的所觸發(fā)的事件, 己方要對(duì)這些數(shù)據(jù)進(jìn)行處理, 這一小節(jié), 以激活channelRead為例講解有關(guān)inbound事件的處理流程
在業(yè)務(wù)代碼中, 我們自己的handler往往會(huì)通過(guò)重寫(xiě)channelRead方法來(lái)處理對(duì)方發(fā)來(lái)的數(shù)據(jù), 那么對(duì)方發(fā)來(lái)的數(shù)據(jù)是如何走到channelRead方法中了呢, 也是我們這一小節(jié)要剖析的內(nèi)容
在業(yè)務(wù)代碼中, 傳遞channelRead事件方式是通過(guò)fireChannelRead方法進(jìn)行傳播的
這里給大家看兩種寫(xiě)法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫(xiě)法1:
ctx.fireChannelRead(msg);
//寫(xiě)法2
ctx.pipeline().fireChannelRead(msg);
}這里重寫(xiě)了channelRead方法, 并且方法體內(nèi)繼續(xù)通過(guò)fireChannelRead方法進(jìn)行傳播channelRead事件, 那么這兩種寫(xiě)法有什么異同?
我們先以寫(xiě)法2為例, 將這種寫(xiě)法進(jìn)行剖析
這里首先獲取當(dāng)前context的pipeline對(duì)象, 然后通過(guò)pipeline對(duì)象調(diào)用自身的fireChannelRead方法進(jìn)行傳播, 因?yàn)槟J(rèn)創(chuàng)建的DefaultChannelpipeline
我們跟到DefaultChannelpipeline的fireChannelRead方法中:
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}這里首先調(diào)用的是AbstractChannelHandlerContext類(lèi)的靜態(tài)方法invokeChannelRead, 參數(shù)傳入head節(jié)點(diǎn)和事件的消息
我們跟進(jìn)invokeChannelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}這里的Object m m通常就是我們傳入的msg, 而next, 目前是head節(jié)點(diǎn), 然后再判斷是否為當(dāng)前eventLoop線(xiàn)程, 如果不是則將方法包裝成task交給eventLoop線(xiàn)程處理
我們跟到invokeChannelRead方法中
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}首先通過(guò)invokeHandler()判斷當(dāng)前handler是否已添加, 如果添加, 則執(zhí)行當(dāng)前handler的chanelRead方法, 其實(shí)這里我們基本上就明白了, 通過(guò)fireChannelRead方法傳遞事件的過(guò)程中, 其實(shí)就是找到相關(guān)handler執(zhí)行其channelRead方法, 由于我們?cè)谶@里的handler就是head節(jié)點(diǎn), 所以我們跟到HeadContext的channelRead方法中:
HeadContext的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//向下傳遞channelRead事件
ctx.fireChannelRead(msg);
}在這里我們看到, 這里通過(guò)fireChannelRead方法繼續(xù)往下傳遞channelRead事件, 而這種調(diào)用方式, 就是我們剛才分析用戶(hù)代碼的第一種調(diào)用方式:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫(xiě)法1:
ctx.fireChannelRead(msg);
//寫(xiě)法2
ctx.pipeline().fireChannelRead(msg);
}這里直接通過(guò)context對(duì)象調(diào)用fireChannelRead方法, 那么和使用pipeline調(diào)用有什么區(qū)別的, 我會(huì)回到HeadConetx的channelRead方法, 我們來(lái)剖析ctx.fireChannelRead(msg)這句, 大家就會(huì)對(duì)這個(gè)問(wèn)題有答案了, 跟到ctx的fireChannelRead方法中, 這里會(huì)走到AbstractChannelHandlerContext類(lèi)中的fireChannelRead方法中
跟到AbstractChannelHandlerContext類(lèi)中的fireChannelRead方法:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}這里我們看到, invokeChannelRead方法中傳入了一個(gè)findContextInbound()參數(shù), 而這findContextInbound方法其實(shí)就是找到當(dāng)前Context的下一個(gè)節(jié)點(diǎn)
跟到findContextInbound方法
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}這里的邏輯也比較簡(jiǎn)單, 是通過(guò)一個(gè)doWhile循環(huán), 找到當(dāng)前handlerContext的下一個(gè)節(jié)點(diǎn), 這里要注意循環(huán)的終止條件, while (!ctx.inbound)表示下一個(gè)context標(biāo)志的事件不是inbound的事件, 則循環(huán)繼續(xù)往下找, 言外之意就是要找到下一個(gè)標(biāo)注inbound事件的節(jié)點(diǎn)
有關(guān)事件的標(biāo)注, 之前的小節(jié)已經(jīng)剖析過(guò)了, 如果是用戶(hù)定義的handler, 是通過(guò)handler繼承的接口而定的, 如果tail或者h(yuǎn)ead, 那么是在初始化的時(shí)候就已經(jīng)定義好, 這里不再贅述
回到AbstractChannelHandlerContext類(lèi)的fireChannelRead方法中:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}找到下一個(gè)節(jié)點(diǎn)后, 繼續(xù)調(diào)用invokeChannelRead方法, 傳入下一個(gè)和消息對(duì)象:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
//第一次執(zhí)行next其實(shí)就是head
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}這里的邏輯我們又不陌生了, 因?yàn)槲覀儌魅氲氖钱?dāng)前context的下一個(gè)節(jié)點(diǎn), 所以這里會(huì)調(diào)用下一個(gè)節(jié)點(diǎn)invokeChannelRead方法, 因我們剛才剖析的是head節(jié)點(diǎn), 所以下一個(gè)節(jié)點(diǎn)有可能是用戶(hù)添加的handler的包裝類(lèi)HandlerConext的對(duì)象
這里我們跟進(jìn)invokeChannelRead方法中去:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//發(fā)生異常的時(shí)候在這里捕獲異常
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}又是我們熟悉的邏輯, 調(diào)用了自身handler的channelRead方法, 如果是用戶(hù)自定義的handler, 則會(huì)走到用戶(hù)定義的channelRead()方法中去, 所以這里就解釋了為什么通過(guò)傳遞channelRead事件, 最終會(huì)走到用戶(hù)重寫(xiě)的channelRead方法中去
同樣, 也解釋了該小節(jié)最初提到過(guò)的兩種寫(xiě)法的區(qū)別:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//寫(xiě)法1:
ctx.fireChannelRead(msg);
//寫(xiě)法2
ctx.pipeline().fireChannelRead(msg);
}寫(xiě)法1是通過(guò)當(dāng)前節(jié)點(diǎn)往下傳播事件
寫(xiě)法2是通過(guò)頭節(jié)點(diǎn)往下傳遞事件
所以, 在handler中如果如果要在channelRead方法中傳遞channelRead事件, 一定要采用寫(xiě)法2的方式向下傳遞, 或者交給其父類(lèi)處理, 如果采用1的寫(xiě)法則每次事件傳輸?shù)竭@里都會(huì)繼續(xù)從head節(jié)點(diǎn)傳輸, 從而陷入死循環(huán)或者發(fā)生異常
這里有一點(diǎn)需要注意, 如果用戶(hù)代碼中channelRead方法, 如果沒(méi)有顯示的調(diào)用ctx.fireChannelRead(msg)那么事件則不會(huì)再往下傳播, 則事件會(huì)在這里終止, 所以如果我們寫(xiě)業(yè)務(wù)代碼的時(shí)候要考慮有關(guān)資源釋放的相關(guān)操作
如果ctx.fireChannelRead(msg)則事件會(huì)繼續(xù)往下傳播, 如果每一個(gè)handler都向下傳播事件, 當(dāng)然, 根據(jù)我們之前的分析channelRead事件只會(huì)在標(biāo)識(shí)為inbound事件的HandlerConetext中傳播, 傳播到最后, 則最終會(huì)調(diào)用到tail節(jié)點(diǎn)的channelRead方法
我們跟到tailConext的channelRead方法中
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}我們跟進(jìn)到onUnhandledInboundMessage方法中:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//釋放資源
ReferenceCountUtil.release(msg);
}
}這里做了釋放資源的相關(guān)的操作
至此, channelRead事件傳輸相關(guān)羅輯剖析完整, 其實(shí)對(duì)于inbound事件的傳輸流程都會(huì)遵循這一邏輯, 小伙伴們可以自行剖析其他inbound事件的傳輸流程,更多關(guān)于Netty分布式pipeline傳播inbound事件的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java模擬棧和隊(duì)列數(shù)據(jù)結(jié)構(gòu)的基本示例講解
這篇文章主要介紹了Java模擬棧和隊(duì)列數(shù)據(jù)結(jié)構(gòu)的基本示例,棧的后進(jìn)先出和隊(duì)列的先進(jìn)先出是數(shù)據(jù)結(jié)構(gòu)中最基礎(chǔ)的知識(shí),本文則又對(duì)Java實(shí)現(xiàn)棧和隊(duì)列結(jié)構(gòu)的方法進(jìn)行了細(xì)分,需要的朋友可以參考下2016-04-04
java源碼解析之String類(lèi)的compareTo(String otherString)方法
這篇文章主要給大家介紹了關(guān)于java源碼解析之String類(lèi)的compareTo(String otherString)方法的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-09-09
SpringBoot接入釘釘自定義機(jī)器人預(yù)警通知
本文主要介紹了SpringBoot接入釘釘自定義機(jī)器人預(yù)警通知,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Spring MVC簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
Spring MVC屬于SpringFrameWork的后續(xù)產(chǎn)品,已經(jīng)融合在Spring Web Flow里面。今天先從寫(xiě)一個(gè)Spring MVC的HelloWorld開(kāi)始,讓我們看看如何搭建起一個(gè)Spring mvc的環(huán)境并運(yùn)行程序,感興趣的朋友一起學(xué)習(xí)吧2017-08-08

