Netty?拆包沾包問題解決方案詳解
上一篇說到Springboot整合Netty,自定義協(xié)議實(shí)現(xiàn),本文聊一些拆包/沾包問題。
拆包/沾包問題
TCP是面向字節(jié)流的協(xié)議,在發(fā)送方發(fā)送的若干包數(shù)據(jù)到接收方接收時(shí),這些數(shù)據(jù)包可能會(huì)被粘成一個(gè)數(shù)據(jù)包,而從接收緩沖區(qū)看,后一包數(shù)據(jù)的頭緊接著前一包數(shù)據(jù)的尾,這就形成沾包問題。
但如果一次請(qǐng)求發(fā)送的數(shù)據(jù)量比較大,超過了緩沖區(qū)大小,TCP 就會(huì)將其拆分為多次發(fā)送,這就是拆包問題,也就是將一個(gè)大的包拆分為多個(gè)小包進(jìn)行發(fā)送,接收端接收到多個(gè)包才能組成一個(gè)完整數(shù)據(jù)。
為什么UDP沒有粘包
粘包/拆包問題在數(shù)據(jù)鏈路層、網(wǎng)絡(luò)層以及傳輸層都有可能發(fā)生。日常的網(wǎng)絡(luò)應(yīng)用開發(fā)大都在傳輸層進(jìn)行,由于UDP有消息保護(hù)邊界,不會(huì)發(fā)生粘包/拆包問題。
而TCP是面向字節(jié)流,沒有邊界,操作系統(tǒng)在發(fā)送 TCP 數(shù)據(jù)的時(shí)候,底層會(huì)有一個(gè)緩沖區(qū),通過這個(gè)緩沖區(qū)來進(jìn)行優(yōu)化,例如緩沖區(qū)為1024個(gè)字節(jié)大小,如果一次發(fā)送數(shù)據(jù)量小于1024,則會(huì)合并多個(gè)數(shù)據(jù)作為一個(gè)數(shù)據(jù)包發(fā)送;如果一次發(fā)送數(shù)據(jù)量大于1024,則會(huì)將這個(gè)包拆分成多個(gè)數(shù)據(jù)包進(jìn)行發(fā)送。上述兩種情況也是沾包和拆包問題。

上圖出現(xiàn)的四種情況包括:
- 正常發(fā)送,兩個(gè)包恰好滿足TCP緩沖區(qū)的大小或達(dá)到TCP等待時(shí)長(zhǎng),分別發(fā)送兩個(gè)包。
- 沾包:D1、D2都過小,兩者進(jìn)行了沾包處理。
- 拆包沾包:D2過大,進(jìn)行了拆包處理,而拆出去的一部分D2_1又與D1進(jìn)行粘包處理。
- 沾包拆包:D1過大,進(jìn)行了拆包處理,而拆出去的一部分D1_2又與D2進(jìn)行粘包處理。
解決方案
對(duì)于粘包和拆包問題,通常可以使用這四種解決方案:
- 使用固定數(shù)據(jù)長(zhǎng)度進(jìn)行發(fā)送,發(fā)送端將每個(gè)包都封裝成固定的長(zhǎng)度,比如100字節(jié)大小。如果不足100字節(jié)可通過補(bǔ)0等填充到指定長(zhǎng)度再發(fā)送。
- 發(fā)送端在每個(gè)包的末尾使用固定的分隔符,例如
##@##。如果發(fā)生拆包需等待多個(gè)包發(fā)送過來之后再找到其中的##@##進(jìn)行合并。如果發(fā)送沾包則找到其中的##@##進(jìn)行拆分。 - 將消息分為頭部和消息體,頭部中保存整個(gè)消息的長(zhǎng)度,這種情況下接收端只有在讀取到足夠長(zhǎng)度的消息之后,才算是接收到一個(gè)完整的消息。
- 通過自定義協(xié)議進(jìn)行粘包和拆包的處理。
Netty拆包沾包處理
Netty對(duì)解決粘包和拆包的方案做了抽象,提供了一些解碼器(Decoder)來解決粘包和拆包的問題。如:
LineBasedFrameDecoder:以行為單位進(jìn)行數(shù)據(jù)包的解碼,使用換行符\n或者\r\n作為依據(jù),遇到\n或者\r\n都認(rèn)為是一條完整的消息。
DelimiterBasedFrameDecoder:以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼。 FixedLengthFrameDecoder:以固定長(zhǎng)度進(jìn)行數(shù)據(jù)包的解碼。
LenghtFieldBasedFrameDecode:適用于消息頭包含消息長(zhǎng)度的協(xié)議(最常用)。
基于Netty進(jìn)行網(wǎng)絡(luò)讀寫的程序,可以直接使用這些Decoder來完成數(shù)據(jù)包的解碼。對(duì)于高并發(fā)、大流量的系統(tǒng)來說,每個(gè)數(shù)據(jù)包都不應(yīng)該傳輸多余的數(shù)據(jù)(所以補(bǔ)齊的方式不可?。?,LenghtFieldBasedFrameDecode更適合這樣的場(chǎng)景。
LineBasedFrameDecoder
使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中
/**
* 服務(wù)啟動(dòng)器
*
* @return
*/
@Bean
public ServerBootstrap serverBootstrap() {
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的線程組
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定連接超時(shí)時(shí)間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// 通過換行符處理沾包/拆包
.childHandler(new NettyServerLineBasedHandler());
return serverBootstrap;
}
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 使用LineBasedFrameDecoder解決粘包問題,其會(huì)根據(jù)"\n"或"\r\n"對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行拆分,封裝到不同的ByteBuf實(shí)例中,并且每次查找的最大長(zhǎng)度為1024字節(jié)
pipeline.addLast(new LineBasedFrameDecoder(1024, true, true));
// 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
pipeline.addLast(new MessageDecodeHandler());
// 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼
pipeline.addLast(new MessageEncodeHandler());
// 對(duì)數(shù)據(jù)進(jìn)行最終處理
pipeline.addLast(new ServerListenerHandler());
}
}
DelimiterBasedFrameDecoder
以特殊的符號(hào)作為分隔來進(jìn)行數(shù)據(jù)包的解碼,上文中就是以##@##作為分割符作為示例展開講解的。這里再粘貼一下關(guān)鍵代碼: 使用DelimiterBasedFrameDecoder處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié)。
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 數(shù)據(jù)分割符
String delimiterStr = "##@##";
ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
ChannelPipeline pipeline = socketChannel.pipeline();
// 使用自定義分隔符處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié)
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
pipeline.addLast(new MessageDecodeHandler());
// 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,并添加數(shù)據(jù)分隔符
pipeline.addLast(new MessageEncodeHandler(delimiterStr));
// 對(duì)數(shù)據(jù)進(jìn)行最終處理
pipeline.addLast(new ServerListenerHandler());
}
MessageEncodeHandler對(duì)發(fā)送數(shù)據(jù)進(jìn)行添加分割符并編碼操作
public class MessageEncodeHandler extends MessageToByteEncoder<Message> {
// 數(shù)據(jù)分割符
String delimiter;
public MessageEncodeHandler(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
}
}
FixedLengthFrameDecoder
服務(wù)端代碼設(shè)置,在NettyConfig配置中將worker處理器改為NettyServerFixedLengthHandler,使用固定100字節(jié)長(zhǎng)度處理消息。
/**
* 服務(wù)啟動(dòng)器
*
* @return
*/
@Bean
public ServerBootstrap serverBootstrap() {
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的線程組
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定連接超時(shí)時(shí)間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// 指定為固定長(zhǎng)度字節(jié)的處理器
.childHandler(new NettyServerFixedLengthHandler());
return serverBootstrap;
}
NettyServerFixedLengthHandler類代碼,使用FixedLengthFrameDecoder設(shè)置按固定100字節(jié)數(shù)去拆分接收到的ByteBuf。并自定義一個(gè)消息編碼器,對(duì)字節(jié)長(zhǎng)度不足100字節(jié)的消息進(jìn)行補(bǔ)0操作。
public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 固定字節(jié)長(zhǎng)度
Integer length = 100;
ChannelPipeline pipeline = socketChannel.pipeline();
// 按固定100字節(jié)數(shù)拆分接收到的ByteBuf的解碼器
pipeline.addLast(new FixedLengthFrameDecoder(length));
// 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
pipeline.addLast(new MessageDecodeHandler());
// 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行自定義編碼,并設(shè)置字節(jié)長(zhǎng)度不足補(bǔ)0
pipeline.addLast(new MessageEncodeFixedLengthHandler(length));
// 對(duì)數(shù)據(jù)進(jìn)行最終處理
pipeline.addLast(new ServerListenerHandler());
}
}
自定義MessageEncodeFixedLengthHandler編碼類,使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0。
public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> {
private int length;
public MessageEncodeFixedLengthHandler(int length) {
this.length = length;
}
/**
* 使用固定字節(jié)長(zhǎng)度編碼消息,字節(jié)長(zhǎng)度不足時(shí)補(bǔ)0
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link ByteBuf} into which the encoded message will be written
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
String jsonStr = msg.toJsonString();
// 如果長(zhǎng)度不足,則進(jìn)行補(bǔ)0
if (jsonStr.length() < length) {
jsonStr = addSpace(jsonStr);
}
// 使用Unpooled.wrappedBuffer實(shí)現(xiàn)零拷貝,將字符串轉(zhuǎn)為ByteBuf
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes()));
}
/**
* 如果沒有達(dá)到指定長(zhǎng)度進(jìn)行補(bǔ)0
*
* @param msg
* @return
*/
private String addSpace(String msg) {
StringBuilder builder = new StringBuilder(msg);
for (int i = 0; i < length - msg.length(); i++) {
builder.append(0);
}
return builder.toString();
}
}
LenghtFieldBasedFrameDecode
LenghtFieldBasedFrameDecode適用于消息頭包含消息長(zhǎng)度的協(xié)議,根據(jù)消息長(zhǎng)度判斷是否讀取完一個(gè)數(shù)據(jù)包。
/**
* 服務(wù)啟動(dòng)器
*
* @return
*/
@Bean
public ServerBootstrap serverBootstrap() {
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的線程組
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定連接超時(shí)時(shí)間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度
.childHandler(new NettyServerLenghtFieldBasedHandler());
return serverBootstrap;
}
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 請(qǐng)求頭包含數(shù)據(jù)長(zhǎng)度,根據(jù)長(zhǎng)度進(jìn)行沾包拆包處理
/**
* maxFrameLength:指定了每個(gè)包所能傳遞的最大數(shù)據(jù)包大??;
* lengthFieldOffset:指定了長(zhǎng)度字段在字節(jié)碼中的偏移量;
* lengthFieldLength:指定了長(zhǎng)度字段所占用的字節(jié)長(zhǎng)度;
* lengthAdjustment:對(duì)一些不僅包含有消息頭和消息體的數(shù)據(jù)進(jìn)行消息頭的長(zhǎng)度的調(diào)整,這樣就可以只得到消息體的數(shù)據(jù),這里的lengthAdjustment指定的就是消息頭的長(zhǎng)度;
* initialBytesToStrip:對(duì)于長(zhǎng)度字段在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長(zhǎng)度字段占用的字節(jié)。
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// 在請(qǐng)求頭添加字節(jié)長(zhǎng)度字段
pipeline.addLast(new LengthFieldPrepender(2));
// 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例
pipeline.addLast(new MessageDecodeHandler());
// 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,字節(jié)長(zhǎng)度不足補(bǔ)0
pipeline.addLast(new MessageEncodeHandler());
// 對(duì)數(shù)據(jù)進(jìn)行最終處理
pipeline.addLast(new ServerListenerHandler());
}
}
總結(jié)
造成TCP協(xié)議粘包/拆包問題的原因是TCP協(xié)議數(shù)據(jù)傳輸是基于字節(jié)流的,它不包含消息、數(shù)據(jù)包等概念,是無界的,需要應(yīng)用層協(xié)議自己設(shè)計(jì)消息的邊界,即消息幀(Message Framing)。如果應(yīng)用層協(xié)議沒有使用基于長(zhǎng)度或者基于分隔符(終結(jié)符)劃分邊界等方式進(jìn)行處理,則會(huì)導(dǎo)致多個(gè)消息的粘包和拆包。
以上就是Netty 拆包沾包問題解決方案示例的詳細(xì)內(nèi)容,更多關(guān)于Netty 拆包沾包解決方案的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot2整合activiti6環(huán)境搭建過程解析
這篇文章主要介紹了SpringBoot2整合activiti6環(huán)境搭建過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
spring boot入門開始你的第一個(gè)應(yīng)用
這篇文章主要介紹了spring boot入門開始你的第一個(gè)應(yīng)用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06
簡(jiǎn)單工廠模式_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了簡(jiǎn)單工廠模式的相關(guān)資料,和大家一起學(xué)習(xí)靜態(tài)工廠方法模式,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08
java學(xué)生信息管理系統(tǒng)MVC架構(gòu)詳解
這篇文章主要為大家詳細(xì)介紹了java學(xué)生信息管理系統(tǒng)MVC架構(gòu)的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-11-11
Spring boot從安裝到交互功能實(shí)現(xiàn)零基礎(chǔ)全程詳解
這篇文章主要介紹了Spring boot從安裝到交互功能得實(shí)現(xiàn)全程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Java狀態(tài)機(jī)的一種優(yōu)雅寫法分享
狀態(tài)機(jī)是一種數(shù)學(xué)模型,對(duì)于我們業(yè)務(wù)實(shí)現(xiàn)有很大的幫助。我們可以用非常多的方法實(shí)現(xiàn)狀態(tài)機(jī),這篇文章就來介紹一個(gè)狀態(tài)機(jī)優(yōu)雅的實(shí)現(xiàn)方法,希望對(duì)大家有所幫助2023-04-04
java隨機(jī)抽取指定范圍內(nèi)不重復(fù)的n個(gè)數(shù)
這篇文章主要為大家詳細(xì)介紹了java隨機(jī)抽取指定范圍內(nèi)不重復(fù)的n個(gè)數(shù),感興趣的小伙伴們可以參考一下2016-02-02

