Netty分布式flush方法刷新buffer隊(duì)列源碼剖析
flush方法
上一小節(jié)學(xué)習(xí)了writeAndFlush的write方法, 這一小節(jié)我們剖析flush方法
通過前面的學(xué)習(xí)我們知道, flush方法通過事件傳遞, 最終會(huì)傳遞到HeadContext的flush方法:
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}這里最終會(huì)調(diào)用AbstractUnsafe的flush方法
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}這里首先也是拿到ChannelOutboundBuffer對(duì)象
然后我們看這一步:
outboundBuffer.addFlush();
這一步同樣也是調(diào)整ChannelOutboundBuffer的指針
跟進(jìn)addFlush方法
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}首先聲明一個(gè)entry指向unflushedEntry, 也就是第一個(gè)未flush的entry
通常情況下unflushedEntry是不為空的, 所以進(jìn)入if
再未刷新前flushedEntry通常為空, 所以會(huì)執(zhí)行到flushedEntry = entry
也就是flushedEntry指向entry
經(jīng)過上述操作, 緩沖區(qū)的指針情況如圖所示:

7-4-1
然后通過do-while將, 不斷尋找unflushedEntry后面的節(jié)點(diǎn), 直到?jīng)]有節(jié)點(diǎn)為止
flushed自增代表需要刷新多少個(gè)節(jié)點(diǎn)
循環(huán)中我們關(guān)注這一步
decrementPendingOutboundBytes(pending, false, true);
這一步也是統(tǒng)計(jì)緩沖區(qū)中的字節(jié)數(shù), 但是是和上一小節(jié)的incrementPendingOutboundBytes正好是相反, 因?yàn)檫@里是刷新, 所以這里要減掉刷新后的字節(jié)數(shù),
我們跟到方法中:
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
//從總的大小減去
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
//直到減到小于某一個(gè)閾值32個(gè)字節(jié)
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
//設(shè)置寫狀態(tài)
setWritable(invokeLater);
}
}同樣TOTAL_PENDING_SIZE_UPDATER代表緩沖區(qū)的字節(jié)數(shù), 這里的addAndGet中參數(shù)是-size, 也就是減掉size的長(zhǎng)度
再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark())
getWriteBufferLowWaterMark()代表寫buffer的第水位值, 也就是32k, 如果寫buffer的長(zhǎng)度小于這個(gè)數(shù), 就通過setWritable方法設(shè)置寫狀態(tài)
也就是通道由原來的不可寫改成可寫
回到addFlush方法
遍歷do-while循環(huán)結(jié)束之后, 將unflushedEntry指為空, 代表所有的entry都是可寫的
經(jīng)過上述操作, 緩沖區(qū)的指針情況如下圖所示:

7-4-2
回到AbstractUnsafe的flush方法
指針調(diào)整完之后, 我們跟到flush0()方法中:
protected void flush0() {
if (inFlush0) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}if (inFlush0) 表示判斷當(dāng)前flush是否在進(jìn)行中, 如果在進(jìn)行中, 則返回, 避免重復(fù)進(jìn)入
我們重點(diǎn)關(guān)注doWrite方法
跟到AbstractNioByteChannel的doWrite方法中去:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
//每次拿到當(dāng)前節(jié)點(diǎn)
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}
if (msg instanceof ByteBuf) {
//轉(zhuǎn)化成ByteBuf
ByteBuf buf = (ByteBuf) msg;
//如果沒有可寫的值
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
//移除
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
//將buf寫入到socket里面
//localFlushedAmount代表向jdk底層寫了多少字節(jié)
int localFlushedAmount = doWriteBytes(buf);
//如果一個(gè)字節(jié)沒寫, 直接break
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
//統(tǒng)計(jì)總共寫了多少字節(jié)
flushedAmount += localFlushedAmount;
//如果buffer全部寫到j(luò)dk底層
if (!buf.isReadable()) {
//標(biāo)記全寫道
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
//移除當(dāng)前對(duì)象
in.remove();
} else {
break;
}
} else if (msg instanceof FileRegion) {
//代碼省略
} else {
throw new Error();
}
}
incompleteWrite(setOpWrite);
}首先是一個(gè)無限for循環(huán)
Object msg = in.current() 這一步是拿到flushedEntry指向的entry中的msg
跟到current()方法中
public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}這里直接拿到flushedEntry指向的entry中關(guān)聯(lián)的msg, 也就是一個(gè)ByteBuf
回到doWrite方法:
如果msg為null, 說明沒有可以刷新的entry, 則調(diào)用clearOpWrite()方法清除寫標(biāo)識(shí)
如果msg不為null, 則會(huì)判斷是否是ByteBuf類型, 如果是ByteBuf, 就進(jìn)入if塊中的邏輯
if塊中首先將msg轉(zhuǎn)化為ByteBuf, 然后判斷ByteBuf是否可讀, 如果不可讀, 則通過in.remove()將當(dāng)前的byteBuf所關(guān)聯(lián)的entry移除, 然后跳過這次循環(huán)進(jìn)入下次循環(huán)
remove方法稍后分析, 這里我們先繼續(xù)往下看
boolean done = false 這里設(shè)置一個(gè)標(biāo)識(shí), 標(biāo)識(shí)刷新操作是否執(zhí)行完成, 這里默認(rèn)值為false代表走到這里沒有執(zhí)行完成
writeSpinCount = config().getWriteSpinCount() 這里是獲得一個(gè)寫操作的循環(huán)次數(shù), 默認(rèn)是16
然后根據(jù)這個(gè)循環(huán)次數(shù), 進(jìn)行循環(huán)的寫操作
在循環(huán)中, 關(guān)注這一步:
int localFlushedAmount = doWriteBytes(buf);
這一步就是將buf的內(nèi)容寫到channel中, 并返回寫的字節(jié)數(shù), 這里會(huì)調(diào)用NioSocketChannel的doWriteBytes
我們跟到doWriteBytes方法中:
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}這里首先拿到buf的可讀字節(jié)數(shù), 然后通過readBytes將可讀字節(jié)寫入到j(luò)dk底層的channel中
回到doWrite方法:
將內(nèi)容寫的jdk底層的channel之后, 如果一個(gè)字節(jié)都沒寫, 說明現(xiàn)在channel可能不可寫, 將setOpWrite設(shè)置為true, 用于標(biāo)識(shí)寫操作位, 并退出循環(huán)
如果已經(jīng)寫出字節(jié), 則通過 flushedAmount += localFlushedAmount 累加寫出的字節(jié)數(shù)
然后根據(jù)是buf是否沒有可讀字節(jié)數(shù)判斷是否buf的數(shù)據(jù)已經(jīng)寫完, 如果寫完, 將done設(shè)置為true, 說明寫操作完成, 并退出循環(huán)
因?yàn)橛袝r(shí)候不一定一次就能將byteBuf所有的字節(jié)寫完, 所以這里會(huì)繼續(xù)通過循環(huán)進(jìn)行寫出, 直到循環(huán)到16次
如果ByteBuf內(nèi)容完全寫完, 會(huì)通過in.remove()將當(dāng)前entry移除掉
我們跟到remove方法中:
public boolean remove() {
//拿到當(dāng)前第一個(gè)flush的entry
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
e.recycle();
return true;
}首先拿到當(dāng)前的flushedEntry
我們重點(diǎn)關(guān)注removeEntry這步, 跟進(jìn)去:
private void removeEntry(Entry e) {
if (-- flushed == 0) {
//位置為空
flushedEntry = null;
//如果是最后一個(gè)節(jié)點(diǎn)
if (e == tailEntry) {
//全部設(shè)置為空
tailEntry = null;
unflushedEntry = null;
}
} else {
//移動(dòng)到下一個(gè)節(jié)點(diǎn)
flushedEntry = e.next;
}
}if (-- flushed == 0) 表示當(dāng)前節(jié)點(diǎn)是否為需要刷新的最后一個(gè)節(jié)點(diǎn), 如果是, 則flushedEntry指針設(shè)置為空
如果當(dāng)前節(jié)點(diǎn)是tailEntry節(jié)點(diǎn), 說明當(dāng)前節(jié)點(diǎn)是最后一個(gè)節(jié)點(diǎn), 將tailEntry和unflushedEntry兩個(gè)指針全部設(shè)置為空
如果當(dāng)前節(jié)點(diǎn)不是需要刷新的最后的一個(gè)節(jié)點(diǎn), 則通過 flushedEntry = e.nex t這步將flushedEntry指針移動(dòng)到下一個(gè)節(jié)點(diǎn)
以上就是flush操作的相關(guān)邏輯,更多關(guān)于Netty分布式flush方法刷新buffer隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
如何利用Java?AWT?創(chuàng)建一個(gè)簡(jiǎn)易計(jì)算器
這篇文章主要介紹了如何利用Java?AWT?創(chuàng)建一個(gè)簡(jiǎn)易計(jì)算器,AWT?是一個(gè)有助于構(gòu)建?GUI?的?API?基于?java?應(yīng)用程序,下面關(guān)于其相關(guān)資料實(shí)現(xiàn)計(jì)算器的內(nèi)容詳細(xì),需要的朋友可以參考一下2022-03-03
java?Export大量數(shù)據(jù)導(dǎo)出和打包
這篇文章主要為大家介紹了java?Export大量數(shù)據(jù)的導(dǎo)出和打包實(shí)現(xiàn)過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
使用Feign配置請(qǐng)求頭以及支持Https協(xié)議
這篇文章主要介紹了使用Feign配置請(qǐng)求頭以及支持Https協(xié)議,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能示例
這篇文章主要介紹了Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能,簡(jiǎn)單分析了計(jì)算機(jī)數(shù)值進(jìn)制轉(zhuǎn)換與通過位移進(jìn)行乘積計(jì)算的原理,并結(jié)合具體實(shí)例給出了java模擬計(jì)算機(jī)成績(jī)運(yùn)算的相關(guān)操作技巧,需要的朋友可以參考下2017-09-09
Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法
在本篇文章里小編給大家整理的是關(guān)于Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法和實(shí)例,需要的朋友們可以參考下。2020-02-02

