Netty分布式編碼器寫(xiě)buffer隊(duì)列邏輯剖析
前文傳送門(mén):抽象編碼器MessageToByteEncoder
寫(xiě)buffer隊(duì)列
之前的小節(jié)我們介紹過(guò), writeAndFlush方法其實(shí)最終會(huì)調(diào)用write和flush方法
write方法最終會(huì)傳遞到head節(jié)點(diǎn), 調(diào)用HeadContext的write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}這里通過(guò)unsafe對(duì)象的write方法, 將消息寫(xiě)入到緩存中, 具體的執(zhí)行邏輯, 我們?cè)谶@個(gè)小節(jié)進(jìn)行剖析
我們跟到AbstractUnsafe的write方法中
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//負(fù)責(zé)緩沖寫(xiě)進(jìn)來(lái)的byteBuf
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
//非堆外內(nèi)存轉(zhuǎn)化為堆外內(nèi)存
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//插入寫(xiě)隊(duì)列
outboundBuffer.addMessage(msg, size, promise);
}首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer
ChannelOutboundBuffer的功能就是緩存寫(xiě)入的ByteBuf
我們繼續(xù)看try塊中的 msg = filterOutboundMessage(msg)
這步的意義就是將非對(duì)外內(nèi)存轉(zhuǎn)化為堆外內(nèi)存
filterOutboundMessage方法方法最終會(huì)調(diào)用AbstractNioByteChannel中的filterOutboundMessage方法:
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
//是堆外內(nèi)存, 直接返回
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}首先判斷msg是否byteBuf對(duì)象, 如果是, 判斷是否堆外內(nèi)存, 如果是堆外內(nèi)存, 則直接返回, 否則, 通過(guò)newDirectBuffer(buf)這種方式轉(zhuǎn)化為堆外內(nèi)存
回到write方法中
outboundBuffer.addMessage(msg, size, promise)將已經(jīng)轉(zhuǎn)化為堆外內(nèi)存的msg插入到寫(xiě)隊(duì)列
我們跟到addMessage方法當(dāng)中, 這是ChannelOutboundBuffer中的方法:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}首先通過(guò) Entry.newInstance(msg, size, total(msg), promise) 的方式將msg封裝成entry
然后通過(guò)調(diào)整tailEntry, flushedEntry, unflushedEntry三個(gè)指針, 完成entry的添加
這三個(gè)指針均是ChannelOutboundBuffer的成員變量
flushedEntry指向第一個(gè)被flush的entry
unflushedEntry指向第一個(gè)未被flush的entry
也就是說(shuō), 從flushedEntry到unflushedEntry之間的entry, 都是被已經(jīng)被flush的entry
tailEntry指向最后一個(gè)entry, 也就是從unflushedEntry到tailEntry之間的entry都是沒(méi)flush的entry
我們回到代碼中:
創(chuàng)建了entry之后首先判斷尾指針是否為空, 在第一次添加的時(shí)候, 均是空, 所以會(huì)將flushedEntry設(shè)置為null, 并且將尾指針設(shè)置為當(dāng)前創(chuàng)建的entry
最后判斷unflushedEntry是否為空, 如果第一次添加這里也是空, 所以這里將unflushedEntry設(shè)置為新創(chuàng)建的entry
第一次添加如下圖所示

7-3-1
如果不是第一次調(diào)用write方法, 則會(huì)進(jìn)入 if (tailEntry == null) 中else塊:
Entry tail = tailEntry 這里tail就是當(dāng)前尾節(jié)點(diǎn)
tail.next = entry 代表尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)指向新創(chuàng)建的entry
tailEntry = entry 將尾節(jié)點(diǎn)也指向entry
這樣就完成了添加操作, 其實(shí)就是將新創(chuàng)建的節(jié)點(diǎn)追加到原來(lái)尾節(jié)點(diǎn)之后
第二次添加 if (unflushedEntry == null) 會(huì)返回false, 所以不會(huì)進(jìn)入if塊
第二次添加之后指針的指向情況如下圖所示:

7-3-4
以后每次調(diào)用write, 如果沒(méi)有調(diào)用flush的話都會(huì)在尾節(jié)點(diǎn)之后進(jìn)行追加
回到代碼中, 看這一步incrementPendingOutboundBytes(size, false)
這步時(shí)統(tǒng)計(jì)當(dāng)前有多少字節(jié)需要被寫(xiě)出, 我們跟到這個(gè)方法中:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
//TOTAL_PENDING_SIZE_UPDATER當(dāng)前緩沖區(qū)里面有多少待寫(xiě)的字節(jié)
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
//getWriteBufferHighWaterMark() 最高不能超過(guò)64k
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}看這一步:
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)
TOTAL_PENDING_SIZE_UPDATER表示當(dāng)前緩沖區(qū)還有多少待寫(xiě)的字節(jié), addAndGet就是將當(dāng)前的ByteBuf的長(zhǎng)度進(jìn)行累加, 累加到newWriteBufferSize中
在繼續(xù)看判斷 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark())
channel.config().getWriteBufferHighWaterMark() 表示寫(xiě)buffer的高水位值, 默認(rèn)是64k, 也就是說(shuō)寫(xiě)buffer的最大長(zhǎng)度不能超過(guò)64k
如果超過(guò)了64k, 則會(huì)調(diào)用setUnwritable(invokeLater)方法設(shè)置寫(xiě)狀態(tài)
我們跟到setUnwritable(invokeLater)方法中
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}這里通過(guò)自旋和cas操作, 傳播一個(gè)ChannelWritabilityChanged事件, 最終會(huì)調(diào)用handler的channelWritabilityChanged方法進(jìn)行處理
以上就是寫(xiě)buffer的相關(guān)邏輯,更多關(guān)于Netty分布式編碼器寫(xiě)buffer隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot應(yīng)用能直接運(yùn)行java -jar的原因分析
這篇文章主要介紹了SpringBoot應(yīng)用為什么能直接運(yùn)行java -jar,首先明確一點(diǎn),普通jar包是不能直接運(yùn)行的,比如工具類jar,要能運(yùn)行,至少得要一個(gè)main函數(shù)作為入口吧?本文給大家介紹了詳細(xì)的原因分析,需要的朋友可以參考下2024-03-03
Java使用BigDecimal進(jìn)行運(yùn)算封裝的實(shí)際案例
今天小編就為大家分享一篇關(guān)于Java使用BigDecimal進(jìn)行運(yùn)算封裝的實(shí)際案例,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
用Eclipse 創(chuàng)建一個(gè)簡(jiǎn)單的web項(xiàng)目(圖文教程)
下面小編就為大家?guī)?lái)一篇用Eclipse 創(chuàng)建一個(gè)簡(jiǎn)單的web項(xiàng)目(圖文教程)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-06-06
辨析Java中的String與StringBuffer及StringBuilder字符串類
這里將為大家來(lái)辨析Java中的String與StringBuffer及StringBuilder字符串類型,通常來(lái)說(shuō)StringBuilder的性能更加,需要的朋友可以參考下2016-05-05
詳解springBoot啟動(dòng)時(shí)找不到或無(wú)法加載主類解決辦法
這篇文章主要介紹了詳解springBoot啟動(dòng)時(shí)找不到或無(wú)法加載主類解決辦法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
mybatis plus自動(dòng)生成器解析(及遇到的坑)
這篇文章主要介紹了mybatis-plus自動(dòng)生成器及遇到的坑,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03

