Springboot實現(xiàn)MQTT通信的示例代碼
MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級消息傳輸協(xié)議,常用于物聯(lián)網(IoT)場景中。它設計簡潔、帶寬占用少,非常適合資源受限的設備和網絡環(huán)境。
一、MQ協(xié)議
MQTT 特點
輕量級協(xié)議:
- 設計簡單,占用帶寬少,特別適合嵌入式設備和不穩(wěn)定的網絡環(huán)境。
發(fā)布/訂閱模型:
- 客戶端通過主題(Topic)發(fā)布消息,訂閱者通過主題接收消息,彼此不直接通信。
可靠性保障:
- 提供三種服務質量(QoS)等級,確保消息可靠傳輸:
- QoS 0:至多一次(不確認,可能丟失)。
- QoS 1:至少一次(需要確認,但可能重復)。
- QoS 2:僅一次(確保消息不丟失且不重復)。
- 提供三種服務質量(QoS)等級,確保消息可靠傳輸:
持續(xù)連接:
- 使用 TCP/IP 連接,通過心跳包(Keep-Alive)保持連接穩(wěn)定。
支持離線消息:
- 使用“保留消息”和“持久會話”功能,實現(xiàn)離線設備接收消息。
安全性:
- 支持 SSL/TLS 加密,結合用戶名和密碼進行身份驗證。
MQTT 工作原理
連接:
- 客戶端通過
CONNECT
消息向服務器建立連接,服務器返回CONNACK
消息。
- 客戶端通過
發(fā)布:
- 客戶端通過
PUBLISH
消息向服務器發(fā)布消息,指定消息的主題。
- 客戶端通過
訂閱:
- 客戶端通過
SUBSCRIBE
消息訂閱一個或多個主題,服務器將匹配主題的消息推送給客戶端。
- 客戶端通過
心跳:
- 客戶端和服務器定期發(fā)送心跳包(PINGREQ 和 PINGRESP),確保連接有效。
斷開:
- 客戶端通過
DISCONNECT
消息通知服務器主動斷開連接。
- 客戶端通過
MQTT 主要應用場景
物聯(lián)網(IoT):
- 設備狀態(tài)監(jiān)控、數(shù)據(jù)收集和遠程控制。
智能家居:
- 控制家電、監(jiān)控傳感器數(shù)據(jù)。
車聯(lián)網:
- 實時車輛數(shù)據(jù)傳輸、位置追蹤。
移動應用:
- 消息推送、實時聊天。
工業(yè)領域:
- 設備數(shù)據(jù)采集和分析。
MQTT 配置與注意事項
主題命名:
- 使用層級結構(如
/iot/device/status
),便于管理。 - 避免過于復雜的主題結構。
- 使用層級結構(如
QoS 選擇:
- 根據(jù)應用需求選擇適合的 QoS 等級,平衡可靠性和性能。
安全措施:
- 啟用 SSL/TLS 加密。
- 配置用戶名和密碼,限制匿名連接。
- 控制主題的訪問權限。
性能優(yōu)化:
- 控制消息大小,減少帶寬占用。
- 調整心跳時間,優(yōu)化連接穩(wěn)定性。
二、MQTT服務器搭建
1、在springboot項目工程pom文件下引入相關依賴
<!--mqtt相關依賴--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2、修改application.yml配置文件
spring: application: name: provider #MQTT配置信息 mqtt: #MQTT服務地址,端口號默認1883,如果有多個,用逗號隔開 url: tcp://127.0.0.1:1883 #用戶名 username: guest #密碼 password: guest #客戶端id(不能重復) client: id: provider-id #MQTT默認的消息推送主題,實際可在調用接口是指定 default: topic: topic server: port: 8080
3、消息發(fā)布者客戶端配置
? package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.time.LocalDateTime; @Configuration @Slf4j public class MqttClientConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; /** * 客戶端對象 */ private MqttAsyncClient client; /** * 在bean初始化后連接到服務器 */ @PostConstruct public void init() { connect(); } /** * 客戶端連接服務端 */ public void connect() { //連接設置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設置false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息 //設置為true表示每次連接服務器都是以新的身份 options.setCleanSession(false); //設置連接用戶名 options.setUserName(username); //設置連接密碼 options.setPassword(password.toCharArray()); //設置超時時間,單位為秒 options.setConnectionTimeout(60); //設置心跳時間 單位為秒,表示服務器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); // 開啟自動重連 options.setAutomaticReconnect(true); // 設置最大重連時間間隔 (可選),單位是毫秒,設置為 5000 表示最多等待 5 秒再嘗試重連 options.setMaxReconnectDelay(5000); //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic", (clientId + "與服務器斷開連接").getBytes(), 0, false); try { //創(chuàng)建MQTT客戶端對象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //設置回調 client.setCallback(mqttClientCallBack); // 使用異步連接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT連接成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT連接失?。? + exception.getMessage()); } }); } catch (MqttException e) { log.error("mqtt連接失敗。。" + e.getMessage()); } } public void publish(int qos, boolean retained) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(pushLog.getData().getBytes()); try { // 使用異步客戶端發(fā)布消息,并處理結果 client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { System.out.println("發(fā)送成功"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("發(fā)送失?。? + exception.getMessage()); } }); } catch (MqttException e) { log.error("發(fā)送失?。? + e.getMessage()); } } /** * 斷開連接 */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } ?
4、消息發(fā)布客戶端回調
package com.three.demo.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 與服務器斷開的回調 */ @Override public void connectionLost(Throwable cause) { log.error(clientId + "與服務器斷開連接!!" + cause.getMessage()); } /** * 消息發(fā)布成功的回調 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId()+"發(fā)布消息成功!"); } }
5、創(chuàng)建控制器測試發(fā)布信息
package com.three.demo.mqtt.controller; import com.three.demo.mqtt.config.MqttClientConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class SendController { @Autowired private MqttClientConfig client; @RequestMapping("/sendMessage") @ResponseBody public String sendMessage(int qos,boolean retained,String topic,String message){ try { client.publish(qos, retained, topic, message); return "發(fā)送成功"; } catch (Exception e) { e.printStackTrace(); return "發(fā)送失敗"; } } }
6、消息接收者配置
這里我對之前的代碼進行改造
/** * 客戶端連接服務端 */ public void connect() { //連接設置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,設置false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息 //設置為true表示每次連接服務器都是以新的身份 options.setCleanSession(false); //設置連接用戶名 options.setUserName(username); //設置連接密碼 options.setPassword(password.toCharArray()); //設置超時時間,單位為秒 options.setConnectionTimeout(60); //設置心跳時間 單位為秒,表示服務器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線 options.setKeepAliveInterval(20); // 開啟自動重連 options.setAutomaticReconnect(true); // 設置最大重連時間間隔 (可選),單位是毫秒,設置為 5000 表示最多等待 5 秒再嘗試重連 options.setMaxReconnectDelay(5000); //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息 options.setWill("willTopic", (clientId + "與服務器斷開連接").getBytes(), 0, false); try { //創(chuàng)建MQTT客戶端對象 client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence()); //設置回調 client.setCallback(mqttClientCallBack); // 使用異步連接 client.connect(options, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { log.info("MQTT連接成功"); // 連接成功后訂閱主題 try { //訂閱主題 //消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息 int[] qos = {2, 2}; String[] topics = { "/iot/msg/topic1", "/iot/msg/topic2" }; client.subscribe(topics, qos); log.info("訂閱主題成功"); } catch (MqttException e) { log.error("訂閱主題失?。? + e.getMessage()); } } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log.error("MQTT連接失?。? + exception.getMessage()); } }); } catch (MqttException e) { e.printStackTrace(); log.error("mqtt連接失敗。。" + e.getMessage()); } }
然后在消息客戶端回調類這里
package com.ruoyi.yyt.mqtt.config; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @Slf4j @Component public class MqttClientCallBack implements MqttCallback { @Value("${spring.mqtt.client.id}") private String clientId; /** * 客戶端斷開連接的回調 */ @Override public void connectionLost(Throwable throwable) { log.error(clientId + "與服務器斷開連接?。? + cause.getMessage()); } /** * 消息到達的回調 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("接收消息主題 : %s",topic)); System.out.println(String.format("接收消息Qos : %d",message.getQos())); System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload()))); System.out.println(String.format("接收消息retained : %b",message.isRetained())); } /** * 消息發(fā)布成功的回調 */ @Override public void deliveryComplete(IMqttDeliveryToken token) { IMqttAsyncClient client = token.getClient(); System.out.println(client.getClientId() + "發(fā)布消息成功!"); } }
這個時候我們啟動服務,調用測試接口
就可以看到接口返回發(fā)布成功,并且能看到后臺服務的打印日志了
至此大功告成了!
到此這篇關于Springboot實現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關Springboot MQTT通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringBoot整合weixin-java-pay實現(xiàn)微信小程序支付的示例代碼
微信小程序支付是常見的一種功能,本文主要介紹了SpringBoot整合weixin-java-pay實現(xiàn)微信小程序支付的示例代碼,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學習學習吧2024-05-05Java語言實現(xiàn)簡單FTP軟件 FTP軟件主界面(4)
這篇文章主要為大家詳細介紹了Java語言實現(xiàn)簡單FTP軟件,F(xiàn)TP軟件主界面編寫的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03Spring使用Configuration注解管理bean的方式詳解
在Spring的世界里,Configuration注解就像是一位細心的園丁,它的主要職責是在這個繁花似錦的園子里,幫助我們聲明和管理各種各樣的bean,本文給大家介紹了在Spring中如何優(yōu)雅地管理你的bean,需要的朋友可以參考下2024-05-05怎樣提高mybatis-plus中saveBatch方法的效率
這篇文章主要介紹了怎樣提高mybatis-plus中saveBatch方法的效率問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07如何將DeepSeek 集成到 Java 的 Spring Boot&
本文介紹了如何將DeepSeek集成到Java的SpringBoot項目中,包括準備工作、集成步驟和示例說明,感興趣的朋友一起看看吧2025-02-02Java并發(fā)編程之synchronized底層實現(xiàn)原理分析
這篇文章主要介紹了Java并發(fā)編程之synchronized底層實現(xiàn)原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02