欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Netty?拆包沾包問題解決方案詳解

 更新時(shí)間:2022年11月25日 10:13:09   作者:鱷魚兒  
這篇文章主要為大家介紹了Netty?拆包沾包問題解決方案示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

上一篇說到Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn),本文聊一些拆包/沾包問題。

拆包/沾包問題

TCP是面向字節(jié)流的協(xié)議,在發(fā)送方發(fā)送的若干包數(shù)據(jù)到接收方接收時(shí),這些數(shù)據(jù)包可能會(huì)被粘成一個(gè)數(shù)據(jù)包,而從接收緩沖區(qū)看,后一包數(shù)據(jù)的頭緊接著前一包數(shù)據(jù)的尾,這就形成沾包問題。

但如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP 就會(huì)將其拆分為多次發(fā)送,這就是拆包問題,也就是將一個(gè)大的包拆分為多個(gè)小包進(jìn)行發(fā)送,接收端接收到多個(gè)包才能組成一個(gè)完整數(shù)據(jù)。

為什么UDP沒有粘包

粘包/拆包問題在數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層以及傳輸層都有可能發(fā)生。日常的網(wǎng)絡(luò)應(yīng)用開發(fā)大都在傳輸層進(jìn)行,由于UDP有消息保護(hù)邊界,不會(huì)發(fā)生粘包/拆包問題。

而TCP是面向字節(jié)流,沒有邊界,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時(shí)候,底層會(huì)有一個(gè)緩沖區(qū),通過這個(gè)緩沖區(qū)來進(jìn)行優(yōu)化,例如緩沖區(qū)為1024個(gè)字節(jié)大小,如果一次發(fā)送數(shù)據(jù)量小于1024,則會(huì)合并多個(gè)數(shù)據(jù)作為一個(gè)數(shù)據(jù)包發(fā)送;如果一次發(fā)送數(shù)據(jù)量大于1024,則會(huì)將這個(gè)包拆分成多個(gè)數(shù)據(jù)包進(jìn)行發(fā)送。上述兩種情況也是沾包和拆包問題。

上圖出現(xiàn)的四種情況包括:

  • 正常發(fā)送,兩個(gè)包恰好滿足TCP緩沖區(qū)的大小或達(dá)到TCP等待時(shí)長(zhǎng),分別發(fā)送兩個(gè)包。
  • 沾包:D1、D2都過小,兩者進(jìn)行了沾包處理。
  • 拆包沾包:D2過大,進(jìn)行了拆包處理,而拆出去的一部分D2_1又與D1進(jìn)行粘包處理。
  • 沾包拆包:D1過大,進(jìn)行了拆包處理,而拆出去的一部分D1_2又與D2進(jìn)行粘包處理。

解決方案

對(duì)于粘包和拆包問題,通??梢允褂眠@四種解決方案:

  • 使用固定數(shù)據(jù)長(zhǎng)度進(jìn)行發(fā)送,發(fā)送端將每個(gè)包都封裝成固定的長(zhǎng)度,比如100字節(jié)大小。如果不足100字節(jié)可通過補(bǔ)0等填充到指定長(zhǎng)度再發(fā)送。
  • 發(fā)送端在每個(gè)包的末尾使用固定的分隔符,例如##@##。如果發(fā)生拆包需等待多個(gè)包發(fā)送過來之后再找到其中的##@##進(jìn)行合并。如果發(fā)送沾包則找到其中的##@##進(jìn)行拆分。
  • 將消息分為頭部和消息體,頭部中保存整個(gè)消息的長(zhǎng)度,這種情況下接收端只有在讀取到足夠長(zhǎng)度的消息之后,才算是接收到一個(gè)完整的消息。
  • 通過自定義協(xié)議進(jìn)行粘包和拆包的處理。

Netty拆包沾包處理

Netty對(duì)解決粘包和拆包的方案做了抽象,提供了一些解碼器(Decoder)來解決粘包和拆包的問題。如:

LineBasedFrameDecoder:以行為單位進(jìn)行數(shù)據(jù)包的解碼,使用換行符\n或者\r\n作為依據(jù),遇到\n或者\r\n都認(rèn)為是一條完整的消息。

DelimiterBasedFrameDecoder:以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼。 FixedLengthFrameDecoder:以固定長(zhǎng)度進(jìn)行數(shù)據(jù)包的解碼。

LenghtFieldBasedFrameDecode:適用于消息頭包含消息長(zhǎng)度的協(xié)議(最常用)。

基于Netty進(jìn)行網(wǎng)絡(luò)讀寫的程序,可以直接使用這些Decoder來完成數(shù)據(jù)包的解碼。對(duì)于高并發(fā)、大流量的系統(tǒng)來說,每個(gè)數(shù)據(jù)包都不應(yīng)該傳輸多余的數(shù)據(jù)(所以補(bǔ)齊的方式不可取),LenghtFieldBasedFrameDecode更適合這樣的場(chǎng)景。

LineBasedFrameDecoder

使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中

    /**
     * 服務(wù)啟動(dòng)器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的線程組
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定連接超時(shí)時(shí)間
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())          
                // 通過換行符處理沾包/拆包
                .childHandler(new NettyServerLineBasedHandler());
        return serverBootstrap;
    }
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中,并且每次查找的最大長(zhǎng)度為1024字節(jié)
        pipeline.addLast(new LineBasedFrameDecoder(1024, true, true));
        // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
        pipeline.addLast(new MessageDecodeHandler());
        // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼
        pipeline.addLast(new MessageEncodeHandler());
        // 對(duì)數(shù)據(jù)進(jìn)行最終處理
        pipeline.addLast(new ServerListenerHandler());
    }
}

DelimiterBasedFrameDecoder

以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼,上文中就是以##@##作為分割符作為示例展開講解的。這里再粘貼一下關(guān)鍵代碼: 使用DelimiterBasedFrameDecoder處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié)。

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 數(shù)據(jù)分割符
        String delimiterStr = "##@##";
        ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 使用自定義分隔符處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié)
        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
        // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
        pipeline.addLast(new MessageDecodeHandler());
        // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,并添加數(shù)據(jù)分隔符
        pipeline.addLast(new MessageEncodeHandler(delimiterStr));
        // 對(duì)數(shù)據(jù)進(jìn)行最終處理
        pipeline.addLast(new ServerListenerHandler());
    }

MessageEncodeHandler對(duì)發(fā)送數(shù)據(jù)進(jìn)行添加分割符并編碼操作

public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
    // 數(shù)據(jù)分割符
    String delimiter;
    public MessageEncodeHandler(String delimiter) {
        this.delimiter = delimiter;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
        out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
    }
}

FixedLengthFrameDecoder

服務(wù)端代碼設(shè)置,在NettyConfig配置中將worker處理器改為NettyServerFixedLengthHandler,使用固定100字節(jié)長(zhǎng)度處理消息。

    /**
     * 服務(wù)啟動(dòng)器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的線程組
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定連接超時(shí)時(shí)間
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                // 指定為固定長(zhǎng)度字節(jié)的處理器
                .childHandler(new NettyServerFixedLengthHandler());
        return serverBootstrap;
    }

NettyServerFixedLengthHandler類代碼,使用FixedLengthFrameDecoder設(shè)置按固定100字節(jié)數(shù)去拆分接收到的ByteBuf。并自定義一個(gè)消息編碼器,對(duì)字節(jié)長(zhǎng)度不足100字節(jié)的消息進(jìn)行補(bǔ)0操作。

public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 固定字節(jié)長(zhǎng)度
        Integer length = 100;
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 按固定100字節(jié)數(shù)拆分接收到的ByteBuf的解碼器
        pipeline.addLast(new FixedLengthFrameDecoder(length));
        // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
        pipeline.addLast(new MessageDecodeHandler());
        // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行自定義編碼,并設(shè)置字節(jié)長(zhǎng)度不足補(bǔ)0
        pipeline.addLast(new MessageEncodeFixedLengthHandler(length));
        // 對(duì)數(shù)據(jù)進(jìn)行最終處理
        pipeline.addLast(new ServerListenerHandler());
    }
}

自定義MessageEncodeFixedLengthHandler編碼類,使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0。

public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> {
    private int length;
    public MessageEncodeFixedLengthHandler(int length) {
        this.length = length;
    }
    /**
     * 使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0
     *
     * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
     * @param msg the message to encode
     * @param out the {@link ByteBuf} into which the encoded message will be written
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        String jsonStr = msg.toJsonString();
        // 如果長(zhǎng)度不足,則進(jìn)行補(bǔ)0
        if (jsonStr.length() < length) {
            jsonStr = addSpace(jsonStr);
        }
        // 使用Unpooled.wrappedBuffer實(shí)現(xiàn)零拷貝,將字符串轉(zhuǎn)為ByteBuf
        ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes()));
    }
    /**
     * 如果沒有達(dá)到指定長(zhǎng)度進(jìn)行補(bǔ)0
     *
     * @param msg
     * @return
     */
    private String addSpace(String msg) {
        StringBuilder builder = new StringBuilder(msg);
        for (int i = 0; i < length - msg.length(); i++) {
            builder.append(0);
        }
        return builder.toString();
    }
}

LenghtFieldBasedFrameDecode

LenghtFieldBasedFrameDecode適用于消息頭包含消息長(zhǎng)度的協(xié)議,根據(jù)消息長(zhǎng)度判斷是否讀取完一個(gè)數(shù)據(jù)包。

    /**
     * 服務(wù)啟動(dòng)器
     *
     * @return
     */
    @Bean
    public ServerBootstrap serverBootstrap() {
        ServerBootstrap serverBootstrap = new ServerBootstrap()
                // 指定使用的線程組
                .group(boosGroup(), workerGroup())
                // 指定使用的通道
                .channel(NioServerSocketChannel.class)
                // 指定連接超時(shí)時(shí)間
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
                // 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度
                .childHandler(new NettyServerLenghtFieldBasedHandler());
        return serverBootstrap;
    }
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度,根據(jù)長(zhǎng)度進(jìn)行沾包拆包處理
        /**
         * maxFrameLength:指定了每個(gè)包所能傳遞的最大數(shù)據(jù)包大??;
         * lengthFieldOffset:指定了長(zhǎng)度字段在字節(jié)碼中的偏移量;
         * lengthFieldLength:指定了長(zhǎng)度字段所占用的字節(jié)長(zhǎng)度;
         * lengthAdjustment:對(duì)一些不僅包含有消息頭和消息體的數(shù)據(jù)進(jìn)行消息頭的長(zhǎng)度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長(zhǎng)度;
         * initialBytesToStrip:對(duì)于長(zhǎng)度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長(zhǎng)度字段占用的字節(jié)。
         */
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
        // 在請(qǐng)求頭添加字節(jié)長(zhǎng)度字段
        pipeline.addLast(new LengthFieldPrepender(2));
        // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
        pipeline.addLast(new MessageDecodeHandler());
        // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,字節(jié)長(zhǎng)度不足補(bǔ)0
        pipeline.addLast(new MessageEncodeHandler());
        // 對(duì)數(shù)據(jù)進(jìn)行最終處理
        pipeline.addLast(new ServerListenerHandler());
    }
}

總結(jié)

造成TCP協(xié)議粘包/拆包問題的原因是TCP協(xié)議數(shù)據(jù)傳輸是基于字節(jié)流的,它不包含消息、數(shù)據(jù)包等概念,是無界的,需要應(yīng)用層協(xié)議自己設(shè)計(jì)消息的邊界,即消息幀(Message Framing)。如果應(yīng)用層協(xié)議沒有使用基于長(zhǎng)度或者基于分隔符(終結(jié)符)劃分邊界等方式進(jìn)行處理,則會(huì)導(dǎo)致多個(gè)消息的粘包和拆包。

以上就是Netty 拆包沾包問題解決方案示例的詳細(xì)內(nèi)容,更多關(guān)于Netty 拆包沾包解決方案的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論