Netty粘包拆包問題解決方案
TCP黏包拆包
TCP是一個流協(xié)議,就是沒有界限的一長串二進制數(shù)據(jù)。TCP作為傳輸層協(xié)議并不不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行數(shù)據(jù)包的劃分,所以在業(yè)務(wù)上認為是一個完整的包,可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。
怎么解決?
- • 消息定長度,傳輸?shù)臄?shù)據(jù)大小固定長度,例如每段的長度固定為100字節(jié),如果不夠空位補空格
- • 在數(shù)據(jù)包尾部添加特殊分隔符,比如下劃線,中劃線等
- • 將消息分為消息頭和消息體,消息頭中包含表示信息的總長度
Netty提供了多個解碼器,可以進行分包的操作,分別是:
- • LineBasedFrameDecoder (回車換行分包)
- • DelimiterBasedFrameDecoder(特殊分隔符分包)
- • FixedLengthFrameDecoder(固定長度報文來分包)
- • LengthFieldBasedFrameDecoder(自定義長度來分包)
制造粘包和拆包問題
為了驗證我們的解碼器能夠解決這種粘包和拆包帶來的問題,首先我們就制造一個這樣的問題,以此用來做對比。
服務(wù)端:
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("server:" + msg.toString());
ctx.writeAndFlush(msg.toString() + "你好" );
}
});
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = bootstrap.bind(2222).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
客戶端我們發(fā)送一個比較長的字符串,如果服務(wù)端收到的消息是一條,那么就是對的,如果是多條,那么就有問題了。
public static void main(String[] args) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Channel channel = null;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("client:" + msg.toString());
}
});
}
});
ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg);
} catch(Exception e) {
e.printStackTrace();
}
}
首先啟動服務(wù)端,然后再啟動客戶端,通過控制臺可以看到服務(wù)接收的數(shù)據(jù)分成了2次,這就是我們要解決的問題。
server:hello yinjihuanhello....
server:o yinjihuanhello...
LineBasedFrameDecoder
用LineBasedFrameDecoder 來解決需要在發(fā)送的數(shù)據(jù)結(jié)尾加上回車換行符,這樣LineBasedFrameDecoder 才知道這段數(shù)據(jù)有沒有讀取完整。
改造服務(wù)端代碼,只需加上LineBasedFrameDecoder 解碼器即可,構(gòu)造函數(shù)的參數(shù)是數(shù)據(jù)包的最大長度。
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(10240));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("server:" + msg.toString());
ctx.writeAndFlush(msg.toString() + "你好");
}
});
}
改造客戶端發(fā)送代碼,再數(shù)據(jù)后面加上回車換行符
ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg + System.getProperty("line.separator"));
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder和LineBasedFrameDecoder差不多,DelimiterBasedFrameDecoder可以自己定義需要分割的符號,比如下劃線,中劃線等等。
改造服務(wù)端代碼,只需加上DelimiterBasedFrameDecoder解碼器即可,構(gòu)造函數(shù)的參數(shù)是數(shù)據(jù)包的最大長度。我們用下劃線來分割。
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, Unpooled.copiedBuffer("_".getBytes())));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("server:" + msg.toString());
ctx.writeAndFlush(msg.toString() + "你好");
}
});
}
改造客戶端發(fā)送代碼,再數(shù)據(jù)后面加上下劃線
ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg + "_");
FixedLengthFrameDecoder
FixedLengthFrameDecoder是按固定的數(shù)據(jù)長度來進行解碼的,也就是說你客戶端發(fā)送的每條消息的長度是固定的,下面我們看看怎么使用。
服務(wù)端還是一樣,增加FixedLengthFrameDecoder解碼器即可。
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1500));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("server:" + msg.toString());
ctx.writeAndFlush(msg.toString() + "你好");
}
});
}
客戶端,msg輸出的長度就是1500
ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
msg.append("hello yinjihuan");
}
System.out.println(msg.length());
channel.writeAndFlush(msg);
服務(wù)端代碼:
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.err.println("server:" + msg.toString());
ctx.writeAndFlush(msg.toString() + "你好");
}
});
}
客戶端,直接發(fā)送就行
ChannelFuture f = b.connect("127.0.0.1", 2222).sync();
channel = f.channel();
StringBuilder msg = new StringBuilder();
for (int i = 0; i < 100; i++) {
msg.append("hello yinjihuan");
}
channel.writeAndFlush(msg);
源碼參考:https://github.com/yinjihuan/netty-im
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
基于servlet的執(zhí)行原理與生命周期(全面解析)
下面小編就為大家分享一篇servlet的執(zhí)行原理與生命周期全面解析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2017-12-12
spring整合redis以及使用RedisTemplate的方法
本篇文章主要介紹了spring整合redis以及使用RedisTemplate的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05
了解spring中的CloudNetflix Hystrix彈性客戶端
這篇文章主要介紹了了解spring中的CloudNetflix Hystrix彈性客戶端,客戶端彈性模式是在遠程服務(wù)發(fā)生錯誤或表現(xiàn)不佳時保護遠程資源(另一個微服務(wù)調(diào)用或者數(shù)據(jù)庫查詢)免于崩潰。,需要的朋友可以參考下2019-06-06
Spring?Cloud?Eureka基礎(chǔ)應(yīng)用及原理
這篇文章主要介紹了Spring?Cloud?Eureka基礎(chǔ)應(yīng)用,Eureka?Client中內(nèi)置一個負載均衡器,用來進行基本的負載均衡,下面我們將通過搭建一個簡單的Eureka例子來了解Eureka的運作原理,感興趣的朋友一起看看吧2022-05-05

