欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式flush方法刷新buffer隊(duì)列源碼剖析

 更新時間:2022年03月29日 15:22:45   作者:向南是個萬人迷  
這篇文章主要為大家介紹了Netty分布式flush方法刷新buffer隊(duì)列源碼剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

flush方法

上一小節(jié)學(xué)習(xí)了writeAndFlush的write方法, 這一小節(jié)我們剖析flush方法

通過前面的學(xué)習(xí)我們知道, flush方法通過事件傳遞, 最終會傳遞到HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

這里最終會調(diào)用AbstractUnsafe的flush方法

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

這里首先也是拿到ChannelOutboundBuffer對象

然后我們看這一步:

outboundBuffer.addFlush();

這一步同樣也是調(diào)整ChannelOutboundBuffer的指針

跟進(jìn)addFlush方法

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) { 
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedEntry = null;
    }
}

首先聲明一個entry指向unflushedEntry, 也就是第一個未flush的entry

通常情況下unflushedEntry是不為空的, 所以進(jìn)入if

再未刷新前flushedEntry通常為空, 所以會執(zhí)行到flushedEntry = entry

也就是flushedEntry指向entry

經(jīng)過上述操作, 緩沖區(qū)的指針情況如圖所示:

7-4-1

然后通過do-while將, 不斷尋找unflushedEntry后面的節(jié)點(diǎn), 直到?jīng)]有節(jié)點(diǎn)為止

flushed自增代表需要刷新多少個節(jié)點(diǎn)

循環(huán)中我們關(guān)注這一步

decrementPendingOutboundBytes(pending, false, true);

這一步也是統(tǒng)計(jì)緩沖區(qū)中的字節(jié)數(shù), 但是是和上一小節(jié)的incrementPendingOutboundBytes正好是相反, 因?yàn)檫@里是刷新, 所以這里要減掉刷新后的字節(jié)數(shù),

我們跟到方法中:

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    //從總的大小減去
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    //直到減到小于某一個閾值32個字節(jié)
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        //設(shè)置寫狀態(tài)
        setWritable(invokeLater);
    }
}

同樣TOTAL_PENDING_SIZE_UPDATER代表緩沖區(qū)的字節(jié)數(shù), 這里的addAndGet中參數(shù)是-size, 也就是減掉size的長度

再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) 

getWriteBufferLowWaterMark()代表寫buffer的第水位值, 也就是32k, 如果寫buffer的長度小于這個數(shù), 就通過setWritable方法設(shè)置寫狀態(tài)

也就是通道由原來的不可寫改成可寫

回到addFlush方法

遍歷do-while循環(huán)結(jié)束之后, 將unflushedEntry指為空, 代表所有的entry都是可寫的

經(jīng)過上述操作, 緩沖區(qū)的指針情況如下圖所示:

7-4-2

回到AbstractUnsafe的flush方法

指針調(diào)整完之后, 我們跟到flush0()方法中:

protected void flush0() {
    if (inFlush0) {
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
    inFlush0 = true;
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}

 if (inFlush0) 表示判斷當(dāng)前flush是否在進(jìn)行中, 如果在進(jìn)行中, 則返回, 避免重復(fù)進(jìn)入

我們重點(diǎn)關(guān)注doWrite方法

跟到AbstractNioByteChannel的doWrite方法中去:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;
    boolean setOpWrite = false;
    for (;;) {
        //每次拿到當(dāng)前節(jié)點(diǎn)
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        if (msg instanceof ByteBuf) {
            //轉(zhuǎn)化成ByteBuf
            ByteBuf buf = (ByteBuf) msg;
            //如果沒有可寫的值
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
                //移除
                in.remove();
                continue;
            } 
            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                //將buf寫入到socket里面
                //localFlushedAmount代表向jdk底層寫了多少字節(jié)
                int localFlushedAmount = doWriteBytes(buf);
                //如果一個字節(jié)沒寫, 直接break
                if (localFlushedAmount == 0) {
                    setOpWrite = true;
                    break;
                }
                //統(tǒng)計(jì)總共寫了多少字節(jié)
                flushedAmount += localFlushedAmount;
                //如果buffer全部寫到j(luò)dk底層
                if (!buf.isReadable()) {
                    //標(biāo)記全寫道
                    done = true;
                    break;
                }
            }
            in.progress(flushedAmount);
            if (done) {
                //移除當(dāng)前對象
                in.remove();
            } else {
                break;
            }
        } else if (msg instanceof FileRegion) {
            //代碼省略
        } else {
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

首先是一個無限for循環(huán)

 Object msg = in.current() 這一步是拿到flushedEntry指向的entry中的msg

跟到current()方法中

public Object current() { 
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }
    return entry.msg;
}

這里直接拿到flushedEntry指向的entry中關(guān)聯(lián)的msg, 也就是一個ByteBuf

回到doWrite方法:

如果msg為null, 說明沒有可以刷新的entry, 則調(diào)用clearOpWrite()方法清除寫標(biāo)識

如果msg不為null, 則會判斷是否是ByteBuf類型, 如果是ByteBuf, 就進(jìn)入if塊中的邏輯

if塊中首先將msg轉(zhuǎn)化為ByteBuf, 然后判斷ByteBuf是否可讀, 如果不可讀, 則通過in.remove()將當(dāng)前的byteBuf所關(guān)聯(lián)的entry移除, 然后跳過這次循環(huán)進(jìn)入下次循環(huán)

remove方法稍后分析, 這里我們先繼續(xù)往下看

 boolean done = false 這里設(shè)置一個標(biāo)識, 標(biāo)識刷新操作是否執(zhí)行完成, 這里默認(rèn)值為false代表走到這里沒有執(zhí)行完成

 writeSpinCount = config().getWriteSpinCount() 這里是獲得一個寫操作的循環(huán)次數(shù), 默認(rèn)是16

然后根據(jù)這個循環(huán)次數(shù), 進(jìn)行循環(huán)的寫操作

在循環(huán)中, 關(guān)注這一步:

int localFlushedAmount = doWriteBytes(buf);

這一步就是將buf的內(nèi)容寫到channel中, 并返回寫的字節(jié)數(shù), 這里會調(diào)用NioSocketChannel的doWriteBytes

我們跟到doWriteBytes方法中:

protected int doWriteBytes(ByteBuf buf) throws Exception { 
    final int expectedWrittenBytes = buf.readableBytes();
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
}

這里首先拿到buf的可讀字節(jié)數(shù), 然后通過readBytes將可讀字節(jié)寫入到j(luò)dk底層的channel中

回到doWrite方法:

將內(nèi)容寫的jdk底層的channel之后, 如果一個字節(jié)都沒寫, 說明現(xiàn)在channel可能不可寫, 將setOpWrite設(shè)置為true, 用于標(biāo)識寫操作位, 并退出循環(huán)

如果已經(jīng)寫出字節(jié), 則通過 flushedAmount += localFlushedAmount 累加寫出的字節(jié)數(shù)

然后根據(jù)是buf是否沒有可讀字節(jié)數(shù)判斷是否buf的數(shù)據(jù)已經(jīng)寫完, 如果寫完, 將done設(shè)置為true, 說明寫操作完成, 并退出循環(huán)

因?yàn)橛袝r候不一定一次就能將byteBuf所有的字節(jié)寫完, 所以這里會繼續(xù)通過循環(huán)進(jìn)行寫出, 直到循環(huán)到16次

如果ByteBuf內(nèi)容完全寫完, 會通過in.remove()將當(dāng)前entry移除掉

我們跟到remove方法中:

public boolean remove() {
    //拿到當(dāng)前第一個flush的entry
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;
    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    removeEntry(e);
    if (!e.cancelled) {
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }
    e.recycle();
    return true;
}

首先拿到當(dāng)前的flushedEntry

我們重點(diǎn)關(guān)注removeEntry這步, 跟進(jìn)去:

private void removeEntry(Entry e) { 
    if (-- flushed == 0) {
        //位置為空
        flushedEntry = null;
        //如果是最后一個節(jié)點(diǎn)
        if (e == tailEntry) {
            //全部設(shè)置為空
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        //移動到下一個節(jié)點(diǎn)
        flushedEntry = e.next;
    }
}

 if (-- flushed == 0) 表示當(dāng)前節(jié)點(diǎn)是否為需要刷新的最后一個節(jié)點(diǎn), 如果是, 則flushedEntry指針設(shè)置為空

如果當(dāng)前節(jié)點(diǎn)是tailEntry節(jié)點(diǎn), 說明當(dāng)前節(jié)點(diǎn)是最后一個節(jié)點(diǎn), 將tailEntry和unflushedEntry兩個指針全部設(shè)置為空

如果當(dāng)前節(jié)點(diǎn)不是需要刷新的最后的一個節(jié)點(diǎn), 則通過 flushedEntry = e.nex t這步將flushedEntry指針移動到下一個節(jié)點(diǎn)

以上就是flush操作的相關(guān)邏輯,更多關(guān)于Netty分布式flush方法刷新buffer隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java生成Jar包方法步驟

    Java生成Jar包方法步驟

    在Java開發(fā)中,打包成JAR文件是一種常見的方式,本文主要介紹了Java生成Jar包方法步驟,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-10-10
  • 如何利用Java?AWT?創(chuàng)建一個簡易計(jì)算器

    如何利用Java?AWT?創(chuàng)建一個簡易計(jì)算器

    這篇文章主要介紹了如何利用Java?AWT?創(chuàng)建一個簡易計(jì)算器,AWT?是一個有助于構(gòu)建?GUI?的?API?基于?java?應(yīng)用程序,下面關(guān)于其相關(guān)資料實(shí)現(xiàn)計(jì)算器的內(nèi)容詳細(xì),需要的朋友可以參考一下
    2022-03-03
  • java?Export大量數(shù)據(jù)導(dǎo)出和打包

    java?Export大量數(shù)據(jù)導(dǎo)出和打包

    這篇文章主要為大家介紹了java?Export大量數(shù)據(jù)的導(dǎo)出和打包實(shí)現(xiàn)過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-06-06
  • java壓縮多個文件并且返回流示例

    java壓縮多個文件并且返回流示例

    這篇文章主要介紹了java壓縮多個文件并且返回流示例,返回壓縮流主是為了在程序里再做其它操作,需要的朋友可以參考下
    2014-03-03
  • Java垃圾回收器的方法和原理總結(jié)

    Java垃圾回收器的方法和原理總結(jié)

    本篇文章主要介紹了Java垃圾回收器的方法和原理總結(jié),Java垃圾回收器是Java虛擬機(jī)的重要模塊,具有一定的參考價(jià)值,有興趣的可以了解一下。
    2016-12-12
  • 使用Feign配置請求頭以及支持Https協(xié)議

    使用Feign配置請求頭以及支持Https協(xié)議

    這篇文章主要介紹了使用Feign配置請求頭以及支持Https協(xié)議,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java封裝的實(shí)現(xiàn)訪問限定符、包

    Java封裝的實(shí)現(xiàn)訪問限定符、包

    封裝就是將數(shù)據(jù)和操作數(shù)據(jù)的方法進(jìn)行有機(jī)結(jié)合,隱藏對象的屬性(成員變量)和實(shí)現(xiàn)細(xì)節(jié),僅對外公開接口來和對象進(jìn)行交互,下面這篇文章主要給大家介紹了關(guān)于Java封裝實(shí)現(xiàn)訪問限定符、包的相關(guān)資料
    2022-08-08
  • Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能示例

    Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能示例

    這篇文章主要介紹了Java模擬計(jì)算機(jī)的整數(shù)乘積計(jì)算功能,簡單分析了計(jì)算機(jī)數(shù)值進(jìn)制轉(zhuǎn)換與通過位移進(jìn)行乘積計(jì)算的原理,并結(jié)合具體實(shí)例給出了java模擬計(jì)算機(jī)成績運(yùn)算的相關(guān)操作技巧,需要的朋友可以參考下
    2017-09-09
  • SpringBoot依賴管理的源碼解析

    SpringBoot依賴管理的源碼解析

    這篇文章主要介紹了SpringBoot依賴管理的源碼解析,maven提供了一套依賴管理機(jī)制,通過在pom.xml定義坐標(biāo),通過坐標(biāo)從互聯(lián)網(wǎng)的中央倉庫下載依賴的構(gòu)件(jar包),規(guī)范去管理依賴所有構(gòu)件,這就叫依賴管理,需要的朋友可以參考下
    2023-04-04
  • Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法

    Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法

    在本篇文章里小編給大家整理的是關(guān)于Spring Boot實(shí)現(xiàn)熱部署的實(shí)例方法和實(shí)例,需要的朋友們可以參考下。
    2020-02-02

最新評論