Netty分布式pipeline管道傳播事件的邏輯總結(jié)分析
我們在第一章和第三章中, 遺留了很多有關(guān)事件傳輸?shù)南嚓P(guān)邏輯, 這里帶大家一一回顧
問題分析
首先看兩個問題:
1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法
2.客戶端handler是什么時候被添加的?
首先看第一個問題
1.在客戶端接入的時候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))為什么會調(diào)用到ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor中的channelRead()方法?
我們首先看這段代碼:
public void read() { //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用 assert eventLoop().inEventLoop(); //服務(wù)端channel的config final ChannelConfig config = config(); //服務(wù)端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理服務(wù)端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設(shè)置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //創(chuàng)建jdk底層的channel //readBuf用于臨時承載讀到鏈接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的鏈接進(jìn)行計數(shù) allocHandle.incMessagesRead(localRead); //連接數(shù)是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端連接 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞 //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略 } finally { //代碼省略 } }
重點看pipeline.fireChannelRead(readBuf.get(i))
首先, 這里pipeline是服務(wù)端channel的pipeline, 也就是NioServerSocketChannel的pipeline
我們學(xué)習(xí)過pipeline之后, 對這種寫法并不陌生, 就是傳遞channelRead事件, 這里通過傳遞channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 說明在這步之前, ServerBootstrapAcceptor作為一個handler添加到了服務(wù)端channel的pipeline中, 那么這個handler什么時候添加的呢?
我們回顧下第一章, 初始化NioServerSocketChannel的時候, 調(diào)用了ServerBootstrap的init方法:
void init(Channel channel) throws Exception { //獲取用戶定義的選項(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //獲取用戶定義的屬性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //獲取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work線程組(4) final EventLoopGroup currentChildGroup = childGroup; //用戶設(shè)置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //選項轉(zhuǎn)化為Entry對象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //屬性轉(zhuǎn)化為Entry對象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服務(wù)端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
這個方法比較長, 我們重點關(guān)注第8步, 添加服務(wù)端channel, 這里的pipeline, 是服務(wù)服務(wù)端channel的pipeline, 也就是NioServerSocketChannel綁定的pipeline, 這里添加了一個ChannelInitializer類型的handler
我們看一下ChannelInitializer這個類的繼承關(guān)系
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { //省略類體 }
我們看到其繼承了ChannelInboundHandlerAdapter, 說明是一個inbound類型的handler
這里我們可能會想到, 添加完handler會執(zhí)行handlerAdded, 然后再handlerAdded方法中做了添加ServerBootstrapAcceptor這個handler
但是, 實際上并不是這樣的, 當(dāng)程序執(zhí)行到這里, 并沒有馬上執(zhí)行handlerAdded, 我們緊跟addLast方法
最后會跟到DefualtChannelPipeline的一個addLast方法中去:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判斷handler是否被重復(fù)添加(1) checkMultiplicity(handler); //創(chuàng)建一個HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注冊 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回調(diào)用戶事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回調(diào)添加事件(4) callHandlerAdded0(newCtx); return this; }
首先完成了handler的添加, 但是并沒有馬上執(zhí)行回調(diào)
這里我們重點關(guān)注if (!registered)這個條件判斷, 其實在注冊完成, registered會變成true, 但是走到這一步的時候NioServerSockeChannel并沒有完成注冊(可以回顧第一章看注冊在哪一步), 所以會進(jìn)到if里并返回自身
我們重點關(guān)注callHandlerCallbackLater這個方法, 我們跟進(jìn)去:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //判斷是否已添加, 未添加, 進(jìn)行添加, 已添加進(jìn)行刪除 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); //獲取第一個Callback任務(wù) PendingHandlerCallback pending = pendingHandlerCallbackHead; //如果第一個Callback任務(wù)為空 if (pending == null) { //將第一個任務(wù)設(shè)置為剛創(chuàng)建的任務(wù) pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }
因我們調(diào)用這個方法的時候added傳的true, 所以PendingHandlerCallback task賦值為new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask這個類, 我們從名字可以看出, 這是一個handler添加的延遲任務(wù), 用于執(zhí)行handler延遲添加的操作, 同樣也對應(yīng)一個名字為PendingHandlerRemovedTask的類, 用于執(zhí)行延遲刪除handler的操作, 這兩個類都繼承抽象類PendingHandlerCallback
我們看PendingHandlerAddedTask類構(gòu)造方法:
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); }
這里調(diào)用了父類的構(gòu)造方法, 再跟進(jìn)去:
PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; }
在父類中, 保存了要添加的context, 也就是ChannelInitializer類型的包裝類
回到callHandlerCallbackLater方法中
PendingHandlerCallback pending = pendingHandlerCallbackHead;
這表示獲取第一個PendingHandlerCallback的任務(wù), 其實PendingHandlerCallback是一個單向鏈表, 自身維護(hù)一個PendingHandlerCallback類型的next, 指向下一個任務(wù), 在DefaultChannelPipeline這個類中, 定義了個PendingHandlerCallback類型的引用pendingHandlerCallbackHead, 用來指向延遲回調(diào)任務(wù)的中的第一個任務(wù)
之后判斷這個任務(wù)是為空, 如果是第一次添加handler, 那么這里就是空, 所以將第一個任務(wù)賦值為我們剛創(chuàng)建的添加任務(wù)
如果不是第一次添加handler, 則將我們新創(chuàng)建的任務(wù)添加到鏈表的尾部, 因為這里我們是第一次添加, 所以第一個回調(diào)任務(wù)就指向了我們創(chuàng)建的添加handler的任務(wù)
完成這一系列操作之后, addLast方法返歸, 此時并沒有完成添加操作
而什么時候完成添加操作的呢?
在服務(wù)端channel注冊時候的會走到AbstractChannel的register0方法:
private void register0(ChannelPromise promise) { try { //做實際的注冊(1) doRegister(); neverRegistered = false; registered = true; //觸發(fā)事件(2) pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //觸發(fā)注冊成功事件(3) pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //傳播active事件(4) pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //省略代碼 } }
重點關(guān)注第二步pipeline.invokeHandlerAddedIfNeeded(), 這里已經(jīng)通過doRegister()方法完成了實際的注冊, 我們跟到該方法中:
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
這里會判斷是否第一次注冊, 這里返回true, 然后會執(zhí)行callHandlerAddedForAllHandlers()方法, 我們跟進(jìn)去:
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } //獲取task PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //執(zhí)行添加handler方法 task.execute(); task = task.next; } }
這里拿到第一個延遲執(zhí)行handler添加的task其實就是我們之前剖析過的, 延遲執(zhí)行handler添加的task, 就是PendingHandlerAddedTask對象
在while循環(huán)中, 通過執(zhí)行execute()方法將handler添加
我們跟到PendingHandlerAddedTask的execute()方法中:
void execute() { //獲取當(dāng)前eventLoop線程 EventExecutor executor = ctx.executor(); //是當(dāng)前執(zhí)行的線程 if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { //添加到隊列 executor.execute(this); } catch (RejectedExecutionException e) { //代碼省略 } } }
終于在這里, 我們看到了執(zhí)行回調(diào)的方法
再回到init方法中:
void init(Channel channel) throws Exception { //獲取用戶定義的選項(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //獲取用戶定義的屬性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //獲取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work線程組(4) final EventLoopGroup currentChildGroup = childGroup; //用戶設(shè)置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //選項轉(zhuǎn)化為Entry對象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //屬性轉(zhuǎn)化為Entry對象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服務(wù)端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我們繼續(xù)看第8步添加服務(wù)端handler
因為這里的handler是ChannelInitializer, 所以完成添加之后會調(diào)用ChannelInitializer的handlerAdded方法
跟到handlerAdded方法:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //默認(rèn)情況下, 會返回true if (ctx.channel().isRegistered()) { initChannel(ctx); } }
因為執(zhí)行到這步服務(wù)端channel已經(jīng)完成注冊, 所以會執(zhí)行到initChannel方法
跟到initChannel方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { //這段代碼是否被執(zhí)行過 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { //調(diào)用之后會刪除當(dāng)前節(jié)點 remove(ctx); } return true; } return false; }
我們關(guān)注initChannel這個方法, 這個方法是在ChannelInitializer的匿名內(nèi)部來實現(xiàn)的, 這里我們注意, 在initChannel方法執(zhí)行完畢之后會調(diào)用remove(ctx)刪除當(dāng)前節(jié)點
我們繼續(xù)跟進(jìn)initChannel方法:
@Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
這里首先添加用戶自定義的handler, 這里如果用戶沒有定義, 則添加不成功, 然后, 會調(diào)用addLast將ServerBootstrapAcceptor這個handler添加了進(jìn)去, 同樣這個handler也繼承了ChannelInboundHandlerAdapter, 在這個handler中, 重寫了channelRead方法, 所以, 這就是第一個問題的答案
緊接著我們看第二個問題
2.客戶端handler是什么時候被添加的?
我們這里看ServerBootstrapAcceptor的channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //添加channelHadler, 這個channelHandler, 就是用戶代碼添加的ChannelInitializer child.pipeline().addLast(childHandler); //代碼省略 try { //work線程注冊channel childGroup.register(child).addListener(new ChannelFutureListener() { //代碼省略 }); } catch (Throwable t) { forceClose(child, t); } }
這里真相可以大白了, 服務(wù)端再創(chuàng)建完客戶端channel之后, 將新創(chuàng)建的NioSocketChannel作為參數(shù)觸發(fā)channelRead事件(可以回顧NioMessageUnsafe的read方法, 代碼這里就不貼了), 所以這里的參數(shù)msg就是NioSocketChannel
拿到channel時候再將客戶端的handler添加進(jìn)去, 我們回顧客戶端handler的添加過程:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
和服務(wù)端channel的邏輯一樣, 首先會添加ChannelInitializer這個handler但是沒有注冊所以沒有執(zhí)行添加handler的回調(diào), 將任務(wù)保存到一個延遲回調(diào)的task中
等客戶端channel注冊完畢, 會將執(zhí)行添加handler的回調(diào), 也就是handlerAdded方法, 在回調(diào)中執(zhí)行initChannel方法將客戶端handler添加進(jìn)去, 然后刪除ChannelInitializer這個handler
因為在服務(wù)端channel中這塊邏輯已經(jīng)進(jìn)行了詳細(xì)的剖析, 所以這邊就不在贅述, 同學(xué)們可以自己跟進(jìn)去走一遍流程
這里注意, 因為每創(chuàng)建一個NioSoeketChannel都會調(diào)用服務(wù)端ServerBootstrapAcceptor的channelRead方法, 所以這里會將每一個NioSocketChannel的handler進(jìn)行添加
章節(jié)總結(jié)
本章剖析了事件傳輸?shù)南嚓P(guān)邏輯, 包括handler的添加, 刪除, inbound和outbound以及異常事件的傳輸, 最后結(jié)合第一章和第三章, 剖析了服務(wù)端channel和客戶端channel的添加過程, 同學(xué)們可以課后跟進(jìn)源碼, 將這些功能自己再走一遍以加深印象.其他的有關(guān)事件傳輸?shù)倪壿? 可以結(jié)合這一章的知識點進(jìn)行自行剖析
更多關(guān)于Netty分布式pipeline管道傳播事件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java日常練習(xí)題,每天進(jìn)步一點點(25)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07Java 使用openoffice進(jìn)行word轉(zhuǎn)換為pdf的方法步驟
這篇文章主要介紹了Java 使用openoffice進(jìn)行word轉(zhuǎn)換為pdf的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04windows java.exe內(nèi)存暴漲解決、idea跑java\ tomcat內(nèi)存無限增長
這篇文章主要介紹了windows java.exe內(nèi)存暴漲解決、idea跑 java\ tomcat內(nèi)存無限增長,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01