SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布(示例代碼)
SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布
背景 java框架SpringBoot通過(guò)mQTT通信 控制物聯(lián)網(wǎng)設(shè)備
還是直接上代碼
第一步依賴(lài):
<!--mqtt相關(guān)依賴(lài)--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.14</version> </dependency>
第二步配置文件
#mqtt mqtt: mqttUrl: tcp://127.0.0.1 mqttPort: 1883 mqttUsername: admin mqttPassword: public mqttClientId: aaa # MQTT回調(diào)類(lèi)型 按一個(gè)MQTT服務(wù)區(qū)分 # 如果MQTT服務(wù)端換了 回調(diào)處理的是新的業(yè)務(wù)需求 就把這個(gè)換了 # 然后在MQTT配置文件中擴(kuò)展新的回調(diào)類(lèi) mqttTypeCallback: breakerCallback
第三步 config類(lèi)
package com.xxx.iotjava.mqtt.config; import com.xxx.iotjava.mqtt.callback.BreakerCallback; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; /** * User:Json * Date: 2024/6/17 **/ @Configuration @Slf4j public class MqttConfig { @Value("${mqtt.mqttUsername}") private String mqttUsername; @Value("${mqtt.mqttPassword}") private String mqttPassword; @Value("${mqtt.mqttUrl}") private String mqttUrl; @Value("${mqtt.mqttPort}") private Integer mqttPort; @Value("${mqtt.mqttClientId}") private String mqttClientId; @Value("${mqttTypeCallback}") private String mqttTypeCallback; private static String breakerCallback = "breakerCallback"; /** * 客戶(hù)端對(duì)象 */ private MqttClient client; /** * 客戶(hù)端連接服務(wù)端 * 目前只支持一個(gè) MQTT服務(wù)端 如果后續(xù)一個(gè)項(xiàng)目有多個(gè)MQTT服務(wù)端那就設(shè)計(jì)成工廠模式 */ public boolean connect() { if (isMqtt()){ return false; } try { //new MemoryPersistence() 使用內(nèi)存持久化 // 優(yōu)點(diǎn):不會(huì)在文件系統(tǒng)中創(chuàng)建任何文件(如 .lck 文件),適合對(duì)會(huì)話持久性沒(méi)有要求的場(chǎng)景。 // 缺點(diǎn): 客戶(hù)端斷開(kāi)連接或重啟后,會(huì)話數(shù)據(jù)會(huì)丟失,無(wú)法保留訂閱信息和未發(fā)送的消息 // String persistenceDirectory = "/path/to/your/mqtt/persistence"; //new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化 //如果persistenceDirectory 不寫(xiě) 他默認(rèn)創(chuàng)建 根目錄 linux要給權(quán)限 // 優(yōu)點(diǎn): 客戶(hù)端斷開(kāi)連接或重啟后,能夠保留訂閱信息和未發(fā)送的消息。這對(duì)于需要保持會(huì)話狀態(tài)的應(yīng)用非常重要 // 缺點(diǎn) 會(huì)在指定的目錄中創(chuàng)建文件(如 .lck 文件),需要確保指定的目錄是有效的,并且應(yīng)用有權(quán)限訪問(wèn)該目錄 //創(chuàng)建MQTT客戶(hù)端對(duì)象 client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence()); //連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶(hù)端的連接記錄(訂閱主題,qos),客戶(hù)端重連之后能獲取到服務(wù)器在客戶(hù)端斷開(kāi)連接期間推送的消息 //設(shè)置為true表示每次連接服務(wù)器都是以新的身份 //如果他為true 會(huì)出現(xiàn)一個(gè)問(wèn)題 //當(dāng)網(wǎng)絡(luò)斷開(kāi)后,客戶(hù)端會(huì)進(jìn)行重連,但是重連之前訂閱的主題就失效了,不再接受之前訂閱主題的消息。 //因?yàn)榕渲美飳leanSession 設(shè)為 true ,當(dāng)客戶(hù)端掉線時(shí) , //服務(wù)器端會(huì)清除 客戶(hù)端 session 。 重連后 客戶(hù)端會(huì)有一個(gè)新的session。 // 所以如果大家把他為true 重新連接mqtt后,要注意需要手動(dòng)再訂閱一下主題 // 推薦文檔:https://www.cnblogs.com/A-yes/p/9894144.html options.setCleanSession(true); //設(shè)置連接用戶(hù)名 options.setUserName(mqttUsername); //設(shè)置連接密碼 options.setPassword(mqttPassword.toCharArray()); options.setAutomaticReconnect(true); // 啟用自動(dòng)重連 //設(shè)置超時(shí)時(shí)間,單位為秒 如果在指定的時(shí)間內(nèi)未能建立連接,客戶(hù)端會(huì)放棄連接嘗試并拋出異常。 options.setConnectionTimeout(100); //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*20秒的時(shí)間向客戶(hù)端發(fā)送心跳判斷客戶(hù)端是否在線 options.setKeepAliveInterval(20); //設(shè)置遺囑消息的話題,若客戶(hù)端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶(hù)端的遺囑信息 // options.setWill("willTopic",(mqttClientId + ":與服務(wù)器斷開(kāi)連接").getBytes(),0,false); if (StringUtils.isEmpty(mqttTypeCallback)) { log.error("MQTT回調(diào)類(lèi)型為空,請(qǐng)去java_config配置文件配置!"); } //設(shè)置回調(diào) if (breakerCallback.equals(mqttTypeCallback)) { //斷路器回調(diào) client.setCallback(new BreakerCallback()); } client.connect(options); return true; } catch (MqttException e) { log.error("MQTT啟動(dòng)報(bào)錯(cuò):" + e.getMessage()); e.printStackTrace(); return false; } } /** * qos * 0 最多一次傳遞【適用于對(duì)消息丟失不敏感的場(chǎng)景,如傳感器數(shù)據(jù)頻繁發(fā)送,可以接受偶爾的數(shù)據(jù)丟失】 * 1 至少一次傳遞 【消息至少傳遞一次,但可能會(huì)重復(fù)(即重復(fù)消息)】 * 2 僅一次傳遞 【消息確保僅傳遞一次,既不會(huì)丟失也不會(huì)重復(fù)?!? * retained * 保留消息:如果 retained 參數(shù)設(shè)置為 true,消息會(huì)被代理保留。代理將記住這個(gè)消息,并在新客戶(hù)端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。 * 非保留消息:如果 retained 參數(shù)設(shè)置為 false,消息不會(huì)被保留,只會(huì)發(fā)送給當(dāng)前在線并訂閱該主題的客戶(hù)端。 * topic 主題 * message 內(nèi)容 */ public void publish(int qos, boolean retained, String topic, String message) { log.info("topic為:【"+topic+"】,qos為:【"+qos+"】 mqtt 發(fā)布數(shù)據(jù)為:"+message); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); //代理將記住這個(gè)消息,并在新客戶(hù)端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。 mqttMessage.setPayload(message.getBytes()); //主題的目的地,用于發(fā)布信息 MqttTopic mqttTopic = client.getTopic(topic); MqttDeliveryToken token; try { //將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài) token = mqttTopic.publish(mqttMessage); //token.waitForCompletion(); // 等待完成 會(huì)堵塞 } catch (MqttException e) { log.warn("ClientId【" + mqttClientId + "】發(fā)布失敗!主題【" + topic + "】,發(fā)布數(shù)據(jù)為:" + message); e.printStackTrace(); } } /** * 斷開(kāi)連接 */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /*** * 手動(dòng)連接 * 可用于斷線后 手動(dòng)重連 * ***/ public boolean againConnect() { try { if (client != null && !client.isConnected()) { client.connect(); } return true; } catch (MqttException e) { e.printStackTrace(); return false; } } //驗(yàn)證是否啟動(dòng)mqtt連接 private boolean isMqtt(){ if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort) || StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword) || StringUtils.isEmpty(mqttClientId) ) { log.info("==========mqtt 參數(shù)不全,無(wú)需啟動(dòng)MQTT連接=================="); return true; } return false; } /** * 訂閱指定主題 * @param topic 訂閱的主題 * @param qos 訂閱的服務(wù)質(zhì)量 */ public boolean subscribe(String topic, int qos) { if (isMqtt()){ return false; } try { if (client != null && client.isConnected()) { client.subscribe(topic, qos); log.info("訂閱主題 {} 成功!", topic); } else { log.error("MQTT客戶(hù)端尚未連接,無(wú)法訂閱主題 {}!", topic); } return true; } catch (MqttException e) { log.error("訂閱主題 {} 失?。簕}", topic, e.getMessage()); e.printStackTrace(); return false; } } /** * 批量訂閱主題 * 消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶(hù)端推送消息 * @param topic 訂閱的主題集合 * @param qos 訂閱的服務(wù)質(zhì)量集合 */ public boolean subscribe(String[] topic, int[] qos) { if (isMqtt()){ return false; } try { if (client != null && client.isConnected()) { client.subscribe(topic, qos); log.info("訂閱主題 {} 成功!", topic); } else { log.error("MQTT客戶(hù)端尚未連接,無(wú)法訂閱主題 {}!", topic); } return true; } catch (MqttException e) { log.error("訂閱主題 {} 失?。簕}", topic, e.getMessage()); e.printStackTrace(); return false; } } }
第四步 回調(diào)類(lèi)
package com.xxx.iotjava.mqtt.callback; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xxx.iotjava.entities.BreakerData; import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum; import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum; import com.xxx.iotjava.enums.breaker.BreakerOperationEnum; import com.xxx.iotjava.service.inteface.IBreakerDataService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import com.alibaba.fastjson.JSONArray; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; /** * User:Json * Date: 2024/6/17 **/ @Component @Slf4j public class BreakerCallback implements MqttCallback { @Autowired IBreakerDataService iBreakerDataService; /** * 與服務(wù)器斷開(kāi)的回調(diào) * 這里可以做手動(dòng)連接 但是配置config類(lèi) 配置了 自動(dòng)檢測(cè)異常 true 這里可以也不做 * options.setAutomaticReconnect(true); // 啟用自動(dòng)重連 */ @Override public void connectionLost(Throwable throwable) { log.error("MQTT連接有異常:" + throwable.getMessage()); } /** * 訂閱的回調(diào) * 消息到達(dá)的回調(diào) * 注意 如果這個(gè)回調(diào)方法 如果有異常 報(bào)錯(cuò) ,mqtt會(huì)重新連接 * 因?yàn)榕渲梦募?設(shè)置了 options.setAutomaticReconnect(true); // 啟用自動(dòng)重連 * 如果自動(dòng)重連了 如果是開(kāi)啟新的會(huì)話 以前的訂閱會(huì)消失 具體操作 再上面的配置文件類(lèi)說(shuō)明過(guò)了 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { // System.out.println("上報(bào)時(shí)間:"+ LocalDateTime.now()); // System.out.println(String.format("接收消息主題 : %s",topic)); // System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos())); // System.out.println(String.format("接收消息內(nèi)容 : %s",new String(mqttMessage.getPayload()))); // System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained())); } /** * 發(fā)布的回調(diào) * 消息發(fā)布成功的回調(diào) */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); log.info(client.getClientId() + "發(fā)布消息成功!"); } }
第五步 mqtt工具類(lèi)
package com.xxx.iotjava.utils; import com.xxx.init.utils.AppContextUtil; import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum; import com.xxx.iotjava.mqtt.config.MqttConfig; import lombok.extern.slf4j.Slf4j; /** * User:Json * Date: 2024/6/17 **/ @Slf4j public class MqttUtils { private static MqttConfig mqttConfig; public static MqttConfig getMqttConfig() { if (mqttConfig == null) mqttConfig = AppContextUtil.getBean(MqttConfig.class); return mqttConfig; } //初始化 訂閱 public static boolean subscribeInit(){ return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0); } /** * 發(fā)送消息 * qos 0 最多一次傳遞 1 至少一次傳遞 2 僅一次傳遞 * retained true 保留消息 false 非保留消息 * topic 主題 * message 內(nèi)容 */ public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) { try { getMqttConfig().publish(qos, retained, topic, message); return true; } catch (Exception e) { e.printStackTrace(); log.error("MQtt發(fā)送消息報(bào)錯(cuò):" + e.getMessage()); return false; } } /* * topic 主題 * message 內(nèi)容 * */ public static boolean sendMqttMsg(String topic, String message) { return sendMqttMsg(1, false, topic, message); } }
第六步 調(diào)用測(cè)試
發(fā)布
MqttUtils.sendMqttMsg(topic, data)
//訂閱 我做的是啟動(dòng)的時(shí)候 初始化訂閱 所以 直接根據(jù)定義的 topic 常量進(jìn)行初始化訂閱
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定義的topic 枚舉類(lèi) 常量
// 這里就不分享了
MqttUtils.subscribeInit();
完成
到此這篇關(guān)于SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布功能的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息的訂閱和發(fā)布內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud Hystrix入門(mén)和Hystrix命令原理分析
這篇文章主要介紹了Spring Cloud Hystrix入門(mén)和Hystrix命令原理分析,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-08-08淺談java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06SpringBoot JPA出現(xiàn)錯(cuò)誤:No identifier specified&nb
這篇文章主要介紹了SpringBoot JPA出現(xiàn)錯(cuò)誤:No identifier specified for en解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03springboot如何靜態(tài)加載@configurationProperties
這篇文章主要介紹了springboot如何靜態(tài)加載@configurationProperties,本文一個(gè)錯(cuò)誤案例和成功案例結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析
這篇文章主要介紹了Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03解決idea找不到或無(wú)法加載主類(lèi)的錯(cuò)誤處理
這篇文章主要介紹了解決idea找不到或無(wú)法加載主類(lèi)的錯(cuò)誤處理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04