Netty分布式flush方法刷新buffer隊(duì)列源碼剖析
flush方法
上一小節(jié)學(xué)習(xí)了writeAndFlush的write方法, 這一小節(jié)我們剖析flush方法
通過前面的學(xué)習(xí)我們知道, flush方法通過事件傳遞, 最終會傳遞到HeadContext的flush方法:
public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
這里最終會調(diào)用AbstractUnsafe的flush方法
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
這里首先也是拿到ChannelOutboundBuffer對象
然后我們看這一步:
outboundBuffer.addFlush();
這一步同樣也是調(diào)整ChannelOutboundBuffer的指針
跟進(jìn)addFlush方法
public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; } }
首先聲明一個entry指向unflushedEntry, 也就是第一個未flush的entry
通常情況下unflushedEntry是不為空的, 所以進(jìn)入if
再未刷新前flushedEntry通常為空, 所以會執(zhí)行到flushedEntry = entry
也就是flushedEntry指向entry
經(jīng)過上述操作, 緩沖區(qū)的指針情況如圖所示:
7-4-1
然后通過do-while將, 不斷尋找unflushedEntry后面的節(jié)點(diǎn), 直到?jīng)]有節(jié)點(diǎn)為止
flushed自增代表需要刷新多少個節(jié)點(diǎn)
循環(huán)中我們關(guān)注這一步
decrementPendingOutboundBytes(pending, false, true);
這一步也是統(tǒng)計(jì)緩沖區(qū)中的字節(jié)數(shù), 但是是和上一小節(jié)的incrementPendingOutboundBytes正好是相反, 因?yàn)檫@里是刷新, 所以這里要減掉刷新后的字節(jié)數(shù),
我們跟到方法中:
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } //從總的大小減去 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //直到減到小于某一個閾值32個字節(jié) if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { //設(shè)置寫狀態(tài) setWritable(invokeLater); } }
同樣TOTAL_PENDING_SIZE_UPDATER代表緩沖區(qū)的字節(jié)數(shù), 這里的addAndGet中參數(shù)是-size, 也就是減掉size的長度
再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark())
getWriteBufferLowWaterMark()代表寫buffer的第水位值, 也就是32k, 如果寫buffer的長度小于這個數(shù), 就通過setWritable方法設(shè)置寫狀態(tài)
也就是通道由原來的不可寫改成可寫
回到addFlush方法
遍歷do-while循環(huán)結(jié)束之后, 將unflushedEntry指為空, 代表所有的entry都是可寫的
經(jīng)過上述操作, 緩沖區(qū)的指針情況如下圖所示:
7-4-2
回到AbstractUnsafe的flush方法
指針調(diào)整完之后, 我們跟到flush0()方法中:
protected void flush0() { if (inFlush0) { return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
if (inFlush0) 表示判斷當(dāng)前flush是否在進(jìn)行中, 如果在進(jìn)行中, 則返回, 避免重復(fù)進(jìn)入
我們重點(diǎn)關(guān)注doWrite方法
跟到AbstractNioByteChannel的doWrite方法中去:
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { //每次拿到當(dāng)前節(jié)點(diǎn) Object msg = in.current(); if (msg == null) { clearOpWrite(); return; } if (msg instanceof ByteBuf) { //轉(zhuǎn)化成ByteBuf ByteBuf buf = (ByteBuf) msg; //如果沒有可寫的值 int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //移除 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { //將buf寫入到socket里面 //localFlushedAmount代表向jdk底層寫了多少字節(jié) int localFlushedAmount = doWriteBytes(buf); //如果一個字節(jié)沒寫, 直接break if (localFlushedAmount == 0) { setOpWrite = true; break; } //統(tǒng)計(jì)總共寫了多少字節(jié) flushedAmount += localFlushedAmount; //如果buffer全部寫到j(luò)dk底層 if (!buf.isReadable()) { //標(biāo)記全寫道 done = true; break; } } in.progress(flushedAmount); if (done) { //移除當(dāng)前對象 in.remove(); } else { break; } } else if (msg instanceof FileRegion) { //代碼省略 } else { throw new Error(); } } incompleteWrite(setOpWrite); }
首先是一個無限for循環(huán)
Object msg = in.current() 這一步是拿到flushedEntry指向的entry中的msg
跟到current()方法中
public Object current() { Entry entry = flushedEntry; if (entry == null) { return null; } return entry.msg; }
這里直接拿到flushedEntry指向的entry中關(guān)聯(lián)的msg, 也就是一個ByteBuf
回到doWrite方法:
如果msg為null, 說明沒有可以刷新的entry, 則調(diào)用clearOpWrite()方法清除寫標(biāo)識
如果msg不為null, 則會判斷是否是ByteBuf類型, 如果是ByteBuf, 就進(jìn)入if塊中的邏輯
if塊中首先將msg轉(zhuǎn)化為ByteBuf, 然后判斷ByteBuf是否可讀, 如果不可讀, 則通過in.remove()將當(dāng)前的byteBuf所關(guān)聯(lián)的entry移除, 然后跳過這次循環(huán)進(jìn)入下次循環(huán)
remove方法稍后分析, 這里我們先繼續(xù)往下看
boolean done = false 這里設(shè)置一個標(biāo)識, 標(biāo)識刷新操作是否執(zhí)行完成, 這里默認(rèn)值為false代表走到這里沒有執(zhí)行完成
writeSpinCount = config().getWriteSpinCount() 這里是獲得一個寫操作的循環(huán)次數(shù), 默認(rèn)是16
然后根據(jù)這個循環(huán)次數(shù), 進(jìn)行循環(huán)的寫操作
在循環(huán)中, 關(guān)注這一步:
int localFlushedAmount = doWriteBytes(buf);
這一步就是將buf的內(nèi)容寫到channel中, 并返回寫的字節(jié)數(shù), 這里會調(diào)用NioSocketChannel的doWriteBytes
我們跟到doWriteBytes方法中:
protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
這里首先拿到buf的可讀字節(jié)數(shù), 然后通過readBytes將可讀字節(jié)寫入到j(luò)dk底層的channel中
回到doWrite方法:
將內(nèi)容寫的jdk底層的channel之后, 如果一個字節(jié)都沒寫, 說明現(xiàn)在channel可能不可寫, 將setOpWrite設(shè)置為true, 用于標(biāo)識寫操作位, 并退出循環(huán)
如果已經(jīng)寫出字節(jié), 則通過 flushedAmount += localFlushedAmount 累加寫出的字節(jié)數(shù)
然后根據(jù)是buf是否沒有可讀字節(jié)數(shù)判斷是否buf的數(shù)據(jù)已經(jīng)寫完, 如果寫完, 將done設(shè)置為true, 說明寫操作完成, 并退出循環(huán)
因?yàn)橛袝r候不一定一次就能將byteBuf所有的字節(jié)寫完, 所以這里會繼續(xù)通過循環(huán)進(jìn)行寫出, 直到循環(huán)到16次
如果ByteBuf內(nèi)容完全寫完, 會通過in.remove()將當(dāng)前entry移除掉
我們跟到remove方法中:
public boolean remove() { //拿到當(dāng)前第一個flush的entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } e.recycle(); return true; }
首先拿到當(dāng)前的flushedEntry
我們重點(diǎn)關(guān)注removeEntry這步, 跟進(jìn)去:
private void removeEntry(Entry e) { if (-- flushed == 0) { //位置為空 flushedEntry = null; //如果是最后一個節(jié)點(diǎn) if (e == tailEntry) { //全部設(shè)置為空 tailEntry = null; unflushedEntry = null; } } else { //移動到下一個節(jié)點(diǎn) flushedEntry = e.next; } }
if (-- flushed == 0) 表示當(dāng)前節(jié)點(diǎn)是否為需要刷新的最后一個節(jié)點(diǎn), 如果是, 則flushedEntry指針設(shè)置為空
如果當(dāng)前節(jié)點(diǎn)是tailEntry節(jié)點(diǎn), 說明當(dāng)前節(jié)點(diǎn)是最后一個節(jié)點(diǎn), 將tailEntry和unflushedEntry兩個指針全部設(shè)置為空
如果當(dāng)前節(jié)點(diǎn)不是需要刷新的最后的一個節(jié)點(diǎn), 則通過 flushedEntry = e.nex t這步將flushedEntry指針移動到下一個節(jié)點(diǎn)
以上就是flush操作的相關(guān)邏輯,更多關(guān)于Netty分布式flush方法刷新buffer隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
如何利用Java?AWT?創(chuàng)建一個簡易計(jì)算器
這篇文章主要介紹了如何利用Java?AWT?創(chuàng)建一個簡易計(jì)算器,AWT?是一個有助于構(gòu)建?GUI?的?API?基于?java?應(yīng)用程序,下面關(guān)于其相關(guān)資料實(shí)現(xiàn)計(jì)算器的內(nèi)容詳細(xì),需要的朋友可以參考一下2022-03-03java?Export大量數(shù)據(jù)導(dǎo)出和打包
這篇文章主要為大家介紹了java?Export大量數(shù)據(jù)的導(dǎo)出和打包實(shí)現(xiàn)過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能示例
這篇文章主要介紹了Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能,簡單分析了計(jì)算機(jī)數(shù)值進(jìn)制轉(zhuǎn)換與通過位移進(jìn)行乘積計(jì)算的原理,并結(jié)合具體實(shí)例給出了java模擬計(jì)算機(jī)成績運(yùn)算的相關(guān)操作技巧,需要的朋友可以參考下2017-09-09Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法
在本篇文章里小編給大家整理的是關(guān)于Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法和實(shí)例,需要的朋友們可以參考下。2020-02-02