Netty實(shí)現(xiàn)自定義協(xié)議編解碼器
為什么要自定義協(xié)議
Netty自帶了一些編解碼器沒,比如 StringDecode,StringEncoder,在實(shí)際業(yè)務(wù)中,協(xié)議往往需要攜帶一些我們自定義的屬性,比如版本號(hào),imei號(hào),appId等,這時(shí)候Netty提供的編解碼器就無法滿足我們的需求,所以我們需要自定義協(xié)議和自定義的編解碼器
自定義協(xié)議設(shè)計(jì)
我們可以仿造HTTP協(xié)議,比如 請求頭 + 請求體 的格式
請求頭
HTTP協(xié)議的請求頭有 請求方法(GET,POST),版本號(hào)等,既然是自定義協(xié)議,那么肯定是要滿足自己實(shí)際業(yè)務(wù)需求的,所以我們的請求頭包含以下信息,也可以根據(jù)自己的業(yè)務(wù)去添加一些自定義的屬性
commond: 指令,比如說你發(fā)送給Netty的消息是【登錄】還是【單聊消息】
或者是【群發(fā)消息】又或者是【踢人下線】的請求.
version:版本號(hào),在后期如果升級(jí)版本的話,要兼容老版本,我們可以做判斷,如果是老版本的就走A邏輯分支,新版本就走B邏輯分支
clientType:客戶端訪問我們的IM系統(tǒng)是通過WEb端,還是IOS,或者是Android端
messageType:將客戶端發(fā)送的數(shù)據(jù)解析成哪種格式,比如JSON,Protobuf,還是Xml格式
imeiLen:imei號(hào)的長度(imei號(hào)在請求體中)
appId:我們的IM是以服務(wù)的方式提供出去的,我們需要知道這個(gè)請求是從哪個(gè)服務(wù)進(jìn)來的,每個(gè)服務(wù)都有一個(gè)自定義唯一的appId
bodyLen:我們的數(shù)據(jù)長度
請求體
imei號(hào):登錄設(shè)備的唯一標(biāo)識(shí),雖然有了clientType來判斷是從WEB端還是IOS端訪問的,
但是并不知道是從哪臺(tái)設(shè)備登錄的,后期我們要做踢人下線,比如一個(gè)賬號(hào)只能一臺(tái)設(shè)備登錄,
或者是一個(gè)賬號(hào)能同時(shí)登錄WEB,或者是IOS端或者是Android端,我們就需要跟clientType一起判斷
data:我們要發(fā)送的數(shù)據(jù)
自定義協(xié)議實(shí)現(xiàn)
1:創(chuàng)建一個(gè)Maven項(xiàng)目,引入Netty依賴,完整的依賴如下
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.0.M2</version>
</dependency>
</dependencies>
2:實(shí)現(xiàn)我們的協(xié)議請求頭
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageHead implements Serializable {
/**
* 指令
*/
private Integer commond;
/**
* 版本號(hào)
*/
private Integer version;
/**
* clientType(WEB,IOS,Android)
*/
private Integer clientType;
/**
* 數(shù)據(jù)解析類型 和具體業(yè)務(wù)無關(guān),后續(xù)根據(jù)解析類型解析data數(shù)據(jù) 0x0:Json,0x1:ProtoBuf,0x2:Xml,默認(rèn):0x0
*/
private Integer messageType = 0x0;
/**
* imei號(hào)長度
*/
private Integer imeiLen;
/**
* appId
*/
private Integer appId;
/**
* bodyLen,數(shù)據(jù)長度
*/
private Integer bodyLen;
}
3:實(shí)現(xiàn)我們的協(xié)議請求體
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageBody implements Serializable {
/**
* imei號(hào)
*/
private String imei;
/**
* 數(shù)據(jù)
*/
private Object data;
}
4:實(shí)現(xiàn)我們的協(xié)議請求類
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
// Message就是我們Netty服務(wù)接收到的完整的(請求頭+請求體)數(shù)據(jù)包
@Data
public class Message implements Serializable {
private MessageHead messageHead;
private MessageBody messageBody;
}
5:實(shí)現(xiàn)自定義編碼器
package com.chat.codec;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 自定義編碼器
*/
public class MessageDecoder extends ByteToMessageDecoder {
/**
* 協(xié)議格式:請求頭 +imei號(hào) + 請求體
* 請求頭: 指令(commond) + 版本號(hào) + clientType + 消息解析類型 + imei長度 + appId + bodyLen
* 指令:這條消息是做什么的,比如是登錄,還是群發(fā)消息,還是單聊消息,還是踢人下線....
* 版本號(hào):協(xié)議的版本號(hào),對(duì)于版本升級(jí)有幫助,比如A版本的走A邏輯,B版本的走B邏輯
* clientType:web端,IOS,Android
* 消息解析類型:把這條消息解析成什么樣的類型,有JSON,還是String等
* imei:雖然有clientType來標(biāo)識(shí)出該用戶是從WEB訪問的還是IOS或者Android端登錄的,但是這時(shí)候有二臺(tái)IOS手機(jī)登錄你就分辨不了了
* 所以imei號(hào)是設(shè)備的唯一標(biāo)識(shí),這樣可以在用戶多端登錄的時(shí)候踢人下線,來實(shí)現(xiàn)一個(gè)賬號(hào)只能一臺(tái)設(shè)備登錄
* appId:如果我們的IM系統(tǒng)是以服務(wù)方式提供的,appId表示的是哪個(gè)服務(wù)來訪問的
* bodyLen:數(shù)據(jù)長度
* 所以請求頭的長度是:7 * 4 = 28字節(jié)
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
//我們的請求頭有7個(gè)屬性,每個(gè)屬性都是int型,所以占4個(gè)字節(jié),如果小于28個(gè)字節(jié)說明這個(gè)請求數(shù)據(jù)是有問題的,
if(in.readableBytes() < 28) {
return;
}
//拿到指令
int command = in.readInt();
//拿到版本號(hào)
int version = in.readInt();
//拿到clientType
int clientType = in.readInt();
//拿到消息解析類型
int messageType = in.readInt();
//拿到imei號(hào)的長度
int imeiLen = in.readInt();
//拿到appId
int appId = in.readInt();
//拿到數(shù)據(jù)內(nèi)容長度
int bodyLen = in.readInt();
//我們的數(shù)據(jù)是以流的形式讀取的,當(dāng)讀取到的數(shù)據(jù)長度小于 imei號(hào)長度+data長度,說明還沒有獲取到完整的請求數(shù)據(jù),需要重新再次讀取接下來TCP發(fā)送過來的數(shù)據(jù),直到等于了就代表
//我們已經(jīng)讀取到一條完整的數(shù)據(jù)了,其實(shí)這也是一種解決TCP粘包和拆包的問題
if(in.readableBytes() < (bodyLen + imeiLen)) {
//表示讀取的數(shù)據(jù)還不夠
in.resetReaderIndex();
return;
}
//通過imei號(hào)長度讀取imei號(hào)
byte[] imeiData = new byte[imeiLen];
in.readBytes(imeiData);
String imei = new String(imeiData);
//通過bodyLen讀取數(shù)據(jù)內(nèi)容
byte[] bodyData = new byte[bodyLen];
in.readBytes(bodyData);
/**
* 設(shè)置請求頭
*/
MessageHead messageHead = new MessageHead();
messageHead.setCommond(command);
messageHead.setAppId(appId);
messageHead.setBodyLen(bodyData.length);
messageHead.setImeiLen(imeiData.length);
messageHead.setVersion(version);
messageHead.setClientType(clientType);
messageHead.setMessageType(messageType);
/**
* 設(shè)置請求體
*/
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
Message message = new Message();
message.setMessageHead(messageHead);
/**
* 根據(jù)messageType來封裝請求數(shù)據(jù)
*/
if(messageType == 0x0) {
//解析成JSON格式
String body = new String(bodyData);
com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
jsonObject.put("body",body);
messageBody.setData(jsonObject);
}else if(messageType == 0x1) {
//解析成Protobuf
}else if(messageType == 0x2) {
//解析成Xml
}
message.setMessageBody(messageBody);
//更新讀索引
in.markReaderIndex();
//最后通過管道寫出去
out.add(message);
}
}
6:實(shí)現(xiàn)自定義解碼器
package com.chat.codec;
import com.chat.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.Charset;
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
out.writeInt(message.getMessageHead().getCommond());
out.writeInt(message.getMessageHead().getVersion());
out.writeInt(message.getMessageHead().getClientType());
out.writeInt(message.getMessageHead().getMessageType());
out.writeInt(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")).length);
out.writeInt(message.getMessageHead().getAppId());
out.writeInt(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")).length);
out.writeBytes(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")));
out.writeBytes(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")));
}
}
7:Netty Server端
package com.chat.server;
import com.chat.codec.MessageDecoder;
import com.chat.codec.MessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) throws Exception{
// 創(chuàng)建兩個(gè)線程組bossGroup和workerGroup, 含有的子線程N(yùn)ioEventLoop的個(gè)數(shù)默認(rèn)為cpu核數(shù)的兩倍
// bossGroup只是處理連接請求 ,真正的和客戶端業(yè)務(wù)處理,會(huì)交給workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用鏈?zhǔn)骄幊虂砼渲脜?shù)
bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線程組
// 使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
.channel(NioServerSocketChannel.class)
// 初始化服務(wù)器連接隊(duì)列大小,服務(wù)端處理客戶端連接請求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶端連接。
// 多個(gè)客戶端同時(shí)來的時(shí)候,服務(wù)端將不能處理的客戶端連接請求放在隊(duì)列中等待處理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象,設(shè)置初始化參數(shù),在 SocketChannel 建立起來之前執(zhí)行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEncoder());
ch.pipeline().addLast(new MyServerHandler());
//ch.pipeline().addLast(new ServerHandler());
}
});
System.out.println("netty server start。。");
// 綁定一個(gè)端口并且同步, 生成了一個(gè)ChannelFuture異步對(duì)象,通過isDone()等方法可以判斷異步事件的執(zhí)行情況
// 啟動(dòng)服務(wù)器(并綁定端口),bind是異步操作,sync方法是等待異步操作執(zhí)行完畢
ChannelFuture cf = bootstrap.bind(9000).sync();
// 給cf注冊監(jiān)聽器,監(jiān)聽我們關(guān)心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監(jiān)聽端口9000成功");
} else {
System.out.println("監(jiān)聽端口9000失敗");
}
}
});*/
// 等待服務(wù)端監(jiān)聽端口關(guān)閉,closeFuture是異步操作
// 通過sync方法同步等待通道關(guān)閉處理完畢,這里會(huì)阻塞等待通道關(guān)閉完成,內(nèi)部調(diào)用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
8:Netty Server端處理器
package com.chat.server;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyServerHandler extends SimpleChannelInboundHandler<Message> {
private final static Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
System.out.println("這是客戶端發(fā)送的消息" + JSONUtil.toJsonPrettyStr(message));
Message messageResponse = new Message();
MessageHead messageHead = new MessageHead();
messageHead.setCommond(9988);
messageHead.setMessageType(0x0);
messageHead.setClientType(1);
messageHead.setVersion(2);
messageHead.setAppId(3);
String msg = "這是服務(wù)端發(fā)送給你的消息";
messageHead.setBodyLen(msg.getBytes().length);
String imei = "12-euri-1234";
messageHead.setImeiLen(imei.getBytes().length);
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
messageBody.setData(msg);
messageResponse.setMessageHead(messageHead);
messageResponse.setMessageBody(messageBody);
ctx.writeAndFlush(messageResponse);
}
}
9:Netty Client端處理器
package com.chat.client;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<Message> {
/**
* 當(dāng)客戶端連接服務(wù)器完成就會(huì)觸發(fā)該方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i = 0; i < 20; i ++) {
Message message = new Message();
MessageHead messageHead = new MessageHead();
messageHead.setCommond(9988);
messageHead.setMessageType(0x0);
messageHead.setClientType(1);
messageHead.setVersion(2);
messageHead.setAppId(3);
String msg = "hello-" + i;
messageHead.setBodyLen(msg.getBytes().length);
String imei = "12-euri";
messageHead.setImeiLen(imei.getBytes().length);
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
messageBody.setData(msg);
message.setMessageHead(messageHead);
message.setMessageBody(messageBody);
ctx.writeAndFlush(message);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
//當(dāng)通道有讀取事件時(shí)會(huì)觸發(fā),即服務(wù)端發(fā)送數(shù)據(jù)給客戶端
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
System.out.println(JSONUtil.toJsonPrettyStr(message));
}
}
10:Netty Client端
package com.chat.client;
import com.chat.codec.MessageDecoder;
import com.chat.codec.MessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Client {
public static void main(String[] args) throws Exception{
//客戶端需要一個(gè)事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創(chuàng)建客戶端啟動(dòng)對(duì)象
//注意客戶端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設(shè)置相關(guān)參數(shù)
bootstrap.group(group) //設(shè)置線程組
.channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實(shí)現(xiàn)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入處理器
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
System.out.println("netty client start。。");
//啟動(dòng)客戶端去連接服務(wù)器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//對(duì)通道關(guān)閉進(jìn)行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
11:測試
1: 先啟動(dòng)Server端的main方法
2:再啟動(dòng)Client端的main方法
3:查看控制臺(tái)
服務(wù)端控制臺(tái):

客戶端控制臺(tái):

完整代碼
全部代碼就是下圖這幾個(gè)類,上面已經(jīng)貼出每個(gè)類的全部代碼,直接復(fù)制就行了

以上就是Netty實(shí)現(xiàn)自定義協(xié)議編解碼器的詳細(xì)內(nèi)容,更多關(guān)于Netty自定義協(xié)議編解碼器的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Slf4j+logback實(shí)現(xiàn)JSON格式日志輸出方式
這篇文章主要介紹了Slf4j+logback實(shí)現(xiàn)JSON格式日志輸出方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Java實(shí)現(xiàn)圖片上文字內(nèi)容的動(dòng)態(tài)修改的操作步驟
在數(shù)字圖像處理領(lǐng)域,Java提供了強(qiáng)大的庫來處理圖片,包括讀取、修改和寫入圖片,如果你需要在Java應(yīng)用程序中修改圖片上的文字內(nèi)容,可以通過圖像處理技術(shù)來實(shí)現(xiàn),這篇博文將介紹如何使用Java實(shí)現(xiàn)圖片上文字內(nèi)容的動(dòng)態(tài)修改,需要的朋友可以參考下2024-07-07
通過實(shí)例學(xué)習(xí)Either 樹和模式匹配
這篇文章主要介紹了通過實(shí)例學(xué)習(xí)Either 樹和模式匹配,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,,需要的朋友可以參考下2019-06-06
使用curator實(shí)現(xiàn)zookeeper鎖服務(wù)的示例分享
這篇文章主要介紹了使用curator實(shí)現(xiàn)zookeeper鎖服務(wù)的示例,需要的朋友可以參考下2014-02-02

