Netty分布式編碼器及寫數(shù)據(jù)事件處理使用場景
概述
上一小章我們介紹了解碼器, 這一章我們介紹編碼器
其實編碼器和解碼器比較類似, 編碼器也是一個handler, 并且屬于outbounfHandle, 就是將準備發(fā)出去的數(shù)據(jù)進行攔截, 攔截之后進行相應的處理之后再次進發(fā)送處理, 如果理解了解碼器, 那么編碼器的相關內容理解起來也比較容易
編碼器
第一節(jié): writeAndFlush的事件傳播
我們之前在學習pipeline的時候, 講解了write事件的傳播過程, 但在實際使用的時候, 我們通常不會調用channel的write方法, 因為該方法只會寫入到發(fā)送數(shù)據(jù)的緩存中, 并不會直接寫入channel中, 如果想寫入到channel中, 還需要調用flush方法
實際使用過程中, 我們用的更多的是writeAndFlush方法, 這方法既能將數(shù)據(jù)寫到發(fā)送緩存中, 也能刷新到channel中
我們看一個最簡單的使用的場景
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.channel().writeAndFlush("test data"); }
學過netty的同學們對此肯定不陌生, 通過這種方式, 可以將數(shù)據(jù)發(fā)送到channel中, 對方可以收到響應
我們跟到writeAndFlush方法中
首先會走到AbstractChannel的writeAndFlush:
public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
繼續(xù)跟到DefualtChannelPipeline中的writeAndFlush方法中:
public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
這里我們看到, writeAndFlush是從tail節(jié)點進行傳播, 有關事件傳播, 我們再pipeline中進行過剖析, 相信這個不會陌生
繼續(xù)跟, 會跟到AbstractChannelHandlerContext中的writeAndFlush方法:
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); }
繼續(xù)跟:
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, true, promise); return promise; }
繼續(xù)跟write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) { //findContextOutbound()尋找前一個outbound節(jié)點 //最后到head節(jié)點結束 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { //沒有調flush next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
這里的邏輯我們也不陌生, 找到下一個節(jié)點, 因為writeAndFlush是從tail節(jié)點開始的, 并且是outBound的事件, 所以這里會找到tail節(jié)點的上一個outBoundHandler, 有可能是編碼器, 也有可能是我們業(yè)務處理的handler
if (executor.inEventLoop()) 判斷是否是eventLoop線程, 如果不是, 則封裝成task通過nioEventLoop異步執(zhí)行, 我們這里先按照是eventLoop線程分析
首先, 這里通過flush判斷是否調用了flush, 這里顯然是true, 因為我們調用的方法是writeAndFlush
我們跟到invokeWriteAndFlush中
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入 invokeWrite0(msg, promise); //刷新 invokeFlush0(); } else { writeAndFlush(msg, promise); } }
這里就真相大白了, 其實在writeAndFlush中, 首先調用write, write完成之后再調用flush方法進行的刷新
首先跟到invokeWrite0方法中:
private void invokeWrite0(Object msg, ChannelPromise promise) { try { //調用當前handler的wirte()方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
該方法我們在pipeline中已經(jīng)進行過分析, 就是調用當前handler的write方法, 如果當前handler中write方法是繼續(xù)往下傳播, 在會繼續(xù)傳播寫事件, 直到傳播到head節(jié)點, 最后會走到HeadContext的write方法中
跟到HeadContext的write方法中:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這里通過當前channel的unsafe對象對將當前消息寫到緩存中, 寫入的過程, 我們之后的小節(jié)進行分析
回到到invokeWriteAndFlush方法中:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { //寫入 invokeWrite0(msg, promise); //刷新 invokeFlush0(); } else { writeAndFlush(msg, promise); } }
我們再看invokeFlush0方法
private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } }
同樣, 這里會調用當前handler的flush方法, 如果當前handler的flush方法是繼續(xù)傳播flush事件, 則flush事件會繼續(xù)往下傳播, 直到最后會調用head節(jié)點的flush方法, 如果我們熟悉pipeline的話, 對這里的邏輯不會陌生
跟到HeadContext的flush方法中:
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
這里同樣, 會通過當前channel的unsafe對象通過調用flush方法將緩存的數(shù)據(jù)刷新到channel中, 有關刷新的邏輯, 我們會在以后的小節(jié)進行剖析
以上就是writeAndFlush的相關邏輯, 整體上比較簡單, 熟悉pipeline的同學應該很容易理解
更多關于Netty分布式編碼器及寫數(shù)據(jù)事件的資料請關注腳本之家其它相關文章!
相關文章
詳解JAVA中implement和extends的區(qū)別
這篇文章主要介紹了詳解JAVA中implement和extends的區(qū)別的相關資料,extends是繼承接口,implement是一個類實現(xiàn)一個接口的關鍵字,需要的朋友可以參考下2017-08-08springMVC實現(xiàn)前臺帶進度條文件上傳的示例代碼
本篇文章主要介紹了springMVC實現(xiàn)前臺帶進度條文件上傳的示例代碼,具有一定的參考價值,有興趣的可以了解一下。2017-01-01SpringBoot+OCR實現(xiàn)PDF內容識別的示例代碼
在SpringBoot中,您可以結合OCR庫來實現(xiàn)對PDF文件內容的識別和提取,本文就來介紹一下如何使用 Tesseract 和 pdf2image 對 PDF 文件進行OCR識別和提取,具有一定的參考價值,感興趣的可以了解一下2023-12-12