Springboot實(shí)現(xiàn)MQTT通信的示例代碼
MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級(jí)消息傳輸協(xié)議,常用于物聯(lián)網(wǎng)(IoT)場(chǎng)景中。它設(shè)計(jì)簡(jiǎn)潔、帶寬占用少,非常適合資源受限的設(shè)備和網(wǎng)絡(luò)環(huán)境。
一、MQ協(xié)議
MQTT 特點(diǎn)
輕量級(jí)協(xié)議:
- 設(shè)計(jì)簡(jiǎn)單,占用帶寬少,特別適合嵌入式設(shè)備和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境。
發(fā)布/訂閱模型:
- 客戶端通過(guò)主題(Topic)發(fā)布消息,訂閱者通過(guò)主題接收消息,彼此不直接通信。
可靠性保障:
- 提供三種服務(wù)質(zhì)量(QoS)等級(jí),確保消息可靠傳輸:
- QoS 0:至多一次(不確認(rèn),可能丟失)。
- QoS 1:至少一次(需要確認(rèn),但可能重復(fù))。
- QoS 2:僅一次(確保消息不丟失且不重復(fù))。
- 提供三種服務(wù)質(zhì)量(QoS)等級(jí),確保消息可靠傳輸:
持續(xù)連接:
- 使用 TCP/IP 連接,通過(guò)心跳包(Keep-Alive)保持連接穩(wěn)定。
支持離線消息:
- 使用“保留消息”和“持久會(huì)話”功能,實(shí)現(xiàn)離線設(shè)備接收消息。
安全性:
- 支持 SSL/TLS 加密,結(jié)合用戶名和密碼進(jìn)行身份驗(yàn)證。
MQTT 工作原理
連接:
- 客戶端通過(guò)
CONNECT
消息向服務(wù)器建立連接,服務(wù)器返回CONNACK
消息。
- 客戶端通過(guò)
發(fā)布:
- 客戶端通過(guò)
PUBLISH
消息向服務(wù)器發(fā)布消息,指定消息的主題。
- 客戶端通過(guò)
訂閱:
- 客戶端通過(guò)
SUBSCRIBE
消息訂閱一個(gè)或多個(gè)主題,服務(wù)器將匹配主題的消息推送給客戶端。
- 客戶端通過(guò)
心跳:
- 客戶端和服務(wù)器定期發(fā)送心跳包(PINGREQ 和 PINGRESP),確保連接有效。
斷開(kāi):
- 客戶端通過(guò)
DISCONNECT
消息通知服務(wù)器主動(dòng)斷開(kāi)連接。
- 客戶端通過(guò)
MQTT 主要應(yīng)用場(chǎng)景
物聯(lián)網(wǎng)(IoT):
- 設(shè)備狀態(tài)監(jiān)控、數(shù)據(jù)收集和遠(yuǎn)程控制。
智能家居:
- 控制家電、監(jiān)控傳感器數(shù)據(jù)。
車聯(lián)網(wǎng):
- 實(shí)時(shí)車輛數(shù)據(jù)傳輸、位置追蹤。
移動(dòng)應(yīng)用:
- 消息推送、實(shí)時(shí)聊天。
工業(yè)領(lǐng)域:
- 設(shè)備數(shù)據(jù)采集和分析。
MQTT 配置與注意事項(xiàng)
主題命名:
- 使用層級(jí)結(jié)構(gòu)(如
/iot/device/status
),便于管理。 - 避免過(guò)于復(fù)雜的主題結(jié)構(gòu)。
- 使用層級(jí)結(jié)構(gòu)(如
QoS 選擇:
- 根據(jù)應(yīng)用需求選擇適合的 QoS 等級(jí),平衡可靠性和性能。
安全措施:
- 啟用 SSL/TLS 加密。
- 配置用戶名和密碼,限制匿名連接。
- 控制主題的訪問(wèn)權(quán)限。
性能優(yōu)化:
- 控制消息大小,減少帶寬占用。
- 調(diào)整心跳時(shí)間,優(yōu)化連接穩(wěn)定性。
二、MQTT服務(wù)器搭建
1、在springboot項(xiàng)目工程pom文件下引入相關(guān)依賴
<!--mqtt相關(guān)依賴--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2、修改application.yml配置文件
spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服務(wù)地址,端口號(hào)默認(rèn)1883,如果有多個(gè),用逗號(hào)隔開(kāi) url: tcp://127.0.0.1:1883 #用戶名 username: guest #密碼 password: guest #客戶端id(不能重復(fù)) client: id: provider-id #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口是指定 default: topic: topic server: port: 8080
3、消息發(fā)布者客戶端配置
? package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.time.LocalDateTime; @Configuration @Slf4j public class MqttClientConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; /** * 客戶端對(duì)象 */ private MqttAsyncClient client; /** * 在bean初始化后連接到服務(wù)器 */ @PostConstruct public void init() { connect(); } /** * 客戶端連接服務(wù)端 */ public void connect() { //連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開(kāi)連接期間推送的消息 //設(shè)置為true表示每次連接服務(wù)器都是以新的身份 options.setCleanSession(false); //設(shè)置連接用戶名 options.setUserName(username); //設(shè)置連接密碼 options.setPassword(password.toCharArray()); //設(shè)置超時(shí)時(shí)間,單位為秒 options.setConnectionTimeout(60); //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); // 開(kāi)啟自動(dòng)重連 options.setAutomaticReconnect(true); // 設(shè)置最大重連時(shí)間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連 options.setMaxReconnectDelay(5000); //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic", (clientId + "與服務(wù)器斷開(kāi)連接").getBytes(), 0, false); try { //創(chuàng)建MQTT客戶端對(duì)象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //設(shè)置回調(diào) client.setCallback(mqttClientCallBack); // 使用異步連接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT連接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT連接失敗:" + exception.getMessage()); } }); } catch (MqttException e) { log.error("mqtt連接失敗。。" + e.getMessage()); } } public void publish(int qos, boolean retained) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(pushLog.getData().getBytes()); try { // 使用異步客戶端發(fā)布消息,并處理結(jié)果 client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { System.out.println("發(fā)送成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("發(fā)送失?。? + exception.getMessage()); } }); } catch (MqttException e) { log.error("發(fā)送失?。? + e.getMessage()); } } /** * 斷開(kāi)連接 */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } ?
4、消息發(fā)布客戶端回調(diào)
package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 與服務(wù)器斷開(kāi)的回調(diào) */ @Override public void connectionLost(Throwable cause) { log.error(clientId + "與服務(wù)器斷開(kāi)連接!!" + cause.getMessage()); } /** * 消息發(fā)布成功的回調(diào) */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId()+"發(fā)布消息成功!"); } }
5、創(chuàng)建控制器測(cè)試發(fā)布信息
package com.three.demo.mqtt.controller; import com.three.demo.mqtt.config.MqttClientConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendController { @Autowired private MqttClientConfig client; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { client.publish(qos, retained, topic, message); return "發(fā)送成功"; } catch (Exception e) { e.printStackTrace(); return "發(fā)送失敗"; } } }
6、消息接收者配置
這里我對(duì)之前的代碼進(jìn)行改造
/** * 客戶端連接服務(wù)端 */ public void connect() { //連接設(shè)置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開(kāi)連接期間推送的消息 //設(shè)置為true表示每次連接服務(wù)器都是以新的身份 options.setCleanSession(false); //設(shè)置連接用戶名 options.setUserName(username); //設(shè)置連接密碼 options.setPassword(password.toCharArray()); //設(shè)置超時(shí)時(shí)間,單位為秒 options.setConnectionTimeout(60); //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); // 開(kāi)啟自動(dòng)重連 options.setAutomaticReconnect(true); // 設(shè)置最大重連時(shí)間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連 options.setMaxReconnectDelay(5000); //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic", (clientId + "與服務(wù)器斷開(kāi)連接").getBytes(), 0, false); try { //創(chuàng)建MQTT客戶端對(duì)象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //設(shè)置回調(diào) client.setCallback(mqttClientCallBack); // 使用異步連接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT連接成功"); // 連接成功后訂閱主題 try { //訂閱主題 //消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶端推送消息 int[] qos = {2, 2}; String[] topics = { "/iot/msg/topic1", "/iot/msg/topic2" }; client.subscribe(topics, qos); log.info("訂閱主題成功"); } catch (MqttException e) { log.error("訂閱主題失敗:" + e.getMessage()); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT連接失?。? + exception.getMessage()); } }); } catch (MqttException e) { e.printStackTrace(); log.error("mqtt連接失敗。。" + e.getMessage()); } }
然后在消息客戶端回調(diào)類這里
package com.ruoyi.yyt.mqtt.config; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 客戶端斷開(kāi)連接的回調(diào) */ @Override public void connectionLost(Throwable throwable) { log.error(clientId + "與服務(wù)器斷開(kāi)連接??!" + cause.getMessage()); } /** * 消息到達(dá)的回調(diào) */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主題 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息發(fā)布成功的回調(diào) */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId() + "發(fā)布消息成功!"); } }
這個(gè)時(shí)候我們啟動(dòng)服務(wù),調(diào)用測(cè)試接口
就可以看到接口返回發(fā)布成功,并且能看到后臺(tái)服務(wù)的打印日志了
至此大功告成了!
到此這篇關(guān)于Springboot實(shí)現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關(guān)Springboot MQTT通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼
微信小程序支付是常見(jiàn)的一種功能,本文主要介紹了SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單FTP軟件 FTP軟件主界面(4)
這篇文章主要為大家詳細(xì)介紹了Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單FTP軟件,F(xiàn)TP軟件主界面編寫的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-03-03圖解Java經(jīng)典算法冒泡排序的原理與實(shí)現(xiàn)
冒泡排序是一種簡(jiǎn)單的排序算法,它也是一種穩(wěn)定排序算法。其實(shí)現(xiàn)原理是重復(fù)掃描待排序序列,并比較每一對(duì)相鄰的元素,當(dāng)該對(duì)元素順序不正確時(shí)進(jìn)行交換。一直重復(fù)這個(gè)過(guò)程,直到?jīng)]有任何兩個(gè)相鄰元素可以交換,就表明完成了排序2022-09-09Spring使用Configuration注解管理bean的方式詳解
在Spring的世界里,Configuration注解就像是一位細(xì)心的園丁,它的主要職責(zé)是在這個(gè)繁花似錦的園子里,幫助我們聲明和管理各種各樣的bean,本文給大家介紹了在Spring中如何優(yōu)雅地管理你的bean,需要的朋友可以參考下2024-05-05怎樣提高mybatis-plus中saveBatch方法的效率
這篇文章主要介紹了怎樣提高mybatis-plus中saveBatch方法的效率問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07如何將DeepSeek 集成到 Java 的 Spring Boot&
本文介紹了如何將DeepSeek集成到Java的SpringBoot項(xiàng)目中,包括準(zhǔn)備工作、集成步驟和示例說(shuō)明,感興趣的朋友一起看看吧2025-02-02JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo)的操作代碼
這篇文章主要介紹了JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo),本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-09-09Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理分析
這篇文章主要介紹了Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02淺談?dòng)肧pringBoot實(shí)現(xiàn)策略模式
本文主要介紹了SpringBoot實(shí)現(xiàn)策略模式,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10