Netty之使用DelimiterBasedFrameDecoder進(jìn)行消息分隔詳解
DelimiterBasedFrameDecoder消息分隔
在使用Netty進(jìn)行TCP消息傳輸時(shí),為了上層協(xié)議能夠?qū)ο⒄_區(qū)分,避免粘包和拆包導(dǎo)致的問題。
一般可以通過消息定長、將回車換行符作為消息結(jié)束符、將特殊的分隔符作為消息的結(jié)束標(biāo)志或者在消息頭中定義長度字段來標(biāo)識(shí)消息的總長度。
其中常用的通過分隔符作為消息的結(jié)束標(biāo)志就涉及到Netty的DelimiterBasedFrameDecoder類,服務(wù)端如下:
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class EchoServer { public void bind(int port)throws Exception{ //配置服務(wù)端的NIO線程組 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //控制臺(tái)輸出服務(wù)端運(yùn)行日志 .handler(new LoggingHandler(LogLevel.INFO)) //編寫服務(wù)端接收和發(fā)送消息的具體邏輯 .childHandler(new ChildChannleHandler()); //綁定啟動(dòng)端口,同步等待成功 ChannelFuture f=b.bind(port).sync(); //等待服務(wù)端監(jiān)聽端口關(guān)閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放線程資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } //服務(wù)端接收到客戶端的消息時(shí)會(huì)先執(zhí)行該類的initChannel()方法進(jìn)行channel的初始化操作 private class ChildChannleHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel arg0) throws Exception { //創(chuàng)建分隔符緩沖對(duì)象,使用"$_"作為分隔符 ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes()); //創(chuàng)建DelimiterBasedFrameDecoder對(duì)象,將其加入到ChannelPipeline //參數(shù)1024表示單條消息的最大長度,當(dāng)達(dá)到該長度仍然沒有找到分隔符就拋出TooLongFrame異常,第二個(gè)參數(shù)就是分隔符 //由于DelimiterBasedFrameDecoder自動(dòng)對(duì)請(qǐng)求消息進(jìn)行了解碼,下面的ChannelHandler接收到的msg對(duì)象就是完整的消息包 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //StringDecoder解碼器將ByteBuf解碼成字符串對(duì)象,這樣在ChannelHandlerAdapter中讀取消息時(shí)就不需要通過ByteBuf獲取了 arg0.pipeline().addLast(new StringDecoder()); //對(duì)網(wǎng)絡(luò)事件進(jìn)行讀寫操作的類 arg0.pipeline().addLast(new EchoServerHandler()); } } public static void main(String[] args)throws Exception { int port =8888; if (args!=null&&args.length>0) { port=Integer.valueOf(args[0]); } new EchoServer().bind(port); } }
服務(wù)端消息讀寫操作:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //網(wǎng)絡(luò)I/O事件讀寫操作 public class EchoServerHandler extends ChannelHandlerAdapter { int counter=0; //接收客戶端發(fā)送的消息并返回響應(yīng) @Override public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ //獲取String類型的請(qǐng)求消息(StringDecoder已經(jīng)對(duì)消息進(jìn)行解碼) String body=(String)msg; System.out.println("This is "+ ++counter+"times receive client : ["+body+"]"); //由于設(shè)置了DelimiterBasedFrameDecoder過濾掉了分隔符"$_", 因此需要將返回消息尾部拼接上分隔符 body+="$_"; //將接收到的消息再放到ByteBuf中重新發(fā)送給客戶端 ByteBuf buf=Unpooled.copiedBuffer(body.getBytes()); //把待發(fā)送的消息放到發(fā)送緩沖數(shù)組中,并把緩沖區(qū)中的消息全部寫入SockChannel發(fā)送給客戶端 ctx.writeAndFlush(buf); } //發(fā)生異常時(shí)關(guān)閉ChannelHandlerContext,釋放和ChannelHandlerContext相關(guān)聯(lián)的句柄等資源 @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ cause.printStackTrace(); ctx.close(); } }
客戶端:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; //客戶端 public class EchoClient { public void connect(int port,String host)throws Exception{ //創(chuàng)建客戶端進(jìn)行I/O讀寫的線程組 EventLoopGroup g=new NioEventLoopGroup(); try { //創(chuàng)建客戶端啟動(dòng)輔助類Bootstrap Bootstrap b=new Bootstrap(); b.group(g) //設(shè)置Channel .channel(NioSocketChannel.class) //配置Channel .option(ChannelOption.TCP_NODELAY, true) //添加處理類,這里為了方便直接使用了匿名內(nèi)部類 .handler(new ChannelInitializer<SocketChannel>() { //當(dāng)創(chuàng)建NioSocketChannel成功后,將ChannelHandler設(shè)置到ChannelPipeline中處理網(wǎng)絡(luò)I/O事件 @Override protected void initChannel(SocketChannel arg0) throws Exception { //與服務(wù)端相同,需要配置一系列的ChannelHandler ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes()); arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); arg0.pipeline().addLast(new StringDecoder()); //客戶端的處理類加入ChannelPipeline arg0.pipeline().addLast(new EchoClientHandler()); } }); //調(diào)用connect方法發(fā)起異步連接,并調(diào)用同步方法等待連接成功 ChannelFuture f=b.connect(host, port).sync(); //f.channel().writeAndFlush(Unpooled.wrappedBuffer("111$_".getBytes())); //等待客戶端連接關(guān)閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放線程組 g.shutdownGracefully(); } } public static void main(String[] args)throws Exception { int port=8888; if (args!=null&&args.length>0) { port=Integer.valueOf(args[0]); } new EchoClient().connect(port, "127.0.0.1"); } }
客戶端網(wǎng)絡(luò)I/O事件處理:
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //客戶端讀寫網(wǎng)絡(luò)I/O事件類 public class EchoClientHandler extends ChannelHandlerAdapter { int counter; //發(fā)送到服務(wù)端的消息,注意結(jié)尾的分隔符一定要和服務(wù)端配置的分隔符一致,否則服務(wù)端ChannelInitializer.initChannel()方法雖然能夠調(diào)用,但是DelimiterBasedFrameDecoder無法找到分隔符,不會(huì)調(diào)用讀取消息的channelRead方法 static final String ECHO_REQ="Hi,Welcome to Netty.$_"; public EchoClientHandler(){ } //客戶端發(fā)送消息的方法 @Override public void channelActive(ChannelHandlerContext ctx)throws Exception{ for (int i = 0; i < 10; i++ ) { //Unpooled.copiedBuffer()方法是深克隆,也可以使用Unpooled.buffer()寫入消息發(fā)送 ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } //讀取服務(wù)端發(fā)送的消息 @Override public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ String body=(String)msg; System.out.println("This is "+ ++counter+" times receive server:["+body+"]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{ //將消息發(fā)送隊(duì)列中的消息寫入到SocketChannel中發(fā)送給對(duì)方,channelActive使用了writeAndFlush這里可以不重寫 ctx.flush(); } //異常處理,關(guān)閉ChannelHandlerContext @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ cause.printStackTrace(); ctx.close(); } }
啟動(dòng)服務(wù)端:
啟動(dòng)客戶端發(fā)送消息:
到此這篇關(guān)于Netty之使用DelimiterBasedFrameDecoder進(jìn)行消息分隔詳解的文章就介紹到這了,更多相關(guān)DelimiterBasedFrameDecoder進(jìn)行消息分隔內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- SpringBoot+netty-socketio實(shí)現(xiàn)服務(wù)器端消息推送
- Springboot+Netty+Websocket實(shí)現(xiàn)消息推送實(shí)例
- SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼
- SpringBoot+WebSocket+Netty實(shí)現(xiàn)消息推送的示例代碼
- Spring Boot實(shí)戰(zhàn)之netty-socketio實(shí)現(xiàn)簡單聊天室(給指定用戶推送消息)
相關(guān)文章
Java通過 Socket 實(shí)現(xiàn) TCP服務(wù)端
這篇文章主要介紹了Java通過 Socket 實(shí)現(xiàn) TCP服務(wù)端的相關(guān)資料,需要的朋友可以參考下2017-05-05Jenkins Pipeline 部署 SpringBoot 應(yīng)用的教程詳解
這篇文章主要介紹了Jenkins Pipeline 部署 SpringBoot 應(yīng)用的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07spring aop execution表達(dá)式的用法
這篇文章主要介紹了spring aop execution表達(dá)式的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07SpringMVC中事務(wù)是否可以加在Controller層的問題
這篇文章主要介紹了SpringMVC中事務(wù)是否可以加在Controller層的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02Spring MVC請(qǐng)求參數(shù)接收的全面總結(jié)教程
這篇文章主要給大家總結(jié)介紹了關(guān)于Spring MVC請(qǐng)求參數(shù)接收的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08