Springboot實(shí)現(xiàn)MQTT通信的示例代碼
MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級消息傳輸協(xié)議,常用于物聯(lián)網(wǎng)(IoT)場景中。它設(shè)計(jì)簡潔、帶寬占用少,非常適合資源受限的設(shè)備和網(wǎng)絡(luò)環(huán)境。
一、MQ協(xié)議
MQTT 特點(diǎn)
輕量級協(xié)議:
- 設(shè)計(jì)簡單,占用帶寬少,特別適合嵌入式設(shè)備和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境。
發(fā)布/訂閱模型:
- 客戶端通過主題(Topic)發(fā)布消息,訂閱者通過主題接收消息,彼此不直接通信。
可靠性保障:
- 提供三種服務(wù)質(zhì)量(QoS)等級,確保消息可靠傳輸:
- QoS 0:至多一次(不確認(rèn),可能丟失)。
- QoS 1:至少一次(需要確認(rèn),但可能重復(fù))。
- QoS 2:僅一次(確保消息不丟失且不重復(fù))。
- 提供三種服務(wù)質(zhì)量(QoS)等級,確保消息可靠傳輸:
持續(xù)連接:
- 使用 TCP/IP 連接,通過心跳包(Keep-Alive)保持連接穩(wěn)定。
支持離線消息:
- 使用“保留消息”和“持久會話”功能,實(shí)現(xiàn)離線設(shè)備接收消息。
安全性:
- 支持 SSL/TLS 加密,結(jié)合用戶名和密碼進(jìn)行身份驗(yàn)證。
MQTT 工作原理
連接:
- 客戶端通過
CONNECT消息向服務(wù)器建立連接,服務(wù)器返回CONNACK消息。
- 客戶端通過
發(fā)布:
- 客戶端通過
PUBLISH消息向服務(wù)器發(fā)布消息,指定消息的主題。
- 客戶端通過
訂閱:
- 客戶端通過
SUBSCRIBE消息訂閱一個或多個主題,服務(wù)器將匹配主題的消息推送給客戶端。
- 客戶端通過
心跳:
- 客戶端和服務(wù)器定期發(fā)送心跳包(PINGREQ 和 PINGRESP),確保連接有效。
斷開:
- 客戶端通過
DISCONNECT消息通知服務(wù)器主動斷開連接。
- 客戶端通過
MQTT 主要應(yīng)用場景
物聯(lián)網(wǎng)(IoT):
- 設(shè)備狀態(tài)監(jiān)控、數(shù)據(jù)收集和遠(yuǎn)程控制。
智能家居:
- 控制家電、監(jiān)控傳感器數(shù)據(jù)。
車聯(lián)網(wǎng):
- 實(shí)時車輛數(shù)據(jù)傳輸、位置追蹤。
移動應(yīng)用:
- 消息推送、實(shí)時聊天。
工業(yè)領(lǐng)域:
- 設(shè)備數(shù)據(jù)采集和分析。
MQTT 配置與注意事項(xiàng)
主題命名:
- 使用層級結(jié)構(gòu)(如
/iot/device/status),便于管理。 - 避免過于復(fù)雜的主題結(jié)構(gòu)。
- 使用層級結(jié)構(gòu)(如
QoS 選擇:
- 根據(jù)應(yīng)用需求選擇適合的 QoS 等級,平衡可靠性和性能。
安全措施:
- 啟用 SSL/TLS 加密。
- 配置用戶名和密碼,限制匿名連接。
- 控制主題的訪問權(quán)限。
性能優(yōu)化:
- 控制消息大小,減少帶寬占用。
- 調(diào)整心跳時間,優(yōu)化連接穩(wěn)定性。
二、MQTT服務(wù)器搭建
1、在springboot項(xiàng)目工程pom文件下引入相關(guān)依賴
<!--mqtt相關(guān)依賴-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>2、修改application.yml配置文件
spring:
application:
name: provider
#MQTT配置信息
mqtt:
#MQTT服務(wù)地址,端口號默認(rèn)1883,如果有多個,用逗號隔開
url: tcp://127.0.0.1:1883
#用戶名
username: guest
#密碼
password: guest
#客戶端id(不能重復(fù))
client:
id: provider-id
#MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口是指定
default:
topic: topic
server:
port: 80803、消息發(fā)布者客戶端配置
?
package com.three.demo.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
@Configuration
@Slf4j
public class MqttClientConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 客戶端對象
*/
private MqttAsyncClient client;
/**
* 在bean初始化后連接到服務(wù)器
*/
@PostConstruct
public void init() {
connect();
}
/**
* 客戶端連接服務(wù)端
*/
public void connect() {
//連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設(shè)置false表示服務(wù)器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
//設(shè)置為true表示每次連接服務(wù)器都是以新的身份
options.setCleanSession(false);
//設(shè)置連接用戶名
options.setUserName(username);
//設(shè)置連接密碼
options.setPassword(password.toCharArray());
//設(shè)置超時時間,單位為秒
options.setConnectionTimeout(60);
//設(shè)置心跳時間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
// 開啟自動重連
options.setAutomaticReconnect(true);
// 設(shè)置最大重連時間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連
options.setMaxReconnectDelay(5000);
//設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic", (clientId + "與服務(wù)器斷開連接").getBytes(), 0, false);
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
//設(shè)置回調(diào)
client.setCallback(mqttClientCallBack);
// 使用異步連接
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("MQTT連接成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("MQTT連接失?。? + exception.getMessage());
}
});
} catch (MqttException e) {
log.error("mqtt連接失敗。。" + e.getMessage());
}
}
public void publish(int qos, boolean retained) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(pushLog.getData().getBytes());
try {
// 使用異步客戶端發(fā)布消息,并處理結(jié)果
client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
System.out.println("發(fā)送成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("發(fā)送失敗:" + exception.getMessage());
}
});
} catch (MqttException e) {
log.error("發(fā)送失?。? + e.getMessage());
}
}
/**
* 斷開連接
*/
public void disConnect() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
?4、消息發(fā)布客戶端回調(diào)
package com.three.demo.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 與服務(wù)器斷開的回調(diào)
*/
@Override
public void connectionLost(Throwable cause) {
log.error(clientId + "與服務(wù)器斷開連接??!" + cause.getMessage());
}
/**
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
System.out.println(client.getClientId()+"發(fā)布消息成功!");
}
}5、創(chuàng)建控制器測試發(fā)布信息
package com.three.demo.mqtt.controller;
import com.three.demo.mqtt.config.MqttClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class SendController {
@Autowired
private MqttClientConfig client;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(int qos,boolean retained,String topic,String message){
try {
client.publish(qos, retained, topic, message);
return "發(fā)送成功";
} catch (Exception e) {
e.printStackTrace();
return "發(fā)送失敗";
}
}
}6、消息接收者配置
這里我對之前的代碼進(jìn)行改造
/**
* 客戶端連接服務(wù)端
*/
public void connect() {
//連接設(shè)置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,設(shè)置false表示服務(wù)器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開連接期間推送的消息
//設(shè)置為true表示每次連接服務(wù)器都是以新的身份
options.setCleanSession(false);
//設(shè)置連接用戶名
options.setUserName(username);
//設(shè)置連接密碼
options.setPassword(password.toCharArray());
//設(shè)置超時時間,單位為秒
options.setConnectionTimeout(60);
//設(shè)置心跳時間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
// 開啟自動重連
options.setAutomaticReconnect(true);
// 設(shè)置最大重連時間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連
options.setMaxReconnectDelay(5000);
//設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開,服務(wù)器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic", (clientId + "與服務(wù)器斷開連接").getBytes(), 0, false);
try {
//創(chuàng)建MQTT客戶端對象
client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
//設(shè)置回調(diào)
client.setCallback(mqttClientCallBack);
// 使用異步連接
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
log.info("MQTT連接成功");
// 連接成功后訂閱主題
try {
//訂閱主題
//消息等級,和主題數(shù)組一一對應(yīng),服務(wù)端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {2, 2};
String[] topics = {
"/iot/msg/topic1",
"/iot/msg/topic2"
};
client.subscribe(topics, qos);
log.info("訂閱主題成功");
} catch (MqttException e) {
log.error("訂閱主題失?。? + e.getMessage());
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
log.error("MQTT連接失?。? + exception.getMessage());
}
});
} catch (MqttException e) {
e.printStackTrace();
log.error("mqtt連接失敗。。" + e.getMessage());
}
}然后在消息客戶端回調(diào)類這里
package com.ruoyi.yyt.mqtt.config;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 客戶端斷開連接的回調(diào)
*/
@Override
public void connectionLost(Throwable throwable) {
log.error(clientId + "與服務(wù)器斷開連接?。? + cause.getMessage());
}
/**
* 消息到達(dá)的回調(diào)
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發(fā)布成功的回調(diào)
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
System.out.println(client.getClientId() + "發(fā)布消息成功!");
}
}這個時候我們啟動服務(wù),調(diào)用測試接口

就可以看到接口返回發(fā)布成功,并且能看到后臺服務(wù)的打印日志了

至此大功告成了!
到此這篇關(guān)于Springboot實(shí)現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關(guān)Springboot MQTT通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼
微信小程序支付是常見的一種功能,本文主要介紹了SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05
Java語言實(shí)現(xiàn)簡單FTP軟件 FTP軟件主界面(4)
這篇文章主要為大家詳細(xì)介紹了Java語言實(shí)現(xiàn)簡單FTP軟件,F(xiàn)TP軟件主界面編寫的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-03-03
圖解Java經(jīng)典算法冒泡排序的原理與實(shí)現(xiàn)
冒泡排序是一種簡單的排序算法,它也是一種穩(wěn)定排序算法。其實(shí)現(xiàn)原理是重復(fù)掃描待排序序列,并比較每一對相鄰的元素,當(dāng)該對元素順序不正確時進(jìn)行交換。一直重復(fù)這個過程,直到?jīng)]有任何兩個相鄰元素可以交換,就表明完成了排序2022-09-09
Spring使用Configuration注解管理bean的方式詳解
在Spring的世界里,Configuration注解就像是一位細(xì)心的園丁,它的主要職責(zé)是在這個繁花似錦的園子里,幫助我們聲明和管理各種各樣的bean,本文給大家介紹了在Spring中如何優(yōu)雅地管理你的bean,需要的朋友可以參考下2024-05-05
怎樣提高mybatis-plus中saveBatch方法的效率
這篇文章主要介紹了怎樣提高mybatis-plus中saveBatch方法的效率問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
如何將DeepSeek 集成到 Java 的 Spring Boot&
本文介紹了如何將DeepSeek集成到Java的SpringBoot項(xiàng)目中,包括準(zhǔn)備工作、集成步驟和示例說明,感興趣的朋友一起看看吧2025-02-02
JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo)的操作代碼
這篇文章主要介紹了JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo),本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-09-09
Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理分析
這篇文章主要介紹了Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02
淺談用SpringBoot實(shí)現(xiàn)策略模式
本文主要介紹了SpringBoot實(shí)現(xiàn)策略模式,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10

