Netty實(shí)現(xiàn)自定義協(xié)議編解碼器
為什么要自定義協(xié)議
Netty自帶了一些編解碼器沒,比如 StringDecode,StringEncoder,在實(shí)際業(yè)務(wù)中,協(xié)議往往需要攜帶一些我們自定義的屬性,比如版本號(hào),imei號(hào),appId等,這時(shí)候Netty提供的編解碼器就無法滿足我們的需求,所以我們需要自定義協(xié)議和自定義的編解碼器
自定義協(xié)議設(shè)計(jì)
我們可以仿造HTTP協(xié)議,比如 請(qǐng)求頭 + 請(qǐng)求體 的格式
請(qǐng)求頭
HTTP協(xié)議的請(qǐng)求頭有 請(qǐng)求方法(GET,POST),版本號(hào)等,既然是自定義協(xié)議,那么肯定是要滿足自己實(shí)際業(yè)務(wù)需求的,所以我們的請(qǐng)求頭包含以下信息,也可以根據(jù)自己的業(yè)務(wù)去添加一些自定義的屬性
commond: 指令,比如說你發(fā)送給Netty的消息是【登錄】還是【單聊消息】
或者是【群發(fā)消息】又或者是【踢人下線】的請(qǐng)求.
version:版本號(hào),在后期如果升級(jí)版本的話,要兼容老版本,我們可以做判斷,如果是老版本的就走A邏輯分支,新版本就走B邏輯分支
clientType:客戶端訪問我們的IM系統(tǒng)是通過WEb端,還是IOS,或者是Android端
messageType:將客戶端發(fā)送的數(shù)據(jù)解析成哪種格式,比如JSON,Protobuf,還是Xml格式
imeiLen:imei號(hào)的長度(imei號(hào)在請(qǐng)求體中)
appId:我們的IM是以服務(wù)的方式提供出去的,我們需要知道這個(gè)請(qǐng)求是從哪個(gè)服務(wù)進(jìn)來的,每個(gè)服務(wù)都有一個(gè)自定義唯一的appId
bodyLen:我們的數(shù)據(jù)長度
請(qǐng)求體
imei號(hào):登錄設(shè)備的唯一標(biāo)識(shí),雖然有了clientType來判斷是從WEB端還是IOS端訪問的,
但是并不知道是從哪臺(tái)設(shè)備登錄的,后期我們要做踢人下線,比如一個(gè)賬號(hào)只能一臺(tái)設(shè)備登錄,
或者是一個(gè)賬號(hào)能同時(shí)登錄WEB,或者是IOS端或者是Android端,我們就需要跟clientType一起判斷
data:我們要發(fā)送的數(shù)據(jù)
自定義協(xié)議實(shí)現(xiàn)
1:創(chuàng)建一個(gè)Maven項(xiàng)目,引入Netty依賴,完整的依賴如下
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.69.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.0.M2</version> </dependency> </dependencies>
2:實(shí)現(xiàn)我們的協(xié)議請(qǐng)求頭
package com.chat.model; import lombok.Data; import java.io.Serializable; @Data public class MessageHead implements Serializable { /** * 指令 */ private Integer commond; /** * 版本號(hào) */ private Integer version; /** * clientType(WEB,IOS,Android) */ private Integer clientType; /** * 數(shù)據(jù)解析類型 和具體業(yè)務(wù)無關(guān),后續(xù)根據(jù)解析類型解析data數(shù)據(jù) 0x0:Json,0x1:ProtoBuf,0x2:Xml,默認(rèn):0x0 */ private Integer messageType = 0x0; /** * imei號(hào)長度 */ private Integer imeiLen; /** * appId */ private Integer appId; /** * bodyLen,數(shù)據(jù)長度 */ private Integer bodyLen; }
3:實(shí)現(xiàn)我們的協(xié)議請(qǐng)求體
package com.chat.model; import lombok.Data; import java.io.Serializable; @Data public class MessageBody implements Serializable { /** * imei號(hào) */ private String imei; /** * 數(shù)據(jù) */ private Object data; }
4:實(shí)現(xiàn)我們的協(xié)議請(qǐng)求類
package com.chat.model; import lombok.Data; import java.io.Serializable; // Message就是我們Netty服務(wù)接收到的完整的(請(qǐng)求頭+請(qǐng)求體)數(shù)據(jù)包 @Data public class Message implements Serializable { private MessageHead messageHead; private MessageBody messageBody; }
5:實(shí)現(xiàn)自定義編碼器
package com.chat.codec; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 自定義編碼器 */ public class MessageDecoder extends ByteToMessageDecoder { /** * 協(xié)議格式:請(qǐng)求頭 +imei號(hào) + 請(qǐng)求體 * 請(qǐng)求頭: 指令(commond) + 版本號(hào) + clientType + 消息解析類型 + imei長度 + appId + bodyLen * 指令:這條消息是做什么的,比如是登錄,還是群發(fā)消息,還是單聊消息,還是踢人下線.... * 版本號(hào):協(xié)議的版本號(hào),對(duì)于版本升級(jí)有幫助,比如A版本的走A邏輯,B版本的走B邏輯 * clientType:web端,IOS,Android * 消息解析類型:把這條消息解析成什么樣的類型,有JSON,還是String等 * imei:雖然有clientType來標(biāo)識(shí)出該用戶是從WEB訪問的還是IOS或者Android端登錄的,但是這時(shí)候有二臺(tái)IOS手機(jī)登錄你就分辨不了了 * 所以imei號(hào)是設(shè)備的唯一標(biāo)識(shí),這樣可以在用戶多端登錄的時(shí)候踢人下線,來實(shí)現(xiàn)一個(gè)賬號(hào)只能一臺(tái)設(shè)備登錄 * appId:如果我們的IM系統(tǒng)是以服務(wù)方式提供的,appId表示的是哪個(gè)服務(wù)來訪問的 * bodyLen:數(shù)據(jù)長度 * 所以請(qǐng)求頭的長度是:7 * 4 = 28字節(jié) */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception { //我們的請(qǐng)求頭有7個(gè)屬性,每個(gè)屬性都是int型,所以占4個(gè)字節(jié),如果小于28個(gè)字節(jié)說明這個(gè)請(qǐng)求數(shù)據(jù)是有問題的, if(in.readableBytes() < 28) { return; } //拿到指令 int command = in.readInt(); //拿到版本號(hào) int version = in.readInt(); //拿到clientType int clientType = in.readInt(); //拿到消息解析類型 int messageType = in.readInt(); //拿到imei號(hào)的長度 int imeiLen = in.readInt(); //拿到appId int appId = in.readInt(); //拿到數(shù)據(jù)內(nèi)容長度 int bodyLen = in.readInt(); //我們的數(shù)據(jù)是以流的形式讀取的,當(dāng)讀取到的數(shù)據(jù)長度小于 imei號(hào)長度+data長度,說明還沒有獲取到完整的請(qǐng)求數(shù)據(jù),需要重新再次讀取接下來TCP發(fā)送過來的數(shù)據(jù),直到等于了就代表 //我們已經(jīng)讀取到一條完整的數(shù)據(jù)了,其實(shí)這也是一種解決TCP粘包和拆包的問題 if(in.readableBytes() < (bodyLen + imeiLen)) { //表示讀取的數(shù)據(jù)還不夠 in.resetReaderIndex(); return; } //通過imei號(hào)長度讀取imei號(hào) byte[] imeiData = new byte[imeiLen]; in.readBytes(imeiData); String imei = new String(imeiData); //通過bodyLen讀取數(shù)據(jù)內(nèi)容 byte[] bodyData = new byte[bodyLen]; in.readBytes(bodyData); /** * 設(shè)置請(qǐng)求頭 */ MessageHead messageHead = new MessageHead(); messageHead.setCommond(command); messageHead.setAppId(appId); messageHead.setBodyLen(bodyData.length); messageHead.setImeiLen(imeiData.length); messageHead.setVersion(version); messageHead.setClientType(clientType); messageHead.setMessageType(messageType); /** * 設(shè)置請(qǐng)求體 */ MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); Message message = new Message(); message.setMessageHead(messageHead); /** * 根據(jù)messageType來封裝請(qǐng)求數(shù)據(jù) */ if(messageType == 0x0) { //解析成JSON格式 String body = new String(bodyData); com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject(); jsonObject.put("body",body); messageBody.setData(jsonObject); }else if(messageType == 0x1) { //解析成Protobuf }else if(messageType == 0x2) { //解析成Xml } message.setMessageBody(messageBody); //更新讀索引 in.markReaderIndex(); //最后通過管道寫出去 out.add(message); } }
6:實(shí)現(xiàn)自定義解碼器
package com.chat.codec; import com.chat.model.Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.Charset; public class MessageEncoder extends MessageToByteEncoder<Message> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { out.writeInt(message.getMessageHead().getCommond()); out.writeInt(message.getMessageHead().getVersion()); out.writeInt(message.getMessageHead().getClientType()); out.writeInt(message.getMessageHead().getMessageType()); out.writeInt(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")).length); out.writeInt(message.getMessageHead().getAppId()); out.writeInt(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")).length); out.writeBytes(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8"))); out.writeBytes(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8"))); } }
7:Netty Server端
package com.chat.server; import com.chat.codec.MessageDecoder; import com.chat.codec.MessageEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) throws Exception{ // 創(chuàng)建兩個(gè)線程組bossGroup和workerGroup, 含有的子線程N(yùn)ioEventLoop的個(gè)數(shù)默認(rèn)為cpu核數(shù)的兩倍 // bossGroup只是處理連接請(qǐng)求 ,真正的和客戶端業(yè)務(wù)處理,會(huì)交給workerGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(3); EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { // 創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象 ServerBootstrap bootstrap = new ServerBootstrap(); // 使用鏈?zhǔn)骄幊虂砼渲脜?shù) bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線程組 // 使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn) .channel(NioServerSocketChannel.class) // 初始化服務(wù)器連接隊(duì)列大小,服務(wù)端處理客戶端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶端連接。 // 多個(gè)客戶端同時(shí)來的時(shí)候,服務(wù)端將不能處理的客戶端連接請(qǐng)求放在隊(duì)列中等待處理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象,設(shè)置初始化參數(shù),在 SocketChannel 建立起來之前執(zhí)行 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(new MyServerHandler()); //ch.pipeline().addLast(new ServerHandler()); } }); System.out.println("netty server start。。"); // 綁定一個(gè)端口并且同步, 生成了一個(gè)ChannelFuture異步對(duì)象,通過isDone()等方法可以判斷異步事件的執(zhí)行情況 // 啟動(dòng)服務(wù)器(并綁定端口),bind是異步操作,sync方法是等待異步操作執(zhí)行完畢 ChannelFuture cf = bootstrap.bind(9000).sync(); // 給cf注冊(cè)監(jiān)聽器,監(jiān)聽我們關(guān)心的事件 /*cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("監(jiān)聽端口9000成功"); } else { System.out.println("監(jiān)聽端口9000失敗"); } } });*/ // 等待服務(wù)端監(jiān)聽端口關(guān)閉,closeFuture是異步操作 // 通過sync方法同步等待通道關(guān)閉處理完畢,這里會(huì)阻塞等待通道關(guān)閉完成,內(nèi)部調(diào)用的是Object的wait()方法 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
8:Netty Server端處理器
package com.chat.server; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyServerHandler extends SimpleChannelInboundHandler<Message> { private final static Logger logger = LoggerFactory.getLogger(MyServerHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception { System.out.println("這是客戶端發(fā)送的消息" + JSONUtil.toJsonPrettyStr(message)); Message messageResponse = new Message(); MessageHead messageHead = new MessageHead(); messageHead.setCommond(9988); messageHead.setMessageType(0x0); messageHead.setClientType(1); messageHead.setVersion(2); messageHead.setAppId(3); String msg = "這是服務(wù)端發(fā)送給你的消息"; messageHead.setBodyLen(msg.getBytes().length); String imei = "12-euri-1234"; messageHead.setImeiLen(imei.getBytes().length); MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); messageBody.setData(msg); messageResponse.setMessageHead(messageHead); messageResponse.setMessageBody(messageBody); ctx.writeAndFlush(messageResponse); } }
9:Netty Client端處理器
package com.chat.client; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ClientHandler extends SimpleChannelInboundHandler<Message> { /** * 當(dāng)客戶端連接服務(wù)器完成就會(huì)觸發(fā)該方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) { for(int i = 0; i < 20; i ++) { Message message = new Message(); MessageHead messageHead = new MessageHead(); messageHead.setCommond(9988); messageHead.setMessageType(0x0); messageHead.setClientType(1); messageHead.setVersion(2); messageHead.setAppId(3); String msg = "hello-" + i; messageHead.setBodyLen(msg.getBytes().length); String imei = "12-euri"; messageHead.setImeiLen(imei.getBytes().length); MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); messageBody.setData(msg); message.setMessageHead(messageHead); message.setMessageBody(messageBody); ctx.writeAndFlush(message); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } //當(dāng)通道有讀取事件時(shí)會(huì)觸發(fā),即服務(wù)端發(fā)送數(shù)據(jù)給客戶端 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception { System.out.println(JSONUtil.toJsonPrettyStr(message)); } }
10:Netty Client端
package com.chat.client; import com.chat.codec.MessageDecoder; import com.chat.codec.MessageEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Client { public static void main(String[] args) throws Exception{ //客戶端需要一個(gè)事件循環(huán)組 EventLoopGroup group = new NioEventLoopGroup(); try { //創(chuàng)建客戶端啟動(dòng)對(duì)象 //注意客戶端使用的不是ServerBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); //設(shè)置相關(guān)參數(shù) bootstrap.group(group) //設(shè)置線程組 .channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實(shí)現(xiàn) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入處理器 ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); System.out.println("netty client start。。"); //啟動(dòng)客戶端去連接服務(wù)器端 ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); //對(duì)通道關(guān)閉進(jìn)行監(jiān)聽 cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
11:測(cè)試
1: 先啟動(dòng)Server端的main方法
2:再啟動(dòng)Client端的main方法
3:查看控制臺(tái)
服務(wù)端控制臺(tái):
客戶端控制臺(tái):
完整代碼
全部代碼就是下圖這幾個(gè)類,上面已經(jīng)貼出每個(gè)類的全部代碼,直接復(fù)制就行了
以上就是Netty實(shí)現(xiàn)自定義協(xié)議編解碼器的詳細(xì)內(nèi)容,更多關(guān)于Netty自定義協(xié)議編解碼器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Slf4j+logback實(shí)現(xiàn)JSON格式日志輸出方式
這篇文章主要介紹了Slf4j+logback實(shí)現(xiàn)JSON格式日志輸出方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Java實(shí)現(xiàn)圖片上文字內(nèi)容的動(dòng)態(tài)修改的操作步驟
在數(shù)字圖像處理領(lǐng)域,Java提供了強(qiáng)大的庫來處理圖片,包括讀取、修改和寫入圖片,如果你需要在Java應(yīng)用程序中修改圖片上的文字內(nèi)容,可以通過圖像處理技術(shù)來實(shí)現(xiàn),這篇博文將介紹如何使用Java實(shí)現(xiàn)圖片上文字內(nèi)容的動(dòng)態(tài)修改,需要的朋友可以參考下2024-07-07通過實(shí)例學(xué)習(xí)Either 樹和模式匹配
這篇文章主要介紹了通過實(shí)例學(xué)習(xí)Either 樹和模式匹配,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06使用curator實(shí)現(xiàn)zookeeper鎖服務(wù)的示例分享
這篇文章主要介紹了使用curator實(shí)現(xiàn)zookeeper鎖服務(wù)的示例,需要的朋友可以參考下2014-02-02