如何在Spring Boot中使用MQTT
為什么選擇MQTT
MQTT的定義相信很多人都能講的頭頭是道,本文章也不討論什么高大上的東西,旨在用最簡(jiǎn)單直觀的方式讓每一位剛接觸的同行們可以最快的應(yīng)用起來(lái)
先從使用MQTT需要什么開(kāi)始分析:
- 消息服務(wù)器
- 不同應(yīng)用/設(shè)備之間的頻繁交互
- 可能涉及一對(duì)多的消息傳遞
根據(jù)上面列舉的這三點(diǎn),我們大概可以了解到, MQTT最適合的場(chǎng)景是消息做為系統(tǒng)的重要組成部分,且參與著系統(tǒng)關(guān)鍵業(yè)務(wù)邏輯的情形
MQTT, 啟動(dòng)!
既然決定使用它,我們首先要研究的是如何讓MQTT正常工作,畢竟它不是簡(jiǎn)單的在maven里加入個(gè)依賴就完事的
我們總共需要干如下兩件事:
- 下載EMQX消息服務(wù)器, 作為broker
- 在maven中引入依賴
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.2.RELEASE</version> </dependency>
完成上面兩步后, 啟動(dòng)EMQX服務(wù)器, 正式進(jìn)入我們的MQTT旅途
使用方式
在Spring Boot中使用MQTT的代碼, 筆者總結(jié)了如下兩種方式:
- 使用spring-integration的消息通道概念
- 使用傳統(tǒng)的Client客戶端概念
第一種會(huì)產(chǎn)生一定程度的心智負(fù)擔(dān),但在筆者成功搭配(抄襲+造輪子)自動(dòng)注冊(cè)后, 比后者要方便許多
在介紹具體代碼之前, 我們先簡(jiǎn)單整理下使用中最常見(jiàn)的概念:
- 主題: MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進(jìn)行業(yè)務(wù)邏輯處理, 主題是消息的通道
- 生產(chǎn)者: MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
- 消費(fèi)者: MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
- broker: 消息轉(zhuǎn)發(fā)器, 消息是通過(guò)它來(lái)承載的, EMQX就是我們的broker, 在使用中我們不用關(guān)心它的具體實(shí)現(xiàn)
其實(shí), MQTT的使用流程就是: 生產(chǎn)者給主題發(fā)消息->broker進(jìn)行消息的傳遞->訂閱該主題的消費(fèi)者拿到消息并進(jìn)行相應(yīng)的業(yè)務(wù)邏輯
Client模式
本模式和傳統(tǒng)的數(shù)據(jù)庫(kù)鏈接,Redis鏈接基本一致,有開(kāi)發(fā)經(jīng)驗(yàn)的小伙伴們可以很輕松的駕馭,我們需要考慮的就是如果創(chuàng)建對(duì)應(yīng)的工廠,是單例模式,還是原型,亦或是造個(gè)池子呢?
我們使用單例模式來(lái)進(jìn)行本次的介紹
創(chuàng)建工廠類
首先, 我們創(chuàng)造一個(gè)工廠(就不承認(rèn)設(shè)計(jì)模式中毒)
public class MqttFactory { private static MqttProperties configuration; private static MqttClient client; /** * 獲取客戶端實(shí)例 * 單例模式, 存在則返回, 不存在則初始化 */ public static MqttClient getInstance() { if (client == null) { init(); } return client; } /** * 初始化客戶端 */ public static void init() { try { client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis()); // MQTT配置對(duì)象 MqttConnectOptions options = new MqttConnectOptions(); // 設(shè)置自動(dòng)重連, 其它具體參數(shù)可以查看MqttConnectOptions options.setAutomaticReconnect(true); if (!client.isConnected()) { client.connect(options); } } catch (MqttException e) { LOGGER.error(String.format("MQTT: 連接消息服務(wù)器[%s]失敗", configuration.getAddress())); } } }
關(guān)于MQTT的具體配置可以查看MqttConnectOptions, 在這里就不做說(shuō)明了
多嘴一句, 文檔永遠(yuǎn)比某些博客給力!!!
創(chuàng)建工具類
接下來(lái), 我們創(chuàng)建MqttUtil, 用于消息的發(fā)送以及主題的訂閱
public class MqttUtil { /** * 發(fā)送消息 * @param topic 主題 * @param data 消息內(nèi)容 */ public static void send(String topic, Object data) { // 獲取客戶端實(shí)例 MqttClient client = MqttFactory.getInstance(); ObjectMapper mapper = new ObjectMapper(); try { // 轉(zhuǎn)換消息為json字符串 String json = mapper.writeValueAsString(data); client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8))); } catch (JsonProcessingException e) { LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息轉(zhuǎn)換json失敗", topic)); } catch (MqttException e) { LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息失敗", topic)); } } /** * 訂閱主題 * @param topic 主題 * @param listener 消息監(jiān)聽(tīng)處理器 */ public static void subscribe(String topic, IMqttMessageListener listener) { MqttClient client = MqttFactory.getInstance(); try { client.subscribe(topic, listener); } catch (MqttException e) { LOGGER.error(String.format("MQTT: 訂閱主題[%s]失敗", topic)); } } }
相信小伙伴們注意到了IMqttMessageListener這個(gè)東西, 我們只需要?jiǎng)?chuàng)建一個(gè)監(jiān)聽(tīng)類, 實(shí)現(xiàn)IMqttMessageListener接口, 就可以處理消息啦, 代碼如下:
public class MessageListener implements IMqttMessageListener { /** * 處理消息 * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { LOGGER.info(String.format("MQTT: 訂閱主題[%s]發(fā)來(lái)消息[%s]", topic, new String(mqttMessage.getPayload()))); } public static void main(String[] args) { //訂閱主題test01, 使用MessageListener來(lái)處理它的消息 MqttUtil.subscribe("test01", new MessageListener()); } }
無(wú)論是發(fā)送還是訂閱,是不是都很好理解?
舒服的事情結(jié)束后, 帶來(lái)的是無(wú)盡的折磨和空虛, 來(lái)吧, 讓我們挑戰(zhàn)下心智負(fù)擔(dān)大的第二種模式!
Spring Integration
什么是Spring Integration?對(duì)不起,我不知道,我也不想知道
為什么使用Spring Integration?因?yàn)樗娴暮芎镁S護(hù)
網(wǎng)上大部分教程都是針對(duì)Spring Integration的, 可能是我第一次接觸, 千篇一律看的我莫名其妙, 所以我選擇放棄了他們, 選擇了大神的自動(dòng)配置方式,并在其基礎(chǔ)上,針對(duì)心智負(fù)擔(dān)進(jìn)行了相應(yīng)的調(diào)整
還記得我們之前討論過(guò)的概念嗎?主題/生產(chǎn)者/消費(fèi)者
在Spring Integration中,我們新加入一些概念, 并把之前的進(jìn)行微調(diào):
- 通道: 消息傳輸和接受的管道, 每一條消息都是通過(guò)它鉆進(jìn)鉆出
- 客戶端工廠: 用于創(chuàng)建MQTT客戶端, 和模式一中的類似
- 消息適配器: 用于接收MQTT消息, 進(jìn)行轉(zhuǎn)換, 但不參與業(yè)務(wù)邏輯
- 入站通道: 搭配消息適配器, 消息進(jìn)入站臺(tái)的通道
- 出站通道: 搭配客戶端工廠, 消息發(fā)出站臺(tái)的通道
- 主題: 還是主題, 它不變
- 生產(chǎn)者: 擁有出站通道的家伙
- 消費(fèi)者: 擁有入站通道的家伙
如果能漸漸理解上面定義的話, 這種模式的流程其實(shí)可以變成這樣:
- 生產(chǎn)者: 創(chuàng)建指定客戶端工廠的出站通道->發(fā)送消息
- 消費(fèi)者: 創(chuàng)建指定消息適配器的入站通道->接收消息->進(jìn)入消息攔截器->業(yè)務(wù)邏輯
其實(shí)在筆者看來(lái), 這符合Spring Boot的理念, 約定優(yōu)于配置
代碼已挪入公司私服, 待后續(xù)個(gè)人私服配置好后再補(bǔ)充筆記
總結(jié)
MQTT作為消息服務(wù), 能夠滿足我們大部分的開(kāi)發(fā)需求, 但還有一些遺留問(wèn)題筆者還沒(méi)進(jìn)行過(guò)深入思考和實(shí)踐:
- 如何利用qos機(jī)制保證數(shù)據(jù)不會(huì)丟失
- 消息的隊(duì)列和排序
- 集群模式下的應(yīng)用
以上就是如何在Spring Boot中使用MQTT的詳細(xì)內(nèi)容,更多關(guān)于在Spring Boot中使用MQTT的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- springboot集成mqtt的實(shí)踐開(kāi)發(fā)
- SpringBoot+MQTT+apollo實(shí)現(xiàn)訂閱發(fā)布功能的示例
- springboot 實(shí)現(xiàn)mqtt物聯(lián)網(wǎng)的示例代碼
- SpringBoot2.0集成MQTT消息推送功能實(shí)現(xiàn)
- SpringBoot集成mqtt的多模塊項(xiàng)目配置詳解
- SpringBoot整合MQTT并實(shí)現(xiàn)異步線程調(diào)用的問(wèn)題
- springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- Springboot整合mqtt服務(wù)的示例代碼
- Spring?boot?集成?MQTT詳情
- Spring?Boot?MQTT?Too?many?publishes?in?progress錯(cuò)誤的解決方案
相關(guān)文章
idea導(dǎo)入項(xiàng)目爆紅問(wèn)題記錄以及解決
這篇文章主要介紹了idea導(dǎo)入項(xiàng)目爆紅問(wèn)題記錄以及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07springboot打包實(shí)現(xiàn)項(xiàng)目JAR包和依賴JAR包分離
這篇文章主要介紹了springboot打包實(shí)現(xiàn)項(xiàng)目JAR包和依賴JAR包分離,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02SpringCloud Feign傳遞HttpServletRequest對(duì)象流程
HttpServletRequest接口的對(duì)象代表客戶端的請(qǐng)求,當(dāng)客戶端通過(guò)HTTP協(xié)議訪問(wèn)Tomcat服務(wù)器時(shí),HTTP請(qǐng)求中的所有信息都封裝在HttpServletRequest接口的對(duì)象中,這篇文章介紹了Feign傳遞HttpServletRequest對(duì)象的流程,感興趣的同學(xué)可以參考下文2023-05-05Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java基于NIO實(shí)現(xiàn)群聊系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11Java微信公眾平臺(tái)之群發(fā)接口(高級(jí)群發(fā))
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺(tái)之群發(fā)接口,高級(jí)群發(fā)功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之棧和隊(duì)列
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)學(xué)習(xí)之棧和隊(duì)列,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有一定的幫助,需要的朋友可以參考下2021-05-05