Netty?拆包沾包問題解決方案詳解
上一篇說到Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn),本文聊一些拆包/沾包問題。
拆包/沾包問題
TCP是面向字節(jié)流的協(xié)議,在發(fā)送方發(fā)送的若干包數(shù)據(jù)到接收方接收時(shí),這些數(shù)據(jù)包可能會(huì)被粘成一個(gè)數(shù)據(jù)包,而從接收緩沖區(qū)看,后一包數(shù)據(jù)的頭緊接著前一包數(shù)據(jù)的尾,這就形成沾包問題。
但如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP 就會(huì)將其拆分為多次發(fā)送,這就是拆包問題,也就是將一個(gè)大的包拆分為多個(gè)小包進(jìn)行發(fā)送,接收端接收到多個(gè)包才能組成一個(gè)完整數(shù)據(jù)。
為什么UDP沒有粘包
粘包/拆包問題在數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層以及傳輸層都有可能發(fā)生。日常的網(wǎng)絡(luò)應(yīng)用開發(fā)大都在傳輸層進(jìn)行,由于UDP有消息保護(hù)邊界,不會(huì)發(fā)生粘包/拆包問題。
而TCP是面向字節(jié)流,沒有邊界,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時(shí)候,底層會(huì)有一個(gè)緩沖區(qū),通過這個(gè)緩沖區(qū)來進(jìn)行優(yōu)化,例如緩沖區(qū)為1024個(gè)字節(jié)大小,如果一次發(fā)送數(shù)據(jù)量小于1024,則會(huì)合并多個(gè)數(shù)據(jù)作為一個(gè)數(shù)據(jù)包發(fā)送;如果一次發(fā)送數(shù)據(jù)量大于1024,則會(huì)將這個(gè)包拆分成多個(gè)數(shù)據(jù)包進(jìn)行發(fā)送。上述兩種情況也是沾包和拆包問題。
上圖出現(xiàn)的四種情況包括:
- 正常發(fā)送,兩個(gè)包恰好滿足TCP緩沖區(qū)的大小或達(dá)到TCP等待時(shí)長(zhǎng),分別發(fā)送兩個(gè)包。
- 沾包:D1、D2都過小,兩者進(jìn)行了沾包處理。
- 拆包沾包:D2過大,進(jìn)行了拆包處理,而拆出去的一部分D2_1又與D1進(jìn)行粘包處理。
- 沾包拆包:D1過大,進(jìn)行了拆包處理,而拆出去的一部分D1_2又與D2進(jìn)行粘包處理。
解決方案
對(duì)于粘包和拆包問題,通??梢允褂眠@四種解決方案:
- 使用固定數(shù)據(jù)長(zhǎng)度進(jìn)行發(fā)送,發(fā)送端將每個(gè)包都封裝成固定的長(zhǎng)度,比如100字節(jié)大小。如果不足100字節(jié)可通過補(bǔ)0等填充到指定長(zhǎng)度再發(fā)送。
- 發(fā)送端在每個(gè)包的末尾使用固定的分隔符,例如
##@##
。如果發(fā)生拆包需等待多個(gè)包發(fā)送過來之后再找到其中的##@##
進(jìn)行合并。如果發(fā)送沾包則找到其中的##@##
進(jìn)行拆分。 - 將消息分為頭部和消息體,頭部中保存整個(gè)消息的長(zhǎng)度,這種情況下接收端只有在讀取到足夠長(zhǎng)度的消息之后,才算是接收到一個(gè)完整的消息。
- 通過自定義協(xié)議進(jìn)行粘包和拆包的處理。
Netty拆包沾包處理
Netty對(duì)解決粘包和拆包的方案做了抽象,提供了一些解碼器(Decoder)來解決粘包和拆包的問題。如:
LineBasedFrameDecoder
:以行為單位進(jìn)行數(shù)據(jù)包的解碼,使用換行符\n
或者\r\n
作為依據(jù),遇到\n
或者\r\n
都認(rèn)為是一條完整的消息。
DelimiterBasedFrameDecoder
:以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼。 FixedLengthFrameDecoder
:以固定長(zhǎng)度進(jìn)行數(shù)據(jù)包的解碼。
LenghtFieldBasedFrameDecode
:適用于消息頭包含消息長(zhǎng)度的協(xié)議(最常用)。
基于Netty進(jìn)行網(wǎng)絡(luò)讀寫的程序,可以直接使用這些Decoder來完成數(shù)據(jù)包的解碼。對(duì)于高并發(fā)、大流量的系統(tǒng)來說,每個(gè)數(shù)據(jù)包都不應(yīng)該傳輸多余的數(shù)據(jù)(所以補(bǔ)齊的方式不可取),LenghtFieldBasedFrameDecode
更適合這樣的場(chǎng)景。
LineBasedFrameDecoder
使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中
/** * 服務(wù)啟動(dòng)器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時(shí)時(shí)間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 通過換行符處理沾包/拆包 .childHandler(new NettyServerLineBasedHandler()); return serverBootstrap; }
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中,并且每次查找的最大長(zhǎng)度為1024字節(jié) pipeline.addLast(new LineBasedFrameDecoder(1024, true, true)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例 pipeline.addLast(new MessageDecodeHandler()); // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼 pipeline.addLast(new MessageEncodeHandler()); // 對(duì)數(shù)據(jù)進(jìn)行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
DelimiterBasedFrameDecoder
以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼,上文中就是以##@##
作為分割符作為示例展開講解的。這里再粘貼一下關(guān)鍵代碼: 使用DelimiterBasedFrameDecoder
處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié)。
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 數(shù)據(jù)分割符 String delimiterStr = "##@##"; ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes()); ChannelPipeline pipeline = socketChannel.pipeline(); // 使用自定義分隔符處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例 pipeline.addLast(new MessageDecodeHandler()); // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,并添加數(shù)據(jù)分隔符 pipeline.addLast(new MessageEncodeHandler(delimiterStr)); // 對(duì)數(shù)據(jù)進(jìn)行最終處理 pipeline.addLast(new ServerListenerHandler()); }
MessageEncodeHandler
對(duì)發(fā)送數(shù)據(jù)進(jìn)行添加分割符并編碼操作
public class MessageEncodeHandler extends MessageToByteEncoder<Message> { // 數(shù)據(jù)分割符 String delimiter; public MessageEncodeHandler(String delimiter) { this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8)); } }
FixedLengthFrameDecoder
服務(wù)端代碼設(shè)置,在NettyConfig
配置中將worker處理器改為NettyServerFixedLengthHandler
,使用固定100字節(jié)長(zhǎng)度處理消息。
/** * 服務(wù)啟動(dòng)器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時(shí)時(shí)間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 指定為固定長(zhǎng)度字節(jié)的處理器 .childHandler(new NettyServerFixedLengthHandler()); return serverBootstrap; }
NettyServerFixedLengthHandler
類代碼,使用FixedLengthFrameDecoder
設(shè)置按固定100字節(jié)數(shù)去拆分接收到的ByteBuf。并自定義一個(gè)消息編碼器,對(duì)字節(jié)長(zhǎng)度不足100字節(jié)的消息進(jìn)行補(bǔ)0操作。
public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 固定字節(jié)長(zhǎng)度 Integer length = 100; ChannelPipeline pipeline = socketChannel.pipeline(); // 按固定100字節(jié)數(shù)拆分接收到的ByteBuf的解碼器 pipeline.addLast(new FixedLengthFrameDecoder(length)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例 pipeline.addLast(new MessageDecodeHandler()); // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行自定義編碼,并設(shè)置字節(jié)長(zhǎng)度不足補(bǔ)0 pipeline.addLast(new MessageEncodeFixedLengthHandler(length)); // 對(duì)數(shù)據(jù)進(jìn)行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
自定義MessageEncodeFixedLengthHandler
編碼類,使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0。
public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> { private int length; public MessageEncodeFixedLengthHandler(int length) { this.length = length; } /** * 使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0 * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link ByteBuf} into which the encoded message will be written * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { String jsonStr = msg.toJsonString(); // 如果長(zhǎng)度不足,則進(jìn)行補(bǔ)0 if (jsonStr.length() < length) { jsonStr = addSpace(jsonStr); } // 使用Unpooled.wrappedBuffer實(shí)現(xiàn)零拷貝,將字符串轉(zhuǎn)為ByteBuf ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes())); } /** * 如果沒有達(dá)到指定長(zhǎng)度進(jìn)行補(bǔ)0 * * @param msg * @return */ private String addSpace(String msg) { StringBuilder builder = new StringBuilder(msg); for (int i = 0; i < length - msg.length(); i++) { builder.append(0); } return builder.toString(); } }
LenghtFieldBasedFrameDecode
LenghtFieldBasedFrameDecode
適用于消息頭包含消息長(zhǎng)度的協(xié)議,根據(jù)消息長(zhǎng)度判斷是否讀取完一個(gè)數(shù)據(jù)包。
/** * 服務(wù)啟動(dòng)器 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時(shí)時(shí)間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度 .childHandler(new NettyServerLenghtFieldBasedHandler()); return serverBootstrap; }
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度,根據(jù)長(zhǎng)度進(jìn)行沾包拆包處理 /** * maxFrameLength:指定了每個(gè)包所能傳遞的最大數(shù)據(jù)包大??; * lengthFieldOffset:指定了長(zhǎng)度字段在字節(jié)碼中的偏移量; * lengthFieldLength:指定了長(zhǎng)度字段所占用的字節(jié)長(zhǎng)度; * lengthAdjustment:對(duì)一些不僅包含有消息頭和消息體的數(shù)據(jù)進(jìn)行消息頭的長(zhǎng)度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長(zhǎng)度; * initialBytesToStrip:對(duì)于長(zhǎng)度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長(zhǎng)度字段占用的字節(jié)。 */ pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 在請(qǐng)求頭添加字節(jié)長(zhǎng)度字段 pipeline.addLast(new LengthFieldPrepender(2)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例 pipeline.addLast(new MessageDecodeHandler()); // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,字節(jié)長(zhǎng)度不足補(bǔ)0 pipeline.addLast(new MessageEncodeHandler()); // 對(duì)數(shù)據(jù)進(jìn)行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
總結(jié)
造成TCP協(xié)議粘包/拆包問題的原因是TCP協(xié)議數(shù)據(jù)傳輸是基于字節(jié)流的,它不包含消息、數(shù)據(jù)包等概念,是無界的,需要應(yīng)用層協(xié)議自己設(shè)計(jì)消息的邊界,即消息幀(Message Framing)。如果應(yīng)用層協(xié)議沒有使用基于長(zhǎng)度或者基于分隔符(終結(jié)符)劃分邊界等方式進(jìn)行處理,則會(huì)導(dǎo)致多個(gè)消息的粘包和拆包。
以上就是Netty 拆包沾包問題解決方案示例的詳細(xì)內(nèi)容,更多關(guān)于Netty 拆包沾包解決方案的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot2整合activiti6環(huán)境搭建過程解析
這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11spring boot入門開始你的第一個(gè)應(yīng)用
這篇文章主要介紹了spring boot入門開始你的第一個(gè)應(yīng)用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06簡(jiǎn)單工廠模式_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了簡(jiǎn)單工廠模式的相關(guān)資料,和大家一起學(xué)習(xí)靜態(tài)工廠方法模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08java學(xué)生信息管理系統(tǒng)MVC架構(gòu)詳解
這篇文章主要為大家詳細(xì)介紹了java學(xué)生信息管理系統(tǒng)MVC架構(gòu)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11Spring boot從安裝到交互功能實(shí)現(xiàn)零基礎(chǔ)全程詳解
這篇文章主要介紹了Spring boot從安裝到交互功能得實(shí)現(xiàn)全程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Java狀態(tài)機(jī)的一種優(yōu)雅寫法分享
狀態(tài)機(jī)是一種數(shù)學(xué)模型,對(duì)于我們業(yè)務(wù)實(shí)現(xiàn)有很大的幫助。我們可以用非常多的方法實(shí)現(xiàn)狀態(tài)機(jī),這篇文章就來介紹一個(gè)狀態(tài)機(jī)優(yōu)雅的實(shí)現(xiàn)方法,希望對(duì)大家有所幫助2023-04-04Java實(shí)現(xiàn)刪除PDF中指定頁(yè)面
這篇文章主要為大家詳細(xì)介紹了如何使用一個(gè)免費(fèi)的國(guó)產(chǎn)Java庫(kù)來刪除PDF中的指定頁(yè)面或者刪除PDF中的空白頁(yè),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-11-11java隨機(jī)抽取指定范圍內(nèi)不重復(fù)的n個(gè)數(shù)
這篇文章主要為大家詳細(xì)介紹了java隨機(jī)抽取指定范圍內(nèi)不重復(fù)的n個(gè)數(shù),感興趣的小伙伴們可以參考一下2016-02-02