netty服務(wù)端處理請求聯(lián)合pipeline分析
兩個問題
- 在客戶端接入的時候,
NioMessageUnsafe
的read
方法中pipeline.fireChannelRead(readBuf.get(i))
為什么會調(diào)用到ServerBootstrap
的內(nèi)部類ServerBootstrapAcceptor
中的channelRead()
方法。 - 客戶端
handler
是什么時候被添加的?
先分析第一個問題?;氐絥etty處理客戶端請求分析_1中服務(wù)端接收到accpet
事件后,進(jìn)行讀取的方法NioMessageUnsafe.read()
NioMessageUnsafe.read()
public void read() { //必須是NioEventLoop方法調(diào)用的, 不能通過外部線程調(diào)用 assert eventLoop().inEventLoop(); //服務(wù)端channel的config final ChannelConfig config = config(); //服務(wù)端channel的pipeline final ChannelPipeline pipeline = pipeline(); //處理服務(wù)端接入的速率 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //設(shè)置配置 allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //創(chuàng)建jdk底層的channel //readBuf用于臨時承載讀到鏈接 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //分配器將讀到的鏈接進(jìn)行計數(shù) allocHandle.incMessagesRead(localRead); //連接數(shù)是否超過最大值 } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍歷每一條客戶端連接 for (int i = 0; i < size; i ++) { readPending = false; //傳遞事件, 將創(chuàng)建NioSokectChannel進(jìn)行傳遞 //最終會調(diào)用ServerBootstrap的內(nèi)部類ServerBootstrapAcceptor的channelRead()方法 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //代碼省略 } finally { //代碼省略 } }
重點(diǎn)看pipeline.fireChannelRead(readBuf.get(i))
首先, 這里pipeline
是服務(wù)端channel
的pipeline
, 也就是NioServerSocketChannel
的pipeline
我們學(xué)習(xí)過pipeline
之后, 對這種寫法并不陌生, 就是傳遞channelRead
事件, 這里通過傳遞channelRead
事件走到了ServerBootstrapAcceptor
的channelRead()
方法, 說明在這步之前, ServerBootstrapAcceptor
作為一個handler
添加到了服務(wù)端channel
的pipeline
中, 那么這個handler
什么時候添加的呢?
我們回顧下第一章, 初始化NioServerSocketChannel的時候, 調(diào)用了ServerBootstrap的init方法 回顧下ServerBootstrap.init
的調(diào)用鏈路:
ServerBootstrap.bind(8899)
---> AbstractBootstrap.doBind(final SocketAddress localAddress)
---> AbstractBootstrap.initAndRegister()
---> ServerBootstrap.init(Channel channel)
ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception { //獲取用戶定義的選項(xiàng)(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //獲取用戶定義的屬性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //獲取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work線程組(4) final EventLoopGroup currentChildGroup = childGroup; //用戶設(shè)置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //選項(xiàng)轉(zhuǎn)化為Entry對象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //屬性轉(zhuǎn)化為Entry對象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服務(wù)端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我們重點(diǎn)關(guān)注第8步, 添加服務(wù)端channel
, 這里的pipeline
, 是服務(wù)服務(wù)端channel
的pipeline
, 也就是NioServerSocketChannel
綁定的pipeline
, 這里添加了一個ChannelInitializer
類型的handler
ChannelInitializer的繼承關(guān)系
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { //省略類體 }
我們看到其繼承了ChannelInboundHandlerAdapter
, 說明是一個inbound
類型的handler
這里我們可能會想到, 添加完handler
會執(zhí)行handlerAdded
, 然后在handlerAdded
方法中做了添加ServerBootstrapAcceptor
這個handler
但是, 實(shí)際上并不是這樣的, 當(dāng)程序執(zhí)行到這里, 并沒有馬上執(zhí)行handlerAdded
, 我們緊跟addLast
方法
最后執(zhí)行到DefualtChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判斷handler是否被重復(fù)添加(1) checkMultiplicity(handler); //創(chuàng)建一個HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注冊 if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回調(diào)用戶事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回調(diào)添加事件(4) callHandlerAdded0(newCtx); return this; }
首先完成了handler
的添加, 但是并沒有馬上執(zhí)行回調(diào)
這里我們重點(diǎn)關(guān)注if (!registered)
這個條件判斷, 其實(shí)在注冊完成, registered
會變成true
, 但是走到這一步的時候NioServerSockeChannel
并沒有完成注冊(可以回顧第一章看注冊在哪一步), 所以會進(jìn)到if里并返回自身
DefualtChannelPipeline.callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; //判斷是否已添加, 未添加, 進(jìn)行添加, 已添加進(jìn)行刪除 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); //獲取第一個Callback任務(wù) PendingHandlerCallback pending = pendingHandlerCallbackHead; //如果第一個Callback任務(wù)為空 if (pending == null) { //將第一個任務(wù)設(shè)置為剛創(chuàng)建的任務(wù) pendingHandlerCallbackHead = task; } else { while (pending.next != null) { pending = pending.next; } pending.next = task; } }
因我們調(diào)用這個方法的時候added
傳的true
, 所以PendingHandlerCallback task
賦值為new PendingHandlerAddedTask(ctx)
PendingHandlerAddedTask
這個類, 我們從名字可以看出, 這是一個handler
添加的延遲任務(wù), 用于執(zhí)行handler
延遲添加的操作, 同樣也對應(yīng)一個名字為PendingHandlerRemovedTask
的類, 用于執(zhí)行延遲刪除handler
的操作, 這兩個類都繼承抽象類PendingHandlerCallback
PendingHandlerAddedTask構(gòu)造方法
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); }
進(jìn)入super(ctx)
PendingHandlerCallback構(gòu)造方法
PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx; }
在父類中, 保存了要添加的context
, 也就是ChannelInitializer
類型的包裝類
回到callHandlerCallbackLater方法
PendingHandlerCallback pending = pendingHandlerCallbackHead;
這表示獲取第一個PendingHandlerCallback
的任務(wù), 其實(shí)PendingHandlerCallback
是一個單向鏈表, 自身維護(hù)一個PendingHandlerCallback
類型的next
, 指向下一個任務(wù), 在DefaultChannelPipeline
這個類中, 定義了個PendingHandlerCallback
類型的引用pendingHandlerCallbackHead
, 用來指向延遲回調(diào)任務(wù)的中的第一個任務(wù)。
之后判斷這個任務(wù)是為空, 如果是第一次添加handler
, 那么這里就是空, 所以將第一個任務(wù)賦值為我們剛創(chuàng)建的添加任務(wù)。
如果不是第一次添加handler
, 則將我們新創(chuàng)建的任務(wù)添加到鏈表的尾部, 因?yàn)檫@里我們是第一次添加, 所以第一個回調(diào)任務(wù)就指向了我們創(chuàng)建的添加handler
的任務(wù)。
完成這一系列操作之后, addLast
方法返歸, 此時并沒有完成添加操作。
而什么時候完成添加操作的呢?
回到在服務(wù)端channel注冊時候的會走到AbstractChannel.register0方法 回顧下AbstractChannel.register0
的調(diào)用鏈路:
ServerBootstrap.bind(8899)
---> AbstractBootstrap.doBind(final SocketAddress localAddress)
---> AbstractBootstrap.initAndRegister()
---> config().group().register(channel)
---> SingleThreadEventLoop.register(final ChannelPromise promise)
---> AbstractChannel.register(EventLoop eventLoop, final ChannelPromise promise)
---> AbstractChannel.register0(ChannelPromise promise)
AbstractChannel.register0(ChannelPromise promise)
private void register0(ChannelPromise promise) { try { //做實(shí)際的注冊(1) doRegister(); neverRegistered = false; registered = true; //觸發(fā)事件(2) pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //觸發(fā)注冊成功事件(3) pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //傳播active事件(4) pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //省略代碼 } }
重點(diǎn)關(guān)注第二步pipeline.invokeHandlerAddedIfNeeded(), 這里已經(jīng)通過doRegister()方法完成了實(shí)際的注冊, 我們跟到該方法中
pipeline.invokeHandlerAddedIfNeeded()
final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false; callHandlerAddedForAllHandlers(); } }
這里會判斷是否第一次注冊, 這里返回true
, 然后會執(zhí)行callHandlerAddedForAllHandlers()
方法, 我們跟進(jìn)去
DefaultChannelPipeline.callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; this.pendingHandlerCallbackHead = null; } //獲取task PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { //執(zhí)行添加handler方法 task.execute(); task = task.next; } }
這里拿到第一個延遲執(zhí)行handler
添加的task
其實(shí)就是我們之前剖析過的, 延遲執(zhí)行handler
添加的task
, 就是PendingHandlerAddedTask
對象
在while
循環(huán)中, 通過執(zhí)行execute()
方法將handler
添加
進(jìn)入PendingHandlerAddedTask.execute()
void execute() { //獲取當(dāng)前eventLoop線程 EventExecutor executor = ctx.executor(); //是當(dāng)前執(zhí)行的線程 if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { //添加到隊(duì)列 executor.execute(this); } catch (RejectedExecutionException e) { //代碼省略 } } }
再進(jìn)入callHandlerAdded0
方法
callHandlerAdded0(final AbstractChannelHandlerContext ctx)
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { //省略... } }
終于在這里, 我們看到了執(zhí)行回調(diào)的方法
再回到ServerBootstrap.init(Channel channel)
void init(Channel channel) throws Exception { //獲取用戶定義的選項(xiàng)(1) final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } //獲取用戶定義的屬性(2) final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //獲取channel的pipline(3) ChannelPipeline p = channel.pipeline(); //work線程組(4) final EventLoopGroup currentChildGroup = childGroup; //用戶設(shè)置的Handler(5) final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; //選項(xiàng)轉(zhuǎn)化為Entry對象(6) synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } //屬性轉(zhuǎn)化為Entry對象(7) synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //添加服務(wù)端handler(8) p.addLast(new ChannelInitializer<Channel>() { //初始化channel @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
我們繼續(xù)看第8步添加服務(wù)端handler
因?yàn)檫@里的handler
是ChannelInitializer
, 所以完成添加之后會調(diào)用ChannelInitializer
的handlerAdded
方法
跟到handlerAdded
方法
ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //默認(rèn)情況下, 會返回true if (ctx.channel().isRegistered()) { initChannel(ctx); } }
因?yàn)閳?zhí)行到這步服務(wù)端channel
已經(jīng)完成注冊, 所以會執(zhí)行到initChannel
方法
ChannelInitializer.initChannel(ChannelHandlerContext ctx)
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { //這段代碼是否被執(zhí)行過 if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { //調(diào)用之后會刪除當(dāng)前節(jié)點(diǎn) remove(ctx); } return true; } return false; }
我們關(guān)注initChannel
這個方法, 這個方法是在ChannelInitializer
的匿名內(nèi)部來實(shí)現(xiàn)的, 這里我們注意, 在initChannel
方法執(zhí)行完畢之后會調(diào)用remove(ctx)
刪除當(dāng)前節(jié)點(diǎn)
繼續(xù)跟進(jìn)initChannel方法
public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
這里首先添加用戶自定義的handler
, 這里如果用戶沒有定義, 則添加不成功, 然后, 會調(diào)用addLast
將ServerBootstrapAcceptor
這個handler
添加了進(jìn)去, 同樣這個handler
也繼承了ChannelInboundHandlerAdapter
, 在這個handler
中, 重寫了channelRead
方法, 所以, 這就是第一個問題的答案
緊接著我們看第二個問題:客戶端handler是什么時候被添加的?
看ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //添加channelHadler, 這個channelHandler, 就是用戶代碼添加的ChannelInitializer child.pipeline().addLast(childHandler); //代碼省略 try { //work線程注冊channel childGroup.register(child).addListener(new ChannelFutureListener() { //代碼省略 }); } catch (Throwable t) { forceClose(child, t); } }
這里真相可以大白了, 服務(wù)端再創(chuàng)建完客戶端channel
之后, 將新創(chuàng)建的NioSocketChannel
作為參數(shù)觸發(fā)channelRead
事件(可以回顧NioMessageUnsafe.read
方法, 代碼這里就不貼了), 所以這里的參數(shù)msg
就是NioSocketChannel
拿到channel
時候再將客戶端的handler
添加進(jìn)去, 我們回顧客戶端handler
的添加過程:
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } });
和服務(wù)端channel
的邏輯一樣, 首先會添加ChannelInitializer
這個handler
但是沒有注冊所以沒有執(zhí)行添加handler
的回調(diào), 將任務(wù)保存到一個延遲回調(diào)的task
中
等客戶端channel
注冊完畢, 會將執(zhí)行添加handler
的回調(diào), 也就是handlerAdded
方法, 在回調(diào)中執(zhí)行initChannel
方法將客戶端handler
添加進(jìn)去, 然后刪除ChannelInitializer
這個handler
因?yàn)樵诜?wù)端channel
中這塊邏輯已經(jīng)進(jìn)行了詳細(xì)的剖析, 所以這邊就不在贅述, 同學(xué)們可以自己跟進(jìn)去走一遍流程
這里注意, 因?yàn)槊縿?chuàng)建一個NioSoeketChannel
都會調(diào)用服務(wù)端ServerBootstrapAcceptor
的channelRead
方法, 所以這里會將每一個NioSocketChannel
的handler
進(jìn)行添加
總結(jié)
本文章分析了事件傳輸?shù)南嚓P(guān)邏輯, 包括handler
的添加, 刪除, inbound
和outbound
以及異常事件的傳輸, 最后結(jié)合第一章和第三章, 剖析了服務(wù)端channel
和客戶端channel
的添加過程,更多關(guān)于netty服務(wù)端請求聯(lián)合pipeline的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java程序圖形用戶界面設(shè)計之標(biāo)簽組件
圖形界面(簡稱GUI)是指采用圖形方式顯示的計算機(jī)操作用戶界面。與早期計算機(jī)使用的命令行界面相比,圖形界面對于用戶來說在視覺上更易于接受,本篇精講Java語言中關(guān)于圖形用戶界面的標(biāo)簽組件部分2022-02-02java使用TimerTask定時器獲取指定網(wǎng)絡(luò)數(shù)據(jù)
java.util.Timer定時器,實(shí)際上是個線程,定時調(diào)度所擁有的TimerTasks。一個TimerTask實(shí)際上就是一個擁有run方法的類,需要定時執(zhí)行的代碼放到run方法體內(nèi),TimerTask一般是以匿名類的方式創(chuàng)建,下面的就用示例來學(xué)習(xí)他的使用方法2014-01-01Spring注解@EnableWebMvc使用的坑點(diǎn)及解析
這篇文章主要介紹了Spring注解@EnableWebMvc使用的坑點(diǎn)及解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09Java Code Cache滿導(dǎo)致應(yīng)用性能降低問題解決
這篇文章主要介紹了Java Code Cache滿導(dǎo)致應(yīng)用性能降低問題解決,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08SpringBoot后臺實(shí)現(xiàn)文件上傳下載
這篇文章主要為大家詳細(xì)介紹了SpringBoot后臺實(shí)現(xiàn)文件上傳下載,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02Deepin系統(tǒng)安裝eclipse2021-03及CDT插件的安裝教程
本教程教大家deepin20.1操作系統(tǒng)上安裝eclipse_2021-03版的詳細(xì)步驟及CDT插件的安裝方法,通過圖文展示的非常明了,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-06-06