欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯剖析

 更新時間:2022年03月29日 09:25:18   作者:向南是個萬人迷  
這篇文章主要為大家介紹了Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

概述

在我們上一個章節(jié)遺留過一個問題, 就是如果Server在讀取客戶端的數(shù)據(jù)的時候, 如果一次讀取不完整, 就觸發(fā)channelRead事件, 那么Netty是如何處理這類問題的, 在這一章中, 會對此做詳細剖析

之前的章節(jié)我們學習過pipeline, 事件在pipeline中傳遞, handler可以將事件截取并對其處理, 而之后剖析的編解碼器, 其實就是一個handler, 截取byteBuf中的字節(jié), 然后組建成業(yè)務需要的數(shù)據(jù)進行繼續(xù)傳播

編碼器, 通常是OutBoundHandler, 也就是以自身為基準, 對那些對外流出的數(shù)據(jù)做處理, 所以也叫編碼器, 將數(shù)據(jù)經(jīng)過編碼發(fā)送出去

解碼器, 通常是inboundHandler, 也就是以自身為基準, 對那些流向自身的數(shù)據(jù)做處理, 所以也叫解碼器, 將對向的數(shù)據(jù)接收之后經(jīng)過解碼再進行使用

同樣, 在netty的編碼器中, 也會對半包和粘包問題做相應的處理

什么是半包, 顧名思義, 就是不完整的數(shù)據(jù)包, 因為netty在輪詢讀事件的時候, 每次將channel中讀取的數(shù)據(jù), 不一定是一個完整的數(shù)據(jù)包, 這種情況, 就叫半包

粘包同樣也不難理解, 如果client往server發(fā)送數(shù)據(jù)包, 如果發(fā)送頻繁很有可能會將多個數(shù)據(jù)包的數(shù)據(jù)都發(fā)送到通道中, 如果在server在讀取的時候可能會讀取到超過一個完整數(shù)據(jù)包的長度, 這種情況叫粘包

有關半包和粘包, 入下圖所示:

6-0-1

 netty對半包的或者粘包的處理其實也很簡單, 通過之前的學習, 我們知道, 每個handler是和channel唯一綁定的, 一個handler只對應一個channel, 所以將channel中的數(shù)據(jù)讀取時候經(jīng)過解析, 如果不是一個完整的數(shù)據(jù)包, 則解析失敗, 將這塊數(shù)據(jù)包進行保存, 等下次解析時再和這個數(shù)據(jù)包進行組裝解析, 直到解析到完整的數(shù)據(jù)包, 才會將數(shù)據(jù)包進行向下傳遞

具體流程是在代碼中如何體現(xiàn)的呢?我們進入到源碼分析中

第一節(jié): ByteToMessageDecoder

ByteToMessageDecoder解碼器, 顧名思義, 是一個將Byte解析成消息的解碼器,

我們看他的定義

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
    //類體省略
}

這里繼承了ChannelInboundHandlerAdapter, 根據(jù)之前的學習, 我們知道, 這是個inbound類型的handler, 也就是處理流向自身事件的handler

其次, 該類通過abstract關鍵字修飾, 說明是個抽象類, 在我們實際使用的時候, 并不是直接使用這個類, 而是使用其子類, 類定義了解碼器的骨架方法, 具體實現(xiàn)邏輯交給子類, 同樣, 在半包處理中也是由該類進行實現(xiàn)的

netty中很多解碼器都實現(xiàn)了這個類, 并且, 我們也可以通過實現(xiàn)該類進行自定義解碼器

我們重點關注一下該類的一個屬性:

ByteBuf cumulation;

這個屬性, 就是有關半包處理的關鍵屬性, 從概述中我們知道, netty會將不完整的數(shù)據(jù)包進行保存, 這個數(shù)據(jù)包就是保存在這個屬性中

之前的學習我們知道, ByteBuf讀取完數(shù)據(jù)會傳遞channelRead事件, 傳播過程中會調用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關鍵部分

我們看其channelRead方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //如果message是byteBuf類型
    if (msg instanceof ByteBuf) {
        //簡單當成一個arrayList, 用于盛放解析到的對象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            //當前累加器為空, 說明這是第一次從io流里面讀取數(shù)據(jù)
            first = cumulation == null;
            if (first) {
                //如果是第一次, 則將累加器賦值為剛讀進來的對象
                cumulation = data;
            } else {
                //如果不是第一次, 則把當前累加的數(shù)據(jù)和讀進來的數(shù)據(jù)進行累加
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            //調用子類的方法進行解析
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new DecoderException(t);
        } finally {
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                numReads = 0;
                discardSomeReadBytes();
            }
            //記錄list長度
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            //向下傳播
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        //不是byteBuf類型則向下傳播
        ctx.fireChannelRead(msg);
    }
}

這方法比較長, 帶大家一步步剖析

首先判斷如果傳來的數(shù)據(jù)是ByteBuf, 則進入if塊中

 CodecOutputList out = CodecOutputList.newInstance() 這里就當成一個ArrayList就好, 用于盛放解碼完成的數(shù)據(jù)

 ByteBuf data = (ByteBuf) msg 這步將數(shù)據(jù)轉化成ByteBuf

 first = cumulation == null  這里表示如果cumulation == null, 說明沒有存儲板半包數(shù)據(jù), 則將當前的數(shù)據(jù)保存在屬性cumulation中

如果 cumulation != null , 說明存儲了半包數(shù)據(jù), 則通過cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的數(shù)據(jù)和原來的數(shù)據(jù)進行累加, 保存在屬性cumulation中

我們看cumulator屬性

private Cumulator cumulator = MERGE_CUMULATOR;

這里調用了其靜態(tài)屬性MERGE_CUMULATOR, 我們跟過去:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        ByteBuf buffer;
        //不能到過最大內存
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1) {
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        //將當前數(shù)據(jù)buffer
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

這里創(chuàng)建了Cumulator類型的靜態(tài)對象, 并重寫了cumulate方法, 這里cumulate方法, 就是用于將ByteBuf進行拼接的方法:

方法中, 首先判斷cumulation的寫指針+in的可讀字節(jié)數(shù)是否超過了cumulation的最大長度, 如果超過了, 將對cumulation進行擴容, 如果沒超過, 則將其賦值到局部變量buffer中

然后將in的數(shù)據(jù)寫到buffer中, 將in進行釋放, 返回寫入數(shù)據(jù)后的ByteBuf

回到channelRead方法中:

最后通過callDecode(ctx, cumulation, out)方法進行解碼, 這里傳入了Context對象, 緩沖區(qū)cumulation和集合out:

我們跟到callDecode(ctx, cumulation, out)方法中:

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        //只要累加器里面有數(shù)據(jù)
        while (in.isReadable()) {
            int outSize = out.size();
            //判斷當前List是否有對象
            if (outSize > 0) {
                //如果有對象, 則向下傳播事件
                fireChannelRead(ctx, out, outSize);
                //清空當前l(fā)ist
                out.clear();
                //解碼過程中如ctx被removed掉就break
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }
            //當前可讀數(shù)據(jù)長度
            int oldInputLength = in.readableBytes();
            //子類實現(xiàn)
            //子類解析, 解析玩對象放到out里面
            decode(ctx, in, out);
            if (ctx.isRemoved()) {
                break;
            }
            //List解析前大小 和解析后長度一樣(什么沒有解析出來)
            if (outSize == out.size()) {
                //原來可讀的長度==解析后可讀長度
                //說明沒有讀取數(shù)據(jù)(當前累加的數(shù)據(jù)并沒有拼成一個完整的數(shù)據(jù)包)
                if (oldInputLength == in.readableBytes()) {
                    //跳出循環(huán)(下次在讀取數(shù)據(jù)才能進行后續(xù)的解析)
                    break;
                } else {
                    //沒有解析到數(shù)據(jù), 但是進行讀取了
                    continue;
                }
            }
            //out里面有數(shù)據(jù), 但是沒有從累加器讀取數(shù)據(jù)
            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                        ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Throwable cause) {
        throw new DecoderException(cause);
    }
}

這里首先循環(huán)判斷傳入的ByteBuf是否有可讀字節(jié), 如果還有可讀字節(jié)說明沒有解碼完成, 則循環(huán)繼續(xù)解碼

然后判斷集合out的大小, 如果大小大于1, 說明out中盛放了解碼完成之后的數(shù)據(jù), 然后將事件向下傳播, 并清空out

因為我們第一次解碼out是空的, 所以這里不會進入if塊, 這部分我們稍后分析, 這里繼續(xù)往下看

通過 int oldInputLength = in.readableBytes() 獲取當前ByteBuf, 其實也就是屬性cumulation的可讀字節(jié)數(shù), 這里就是一個備份用于比較, 我們繼續(xù)往下看:

decode(ctx, in, out)方法是最終的解碼操作, 這部會讀取cumulation并且將解碼后的數(shù)據(jù)放入到集合out中, 在ByteToMessageDecoder中的該方法是一個抽象方法, 讓子類進行實現(xiàn), 我們使用的netty很多的解碼都是繼承了ByteToMessageDecoder并實現(xiàn)了decode方法從而完成了解碼操作, 同樣我們也可以遵循相應的規(guī)則進行自定義解碼器, 在之后的小節(jié)中會講解netty定義的解碼器, 并剖析相關的實現(xiàn)細節(jié), 這里我們繼續(xù)往下看:

 if (outSize == out.size()) 這個判斷表示解析之前的out大小和解析之后out大小進行比較, 如果相同, 說明并沒有解析出數(shù)據(jù), 我們進入到if塊中:

 if (oldInputLength == in.readableBytes()) 表示cumulation的可讀字節(jié)數(shù)在解析之前和解析之后是相同的, 說明解碼方法中并沒有解析數(shù)據(jù), 也就是當前的數(shù)據(jù)并不是一個完整的數(shù)據(jù)包, 則跳出循環(huán), 留給下次解析, 否則, 說明沒有解析到數(shù)據(jù), 但是讀取了, 所以跳過該次循環(huán)進入下次循環(huán)

最后判斷 if (oldInputLength == in.readableBytes()) , 這里代表out中有數(shù)據(jù), 但是并沒有從cumulation讀數(shù)據(jù), 說明這個out的內容是非法的, 直接拋出異常

我們回到channRead方法中

我們關注finally中的內容:

finally {
    if (cumulation != null && !cumulation.isReadable()) {
        numReads = 0;
        cumulation.release();
        cumulation = null;
    } else if (++ numReads >= discardAfterReads) {
        numReads = 0;
        discardSomeReadBytes();
    }
    //記錄list長度
    int size = out.size();
    decodeWasNull = !out.insertSinceRecycled();
    //向下傳播
    fireChannelRead(ctx, out, size);
    out.recycle();
}

首先判斷cumulation不為null, 并且沒有可讀字節(jié), 則將累加器進行釋放, 并設置為null

之后記錄out的長度, 通過fireChannelRead(ctx, out, size)將channelRead事件進行向下傳播, 并回收out對象

我們跟到fireChannelRead(ctx, out, size)方法中:

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    //遍歷List
    for (int i = 0; i < numElements; i ++) {
        //逐個向下傳遞
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

這里遍歷out集合, 并將里面的元素逐個向下傳遞

以上就是有關解碼的骨架邏輯

更多關于Netty分布式解碼器讀取數(shù)據(jù)的資料請關注腳本之家其它相關文章!

相關文章

  • SpringBoot實現(xiàn)文件上傳接口

    SpringBoot實現(xiàn)文件上傳接口

    這篇文章主要為大家詳細介紹了SpringBoot實現(xiàn)文件上傳接口,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-11-11
  • Java編寫網(wǎng)上超市購物結算功能程序

    Java編寫網(wǎng)上超市購物結算功能程序

    這篇文章主要為大家詳細介紹了Java編寫網(wǎng)上超市購物結算功能程序的具體代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-06-06
  • Spring boot創(chuàng)建自定義starter的完整步驟

    Spring boot創(chuàng)建自定義starter的完整步驟

    這篇文章主要給大家介紹了關于Spring boot創(chuàng)建自定義starter的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Spring boot具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-09-09
  • Java精品項目瑞吉外賣之登陸的完善與退出功能篇

    Java精品項目瑞吉外賣之登陸的完善與退出功能篇

    這篇文章主要為大家詳細介紹了java精品項目-瑞吉外賣訂餐系統(tǒng),此項目過大,分為多章獨立講解,本篇內容為新增菜品和分頁查詢功能的實現(xiàn),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • java枚舉使用詳細介紹及實現(xiàn)

    java枚舉使用詳細介紹及實現(xiàn)

    這篇文章主要介紹了java枚舉使用詳細介紹及實現(xiàn)的相關資料,需要的朋友可以參考下
    2017-06-06
  • ChatGPT在IDEA中使用的詳細過程

    ChatGPT在IDEA中使用的詳細過程

    這篇文章主要介紹了ChatGPT在IDEA中使用的詳細過程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-02-02
  • IDEA使用學生郵箱無法注冊問題:JetBrains Account connection error: 拒絕連接

    IDEA使用學生郵箱無法注冊問題:JetBrains Account connection error: 拒絕連接

    這篇文章主要介紹了IDEA使用學生郵箱無法注冊問題:JetBrains Account connection error: 拒絕連接,文中通過圖文及示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • 使用Spring掃描Mybatis的mapper接口的三種配置

    使用Spring掃描Mybatis的mapper接口的三種配置

    這篇文章主要介紹了使用Spring掃描Mybatis的mapper接口的三種配置,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Spring security實現(xiàn)權限管理示例

    Spring security實現(xiàn)權限管理示例

    這篇文章主要介紹了Spring security實現(xiàn)權限管理示例,這里整理了詳細的代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-01-01
  • SpringBoot淺析安全管理之Shiro框架

    SpringBoot淺析安全管理之Shiro框架

    安全管理是軟件系統(tǒng)必不可少的的功能。根據(jù)經(jīng)典的“墨菲定律”——凡是可能,總會發(fā)生。如果系統(tǒng)存在安全隱患,最終必然會出現(xiàn)問題,這篇文章主要介紹了SpringBoot安全管理Shiro框架的使用
    2022-08-08

最新評論