欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式ByteBuf使用SocketChannel讀取數(shù)據(jù)過程剖析

 更新時(shí)間:2022年03月29日 09:07:31   作者:向南是個(gè)萬人迷  
這篇文章主要為大家介紹了Netty源碼分析ByteBuf使用SocketChannel讀取數(shù)據(jù)過程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

我們第三章分析過客戶端接入的流程, 這一小節(jié)帶大家剖析客戶端發(fā)送數(shù)據(jù), Server讀取數(shù)據(jù)的流程:

首先溫馨提示, 這一小節(jié)高度耦合第三章的第1, 2節(jié)的內(nèi)容, 很多知識(shí)這里并不會(huì)重復(fù)講解, 如果對(duì)之前的知識(shí)印象不深刻建議惡補(bǔ)第三章的第1, 2節(jié)的內(nèi)容之后再學(xué)習(xí)這一小節(jié)

傳送門:

初始化NioSockectChannelConfig

 處理接入事件之handle的創(chuàng)建

Server讀取數(shù)據(jù)的流程

我們首先看NioEventLoop的processSelectedKey方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //獲取到channel中的unsafe
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //如果這個(gè)key不是合法的, 說明這個(gè)channel可能有問題
    if (!k.isValid()) {
        //代碼省略
    }
    try {
        //如果是合法的, 拿到key的io事件
        int readyOps = k.readyOps();
        //鏈接事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        //寫事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        //讀事件和接受鏈接事件
        //如果當(dāng)前NioEventLoop是work線程的話, 這里就是op_read事件
        //如果是當(dāng)前NioEventLoop是boss線程的話, 這里就是op_accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 

這里的判斷表示輪詢到大事件是op_read或者op_accept事件

之前的章節(jié)分析過, 如果當(dāng)前NioEventLoop是work線程的話, 那么這里就是op_read事件, 也就是讀事件, 表示客戶端發(fā)來了數(shù)據(jù)流

這里會(huì)調(diào)用unsafe的redis()方法進(jìn)行讀取

如果是work線程, 那么這里的channel是NioServerSocketChannel, 其綁定的unsafe是NioByteUnsafe, 這里會(huì)走進(jìn)NioByteUnsafe的read()方法中:

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);
        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

首先獲取SocketChannel的config, pipeline等相關(guān)屬性

 final ByteBufAllocator allocator = config.getAllocator(); 這一步是獲取一個(gè)ByteBuf的內(nèi)存分配器, 用于分配ByteBuf

這里會(huì)走到DefaultChannelConfig的getAllocator方法中

public ByteBufAllocator getAllocator() {
    return allocator;
}

這里返回的DefualtChannelConfig的成員變量, 我們看這個(gè)成員變量:

private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

這里調(diào)用ByteBufAllocator的屬性DEFAULT, 跟進(jìn)去:

ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

我們看到這里又調(diào)用了ByteBufUtil的靜態(tài)屬性DEFAULT_ALLOCATOR, 再跟進(jìn)去:

static final ByteBufAllocator DEFAULT_ALLOCATOR;

DEFAULT_ALLOCATOR這個(gè)屬性是在static塊中初始化的

我們跟到static塊中

static {
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
    }
    DEFAULT_ALLOCATOR = alloc;
    //代碼省略
}

首先判斷運(yùn)行環(huán)境是不是安卓, 如果不是安卓, 在返回"pooled"字符串保存在allocType中

然后通過if判斷, 最后局部變量alloc = PooledByteBufAllocator.DEFAULT, 最后將alloc賦值到成員變量DEFAULT_ALLOCATOR

我們跟到PooledByteBufAllocator的DEFAULT屬性中:

public static final PooledByteBufAllocator DEFAULT =
        new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

我們看到這里直接通過new的方式, 創(chuàng)建了一個(gè)PooledByteBufAllocator對(duì)象, 也就是基于申請(qǐng)一塊連續(xù)內(nèi)存進(jìn)行緩沖區(qū)分配的緩沖區(qū)分配器

緩沖區(qū)分配器的知識(shí), 我們之前小節(jié)進(jìn)行了詳細(xì)的剖析, 這里就不再贅述

回到NioByteUnsafe的read()方法中

public final void read() {
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

這里 ByteBufAllocator allocator = config.getAllocator()中的allocator , 就是PooledByteBufAllocator

 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle()  是創(chuàng)建一個(gè)handle, 我們之前的章節(jié)講過, handle是對(duì)RecvByteBufAllocator進(jìn)行實(shí)際操作的對(duì)象

我們跟進(jìn)recvBufAllocHandle

public RecvByteBufAllocator.Handle recvBufAllocHandle() {
    //如果不存在, 則創(chuàng)建一個(gè)handle的實(shí)例
    if (recvHandle == null) {
        recvHandle = config().getRecvByteBufAllocator().newHandle();
    }
    return recvHandle;
}

這里是我們之前剖析過的邏輯, 如果不存在, 則創(chuàng)建handle的實(shí)例, 具體創(chuàng)建過程我們可以回顧第三章的第二小節(jié), 這里就不再贅述

同樣allocHandle.reset(config)是將配置重置, 第三章的第二小節(jié)也對(duì)其進(jìn)行過剖析

重置完配置之后, 進(jìn)行do-while循環(huán), 有關(guān)循環(huán)終止條件allocHandle.continueReading(), 之前小節(jié)也有過詳細(xì)剖析, 這里也不再贅述

在do-while循環(huán)中, 首先看 byteBuf = allocHandle.allocate(allocator) 這一步, 這里傳入了剛才創(chuàng)建的allocate對(duì)象, 也就是PooledByteBufAllocator:

這里會(huì)跑到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

這里的guess方法, 會(huì)調(diào)用AdaptiveRecvByteBufAllocator的guess方法:

public int guess() {
    return nextReceiveBufferSize;
}

這里會(huì)返回AdaptiveRecvByteBufAllocator的成員變量nextReceiveBufferSize, 也就是下次所分配緩沖區(qū)的大小, 根據(jù)我們之前學(xué)習(xí)的內(nèi)容, 第一次分配的時(shí)候會(huì)分配初始大小, 也就是1024字節(jié)

回到DefaultMaxMessagesRecvByteBufAllocator類的allocate方法中:

這樣, alloc.ioBuffer(guess())就會(huì)分配一個(gè)PooledByteBuf

我們跟到AbstractByteBufAllocator的ioBuffer方法中:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

這里首先判斷是否能獲取jdk的unsafe對(duì)象, 默認(rèn)為true, 所以會(huì)走到directBuffer(initialCapacity)中, 這里最終會(huì)分配一個(gè)PooledUnsafeDirectByteBuf對(duì)象, 具體分配流程我們?cè)僦靶」?jié)做過詳細(xì)剖析

回到NioByteUnsafe的read()方法中:

分配完了ByteBuf之后, 再看這一步allocHandle.lastBytesRead(doReadBytes(byteBuf)):

首先看參數(shù)doReadBytes(byteBuf)方法, 這步是將channel中的數(shù)據(jù)讀取到我們剛分配的ByteBuf中, 并返回讀取到的字節(jié)數(shù)

這里會(huì)調(diào)用到NioSocketChannel的doReadBytes方法:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

首先拿到綁定在channel中的handler, 因?yàn)槲覀円呀?jīng)創(chuàng)建了handle, 所以這里會(huì)直接拿到

再看allocHandle.attemptedBytesRead(byteBuf.writableBytes())這步, byteBuf.writableBytes()返回byteBuf的可寫字節(jié)數(shù), 也就是最多能從channel中讀取多少字節(jié)寫到ByteBuf, allocate的attemptedBytesRead會(huì)把可寫字節(jié)數(shù)設(shè)置到DefaultMaxMessagesRecvByteBufAllocator 類的attemptedBytesRead屬性中

跟到DefaultMaxMessagesRecvByteBufAllocator中的attemptedBytesRead我們會(huì)看到:

public void attemptedBytesRead(int bytes) {
    attemptedBytesRead = bytes;
}

繼續(xù)看doReadBytes方法

最后, 通過byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead())將jdk底層的channel中的數(shù)據(jù)寫入到我們創(chuàng)建的ByteBuf中, 并返回實(shí)際寫入的字節(jié)數(shù)

回到NioByteUnsafe的read()方法中:

繼續(xù)看allocHandle.lastBytesRead(doReadBytes(byteBuf))這步

剛才我們剖析過doReadBytes(byteBuf)返回的是世界寫入ByteBuf的字節(jié)數(shù)

再看lastBytesRead方法, 跟到DefaultMaxMessagesRecvByteBufAllocator的lastBytesRead方法中:

public final void lastBytesRead(int bytes) {
    lastBytesRead = bytes;
    totalBytesRead += bytes;
    if (totalBytesRead < 0) {
        totalBytesRead = Integer.MAX_VALUE;
    }
}

這里會(huì)賦值兩個(gè)屬性, lastBytesRead代表最后讀取的字節(jié)數(shù), 這里賦值為我們剛才寫入ByteBuf的字節(jié)數(shù), totalBytesRead表示總共讀取的字節(jié)數(shù), 這里將寫入的字節(jié)數(shù)追加

繼續(xù)看NioByteUnsafe的read()方法:

如果最后一次讀取數(shù)據(jù)為0, 說明已經(jīng)將channel中的數(shù)據(jù)全部讀取完畢, 將新創(chuàng)建的ByteBuf釋放循環(huán)利用, 并跳出循環(huán)

allocHandle.incMessagesRead(1)這步是增加消息的讀取次數(shù), 因?yàn)槲覀冄h(huán)最多16次, 所以當(dāng)增加消息次數(shù)增加到16會(huì)結(jié)束循環(huán)

讀取完畢之后, 會(huì)通過pipeline.fireChannelRead(byteBuf)將傳遞channelRead事件, 有關(guān)channelRead事件, 我們?cè)诘谒恼乱策M(jìn)行了詳細(xì)的剖析

這里讀者會(huì)有疑問, 如果一次讀取不完, 就傳遞channelRead事件, 那么server接收到的數(shù)據(jù)有可能就是不完整的, 其實(shí)關(guān)于這點(diǎn), netty也做了相應(yīng)的處理, 我們會(huì)在之后的章節(jié)詳細(xì)剖析netty的半包處理機(jī)制

循環(huán)結(jié)束后, 會(huì)執(zhí)行到allocHandle.readComplete()這一步

我們知道第一次分配ByteBuf的初始容量是1024, 但是初始容量不一定一定滿足所有的業(yè)務(wù)場景, netty中, 將每次讀取數(shù)據(jù)的字節(jié)數(shù)進(jìn)行記錄, 然后之后次分配ByteBuf的時(shí)候, 容量會(huì)盡可能的符合業(yè)務(wù)場景所需要大小, 具體實(shí)現(xiàn)方式, 就是在readComplete()這一步體現(xiàn)的

我們跟到AdaptiveRecvByteBufAllocator的readComplete()方法中:

public void readComplete() {
    record(totalBytesRead());
}

這里調(diào)用了record方法, 并且傳入了這一次所讀取的字節(jié)總數(shù)

跟到record方法中

private void record(int actualReadBytes) { 
    if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { 
        if (decreaseNow) { 
            index = Math.max(index - INDEX_DECREMENT, minIndex); 
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) { 
        index = Math.min(index + INDEX_INCREMENT, maxIndex); 
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

首先看判斷條件 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) 

這里index是當(dāng)前分配的緩沖區(qū)大小所在的SIZE_TABLE中的索引, 將這個(gè)索引進(jìn)行縮進(jìn), 然后根據(jù)縮進(jìn)后的所以找出SIZE_TABLE中所存儲(chǔ)的內(nèi)存值, 再判斷是否大于等于這次讀取的最大字節(jié)數(shù), 如果條件成立, 說明分配的內(nèi)存過大, 需要縮容操作, 我們看if塊中縮容相關(guān)的邏輯

首先 if (decreaseNow) 會(huì)判斷是否立刻進(jìn)行收縮操作, 通常第一次不會(huì)進(jìn)行收縮操作, 然后會(huì)將decreaseNow設(shè)置為true, 代表下一次直接進(jìn)行收縮操作

假設(shè)需要立刻進(jìn)行收縮操作, 我們看收縮操作的相關(guān)邏輯:

 index = Math.max(index - INDEX_DECREMENT, minIndex) 這一步將索引縮進(jìn)一步, 但不能小于最小索引值

然后通過 nextReceiveBufferSize = SIZE_TABLE[index] 獲取設(shè)置索引之后的內(nèi)存, 賦值在nextReceiveBufferSize, 也就是下次需要分配的大小, 下次就會(huì)根據(jù)這個(gè)大小分配ByteBuf了, 這樣就實(shí)現(xiàn)了縮容操作

再看 else&nbsp;if (actualReadBytes >= nextReceiveBufferSize) 

這里判斷這次讀取字節(jié)的總量比上次分配的大小還要大, 則進(jìn)行擴(kuò)容操作

擴(kuò)容操作也很簡單, 索引步進(jìn), 然后拿到步進(jìn)后的索引所對(duì)應(yīng)的內(nèi)存值, 作為下次所需要分配的大小

再NioByteUnsafe的read()方法中:

經(jīng)過了縮容或者擴(kuò)容操作之后, 通過pipeline.fireChannelReadComplete()傳播ChannelReadComplete()事件

以上就是讀取客戶端消息的相關(guān)流程

章節(jié)總結(jié)

        本章主要剖析了ByteBuf的基本操作以及緩沖區(qū)分配等相關(guān)知識(shí).

        緩沖區(qū)分配, 分為通過調(diào)用jdk的api的方式和分配一塊連續(xù)內(nèi)存的方式

        其中, 通過分配連續(xù)內(nèi)存的方式分配緩沖區(qū)中, 又介紹了在page級(jí)別分配的邏輯和在subpage級(jí)別分配的邏輯

        page級(jí)別分配時(shí)通過操作內(nèi)存二叉樹的方式記錄分配情況

        subpage級(jí)別分配是通過位圖的方式記錄分配情況

        最后介紹了NioSocketChannel處理讀事件的相關(guān)邏輯

        總體來說, 這一章的內(nèi)容難度是比較大的, 希望同學(xué)課后通過多調(diào)試的方式進(jìn)行熟練掌握

更多關(guān)于ByteBuf使用SocketChannel讀取數(shù)據(jù)過程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java經(jīng)典排序算法之二分插入排序詳解

    Java經(jīng)典排序算法之二分插入排序詳解

    這篇文章主要為大家詳細(xì)介紹了Java經(jīng)典排序算法之二分插入排序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • java實(shí)現(xiàn)截取PDF指定頁并進(jìn)行圖片格式轉(zhuǎn)換功能

    java實(shí)現(xiàn)截取PDF指定頁并進(jìn)行圖片格式轉(zhuǎn)換功能

    這篇文章主要介紹了java實(shí)現(xiàn)截取PDF指定頁并進(jìn)行圖片格式轉(zhuǎn)換功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2019-09-09
  • Java split函數(shù)拆分后變成null問題解決方案

    Java split函數(shù)拆分后變成null問題解決方案

    這篇文章主要介紹了Java split函數(shù)拆分后變成null問題解決方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-10-10
  • Java網(wǎng)絡(luò)編程TCP實(shí)現(xiàn)聊天功能

    Java網(wǎng)絡(luò)編程TCP實(shí)現(xiàn)聊天功能

    這篇文章主要為大家詳細(xì)介紹了Java網(wǎng)絡(luò)編程TCP實(shí)現(xiàn)聊天功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • jdk動(dòng)態(tài)代理使用實(shí)例詳解

    jdk動(dòng)態(tài)代理使用實(shí)例詳解

    JDK動(dòng)態(tài)代理是代理模式的一種實(shí)現(xiàn)方式,因?yàn)樗腔诮涌趤碜龃淼?所以也常被稱為接口代理,下面這篇文章主要給大家介紹了關(guān)于jdk動(dòng)態(tài)代理使用的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-06-06
  • Java中的SkyWalking監(jiān)控告警詳解

    Java中的SkyWalking監(jiān)控告警詳解

    這篇文章主要介紹了Java中的SkyWalking監(jiān)控告警詳解,SkyWalking在6.x版本中新增了告警功能,其核心在于config/alarm-settings.yaml文件中,該文件分為rules和webhooks兩部分,需要的朋友可以參考下
    2023-11-11
  • Java輸出數(shù)組的3種方法

    Java輸出數(shù)組的3種方法

    這篇文章主要給大家介紹了關(guān)于Java輸出數(shù)組的3種方法,對(duì)于初學(xué)者來說,數(shù)組的輸入輸出是一個(gè)麻煩的問題,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2023-07-07
  • 詳解Kotlin 高階函數(shù) 與 Lambda 表達(dá)式

    詳解Kotlin 高階函數(shù) 與 Lambda 表達(dá)式

    這篇文章主要介紹了詳解Kotlin 高階函數(shù) 與 Lambda 表達(dá)式的相關(guān)資料,需要的朋友可以參考下
    2017-06-06
  • 深入了解Java虛擬機(jī)棧以及內(nèi)存模型

    深入了解Java虛擬機(jī)棧以及內(nèi)存模型

    這篇文章主要介紹了深入了解Java虛擬機(jī)棧以及內(nèi)存模型,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很大的幫助,需要的朋友可以參考下
    2021-04-04
  • SpringSecurity攔截器鏈的使用詳解

    SpringSecurity攔截器鏈的使用詳解

    這篇文章主要介紹了SpringSecurity攔截器鏈的使用詳解,webSecurity的build方法最終調(diào)用的是doBuild方法,doBuild方法調(diào)用的是webSecurity的performBuild方法,webSecurity完成所有過濾器的插件,最終返回的是過濾器鏈代理類filterChainProxy,需要的朋友可以參考下
    2023-11-11

最新評(píng)論