Netty之使用DelimiterBasedFrameDecoder進(jìn)行消息分隔詳解
DelimiterBasedFrameDecoder消息分隔
在使用Netty進(jìn)行TCP消息傳輸時(shí),為了上層協(xié)議能夠?qū)ο⒄_區(qū)分,避免粘包和拆包導(dǎo)致的問(wèn)題。
一般可以通過(guò)消息定長(zhǎng)、將回車(chē)換行符作為消息結(jié)束符、將特殊的分隔符作為消息的結(jié)束標(biāo)志或者在消息頭中定義長(zhǎng)度字段來(lái)標(biāo)識(shí)消息的總長(zhǎng)度。
其中常用的通過(guò)分隔符作為消息的結(jié)束標(biāo)志就涉及到Netty的DelimiterBasedFrameDecoder類(lèi),服務(wù)端如下:
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class EchoServer { public void bind(int port)throws Exception{ //配置服務(wù)端的NIO線程組 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //控制臺(tái)輸出服務(wù)端運(yùn)行日志 .handler(new LoggingHandler(LogLevel.INFO)) //編寫(xiě)服務(wù)端接收和發(fā)送消息的具體邏輯 .childHandler(new ChildChannleHandler()); //綁定啟動(dòng)端口,同步等待成功 ChannelFuture f=b.bind(port).sync(); //等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放線程資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } //服務(wù)端接收到客戶(hù)端的消息時(shí)會(huì)先執(zhí)行該類(lèi)的initChannel()方法進(jìn)行channel的初始化操作 private class ChildChannleHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel arg0) throws Exception { //創(chuàng)建分隔符緩沖對(duì)象,使用"$_"作為分隔符 ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes()); //創(chuàng)建DelimiterBasedFrameDecoder對(duì)象,將其加入到ChannelPipeline //參數(shù)1024表示單條消息的最大長(zhǎng)度,當(dāng)達(dá)到該長(zhǎng)度仍然沒(méi)有找到分隔符就拋出TooLongFrame異常,第二個(gè)參數(shù)就是分隔符 //由于DelimiterBasedFrameDecoder自動(dòng)對(duì)請(qǐng)求消息進(jìn)行了解碼,下面的ChannelHandler接收到的msg對(duì)象就是完整的消息包 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //StringDecoder解碼器將ByteBuf解碼成字符串對(duì)象,這樣在ChannelHandlerAdapter中讀取消息時(shí)就不需要通過(guò)ByteBuf獲取了 arg0.pipeline().addLast(new StringDecoder()); //對(duì)網(wǎng)絡(luò)事件進(jìn)行讀寫(xiě)操作的類(lèi) arg0.pipeline().addLast(new EchoServerHandler()); } } public static void main(String[] args)throws Exception { int port =8888; if (args!=null&&args.length>0) { port=Integer.valueOf(args[0]); } new EchoServer().bind(port); } }
服務(wù)端消息讀寫(xiě)操作:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //網(wǎng)絡(luò)I/O事件讀寫(xiě)操作 public class EchoServerHandler extends ChannelHandlerAdapter { int counter=0; //接收客戶(hù)端發(fā)送的消息并返回響應(yīng) @Override public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ //獲取String類(lèi)型的請(qǐng)求消息(StringDecoder已經(jīng)對(duì)消息進(jìn)行解碼) String body=(String)msg; System.out.println("This is "+ ++counter+"times receive client : ["+body+"]"); //由于設(shè)置了DelimiterBasedFrameDecoder過(guò)濾掉了分隔符"$_", 因此需要將返回消息尾部拼接上分隔符 body+="$_"; //將接收到的消息再放到ByteBuf中重新發(fā)送給客戶(hù)端 ByteBuf buf=Unpooled.copiedBuffer(body.getBytes()); //把待發(fā)送的消息放到發(fā)送緩沖數(shù)組中,并把緩沖區(qū)中的消息全部寫(xiě)入SockChannel發(fā)送給客戶(hù)端 ctx.writeAndFlush(buf); } //發(fā)生異常時(shí)關(guān)閉ChannelHandlerContext,釋放和ChannelHandlerContext相關(guān)聯(lián)的句柄等資源 @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ cause.printStackTrace(); ctx.close(); } }
客戶(hù)端:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; //客戶(hù)端 public class EchoClient { public void connect(int port,String host)throws Exception{ //創(chuàng)建客戶(hù)端進(jìn)行I/O讀寫(xiě)的線程組 EventLoopGroup g=new NioEventLoopGroup(); try { //創(chuàng)建客戶(hù)端啟動(dòng)輔助類(lèi)Bootstrap Bootstrap b=new Bootstrap(); b.group(g) //設(shè)置Channel .channel(NioSocketChannel.class) //配置Channel .option(ChannelOption.TCP_NODELAY, true) //添加處理類(lèi),這里為了方便直接使用了匿名內(nèi)部類(lèi) .handler(new ChannelInitializer<SocketChannel>() { //當(dāng)創(chuàng)建NioSocketChannel成功后,將ChannelHandler設(shè)置到ChannelPipeline中處理網(wǎng)絡(luò)I/O事件 @Override protected void initChannel(SocketChannel arg0) throws Exception { //與服務(wù)端相同,需要配置一系列的ChannelHandler ByteBuf delimiter=Unpooled.copiedBuffer("$_".getBytes()); arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); arg0.pipeline().addLast(new StringDecoder()); //客戶(hù)端的處理類(lèi)加入ChannelPipeline arg0.pipeline().addLast(new EchoClientHandler()); } }); //調(diào)用connect方法發(fā)起異步連接,并調(diào)用同步方法等待連接成功 ChannelFuture f=b.connect(host, port).sync(); //f.channel().writeAndFlush(Unpooled.wrappedBuffer("111$_".getBytes())); //等待客戶(hù)端連接關(guān)閉 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //釋放線程組 g.shutdownGracefully(); } } public static void main(String[] args)throws Exception { int port=8888; if (args!=null&&args.length>0) { port=Integer.valueOf(args[0]); } new EchoClient().connect(port, "127.0.0.1"); } }
客戶(hù)端網(wǎng)絡(luò)I/O事件處理:
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //客戶(hù)端讀寫(xiě)網(wǎng)絡(luò)I/O事件類(lèi) public class EchoClientHandler extends ChannelHandlerAdapter { int counter; //發(fā)送到服務(wù)端的消息,注意結(jié)尾的分隔符一定要和服務(wù)端配置的分隔符一致,否則服務(wù)端ChannelInitializer.initChannel()方法雖然能夠調(diào)用,但是DelimiterBasedFrameDecoder無(wú)法找到分隔符,不會(huì)調(diào)用讀取消息的channelRead方法 static final String ECHO_REQ="Hi,Welcome to Netty.$_"; public EchoClientHandler(){ } //客戶(hù)端發(fā)送消息的方法 @Override public void channelActive(ChannelHandlerContext ctx)throws Exception{ for (int i = 0; i < 10; i++ ) { //Unpooled.copiedBuffer()方法是深克隆,也可以使用Unpooled.buffer()寫(xiě)入消息發(fā)送 ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } //讀取服務(wù)端發(fā)送的消息 @Override public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ String body=(String)msg; System.out.println("This is "+ ++counter+" times receive server:["+body+"]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{ //將消息發(fā)送隊(duì)列中的消息寫(xiě)入到SocketChannel中發(fā)送給對(duì)方,channelActive使用了writeAndFlush這里可以不重寫(xiě) ctx.flush(); } //異常處理,關(guān)閉ChannelHandlerContext @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ cause.printStackTrace(); ctx.close(); } }
啟動(dòng)服務(wù)端:
啟動(dòng)客戶(hù)端發(fā)送消息:
到此這篇關(guān)于Netty之使用DelimiterBasedFrameDecoder進(jìn)行消息分隔詳解的文章就介紹到這了,更多相關(guān)DelimiterBasedFrameDecoder進(jìn)行消息分隔內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- SpringBoot+netty-socketio實(shí)現(xiàn)服務(wù)器端消息推送
- Springboot+Netty+Websocket實(shí)現(xiàn)消息推送實(shí)例
- SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼
- SpringBoot+WebSocket+Netty實(shí)現(xiàn)消息推送的示例代碼
- Spring Boot實(shí)戰(zhàn)之netty-socketio實(shí)現(xiàn)簡(jiǎn)單聊天室(給指定用戶(hù)推送消息)
相關(guān)文章
零基礎(chǔ)寫(xiě)Java知乎爬蟲(chóng)之進(jìn)階篇
前面幾篇文章,我們都是簡(jiǎn)單的實(shí)現(xiàn)了java爬蟲(chóng)抓取內(nèi)容的問(wèn)題,那么如果遇到復(fù)雜情況,我們還能繼續(xù)那么做嗎?答案當(dāng)然是否定的,之前的僅僅是入門(mén)篇,都是些基礎(chǔ)知識(shí),給大家練手用的,本文我們就來(lái)點(diǎn)高大上的東西2014-11-11Java通過(guò) Socket 實(shí)現(xiàn) TCP服務(wù)端
這篇文章主要介紹了Java通過(guò) Socket 實(shí)現(xiàn) TCP服務(wù)端的相關(guān)資料,需要的朋友可以參考下2017-05-05Jenkins Pipeline 部署 SpringBoot 應(yīng)用的教程詳解
這篇文章主要介紹了Jenkins Pipeline 部署 SpringBoot 應(yīng)用的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07spring aop execution表達(dá)式的用法
這篇文章主要介紹了spring aop execution表達(dá)式的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07SpringMVC中事務(wù)是否可以加在Controller層的問(wèn)題
這篇文章主要介紹了SpringMVC中事務(wù)是否可以加在Controller層的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02Spring MVC請(qǐng)求參數(shù)接收的全面總結(jié)教程
這篇文章主要給大家總結(jié)介紹了關(guān)于Spring MVC請(qǐng)求參數(shù)接收的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08