SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布(示例代碼)
SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布
背景 java框架SpringBoot通過mQTT通信 控制物聯(lián)網(wǎng)設(shè)備
還是直接上代碼
第一步依賴:
<!--mqtt相關(guān)依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.14</version>
</dependency>第二步配置文件
#mqtt mqtt: mqttUrl: tcp://127.0.0.1 mqttPort: 1883 mqttUsername: admin mqttPassword: public mqttClientId: aaa # MQTT回調(diào)類型 按一個(gè)MQTT服務(wù)區(qū)分 # 如果MQTT服務(wù)端換了 回調(diào)處理的是新的業(yè)務(wù)需求 就把這個(gè)換了 # 然后在MQTT配置文件中擴(kuò)展新的回調(diào)類 mqttTypeCallback: breakerCallback
第三步 config類
package com.xxx.iotjava.mqtt.config;
import com.xxx.iotjava.mqtt.callback.BreakerCallback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
/**
* User:Json
* Date: 2024/6/17
**/
@Configuration
@Slf4j
public class MqttConfig {
@Value("${mqtt.mqttUsername}")
private String mqttUsername;
@Value("${mqtt.mqttPassword}")
private String mqttPassword;
@Value("${mqtt.mqttUrl}")
private String mqttUrl;
@Value("${mqtt.mqttPort}")
private Integer mqttPort;
@Value("${mqtt.mqttClientId}")
private String mqttClientId;
@Value("${mqttTypeCallback}")
private String mqttTypeCallback;
private static String breakerCallback = "breakerCallback";
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 客戶端連接服務(wù)端
* 目前只支持一個(gè) MQTT服務(wù)端 如果后續(xù)一個(gè)項(xiàng)目有多個(gè)MQTT服務(wù)端那就設(shè)計(jì)成工廠模式
*/
public boolean connect() {
if (isMqtt()){
return false;
}
try {
//new MemoryPersistence() 使用內(nèi)存持久化
// 優(yōu)點(diǎn):不會在文件系統(tǒng)中創(chuàng)建任何文件(如 .lck 文件),適合對會話持久性沒有要求的場景。
// 缺點(diǎn): 客戶端斷開連接或重啟后,會話數(shù)據(jù)會丟失,無法保留訂閱信息和未發(fā)送的消息
// String persistenceDirectory = "/path/to/your/mqtt/persistence";
//new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化
//如果persistenceDirectory 不寫 他默認(rèn)創(chuàng)建 根目錄 linux要給權(quán)限
// 優(yōu)點(diǎn): 客戶端斷開連接或重啟后,能夠保留訂閱信息和未發(fā)送的消息。這對于需要保持會話狀態(tài)的應(yīng)用非常重要
// 缺點(diǎn) 會在指定的目錄中創(chuàng)建文件(如 .lck 文件),需要確保指定的目錄是有效的,并且應(yīng)用有權(quán)限訪問該目錄
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence());
//連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設(shè)置false表示服務(wù)器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
//設(shè)置為true表示每次連接服務(wù)器都是以新的身份
//如果他為true 會出現(xiàn)一個(gè)問題
//當(dāng)網(wǎng)絡(luò)斷開后,客戶端會進(jìn)行重連,但是重連之前訂閱的主題就失效了,不再接受之前訂閱主題的消息。
//因?yàn)榕渲美飳leanSession 設(shè)為 true ,當(dāng)客戶端掉線時(shí) ,
//服務(wù)器端會清除 客戶端 session 。 重連后 客戶端會有一個(gè)新的session。
// 所以如果大家把他為true 重新連接mqtt后,要注意需要手動再訂閱一下主題
// 推薦文檔:https://www.cnblogs.com/A-yes/p/9894144.html
options.setCleanSession(true);
//設(shè)置連接用戶名
options.setUserName(mqttUsername);
//設(shè)置連接密碼
options.setPassword(mqttPassword.toCharArray());
options.setAutomaticReconnect(true); // 啟用自動重連
//設(shè)置超時(shí)時(shí)間,單位為秒 如果在指定的時(shí)間內(nèi)未能建立連接,客戶端會放棄連接嘗試并拋出異常。
options.setConnectionTimeout(100);
//設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
// options.setWill("willTopic",(mqttClientId + ":與服務(wù)器斷開連接").getBytes(),0,false);
if (StringUtils.isEmpty(mqttTypeCallback)) {
log.error("MQTT回調(diào)類型為空,請去java_config配置文件配置!");
}
//設(shè)置回調(diào)
if (breakerCallback.equals(mqttTypeCallback)) {
//斷路器回調(diào)
client.setCallback(new BreakerCallback());
}
client.connect(options);
return true;
} catch (MqttException e) {
log.error("MQTT啟動報(bào)錯:" + e.getMessage());
e.printStackTrace();
return false;
}
}
/**
* qos
* 0 最多一次傳遞【適用于對消息丟失不敏感的場景,如傳感器數(shù)據(jù)頻繁發(fā)送,可以接受偶爾的數(shù)據(jù)丟失】
* 1 至少一次傳遞 【消息至少傳遞一次,但可能會重復(fù)(即重復(fù)消息)】
* 2 僅一次傳遞 【消息確保僅傳遞一次,既不會丟失也不會重復(fù)。】
* retained
* 保留消息:如果 retained 參數(shù)設(shè)置為 true,消息會被代理保留。代理將記住這個(gè)消息,并在新客戶端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。
* 非保留消息:如果 retained 參數(shù)設(shè)置為 false,消息不會被保留,只會發(fā)送給當(dāng)前在線并訂閱該主題的客戶端。
* topic 主題
* message 內(nèi)容
*/
public void publish(int qos, boolean retained, String topic, String message) {
log.info("topic為:【"+topic+"】,qos為:【"+qos+"】 mqtt 發(fā)布數(shù)據(jù)為:"+message);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained); //代理將記住這個(gè)消息,并在新客戶端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。
mqttMessage.setPayload(message.getBytes());
//主題的目的地,用于發(fā)布信息
MqttTopic mqttTopic = client.getTopic(topic);
MqttDeliveryToken token;
try {
//將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
token = mqttTopic.publish(mqttMessage);
//token.waitForCompletion(); // 等待完成 會堵塞
} catch (MqttException e) {
log.warn("ClientId【" + mqttClientId + "】發(fā)布失?。≈黝}【" + topic + "】,發(fā)布數(shù)據(jù)為:" + message);
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disConnect() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/***
* 手動連接
* 可用于斷線后 手動重連
* ***/
public boolean againConnect() {
try {
if (client != null && !client.isConnected()) {
client.connect();
}
return true;
} catch (MqttException e) {
e.printStackTrace();
return false;
}
}
//驗(yàn)證是否啟動mqtt連接
private boolean isMqtt(){
if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort)
|| StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword)
|| StringUtils.isEmpty(mqttClientId)
) {
log.info("==========mqtt 參數(shù)不全,無需啟動MQTT連接==================");
return true;
}
return false;
}
/**
* 訂閱指定主題
* @param topic 訂閱的主題
* @param qos 訂閱的服務(wù)質(zhì)量
*/
public boolean subscribe(String topic, int qos) {
if (isMqtt()){
return false;
}
try {
if (client != null && client.isConnected()) {
client.subscribe(topic, qos);
log.info("訂閱主題 {} 成功!", topic);
} else {
log.error("MQTT客戶端尚未連接,無法訂閱主題 {}!", topic);
}
return true;
} catch (MqttException e) {
log.error("訂閱主題 {} 失?。簕}", topic, e.getMessage());
e.printStackTrace();
return false;
}
}
/**
* 批量訂閱主題
* 消息等級,和主題數(shù)組一一對應(yīng),服務(wù)端將按照指定等級給訂閱了主題的客戶端推送消息
* @param topic 訂閱的主題集合
* @param qos 訂閱的服務(wù)質(zhì)量集合
*/
public boolean subscribe(String[] topic, int[] qos) {
if (isMqtt()){
return false;
}
try {
if (client != null && client.isConnected()) {
client.subscribe(topic, qos);
log.info("訂閱主題 {} 成功!", topic);
} else {
log.error("MQTT客戶端尚未連接,無法訂閱主題 {}!", topic);
}
return true;
} catch (MqttException e) {
log.error("訂閱主題 {} 失敗:{}", topic, e.getMessage());
e.printStackTrace();
return false;
}
}
}第四步 回調(diào)類
package com.xxx.iotjava.mqtt.callback;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxx.iotjava.entities.BreakerData;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum;
import com.xxx.iotjava.enums.breaker.BreakerOperationEnum;
import com.xxx.iotjava.service.inteface.IBreakerDataService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import com.alibaba.fastjson.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* User:Json
* Date: 2024/6/17
**/
@Component
@Slf4j
public class BreakerCallback implements MqttCallback {
@Autowired
IBreakerDataService iBreakerDataService;
/**
* 與服務(wù)器斷開的回調(diào)
* 這里可以做手動連接 但是配置config類 配置了 自動檢測異常 true 這里可以也不做
* options.setAutomaticReconnect(true); // 啟用自動重連
*/
@Override
public void connectionLost(Throwable throwable) {
log.error("MQTT連接有異常:" + throwable.getMessage());
}
/**
* 訂閱的回調(diào)
* 消息到達(dá)的回調(diào)
* 注意 如果這個(gè)回調(diào)方法 如果有異常 報(bào)錯 ,mqtt會重新連接
* 因?yàn)榕渲梦募?設(shè)置了 options.setAutomaticReconnect(true); // 啟用自動重連
* 如果自動重連了 如果是開啟新的會話 以前的訂閱會消失 具體操作 再上面的配置文件類說明過了
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// System.out.println("上報(bào)時(shí)間:"+ LocalDateTime.now());
// System.out.println(String.format("接收消息主題 : %s",topic));
// System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos()));
// System.out.println(String.format("接收消息內(nèi)容 : %s",new String(mqttMessage.getPayload())));
// System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained()));
}
/**
* 發(fā)布的回調(diào)
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
log.info(client.getClientId() + "發(fā)布消息成功!");
}
}第五步 mqtt工具類
package com.xxx.iotjava.utils;
import com.xxx.init.utils.AppContextUtil;
import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum;
import com.xxx.iotjava.mqtt.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
/**
* User:Json
* Date: 2024/6/17
**/
@Slf4j
public class MqttUtils {
private static MqttConfig mqttConfig;
public static MqttConfig getMqttConfig() {
if (mqttConfig == null)
mqttConfig = AppContextUtil.getBean(MqttConfig.class);
return mqttConfig;
}
//初始化 訂閱
public static boolean subscribeInit(){
return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0);
}
/**
* 發(fā)送消息
* qos 0 最多一次傳遞 1 至少一次傳遞 2 僅一次傳遞
* retained true 保留消息 false 非保留消息
* topic 主題
* message 內(nèi)容
*/
public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) {
try {
getMqttConfig().publish(qos, retained, topic, message);
return true;
} catch (Exception e) {
e.printStackTrace();
log.error("MQtt發(fā)送消息報(bào)錯:" + e.getMessage());
return false;
}
}
/*
* topic 主題
* message 內(nèi)容
* */
public static boolean sendMqttMsg(String topic, String message) {
return sendMqttMsg(1, false, topic, message);
}
}第六步 調(diào)用測試
發(fā)布
MqttUtils.sendMqttMsg(topic, data)
//訂閱 我做的是啟動的時(shí)候 初始化訂閱 所以 直接根據(jù)定義的 topic 常量進(jìn)行初始化訂閱
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定義的topic 枚舉類 常量
// 這里就不分享了
MqttUtils.subscribeInit();
完成
到此這篇關(guān)于SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布功能的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息的訂閱和發(fā)布內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud Hystrix入門和Hystrix命令原理分析
這篇文章主要介紹了Spring Cloud Hystrix入門和Hystrix命令原理分析,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08
淺談java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制
今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識,文章圍繞著java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
SpringBoot JPA出現(xiàn)錯誤:No identifier specified&nb
這篇文章主要介紹了SpringBoot JPA出現(xiàn)錯誤:No identifier specified for en解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
springboot如何靜態(tài)加載@configurationProperties
這篇文章主要介紹了springboot如何靜態(tài)加載@configurationProperties,本文一個(gè)錯誤案例和成功案例結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07
Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析
這篇文章主要介紹了Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03

