Netty之使用DelimiterBasedFrameDecoder進行消息分隔詳解
DelimiterBasedFrameDecoder消息分隔
在使用Netty進行TCP消息傳輸時,為了上層協(xié)議能夠?qū)ο⒄_區(qū)分,避免粘包和拆包導(dǎo)致的問題。
一般可以通過消息定長、將回車換行符作為消息結(jié)束符、將特殊的分隔符作為消息的結(jié)束標(biāo)志或者在消息頭中定義長度字段來標(biāo)識消息的總長度。
其中常用的通過分隔符作為消息的結(jié)束標(biāo)志就涉及到Netty的DelimiterBasedFrameDecoder類,服務(wù)端如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class EchoServer
{
public void bind(int port)throws Exception{
//配置服務(wù)端的NIO線程組
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try
{
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//控制臺輸出服務(wù)端運行日志
.handler(new LoggingHandler(LogLevel.INFO))
//編寫服務(wù)端接收和發(fā)送消息的具體邏輯
.childHandler(new ChildChannleHandler());
//綁定啟動端口,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服務(wù)端監(jiān)聽端口關(guān)閉
f.channel().closeFuture().sync();
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
//釋放線程資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
//服務(wù)端接收到客戶端的消息時會先執(zhí)行該類的initChannel()方法進行channel的初始化操作
private class ChildChannleHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0)
throws Exception
{
//創(chuàng)建分隔符緩沖對象,使用"$_"作為分隔符
ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
//創(chuàng)建DelimiterBasedFrameDecoder對象,將其加入到ChannelPipeline
//參數(shù)1024表示單條消息的最大長度,當(dāng)達到該長度仍然沒有找到分隔符就拋出TooLongFrame異常,第二個參數(shù)就是分隔符
//由于DelimiterBasedFrameDecoder自動對請求消息進行了解碼,下面的ChannelHandler接收到的msg對象就是完整的消息包
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//StringDecoder解碼器將ByteBuf解碼成字符串對象,這樣在ChannelHandlerAdapter中讀取消息時就不需要通過ByteBuf獲取了
arg0.pipeline().addLast(new StringDecoder());
//對網(wǎng)絡(luò)事件進行讀寫操作的類
arg0.pipeline().addLast(new EchoServerHandler());
}
}
public static void main(String[] args)throws Exception
{
int port =8888;
if (args!=null&&args.length>0)
{
port=Integer.valueOf(args[0]);
}
new EchoServer().bind(port);
}
}服務(wù)端消息讀寫操作:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//網(wǎng)絡(luò)I/O事件讀寫操作
public class EchoServerHandler extends ChannelHandlerAdapter
{
int counter=0;
//接收客戶端發(fā)送的消息并返回響應(yīng)
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
//獲取String類型的請求消息(StringDecoder已經(jīng)對消息進行解碼)
String body=(String)msg;
System.out.println("This is "+ ++counter+"times receive client : ["+body+"]");
//由于設(shè)置了DelimiterBasedFrameDecoder過濾掉了分隔符"$_", 因此需要將返回消息尾部拼接上分隔符
body+="$_";
//將接收到的消息再放到ByteBuf中重新發(fā)送給客戶端
ByteBuf buf=Unpooled.copiedBuffer(body.getBytes());
//把待發(fā)送的消息放到發(fā)送緩沖數(shù)組中,并把緩沖區(qū)中的消息全部寫入SockChannel發(fā)送給客戶端
ctx.writeAndFlush(buf);
}
//發(fā)生異常時關(guān)閉ChannelHandlerContext,釋放和ChannelHandlerContext相關(guān)聯(lián)的句柄等資源
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}客戶端:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
//客戶端
public class EchoClient
{
public void connect(int port,String host)throws Exception{
//創(chuàng)建客戶端進行I/O讀寫的線程組
EventLoopGroup g=new NioEventLoopGroup();
try
{
//創(chuàng)建客戶端啟動輔助類Bootstrap
Bootstrap b=new Bootstrap();
b.group(g)
//設(shè)置Channel
.channel(NioSocketChannel.class)
//配置Channel
.option(ChannelOption.TCP_NODELAY, true)
//添加處理類,這里為了方便直接使用了匿名內(nèi)部類
.handler(new ChannelInitializer<SocketChannel>()
{
//當(dāng)創(chuàng)建NioSocketChannel成功后,將ChannelHandler設(shè)置到ChannelPipeline中處理網(wǎng)絡(luò)I/O事件
@Override
protected void initChannel(SocketChannel arg0)
throws Exception
{
//與服務(wù)端相同,需要配置一系列的ChannelHandler
ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes());
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
arg0.pipeline().addLast(new StringDecoder());
//客戶端的處理類加入ChannelPipeline
arg0.pipeline().addLast(new EchoClientHandler());
}
});
//調(diào)用connect方法發(fā)起異步連接,并調(diào)用同步方法等待連接成功
ChannelFuture f=b.connect(host, port).sync();
//f.channel().writeAndFlush(Unpooled.wrappedBuffer("111$_".getBytes()));
//等待客戶端連接關(guān)閉
f.channel().closeFuture().sync();
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
//釋放線程組
g.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception
{
int port=8888;
if (args!=null&&args.length>0)
{
port=Integer.valueOf(args[0]);
}
new EchoClient().connect(port, "127.0.0.1");
}
}客戶端網(wǎng)絡(luò)I/O事件處理:
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
//客戶端讀寫網(wǎng)絡(luò)I/O事件類
public class EchoClientHandler extends ChannelHandlerAdapter
{
int counter;
//發(fā)送到服務(wù)端的消息,注意結(jié)尾的分隔符一定要和服務(wù)端配置的分隔符一致,否則服務(wù)端ChannelInitializer.initChannel()方法雖然能夠調(diào)用,但是DelimiterBasedFrameDecoder無法找到分隔符,不會調(diào)用讀取消息的channelRead方法
static final String ECHO_REQ="Hi,Welcome to Netty.$_";
public EchoClientHandler(){
}
//客戶端發(fā)送消息的方法
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception{
for (int i = 0; i < 10; i++ )
{
//Unpooled.copiedBuffer()方法是深克隆,也可以使用Unpooled.buffer()寫入消息發(fā)送
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
//讀取服務(wù)端發(fā)送的消息
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
String body=(String)msg;
System.out.println("This is "+ ++counter+" times receive server:["+body+"]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
//將消息發(fā)送隊列中的消息寫入到SocketChannel中發(fā)送給對方,channelActive使用了writeAndFlush這里可以不重寫
ctx.flush();
}
//異常處理,關(guān)閉ChannelHandlerContext
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}啟動服務(wù)端:

啟動客戶端發(fā)送消息:

到此這篇關(guān)于Netty之使用DelimiterBasedFrameDecoder進行消息分隔詳解的文章就介紹到這了,更多相關(guān)DelimiterBasedFrameDecoder進行消息分隔內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java通過 Socket 實現(xiàn) TCP服務(wù)端
這篇文章主要介紹了Java通過 Socket 實現(xiàn) TCP服務(wù)端的相關(guān)資料,需要的朋友可以參考下2017-05-05
Jenkins Pipeline 部署 SpringBoot 應(yīng)用的教程詳解
這篇文章主要介紹了Jenkins Pipeline 部署 SpringBoot 應(yīng)用的詳細教程,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07
SpringMVC中事務(wù)是否可以加在Controller層的問題
這篇文章主要介紹了SpringMVC中事務(wù)是否可以加在Controller層的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
Spring MVC請求參數(shù)接收的全面總結(jié)教程
這篇文章主要給大家總結(jié)介紹了關(guān)于Spring MVC請求參數(shù)接收的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08

