Netty分布式pipeline管道Handler的添加代碼跟蹤解析
前文傳送門:Netty分布式pipeline管道創(chuàng)建
添加handler
我們以用戶代碼為例進(jìn)行剖析:
.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()[0])); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new SimpleHandler()); } });
用過netty的小伙伴們肯定對(duì)這段代碼不會(huì)陌生, 通過addLast, 可以添加編解碼器和我們自定義的handler, 某一個(gè)事件完成之后可以自動(dòng)調(diào)用我們handler預(yù)先定義的方法, 具體添加和調(diào)用是怎么個(gè)執(zhí)行邏輯, 在我們之后的內(nèi)容會(huì)全部學(xué)習(xí)到, 以后再使用這類的功能會(huì)得心應(yīng)手
在這里, 我們主要剖析 ch.pipeline().addLast(new SimpleHandler()) 這部分代碼的addLast()方法
首先通過channel拿到當(dāng)前的pipline, 這個(gè)上一小節(jié)進(jìn)行剖析過相信不會(huì)陌生
拿到pipeline之后再為其添加handler, 因?yàn)閏hannel初始化默認(rèn)創(chuàng)建的是DefualtChannelPipeline
我們跟到其addLast()方法中
public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); }
首先看到這里的參數(shù)其實(shí)是一個(gè)可變對(duì)象, 也就是可以傳遞多個(gè)handler, 這里我們只傳遞了一個(gè)
我們繼續(xù)跟addLast:
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } //傳多個(gè)參數(shù)的時(shí)候通過for循環(huán)添加 for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; }
這里如果傳入多個(gè)handler則會(huì)循環(huán)添加, 我們通常只添加一個(gè)
再繼續(xù)跟到addLast()方法中去
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判斷handler是否被重復(fù)添加(1) checkMultiplicity(handler); //創(chuàng)建一個(gè)HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注冊(cè) if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回調(diào)用戶事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回調(diào)添加事件(4) callHandlerAdded0(newCtx); return this; }
這部分代碼比較長(zhǎng), 我們拆解為4個(gè)步驟:
1.重復(fù)添加驗(yàn)證
2.創(chuàng)建一個(gè)HandlerContext并添加到列表
3. 添加context
4. 回調(diào)添加事件
首先我們看第一步, 重復(fù)添加驗(yàn)證
我們跟到checkMultiplicity(handler)中
private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } //滿足條件設(shè)置為true, 代表已添加 h.added = true; } }
首先判斷是不是ChannelHandlerAdapter類型, 因?yàn)槲覀冏远x的handler通常會(huì)直接或者間接的繼承該接口, 所以這里為true
拿到handler之后轉(zhuǎn)換成ChannelHandlerAdapter類型, 然后進(jìn)行條件判斷
if (!h.isSharable() && h.added) 代表如果不是共享的handler, 并且是未添加狀態(tài), 則拋出異常:
我們可以跟到isSharable()方法中去:
public boolean isSharable() { Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { //如果這個(gè)類注解了Sharable.class, 說明這個(gè)類會(huì)被多個(gè)channel共享 sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; }
首先拿到當(dāng)前handler的class對(duì)象
然后再從netty自定義的一個(gè)ThreadLocalMap對(duì)象中獲取一個(gè)盛放handler的class對(duì)象的map, 并獲取其value
如果value值為空, 則會(huì)判斷是否被Sharable注解, 并將自身handler的class對(duì)象和判斷結(jié)果存入map對(duì)象中, 最后返回判斷結(jié)果
這說明了被Sharable注解的handler是一個(gè)共享handler
從這個(gè)邏輯我們可以判斷, 共享對(duì)象是可以重復(fù)添加的
我們回到checkMultiplicity(handler)方法中:
如果是共享對(duì)象或者沒有被添加, 則將ChannelHandlerAdapter的added設(shè)置為true, 代表已添加
剖析完了重復(fù)添加驗(yàn)證, 回到addLast方法中, 我們看第二步, 創(chuàng)建一個(gè)HandlerContext并添加到列表:
newCtx = newContext(group, filterName(name, handler), handler);
首先看filterName(name, handler)方法, 這個(gè)方法是判斷添加handler的name是否重復(fù)
跟到filterName方法中
private String filterName(String name, ChannelHandler handler) { if (name == null) { //沒有名字創(chuàng)建默認(rèn)名字 return generateName(handler); } //檢查名字是否重復(fù) checkDuplicateName(name); return name; }
因?yàn)槲覀兲砑觝andler時(shí)候, 不一定會(huì)會(huì)給handler命名, 所以這一步name有可能是null, 如果是null, 則創(chuàng)建一個(gè)默認(rèn)的名字, 這里創(chuàng)建名字的方法我們就不往里跟了, 有興趣的同學(xué)可以自己跟進(jìn)去看
然后再檢查名字是否重復(fù)
我們跟到checkDuplicateName(name)這個(gè)方法中:
private void checkDuplicateName(String name) { //不為空 if (context0(name) != null) { throw new IllegalArgumentException("Duplicate handler name: " + name); } }
這里有個(gè)context0(name)方法, 我們跟進(jìn)去:
private AbstractChannelHandlerContext context0(String name) { //遍歷pipeline AbstractChannelHandlerContext context = head.next; while (context != tail) { //發(fā)現(xiàn)name相同, 說明存在handler if (context.name().equals(name)) { //返回 return context; } context = context.next; } return null; }
這里做的操作非常簡(jiǎn)單, 就是將pipeline中, 從head節(jié)點(diǎn)往下遍歷HandlerContext, 一直遍歷到tail, 如果發(fā)現(xiàn)名字相同則會(huì)認(rèn)為重復(fù)并返回HandlerContext對(duì)象
我們回到addLast()方法中并繼續(xù)看添加創(chuàng)建相關(guān)的邏輯:
newCtx = newContext(group, filterName(name, handler), handler)
filterName(name, handler)這步如果并沒有重復(fù)則會(huì)返回handler的name
我們繼續(xù)跟到newContext(group, filterName(name, handler), handler)方法中:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
這里我們看到創(chuàng)建了一個(gè)DefaultChannelHandlerContext對(duì)象, 構(gòu)造方法的參數(shù)中, 第一個(gè)this代表當(dāng)前的pipeline對(duì)象, group為null, 所以childExecutor(group)也會(huì)返回null, name為handler的名字, handler為新添加的handler對(duì)象
我們繼續(xù)跟到DefaultChannelHandlerContext的構(gòu)造方法中:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }
我們看到首先調(diào)用了父類的構(gòu)造方法, 之后將handler賦值為自身handler的成員變量, HandlerConext和handler關(guān)系在此也展現(xiàn)了出來, 是一種組合關(guān)系
我們首先看父類的構(gòu)造方法, 有這么兩個(gè)參數(shù):isInbound(handler), isOutbound(handler), 這兩個(gè)參數(shù)意思是判斷需要添加的handler是inboundHandler還是outBoundHandler
跟到isInbound(handler)方法中
private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; }
這里通過是否實(shí)現(xiàn)ChannelInboundHandler接口來判斷是否為inboundhandler
同樣我們看isOutbound(handler)方法:
private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }
通過判斷是否實(shí)現(xiàn)ChannelOutboundHandler接口判斷是否為outboundhandler
在跟到其父類AbstractChannelHandlerContext的構(gòu)造方法中:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; }
一切都不陌生了, 因?yàn)槲覀僼ail節(jié)點(diǎn)和head節(jié)點(diǎn)創(chuàng)建的時(shí)候同樣走到了這里
這里初始化了name, pipeline, 以及標(biāo)識(shí)添加的handler是inboundhanlder還是outboundhandler
我們回到最初的addLast()方法中
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { //判斷handler是否被重復(fù)添加(1) checkMultiplicity(handler); //創(chuàng)建一個(gè)HandlerContext并添加到列表(2) newCtx = newContext(group, filterName(name, handler), handler); //添加HandlerContext(3) addLast0(newCtx); //是否已注冊(cè) if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); //回調(diào)用戶事件 executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } //回調(diào)添加事件(4) callHandlerAdded0(newCtx); return this; }
我們跟完了創(chuàng)建HandlerContext的相關(guān)邏輯, 我們繼續(xù)跟第三步, 添加HandlerContext
我們跟進(jìn)addLast0(newCtx)中
private void addLast0(AbstractChannelHandlerContext newCtx) { //拿到tail節(jié)點(diǎn)的前置節(jié)點(diǎn) AbstractChannelHandlerContext prev = tail.prev; //當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)賦值為tail節(jié)點(diǎn)的前置節(jié)點(diǎn) newCtx.prev = prev; //當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值為tail節(jié)點(diǎn) newCtx.next = tail; //tail前置節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值為當(dāng)前節(jié)點(diǎn) prev.next = newCtx; //tail節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)賦值為當(dāng)前節(jié)點(diǎn) tail.prev = newCtx; }
這一部分也非常簡(jiǎn)單, 做了一個(gè)指針的指向操作, 將新添加的handlerConext放在tail節(jié)點(diǎn)之前, 之前tail節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)之后, 熟悉雙向鏈表的同學(xué)對(duì)此邏輯應(yīng)該不會(huì)陌生, 如果是第一次添加handler, 那么添加后的結(jié)構(gòu)入下圖所示:
添加完handler之后, 這里會(huì)判斷當(dāng)前channel是否已經(jīng)注冊(cè), 這部分邏輯我們之后再進(jìn)行剖析, 我們繼續(xù)往下走
之后會(huì)判斷當(dāng)前線程線程是否為eventLoop線程, 如果不是eventLoop線程, 就將添加回調(diào)事件封裝成task交給eventLoop線程執(zhí)行, 否則, 直接執(zhí)行添加回調(diào)事件callHandlerAdded0(newCtx)
跟進(jìn)callHandlerAdded0(newCtx)
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { //忽略代碼 } }
我們重點(diǎn)關(guān)注這句
ctx.handler().handlerAdded(ctx);
其中ctx是我們新創(chuàng)建的HandlerContext, 通過handler()方法拿到綁定的handler, 也就是新添加的handler, 然后執(zhí)行handlerAdded(ctx)方法, 如果我們沒有重寫這個(gè)方法, 則會(huì)執(zhí)行父類的該方法
在ChannelHandlerAdapter類中定義了該方法的實(shí)現(xiàn):
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
我們看到?jīng)]做任何操作, 也就是如果我們沒有重寫該方法時(shí), 如果添加handler之后將不會(huì)做任何操作, 這里如果我們需要做一些業(yè)務(wù)邏輯, 可以通過重寫該方法進(jìn)行實(shí)現(xiàn)
以上就是添加handler的有關(guān)的業(yè)務(wù)邏輯,更多關(guān)于Netty分布式pipeline管道添加Handler的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java線程優(yōu)先級(jí)和守護(hù)線程原理解析
這篇文章主要介紹了Java線程優(yōu)先級(jí)和守護(hù)線程原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03java多線程Synchronized實(shí)現(xiàn)可見性原理解析
這篇文章主要介紹了java多線程Synchronized實(shí)現(xiàn)可見性原理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12JAVA基礎(chǔ)類庫之String類,StringBuffer類和StringBuilder類
這篇文章主要介紹了Java中基礎(chǔ)類庫的String類,StringBuffer類和StringBuilder類,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2021-09-09SpringBoot+SpringSecurity+JWT實(shí)現(xiàn)系統(tǒng)認(rèn)證與授權(quán)示例
本文主要介紹了SpringBoot+SpringSecurity+JWT實(shí)現(xiàn)系統(tǒng)認(rèn)證與授權(quán)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08Java如果通過jdbc操作連接oracle數(shù)據(jù)庫
這篇文章主要介紹了Java如果通過jdbc操作連接oracle數(shù)據(jù)庫,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09hibernate-validator改進(jìn)校驗(yàn)框架validator?v0.4使用
這篇文章主要為大家介紹了改進(jìn)?hibernate-validator,新一代校驗(yàn)框架?validator?使用介紹?v0.4,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪<BR>2023-03-03Java中的Vector和ArrayList區(qū)別及比較
這篇文章主要介紹了Java中的Vector和ArrayList區(qū)別及比較,本文從API、同步、數(shù)據(jù)增長(zhǎng)、使用模式4個(gè)方面總結(jié)了它們之間的不同之處,需要的朋友可以參考下2015-03-03基于Java實(shí)現(xiàn)簡(jiǎn)單的郵件群發(fā)功能
這篇文章主要為大家詳細(xì)介紹了如何利用Java語言編寫一個(gè)簡(jiǎn)單的工具類,可以實(shí)現(xiàn)郵件群發(fā)功能。文中的示例代碼講解詳細(xì),需要的可以參考一下2022-05-05