SpringBoot整合Netty開發(fā)MQTT服務(wù)端
Netty認(rèn)知
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)開發(fā)的網(wǎng)絡(luò)通信框架,相比傳統(tǒng)Socket,在并發(fā)性方面有著很大的提升。關(guān)于NIO,BIO,AIO之間的區(qū)別,可以參考這篇博客:Java中AIO、BIO、NIO應(yīng)用場(chǎng)景及區(qū)別
MQTT服務(wù)端實(shí)現(xiàn)
首先我們啟動(dòng)一個(gè)tcp服務(wù),這里我用到了Redis與RabbitMQ,主要是與分布式WEB平臺(tái)之間好對(duì)接
@Component public class ApplicationEventListener implements CommandLineRunner { @Value("${spring.application.name}") private String nodeName; @Value("${gnss.mqttserver.tcpPort}") private int tcpPort; @Override public void run(String... args) throws Exception { //啟動(dòng)TCP服務(wù) startTcpServer(); //清除Redis所有此節(jié)點(diǎn)的在線終端 RedisService redisService = SpringBeanService.getBean(RedisService.class); redisService.deleteAllOnlineTerminals(nodeName); //將所有此節(jié)點(diǎn)的終端設(shè)置為離線 RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class); messageSender.noticeAllOffline(nodeName); } /** * 啟動(dòng)TCP服務(wù) * * @throws Exception */ private void startTcpServer() throws Exception { //計(jì)數(shù)器,必須等到所有服務(wù)啟動(dòng)成功才能進(jìn)行后續(xù)的操作 final CountDownLatch countDownLatch = new CountDownLatch(1); //啟動(dòng)TCP服務(wù) TcpServer tcpServer = new TcpServer(tcpPort, ProtocolEnum.MqttCommon, countDownLatch); tcpServer.start(); //等待啟動(dòng)完成 countDownLatch.await(); } }
接下來(lái)我們編寫一個(gè)TcpServer類實(shí)現(xiàn)TCP服務(wù)
@Slf4j public class TcpServer extends Thread{ private int port; private ProtocolEnum protocolType; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap serverBootstrap = new ServerBootstrap(); private CountDownLatch countDownLatch; public TcpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) { this.port = port; this.protocolType = protocolType; this.countDownLatch = countDownLatch; bossGroup = new NioEventLoopGroup(1); workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class); final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(MqttConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS)); ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE); ch.pipeline().addLast("decoder", new MqttDecoder()); ch.pipeline().addLast(executorGroup, MqttBusinessHandler.INSTANCE); } }); } @Override public void run() { bind(); } /** * 綁定端口啟動(dòng)服務(wù) */ private void bind() { serverBootstrap.bind(port).addListener(future -> { if (future.isSuccess()) { log.info("{} MQTT服務(wù)器啟動(dòng),端口:{}", protocolType, port); countDownLatch.countDown(); } else { log.error("{} MQTT服務(wù)器啟動(dòng)失敗,端口:{}", protocolType, port, future.cause()); System.exit(-1); } }); } /** * 關(guān)閉服務(wù)端 */ public void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); log.info("{} TCP服務(wù)器關(guān)閉,端口:{}", protocolType, port); } }
編寫一個(gè)解碼器MqttBusinessHandler,實(shí)現(xiàn)對(duì)MQTT消息接收與處理
@Slf4j @ChannelHandler.Sharable public class MqttBusinessHandler extends SimpleChannelInboundHandler<Object> { public static final MqttBusinessHandler INSTANCE = new MqttBusinessHandler(); private MqttMsgBack mqttMsgBack; private MqttBusinessHandler() { mqttMsgBack= MqttMsgBack.INSTANCE; } /** * 接收到消息后處理 * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg) { MqttMessage mqttMessage = (MqttMessage) msg; MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader(); Channel channel = ctx.channel(); if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){ //在一個(gè)網(wǎng)絡(luò)連接上,客戶端只能發(fā)送一次CONNECT報(bào)文。服務(wù)端必須將客戶端發(fā)送的第二個(gè)CONNECT報(bào)文當(dāng)作協(xié)議違規(guī)處理并斷開客戶端的連接 //建議connect消息單獨(dú)處理,用來(lái)對(duì)客戶端進(jìn)行認(rèn)證管理等 這里直接返回一個(gè)CONNACK消息 mqttMsgBack.connectionAck(ctx, mqttMessage); } switch (mqttFixedHeader.messageType()){ //客戶端發(fā)布消息 case PUBLISH: mqttMsgBack.publishAck(ctx, mqttMessage); break; //發(fā)布釋放 case PUBREL: mqttMsgBack.publishComp(ctx, mqttMessage); break; //訂閱主題 case SUBSCRIBE: mqttMsgBack.subscribeAck(ctx, mqttMessage); break; //取消訂閱主題 case UNSUBSCRIBE: mqttMsgBack.unsubscribeAck(ctx, mqttMessage); break; //客戶端發(fā)送心跳報(bào)文 case PINGREQ: mqttMsgBack.pingResp(ctx, mqttMessage); break; //客戶端主動(dòng)斷開連接 case DISCONNECT: break; default: break; } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("終端關(guān)閉連接,IP信息:{}", CommonUtil.getClientAddress(ctx)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); log.error("終端連接異常,IP信息:{}", CommonUtil.getClientAddress(ctx), cause); } /** * 服務(wù)端當(dāng)讀超時(shí)時(shí)會(huì)調(diào)用這個(gè)方法 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { ctx.close(); log.error("讀超時(shí),IP信息:{}", CommonUtil.getClientAddress(ctx), evt); }
我們對(duì)接收到的消息進(jìn)行業(yè)務(wù)處理
@Slf4j public class MqttMsgBack { public static final MqttMsgBack INSTANCE = new MqttMsgBack(); private RedisService redisService; private RabbitMessageSender messageSender; private Environment environment; private MessageServiceProvider messageServiceProvider; private MqttMsgBack() { redisService = SpringBeanService.getBean(RedisService.class); messageSender = SpringBeanService.getBean(RabbitMessageSender.class); environment = SpringBeanService.getBean(Environment.class); messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class); } /** * 確認(rèn)連接請(qǐng)求 * @param ctx * @param mqttMessage */ public void connectionAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); //構(gòu)建返回報(bào)文, 可變報(bào)頭 MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); //構(gòu)建返回報(bào)文, 固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //構(gòu)建連接回復(fù)消息體 MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); ctx.writeAndFlush(connAck); //獲取連接者的ClientId String clientIdentifier = mqttConnectMessage.payload().clientIdentifier(); //查詢終端號(hào)碼有無(wú)在平臺(tái)注冊(cè) TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(clientIdentifier); if (terminalInfo == null) { log.error("終端登錄失敗,未找到終端信息,終端號(hào):{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx)); ctx.close(); return; } //設(shè)置節(jié)點(diǎn)名 terminalInfo.setNodeName(environment.getProperty("spring.application.name")); //保存終端信息和消息流水號(hào)到上下文屬性中 Session session = new Session(terminalInfo); ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx); if (oldCtx == null) { log.info("終端登錄成功,終端ID:{},終端號(hào):{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx)); } else { log.info("終端重復(fù)登錄關(guān)閉上一個(gè)連接,終端ID:{},終端號(hào):{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx)); oldCtx.close(); } //通知上線 messageSender.noticeOnline(terminalInfo); log.info("終端登錄成功,終端號(hào):{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx)); } /** * 根據(jù)qos發(fā)布確認(rèn) * @param ctx * @param mqttMessage */ public void publishAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel(); //得到主題 String topicName = mqttPublishMessage.variableHeader().topicName(); //獲取消息體 ByteBuf msgBodyBuf = mqttPublishMessage.payload(); log.info("收到:{}", ByteBufUtil.hexDump(msgBodyBuf)); MqttCommonMessage msg=new MqttCommonMessage(); msg.setTerminalNum(SessionUtil.getTerminalInfo(ctx).getTerminalNum()); msg.setStrMsgId(topicName); //根據(jù)主題獲取對(duì)應(yīng)的主題消息處理器 BaseMessageService messageService = messageServiceProvider.getMessageService(topicName); try { Object result = messageService.process(ctx, msg, msgBodyBuf); log.info("收到{}({}),終端ID:{},內(nèi)容:{}", messageService.getDesc(), topicName,msg.getTerminalNum(), msg.getMsgBodyItems()); } catch (Exception e) { log.error("收到{}({}),消息異常,終端ID:{},消息體:{}", messageService.getDesc(), topicName,msg.getTerminalNum(),ByteBufUtil.hexDump(msgBodyBuf), e); } switch (qos) { //至多一次 case AT_MOST_ONCE: break; //至少一次 case AT_LEAST_ONCE: //構(gòu)建返回報(bào)文, 可變報(bào)頭 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); //構(gòu)建返回報(bào)文, 固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //構(gòu)建PUBACK消息體 MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); log.info("Qos:AT_LEAST_ONCE:{}",pubAck.toString()); ctx.writeAndFlush(pubAck); break; //剛好一次 case EXACTLY_ONCE: //構(gòu)建返回報(bào)文,固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02); //構(gòu)建返回報(bào)文,可變報(bào)頭 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2); log.info("Qos:EXACTLY_ONCE回復(fù):{}"+mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); break; default: break; } } /** * 發(fā)布完成 qos2 * @param ctx * @param mqttMessage */ public void publishComp (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); //構(gòu)建返回報(bào)文, 固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02); //構(gòu)建返回報(bào)文, 可變報(bào)頭 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack); log.info("發(fā)布完成回復(fù):{}"+mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); } /** * 訂閱確認(rèn) * @param ctx * @param mqttMessage */ public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); //構(gòu)建返回報(bào)文, 可變報(bào)頭 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet()); List<Integer> grantedQoSLevels = new ArrayList<>(topics.size()); for (int i = 0; i < topics.size(); i++) { grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value()); } // 構(gòu)建返回報(bào)文 有效負(fù)載 MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); // 構(gòu)建返回報(bào)文 固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size()); // 構(gòu)建返回報(bào)文 訂閱確認(rèn) MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack); log.info("訂閱回復(fù):{}", subAck.toString()); ctx.writeAndFlush(subAck); } /** * 取消訂閱確認(rèn) * @param ctx * @param mqttMessage */ public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); // 構(gòu)建返回報(bào)文 可變報(bào)頭 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); // 構(gòu)建返回報(bào)文 固定報(bào)頭 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); // 構(gòu)建返回報(bào)文 取消訂閱確認(rèn) MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack); log.info("取消訂閱回復(fù):{}",unSubAck.toString()); ctx.writeAndFlush(unSubAck); } /** * 心跳響應(yīng) * @param ctx * @param mqttMessage */ public void pingResp (ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); log.info("心跳回復(fù):{}", mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); } }
我們可以根據(jù)客戶端發(fā)布消息的主題匹配不同的處理器
最后,我們?cè)趯?duì)應(yīng)的處理器里面實(shí)現(xiàn)對(duì)主題消息的處理邏輯,比如:定位消息,指令消息等等,比如簡(jiǎn)單實(shí)現(xiàn)對(duì)定位數(shù)據(jù)Location主題的消息處理
@Slf4j @MessageService(strMessageId = "Location", desc = "定位") public class LocationMessageService extends BaseMessageService<MqttCommonMessage> { @Autowired private RabbitMessageSender messageSender; @Override public Object process(ChannelHandlerContext ctx, MqttCommonMessage msg, ByteBuf msgBodyBuf) throws Exception { byte[] msgByteArr = new byte[msgBodyBuf.readableBytes()]; msgBodyBuf.readBytes(msgByteArr); String data = new String(msgByteArr); msg.putMessageBodyItem("位置", data); return null; } }
后續(xù)
目前僅僅是實(shí)現(xiàn)MQTT服務(wù)端消息接收與消息回復(fù),后續(xù)可以根據(jù)接入的物聯(lián)網(wǎng)設(shè)備進(jìn)行對(duì)應(yīng)主題消息的業(yè)務(wù)處理
到此這篇關(guān)于SpringBoot整合Netty開發(fā)MQTT服務(wù)端的文章就介紹到這了,更多相關(guān)SpringBoot MQTT服務(wù)端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot項(xiàng)目平滑關(guān)閉及自動(dòng)化關(guān)閉腳本
這篇文章主要為大家詳細(xì)介紹了Springboot項(xiàng)目平滑關(guān)閉及自動(dòng)化關(guān)閉腳本,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-05-05Java并發(fā)Map面試線程安全數(shù)據(jù)結(jié)構(gòu)全面分析
本文將探討如何在Java中有效地應(yīng)對(duì)這些挑戰(zhàn),介紹一種強(qiáng)大的工具并發(fā)Map,它能夠幫助您管理多線程環(huán)境下的共享數(shù)據(jù),確保數(shù)據(jù)的一致性和高性能,深入了解Java中的并發(fā)Map實(shí)現(xiàn),包括ConcurrentHashMap和ConcurrentSkipListMap,及相關(guān)知識(shí)點(diǎn)2023-09-09Java?實(shí)戰(zhàn)項(xiàng)目之學(xué)生信息管理系統(tǒng)的實(shí)現(xiàn)流程
讀萬(wàn)卷書不如行萬(wàn)里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+jsp+mysql+maven實(shí)現(xiàn)學(xué)生信息管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11Java 中的 NoSuchMethodException 異常及解決思路(最新推薦)
NoSuchMethodException異常是Java中使用反射機(jī)制時(shí)常見的錯(cuò)誤,它通常由方法名或參數(shù)不匹配、訪問(wèn)權(quán)限問(wèn)題、方法簽名不匹配等原因引發(fā),解決方法包括核實(shí)方法名及其參數(shù)類型、確認(rèn)方法訪問(wèn)權(quán)限、檢查方法簽名和重載問(wèn)題、確保方法存在于正確的類中,感興趣的朋友一起看看吧2025-01-01Java工廠模式用法之如何動(dòng)態(tài)選擇對(duì)象詳解
工廠設(shè)計(jì)模式可能是最常用的設(shè)計(jì)模式之一,我想大家在自己的項(xiàng)目中都用到過(guò)。本文不僅僅是關(guān)于工廠模式的基本知識(shí),更是討論如何在運(yùn)行時(shí)動(dòng)態(tài)選擇不同的方法進(jìn)行執(zhí)行,你們可以看看是不是和你們項(xiàng)目中用的一樣2023-03-03Java聊天室之實(shí)現(xiàn)接收和發(fā)送Socket
這篇文章主要為大家詳細(xì)介紹了Java簡(jiǎn)易聊天室之實(shí)現(xiàn)接收和發(fā)送Socket功能,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的可以了解一下2022-10-10簡(jiǎn)化API提升開發(fā)效率RestTemplate與HttpClient?OkHttp關(guān)系詳解
這篇文章主要為大家介紹了簡(jiǎn)化API,提升開發(fā)效率,RestTemplate與HttpClient?OkHttp關(guān)系介紹,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10Springboot集成JWT實(shí)現(xiàn)登錄注冊(cè)的示例代碼
本文主要介紹了Springboot集成JWT實(shí)現(xiàn)登錄注冊(cè)的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06Java判斷是否為簡(jiǎn)體中文字符的實(shí)現(xiàn)方法
在應(yīng)用開發(fā)中,判斷簡(jiǎn)體中文字符是一個(gè)重要但常被忽視的任務(wù),簡(jiǎn)體中文和繁體中文在字符上有顯著的區(qū)別,因此在某些場(chǎng)景下我們需要判斷輸入的文本是否為簡(jiǎn)體中文,本文將介紹如何使用Java進(jìn)行此判斷,并提供相應(yīng)的代碼示例,幫助開發(fā)者更好地理解這一過(guò)程2024-09-09