Netty解決半包和粘包問題的方案
什么是半包和粘包?
半包問題
半包問題是指一個(gè)完整的應(yīng)用層消息被分成多個(gè) TCP 數(shù)據(jù)包發(fā)送,接收端在一次讀取操作中只接收到消息的一部分。
例如,發(fā)送端發(fā)送了一條 100 字節(jié)的消息,但由于網(wǎng)絡(luò)原因,這條消息被拆分成了兩個(gè) TCP 數(shù)據(jù)包,一個(gè) 60 字節(jié),另一個(gè) 40 字節(jié)。接收端可能在第一次讀取時(shí)只接收到前 60 字節(jié)的數(shù)據(jù),剩下的 40 字節(jié)需要在后續(xù)的讀取操作中才能接收到。
粘包問題
粘包問題是指多個(gè)應(yīng)用層消息在傳輸過程中被粘在一起,接收端在一次讀取操作中接收到大于 1個(gè)消息的情況。
例如,發(fā)送端發(fā)送了兩條消息,每條 50 字節(jié),但接收端在一次讀取操作中收到了 80 字節(jié)的數(shù)據(jù),超過了 1條消息的內(nèi)容。
產(chǎn)生原因
產(chǎn)生半包和粘包問題主要是以下 3個(gè)原因:
- TCP 的流式特性:TCP 是面向字節(jié)流的協(xié)議,沒有消息邊界的概念,它保證數(shù)據(jù)的順序和可靠性,但不保證每次發(fā)送的數(shù)據(jù)對(duì)應(yīng)每次接收的數(shù)據(jù)。
- 網(wǎng)絡(luò)狀況:網(wǎng)絡(luò)的擁塞、延遲、抖動(dòng)等因素可能導(dǎo)致數(shù)據(jù)包的拆分和重組。
- 操作系統(tǒng)和緩沖區(qū):操作系統(tǒng) TCP/IP 協(xié)議棧和應(yīng)用程序的緩沖區(qū)大小也會(huì)影響數(shù)據(jù)的讀取方式。
示例
假設(shè)發(fā)送端發(fā)送了兩條消息:
- 消息1:Hello
- 消息2:World
在半包情況下,接收端可能會(huì)這樣接收:
- 第一次讀取:Hel
- 第二次讀?。簂oWo
- 第三次讀?。簉ld
在粘包情況下,接收端可能會(huì)這樣接收:
- 第一次讀?。篐elloWor
- 第二次讀取:ld
解決方案
基于固定長(zhǎng)度的解碼器
基于固定長(zhǎng)度的解碼器是指發(fā)消息時(shí),每條消息的長(zhǎng)度固定,讀消息時(shí)也通過固定長(zhǎng)度來讀取消息,從而解決半包和粘包問題。
實(shí)現(xiàn)方式
Netty 提供了 FixedLengthFrameDecoder 類來實(shí)現(xiàn)這一功能,核心源碼如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } }
注意點(diǎn)
使用定長(zhǎng)幀需要注意以下幾點(diǎn):
- 固定長(zhǎng)度:消息長(zhǎng)度必須是固定的,發(fā)送端需要確保消息長(zhǎng)度一致。如果長(zhǎng)度超出固定長(zhǎng)度,解包時(shí)消息就會(huì)錯(cuò)位,如果消息不足固定長(zhǎng)度,需要使用填充字符補(bǔ)齊。
- 填充字符:選擇合適的填充字符(如空格)來補(bǔ)齊消息長(zhǎng)度,接收端在處理時(shí)需要去除這些填充字符。
優(yōu)點(diǎn)
- 簡(jiǎn)單易實(shí)現(xiàn):實(shí)現(xiàn)起來非常簡(jiǎn)單,不需要額外的頭部信息或分隔符。
- 解析效率高:由于每個(gè)消息長(zhǎng)度固定,接收端解析時(shí)只需按照固定長(zhǎng)度讀取。
缺點(diǎn)
- 不靈活:消息長(zhǎng)度固定,可能會(huì)造成空間浪費(fèi)(如果消息長(zhǎng)度較短)或不足(如果消息長(zhǎng)度較長(zhǎng))。
- 適用場(chǎng)景有限:適用于固定格式和長(zhǎng)度的協(xié)議,不適用于可變長(zhǎng)度消息的場(chǎng)景。
示例
下面我們通過一個(gè)示例來展示使用定長(zhǎng)幀是如何解決半包粘包問題的。
發(fā)送端,確保每個(gè)消息的長(zhǎng)度固定。如果實(shí)際消息長(zhǎng)度不足,可以使用填充字符(如空格)來補(bǔ)齊。
public class FixedLengthFrameSender { private static final int FRAME_LENGTH = 10; // 固定消息長(zhǎng)度 public static void send(Channel channel, String message) { // 確保消息長(zhǎng)度不超過固定長(zhǎng)度 if (message.length() > FRAME_LENGTH) { throw new IllegalArgumentException("Message too long"); } // 使用空格填充消息到固定長(zhǎng)度 String paddedMessage = String.format("%-" + FRAME_LENGTH + "s", message); // 將消息轉(zhuǎn)換為字節(jié)數(shù)組并發(fā)送 ByteBuf buffer = Unpooled.copiedBuffer(paddedMessage.getBytes()); channel.writeAndFlush(buffer); } }
接收端,使用 Netty 提供的 FixedLengthFrameDecoder 解碼器來處理固定長(zhǎng)度的消息。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class FixedLengthFrameReceiver { private static final int FRAME_LENGTH = 10; // 固定消息長(zhǎng)度 public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 添加定長(zhǎng)幀解碼器 p.addLast(new FixedLengthFrameDecoder(FRAME_LENGTH)); // 添加自定義處理器 p.addLast(new FixedLengthFrameHandler()); } }); // 啟動(dòng)服務(wù)器 b.bind(8888).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class FixedLengthFrameHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; byte[] receivedBytes = new byte[in.readableBytes()]; in.readBytes(receivedBytes); String receivedMsg = new String(receivedBytes).trim(); // 去除填充字符 System.out.println("Received: " + receivedMsg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
基于換行符解碼器
自定義分隔符解碼器
基于換行符解碼器和自定義分隔符解碼器(比如 特殊字符)來劃分消息邊界,從而解決半包和粘包問題,使用者可以根據(jù)自己的需求靈活確定分隔符。
實(shí)現(xiàn)方式
Netty 提供了 DelimiterBasedFrameDecoder 類來實(shí)現(xiàn)這一功能,核心源碼如下:
public DelimiterBasedFrameDecoder( int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) { validateMaxFrameLength(maxFrameLength); ObjectUtil.checkNonEmpty(delimiters, "delimiters"); if (isLineBased(delimiters) && !isSubclass()) { lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else { this.delimiters = new ByteBuf[delimiters.length]; for (int i = 0; i < delimiters.length; i ++) { ByteBuf d = delimiters[i]; validateDelimiter(d); this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes()); } lineBasedDecoder = null; } this.maxFrameLength = maxFrameLength; this.stripDelimiter = stripDelimiter; this.failFast = failFast; }
注意點(diǎn)
- 分隔符選擇:選擇一個(gè)不會(huì)出現(xiàn)在消息內(nèi)容中的分隔符(如換行符 \n 或特定字符 |)。
- 消息格式:發(fā)送端在每個(gè)消息的末尾添加分隔符,確保接收端能夠正確解析消息邊界。
優(yōu)點(diǎn)
- 靈活性高:可以處理可變長(zhǎng)度的消息。
- 實(shí)現(xiàn)相對(duì)簡(jiǎn)單:只需在消息末尾添加特定的分隔符,接收端根據(jù)分隔符拆分消息。
缺點(diǎn)
- 分隔符沖突:如果消息內(nèi)容中包含分隔符,可能導(dǎo)致解析錯(cuò)誤,需要對(duì)消息內(nèi)容進(jìn)行轉(zhuǎn)義處理。
- 解析效率低:需要掃描整個(gè)數(shù)據(jù)流尋找分隔符,效率較低。
示例
下面我們通過一個(gè)示例來展示使用分隔符是如何解決半包粘包問題的。
發(fā)送端,確保每個(gè)消息以特定的分隔符結(jié)尾。常用的分隔符包括換行符(\n)、特定字符(如 |)等。
public class DelimiterBasedFrameSender { private static final String DELIMITER = "\n"; // 分隔符 public static void send(Channel channel, String message) { // 在消息末尾添加分隔符 String delimitedMessage = message + DELIMITER; // 將消息轉(zhuǎn)換為字節(jié)數(shù)組并發(fā)送 ByteBuf buffer = Unpooled.copiedBuffer(delimitedMessage.getBytes()); channel.writeAndFlush(buffer); } }
接收端,使用 Netty 提供的 DelimiterBasedFrameDecoder 解碼器來處理以分隔符結(jié)尾的消息。
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class DelimiterBasedFrameReceiver { private static final String DELIMITER = "\n"; // 分隔符 private static final int MAX_FRAME_LENGTH = 1024; // 最大幀長(zhǎng)度 public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 添加分隔符解碼器 ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes()); p.addLast(new DelimiterBasedFrameDecoder(MAX_FRAME_LENGTH, delimiter)); // 添加字符串解碼器 p.addLast(new StringDecoder()); // 添加自定義處理器 p.addLast(new DelimiterBasedFrameHandler()); } }); // 啟動(dòng)服務(wù)器 b.bind(8888).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class DelimiterBasedFrameHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String receivedMsg = (String) msg; System.out.println("Received: " + receivedMsg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
基于長(zhǎng)度字段的解碼器
基于長(zhǎng)度字段的解碼器是指在消息頭部添加長(zhǎng)度字段,指示消息的總長(zhǎng)度。
實(shí)現(xiàn)方式
Netty 提供了 LengthFieldBasedFrameDecoder 類來實(shí)現(xiàn)這一功能,核心源碼如下:
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < lengthFieldOffset + lengthFieldLength) { return; } in.markReaderIndex(); int length = in.getInt(in.readerIndex() + lengthFieldOffset); if (in.readableBytes() < lengthFieldOffset + lengthFieldLength + length) { in.resetReaderIndex(); return; } in.skipBytes(lengthFieldOffset + lengthFieldLength); ByteBuf frame = in.readBytes(length); out.add(frame); } }
關(guān)鍵點(diǎn)
- 長(zhǎng)度字段位置:長(zhǎng)度字段通常位于消息的頭部,用于指示消息的總長(zhǎng)度。
- 解碼器參數(shù):
- maxFrameLength:消息的最大長(zhǎng)度,防止內(nèi)存溢出。
- lengthFieldOffset:長(zhǎng)度字段在消息中的偏移量。
- lengthFieldLength:長(zhǎng)度字段的字節(jié)數(shù)(通常為 4 字節(jié))。
- lengthAdjustment:長(zhǎng)度調(diào)整值,如果長(zhǎng)度字段不包含消息頭的長(zhǎng)度,需要進(jìn)行調(diào)整。
- initialBytesToStrip:解碼后跳過的字節(jié)數(shù),通常為長(zhǎng)度字段的長(zhǎng)度。
優(yōu)點(diǎn)
- 靈活性高:支持可變長(zhǎng)度的消息。
- 解析效率高:通過長(zhǎng)度字段可以直接讀取完整消息,無需掃描整個(gè)數(shù)據(jù)流。
缺點(diǎn)
- 實(shí)現(xiàn)復(fù)雜:需要在消息頭部添加長(zhǎng)度字段,接收端需要解析頭部信息。
- 額外開銷:消息頭部的長(zhǎng)度字段會(huì)增加一些額外的字節(jié)數(shù)。
示例
下面我們通過一個(gè)示例來展示使用長(zhǎng)度字段是如何解決半包粘包問題的。
發(fā)送端,確保每個(gè)消息在發(fā)送前都包含長(zhǎng)度字段。長(zhǎng)度字段通常放在消息的頭部,用于指示消息的總長(zhǎng)度。
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; public class LengthFieldBasedFrameSender { public static void send(Channel channel, String message) { // 將消息轉(zhuǎn)換為字節(jié)數(shù)組 byte[] messageBytes = message.getBytes(); int messageLength = messageBytes.length; // 創(chuàng)建一個(gè) ByteBuf 來存儲(chǔ)長(zhǎng)度字段和消息內(nèi)容 ByteBuf buffer = Unpooled.buffer(4 + messageLength); // 寫入長(zhǎng)度字段(4 字節(jié),表示消息長(zhǎng)度) buffer.writeInt(messageLength); // 寫入消息內(nèi)容 buffer.writeBytes(messageBytes); // 發(fā)送消息 channel.writeAndFlush(buffer); } }
接收端,使用 Netty 提供的 LengthFieldBasedFrameDecoder 解碼器來處理包含長(zhǎng)度字段的消息。
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class LengthFieldBasedFrameReceiver { private static final int MAX_FRAME_LENGTH = 1024; // 最大幀長(zhǎng)度 public static void main(String[] args) throws Exception { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 添加長(zhǎng)度字段解碼器 p.addLast(new LengthFieldBasedFrameDecoder( MAX_FRAME_LENGTH, 0, 4, 0, 4)); // 添加字符串解碼器 p.addLast(new StringDecoder()); // 添加自定義處理器 p.addLast(new LengthFieldBasedFrameHandler()); } }); // 啟動(dòng)服務(wù)器 b.bind(8888).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static class LengthFieldBasedFrameHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String receivedMsg = (String) msg; System.out.println("Received: " + receivedMsg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
自定義解碼器
如果上述 Netty提供的方案無法滿足業(yè)務(wù)需求的話,Netty還提供了一個(gè)擴(kuò)展點(diǎn),使用者可以通過自定義解碼器來處理消息,
實(shí)現(xiàn)方式
例如,自定義頭部信息來表示消息長(zhǎng)度或結(jié)束標(biāo)志,示例代碼如下:
public class CustomProtocolDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 根據(jù)自定義協(xié)議解析消息 if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int length = in.readInt(); if (in.readableBytes() < length) { in.resetReaderIndex(); return; } ByteBuf frame = in.readBytes(length); out.add(frame); } }
優(yōu)點(diǎn)
- 高度靈活:可以根據(jù)具體需求設(shè)計(jì)協(xié)議,適應(yīng)各種復(fù)雜場(chǎng)景。
- 功能豐富:可以在自定義協(xié)議中添加其他信息(如校驗(yàn)和、序列號(hào)等),增強(qiáng)協(xié)議的功能和可靠性。
缺點(diǎn)
- 實(shí)現(xiàn)復(fù)雜:設(shè)計(jì)和實(shí)現(xiàn)自定義協(xié)議需要更多的工作量。
- 維護(hù)成本高:自定義協(xié)議可能需要更多的維護(hù)和更新工作。
總結(jié)
本文我們分析了產(chǎn)生半包和粘包的原因以及在Netty中的 5種解決方案:
- 基于固定長(zhǎng)度解碼器
- 基于換行符解碼器
- 自定義分隔符解碼器
- 基于長(zhǎng)度字段解碼器
- 自定義解碼器
通過學(xué)習(xí)這些內(nèi)容,我們不僅掌握了半包和粘包問題的理論知識(shí),同時(shí)學(xué)會(huì)了多種解決方法的具體實(shí)現(xiàn)。
以上就是Netty解決半包和粘包問題的方案的詳細(xì)內(nèi)容,更多關(guān)于Netty解決半包和粘包的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python機(jī)器學(xué)習(xí)三大件之二pandas
這篇文章主要介紹了Python機(jī)器學(xué)習(xí)三大件之二pandas,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)Python的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05java數(shù)據(jù)結(jié)構(gòu)之樹基本概念解析及代碼示例
這篇文章主要介紹了java數(shù)據(jù)結(jié)構(gòu)之樹基本概念解析及代碼示例,介紹了樹的定義,基本術(shù)語(yǔ),主要操作及實(shí)現(xiàn)等相關(guān)內(nèi)容,具有一定參考價(jià)值,需要的朋友可了解下。2017-11-11Java多線程導(dǎo)致CPU占用100%解決及線程池正確關(guān)閉方式
1000萬(wàn)表數(shù)據(jù)導(dǎo)入內(nèi)存數(shù)據(jù)庫(kù),按分頁(yè)大小10000查詢,多線程,15條線程跑,最后發(fā)現(xiàn)CPU占用100%卡死,那么如何解決,本文就來介紹一下,感興趣的朋友可以了解一下2021-05-05Java實(shí)現(xiàn)MySQL數(shù)據(jù)實(shí)時(shí)同步至Elasticsearch的方法詳解
MySQL擅長(zhǎng)事務(wù)處理,而Elasticsearch(ES)則專注于搜索與分析,將MySQL數(shù)據(jù)實(shí)時(shí)同步到ES,可以充分發(fā)揮兩者的優(yōu)勢(shì),下面我們就來看看如何使用Java實(shí)現(xiàn)這一功能吧2025-03-03Java并發(fā)LinkedBlockingQueue源碼分析
這篇文章主要為大家介紹了Java并發(fā)LinkedBlockingQueue源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02SpringBoot實(shí)現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot實(shí)現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-11-11基于java中的null類型---有關(guān)null的9件事
這篇文章主要介紹了java中的null類型---有關(guān)null的9件事,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Java本地高性能緩存的幾種常見實(shí)現(xiàn)方式
在Java中緩存是一種常用的性能優(yōu)化技術(shù),用于在應(yīng)用程序中加速訪問和查詢數(shù)據(jù)的速度,下面這篇文章主要給大家介紹了關(guān)于Java本地高性能緩存的幾種常見實(shí)現(xiàn)方式,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-07-07