Springboot整合Netty自定義協(xié)議實(shí)現(xiàn)示例詳解
引言
Netty是由JBOSS提供的一個(gè)java開(kāi)源框架,現(xiàn)為 Github上的獨(dú)立項(xiàng)目。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
也就是說(shuō),Netty 是一個(gè)基于NIO的客戶、服務(wù)器端的編程框架,使用Netty 可以確保你快速和簡(jiǎn)單的開(kāi)發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶、服務(wù)端應(yīng)用。Netty相當(dāng)于簡(jiǎn)化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開(kāi)發(fā)過(guò)程。
Springboot整合Netty
導(dǎo)入netty包
新建springboot項(xiàng)目,并在項(xiàng)目以來(lái)中導(dǎo)入netty包,用fastjson包處理jsonStr。
<!-- netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency> <!-- Json處理 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.16</version> </dependency>
創(chuàng)建netty相關(guān)配置信息文件
- yml配置文件——application.yml
# netty 配置 netty: # boss線程數(shù)量 boss: 4 # worker線程數(shù)量 worker: 2 # 連接超時(shí)時(shí)間 timeout: 6000 # 服務(wù)器主端口 port: 18023 # 服務(wù)器備用端口 portSalve: 18026 # 服務(wù)器地址 host: 127.0.0.1
- netty配置實(shí)體類——NettyProperties與yml配置文件綁定 通過(guò)@ConfigurationProperties(prefix = "netty")注解讀取配置文件中的netty配置,通過(guò)反射注入值,需要在實(shí)體類中提供對(duì)應(yīng)的setter和getter方法。
@ConfigurationProperties(prefix = "netty")對(duì)應(yīng)的實(shí)體類屬性名稱不要求一定相同,只需保證“set”字符串拼接配置文件的屬性和setter方法名相同即可。
@Configuration @ConfigurationProperties(prefix = "netty") public class NettyProperties { /** * boss線程數(shù)量 */ private Integer boss; /** * worker線程數(shù)量 */ private Integer worker; /** * 連接超時(shí)時(shí)間 */ private Integer timeout = 30000; /** * 服務(wù)器主端口 */ private Integer port = 18023; /** * 服務(wù)器備用端口 */ private Integer portSalve = 18026; /** * 服務(wù)器地址 默認(rèn)為本地 */ private String host = "127.0.0.1"; // setter、getter 。。。。 }
- 對(duì)netty進(jìn)行配置,綁定netty相關(guān)配置設(shè)置 Netty通常由一個(gè)Bootstrap開(kāi)始,主要作用是配置整個(gè)Netty程序,串聯(lián)各個(gè)組件,Netty中Bootstrap類是客戶端程序的啟動(dòng)引導(dǎo)類,ServerBootstrap是服務(wù)端啟動(dòng)引導(dǎo)類。
@Configuration @EnableConfigurationProperties public class NettyConfig { final NettyProperties nettyProperties; public NettyConfig(NettyProperties nettyProperties) { this.nettyProperties = nettyProperties; } /** * boss線程池-進(jìn)行客戶端連接 * * @return */ @Bean public NioEventLoopGroup boosGroup() { return new NioEventLoopGroup(nettyProperties.getBoss()); } /** * worker線程池-進(jìn)行業(yè)務(wù)處理 * * @return */ @Bean public NioEventLoopGroup workerGroup() { return new NioEventLoopGroup(nettyProperties.getWorker()); } /** * 服務(wù)端啟動(dòng)器,監(jiān)聽(tīng)客戶端連接 * * @return */ @Bean public ServerBootstrap serverBootstrap() { ServerBootstrap serverBootstrap = new ServerBootstrap() // 指定使用的線程組 .group(boosGroup(), workerGroup()) // 指定使用的通道 .channel(NioServerSocketChannel.class) // 指定連接超時(shí)時(shí)間 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout()) // 指定worker處理器 .childHandler(new NettyServerHandler()); return serverBootstrap; } }
- worker處理器,初始化通道以及配置對(duì)應(yīng)管道的處理器 自定義了
##@##
分割符 - 通過(guò)
DelimiterBasedFrameDecoder
來(lái)處理拆包沾包問(wèn)題; - 通過(guò)
MessageDecodeHandler
將接收消息解碼處理成對(duì)象實(shí)例; - 通過(guò)
MessageEncodeHandler
將發(fā)送消息增加分割符后并編碼; 最后通過(guò)ServerListenerHandler
根據(jù)消息類型對(duì)應(yīng)處理不同消息。
public class NettyServerHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 數(shù)據(jù)分割符 String delimiterStr = "##@##"; ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes()); ChannelPipeline pipeline = socketChannel.pipeline(); // 使用自定義處理拆包/沾包,并且每次查找的最大長(zhǎng)度為1024字節(jié) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); // 將上一步解碼后的數(shù)據(jù)轉(zhuǎn)碼為Message實(shí)例 pipeline.addLast(new MessageDecodeHandler()); // 對(duì)發(fā)送客戶端的數(shù)據(jù)進(jìn)行編碼,并添加數(shù)據(jù)分隔符 pipeline.addLast(new MessageEncodeHandler(delimiterStr)); // 對(duì)數(shù)據(jù)進(jìn)行最終處理 pipeline.addLast(new ServerListenerHandler()); } }
數(shù)據(jù)解碼 編碼
數(shù)據(jù)解碼和編碼都采用UTF8格式
public class MessageDecodeHandler extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception { ByteBuf frame = in.retainedDuplicate(); final String content = frame.toString(CharsetUtil.UTF_8); Message message = new Message(content); list.add(message); in.skipBytes(in.readableBytes()); } }
- 數(shù)據(jù)解碼轉(zhuǎn)換的實(shí)例 Message類用于承載消息、轉(zhuǎn)JsonString
public class Message { /** * 數(shù)據(jù)長(zhǎng)度 */ private Integer len; /** * 接收的通訊數(shù)據(jù)body */ private String content; /** * 消息類型 */ private Integer msgType; public Message(Object object) { String str = object.toString(); JSONObject jsonObject = JSONObject.parseObject(str); msgType = Integer.valueOf(jsonObject.getString("msg_type")); content = jsonObject.getString("body"); len = str.length(); } public String toJsonString() { return "{" + "\"msg_type\": " + msgType + ",\n" + "\"body\": " + content + "}"; } // setter、getter 。。。。 }
- 數(shù)據(jù)編碼 netty服務(wù)端回復(fù)消息時(shí),對(duì)消息轉(zhuǎn)JsonString增加分割符,并進(jìn)行編碼。
public class MessageEncodeHandler extends MessageToByteEncoder<Message> { // 數(shù)據(jù)分割符 String delimiter; public MessageEncodeHandler(String delimiter) { this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8)); } }
- 數(shù)據(jù)處理器,針對(duì)不同類型數(shù)據(jù)分類處理 在處理不同接收數(shù)據(jù)時(shí)使用了枚舉類型,在使用switch時(shí)可以做下處理,具體參考代碼,這里只演示如何操作,并沒(méi)實(shí)現(xiàn)數(shù)據(jù)處理業(yè)務(wù)類。
public class ServerListenerHandler extends SimpleChannelInboundHandler<Message> { private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class); /** * 設(shè)備接入連接時(shí)處理 * * @param ctx */ @Override public void handlerAdded(ChannelHandlerContext ctx) { log.info("有新的連接:[{}]", ctx.channel().id().asLongText()); } /** * 數(shù)據(jù)處理 * * @param ctx * @param msg */ @Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) { // 獲取消息實(shí)例中的消息體 String content = msg.getContent(); // 對(duì)不同消息類型進(jìn)行處理 MessageEnum type = MessageEnum.getStructureEnum(msg); switch (type) { case CONNECT: // TODO 心跳消息處理 case STATE: // TODO 設(shè)備狀態(tài) default: System.out.println(type.content + "消息內(nèi)容" + content); } } /** * 設(shè)備下線處理 * * @param ctx */ @Override public void handlerRemoved(ChannelHandlerContext ctx) { log.info("設(shè)備下線了:{}", ctx.channel().id().asLongText()); } /** * 設(shè)備連接異常處理 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印異常 log.info("異常:{}", cause.getMessage()); // 關(guān)閉連接 ctx.close(); } }
- 數(shù)據(jù)類型枚舉類
public enum MessageEnum { CONNECT(1, "心跳消息"), STATE(2, "設(shè)備狀態(tài)"); public final Integer type; public final String content; MessageEnum(Integer type, String content) { this.type = type; this.content = content; } // case中判斷使用 public static MessageEnum getStructureEnum(Message msg) { Integer type = Optional.ofNullable(msg) .map(Message::getMsgType) .orElse(0); if (type == 0) { return null; } else { List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values()) .filter((item) -> item.getType() == type) .distinct() .collect(Collectors.toList()); if (objectEnums.size() > 0) { return objectEnums.get(0); } return null; } } // setter、getter。。。。 }
到此Netty整個(gè)配置已經(jīng)完成,但如果要跟隨springboot一起啟動(dòng),仍需要做一些配置。
netty啟動(dòng)類配置
@Component public class NettyServerBoot { private static final Logger log = LoggerFactory.getLogger(NettyServerBoot.class); @Resource NioEventLoopGroup boosGroup; @Resource NioEventLoopGroup workerGroup; final ServerBootstrap serverBootstrap; final NettyProperties nettyProperties; public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) { this.serverBootstrap = serverBootstrap; this.nettyProperties = nettyProperties; } /** * 啟動(dòng)netty * * @throws InterruptedException */ @PostConstruct public void start() throws InterruptedException { // 綁定端口啟動(dòng) serverBootstrap.bind(nettyProperties.getPort()).sync(); // 備用端口 serverBootstrap.bind(nettyProperties.getPortSalve()).sync(); log.info("啟動(dòng)Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve()); } /** * 關(guān)閉netty */ @PreDestroy public void close() { log.info("關(guān)閉Netty"); boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
增加NettyServerBoot配置后,啟動(dòng)application時(shí),netty服務(wù)端會(huì)跟隨一起啟動(dòng)。
同時(shí),在springboot關(guān)閉前,會(huì)先銷毀netty服務(wù)。
完整源碼
以上就是Springboot整合Netty自定義協(xié)議實(shí)現(xiàn)示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Springboot整合Netty自定義協(xié)議的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot如何集成Netty
- SpringBoot集成netty實(shí)現(xiàn)websocket通信功能
- SpringBoot整合Netty+Websocket實(shí)現(xiàn)消息推送的示例代碼
- SpringBoot 整合 Netty 多端口監(jiān)聽(tīng)的操作方法
- SpringBoot整合Netty的流程步驟
- springboot之springboot與netty整合方案
- springboot整合netty框架實(shí)現(xiàn)站內(nèi)信
- springboot整合netty框架的方式小結(jié)
- Springboot+netty實(shí)現(xiàn)Web聊天室
- SpringBoot整合Netty服務(wù)端的實(shí)現(xiàn)示例
相關(guān)文章
如何在spring事務(wù)提交之后進(jìn)行異步操作
這篇文章主要為大家介紹了如何在spring事務(wù)提交之后進(jìn)行異步操作,這些異步操作必須得在該事務(wù)成功提交后才執(zhí)行,回滾則不執(zhí)行,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2023-09-09java常用工具類 Reflect反射工具類、String字符串工具類
這篇文章主要為大家詳細(xì)介紹了java常用工具類,包括Reflect反射工具類、String字符串工具類,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-05-05Springboot實(shí)現(xiàn)獲取實(shí)時(shí)天氣
這篇文章主要為大家詳細(xì)介紹了如何使用Springboot實(shí)現(xiàn)獲取實(shí)時(shí)天氣功能,文中的示例代碼講解詳細(xì),有需要的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-04-04淺談MySQL中是如何實(shí)現(xiàn)事務(wù)提交和回滾的
本文主要介紹了MySQL中是如何實(shí)現(xiàn)事務(wù)提交和回滾的,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02POI通用導(dǎo)出Excel(.xls,.xlsx)的方法
這篇文章主要介紹了POI通用導(dǎo)出Excel(.xls,.xlsx)的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05SpringBoot對(duì)PDF進(jìn)行模板內(nèi)容填充與電子簽名合并詳解
這篇文章主要為大家詳細(xì)介紹了SpringBoot對(duì)PDF進(jìn)行模板內(nèi)容填充與電子簽名合并的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以參考下2023-12-12Java實(shí)現(xiàn)廣度優(yōu)先遍歷的示例詳解
廣度優(yōu)先遍歷:廣度優(yōu)先遍歷是連通圖的一種遍歷策略,因?yàn)樗乃枷胧菑囊粋€(gè)頂點(diǎn)V0開(kāi)始,輻射狀地優(yōu)先遍歷其周圍較廣的區(qū)域故得名。本文詳細(xì)介紹了Java如何實(shí)現(xiàn)廣度優(yōu)先遍歷,感興趣的小伙伴可以學(xué)習(xí)一下2022-02-02