欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式編碼器寫buffer隊列邏輯剖析

 更新時間:2022年03月29日 14:37:11   作者:向南是個萬人迷  
這篇文章主要介紹了Netty分布式編碼器寫buffer隊列邏輯剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前文傳送門:抽象編碼器MessageToByteEncoder

寫buffer隊列

之前的小節(jié)我們介紹過, writeAndFlush方法其實最終會調(diào)用write和flush方法

write方法最終會傳遞到head節(jié)點, 調(diào)用HeadContext的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

這里通過unsafe對象的write方法, 將消息寫入到緩存中, 具體的執(zhí)行邏輯, 我們在這個小節(jié)進行剖析

我們跟到AbstractUnsafe的write方法中

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    //負責(zé)緩沖寫進來的byteBuf
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        //非堆外內(nèi)存轉(zhuǎn)化為堆外內(nèi)存
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //插入寫隊列
    outboundBuffer.addMessage(msg, size, promise);
}

首先看 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer 

ChannelOutboundBuffer的功能就是緩存寫入的ByteBuf

我們繼續(xù)看try塊中的 msg = filterOutboundMessage(msg) 

這步的意義就是將非對外內(nèi)存轉(zhuǎn)化為堆外內(nèi)存

filterOutboundMessage方法方法最終會調(diào)用AbstractNioByteChannel中的filterOutboundMessage方法:

protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        //是堆外內(nèi)存, 直接返回
        if (buf.isDirect()) {
            return msg;
        }
        return newDirectBuffer(buf);
    }
    if (msg instanceof FileRegion) {
        return msg;
    }
    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}

首先判斷msg是否byteBuf對象, 如果是, 判斷是否堆外內(nèi)存, 如果是堆外內(nèi)存, 則直接返回, 否則, 通過newDirectBuffer(buf)這種方式轉(zhuǎn)化為堆外內(nèi)存

回到write方法中

outboundBuffer.addMessage(msg, size, promise)將已經(jīng)轉(zhuǎn)化為堆外內(nèi)存的msg插入到寫隊列

我們跟到addMessage方法當(dāng)中, 這是ChannelOutboundBuffer中的方法:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise); 
    if (tailEntry == null) { 
        flushedEntry = null;
        tailEntry = entry;
    } else { 
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    } 
    if (unflushedEntry == null) { 
        unflushedEntry = entry;
    }
    incrementPendingOutboundBytes(size, false);
}

首先通過 Entry.newInstance(msg, size, total(msg), promise) 的方式將msg封裝成entry

然后通過調(diào)整tailEntry, flushedEntry, unflushedEntry三個指針, 完成entry的添加

這三個指針均是ChannelOutboundBuffer的成員變量

flushedEntry指向第一個被flush的entry

unflushedEntry指向第一個未被flush的entry

也就是說, 從flushedEntry到unflushedEntry之間的entry, 都是被已經(jīng)被flush的entry

tailEntry指向最后一個entry, 也就是從unflushedEntry到tailEntry之間的entry都是沒flush的entry

我們回到代碼中:

創(chuàng)建了entry之后首先判斷尾指針是否為空, 在第一次添加的時候, 均是空, 所以會將flushedEntry設(shè)置為null, 并且將尾指針設(shè)置為當(dāng)前創(chuàng)建的entry

最后判斷unflushedEntry是否為空, 如果第一次添加這里也是空, 所以這里將unflushedEntry設(shè)置為新創(chuàng)建的entry

第一次添加如下圖所示

7-3-1

如果不是第一次調(diào)用write方法, 則會進入 if (tailEntry == null) 中else塊:

 Entry tail = tailEntry  這里tail就是當(dāng)前尾節(jié)點

 tail.next = entry  代表尾節(jié)點的下一個節(jié)點指向新創(chuàng)建的entry

 tailEntry = entry  將尾節(jié)點也指向entry

這樣就完成了添加操作, 其實就是將新創(chuàng)建的節(jié)點追加到原來尾節(jié)點之后

第二次添加 if (unflushedEntry == null) 會返回false, 所以不會進入if塊

第二次添加之后指針的指向情況如下圖所示:

7-3-4

以后每次調(diào)用write, 如果沒有調(diào)用flush的話都會在尾節(jié)點之后進行追加

回到代碼中, 看這一步incrementPendingOutboundBytes(size, false)

這步時統(tǒng)計當(dāng)前有多少字節(jié)需要被寫出, 我們跟到這個方法中:

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
    //TOTAL_PENDING_SIZE_UPDATER當(dāng)前緩沖區(qū)里面有多少待寫的字節(jié)
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    //getWriteBufferHighWaterMark() 最高不能超過64k
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

看這一步:

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size)

TOTAL_PENDING_SIZE_UPDATER表示當(dāng)前緩沖區(qū)還有多少待寫的字節(jié), addAndGet就是將當(dāng)前的ByteBuf的長度進行累加, 累加到newWriteBufferSize中

在繼續(xù)看判斷 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) 

 channel.config().getWriteBufferHighWaterMark() 表示寫buffer的高水位值, 默認是64k, 也就是說寫buffer的最大長度不能超過64k

如果超過了64k, 則會調(diào)用setUnwritable(invokeLater)方法設(shè)置寫狀態(tài)

我們跟到setUnwritable(invokeLater)方法中

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1; 
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) { 
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

這里通過自旋和cas操作, 傳播一個ChannelWritabilityChanged事件, 最終會調(diào)用handler的channelWritabilityChanged方法進行處理

以上就是寫buffer的相關(guān)邏輯,更多關(guān)于Netty分布式編碼器寫buffer隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot應(yīng)用能直接運行java -jar的原因分析

    SpringBoot應(yīng)用能直接運行java -jar的原因分析

    這篇文章主要介紹了SpringBoot應(yīng)用為什么能直接運行java -jar,首先明確一點,普通jar包是不能直接運行的,比如工具類jar,要能運行,至少得要一個main函數(shù)作為入口吧?本文給大家介紹了詳細的原因分析,需要的朋友可以參考下
    2024-03-03
  • Java使用BigDecimal進行運算封裝的實際案例

    Java使用BigDecimal進行運算封裝的實際案例

    今天小編就為大家分享一篇關(guān)于Java使用BigDecimal進行運算封裝的實際案例,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2018-12-12
  • 用Eclipse 創(chuàng)建一個簡單的web項目(圖文教程)

    用Eclipse 創(chuàng)建一個簡單的web項目(圖文教程)

    下面小編就為大家?guī)硪黄肊clipse 創(chuàng)建一個簡單的web項目(圖文教程)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • Spring?Boot?優(yōu)雅停機原理詳解

    Spring?Boot?優(yōu)雅停機原理詳解

    這篇文章主要為大家介紹了Spring?Boot?優(yōu)雅停機原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-02-02
  • 辨析Java中的String與StringBuffer及StringBuilder字符串類

    辨析Java中的String與StringBuffer及StringBuilder字符串類

    這里將為大家來辨析Java中的String與StringBuffer及StringBuilder字符串類型,通常來說StringBuilder的性能更加,需要的朋友可以參考下
    2016-05-05
  • 詳解springBoot啟動時找不到或無法加載主類解決辦法

    詳解springBoot啟動時找不到或無法加載主類解決辦法

    這篇文章主要介紹了詳解springBoot啟動時找不到或無法加載主類解決辦法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-09-09
  • Java生成條形碼code128(親測有效)

    Java生成條形碼code128(親測有效)

    這篇文章主要介紹了Java生成條形碼code128,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • mybatis plus自動生成器解析(及遇到的坑)

    mybatis plus自動生成器解析(及遇到的坑)

    這篇文章主要介紹了mybatis-plus自動生成器及遇到的坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • JAVA內(nèi)存溢出解決方案圖解

    JAVA內(nèi)存溢出解決方案圖解

    這篇文章主要介紹了JAVA內(nèi)存溢出解決方案圖解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-10-10
  • Java讀取Properties文件幾種方法總結(jié)

    Java讀取Properties文件幾種方法總結(jié)

    這篇文章主要介紹了 Java讀取Properties文件幾種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下
    2017-04-04

最新評論