springboot整合mqtt實(shí)現(xiàn)消息訂閱和推送功能
前言
mica-mqtt-client-spring-boot-starter是一個(gè)基于Spring Boot的MQTT客戶端啟動(dòng)器,它集成了mica-mqtt客戶端,提供了在Spring Boot應(yīng)用程序中使用MQTT協(xié)議進(jìn)行消息通信的能力。以下是關(guān)于mica-mqtt-client-spring-boot-starter的簡介:
特點(diǎn):
- 簡單易用:通過Spring Boot的自動(dòng)配置,可以輕松地集成到Spring應(yīng)用程序中,并使用Spring的注解或Java配置進(jìn)行MQTT客戶端的配置。
- 低延遲:支持MQTT協(xié)議,能夠?qū)崿F(xiàn)實(shí)時(shí)消息通信,具有較低的延遲。
- 高性能:基于mica-mqtt客戶端,具有高效的消息處理和網(wǎng)絡(luò)通信能力,能夠處理大量的并發(fā)連接和消息。
- 集群支持:支持基于Redis的發(fā)布/訂閱模式的集群,可以實(shí)現(xiàn)多個(gè)節(jié)點(diǎn)之間的消息同步和負(fù)載均衡。
- 使用場景:適用于需要使用MQTT協(xié)議進(jìn)行消息通信的物聯(lián)網(wǎng)、實(shí)時(shí)應(yīng)用、移動(dòng)應(yīng)用等領(lǐng)域??梢栽谠贫嘶蜻吘壎耸褂?,實(shí)現(xiàn)設(shè)備與設(shè)備之間、設(shè)備與服務(wù)器之間的消息通信。
- 集成方式:通過在Spring Boot項(xiàng)目中添加相關(guān)依賴,并配置MQTT客戶端的相關(guān)參數(shù),即可快速集成mica-mqtt-client-spring-boot-starter。具體的使用方法可以參考官方文檔和示例代碼。
- 注意事項(xiàng):在使用過程中需要注意確保網(wǎng)絡(luò)連接的穩(wěn)定性和安全性,并根據(jù)實(shí)際需求進(jìn)行適當(dāng)?shù)呐渲煤蛢?yōu)化。同時(shí),也需要關(guān)注數(shù)據(jù)安全和隱私保護(hù)等方面的問題。
總之,mica-mqtt-client-spring-boot-starter是一個(gè)方便、高效、可靠的MQTT客戶端啟動(dòng)器,適用于需要使用MQTT協(xié)議進(jìn)行消息通信的Spring Boot應(yīng)用程序。
功能
- 支持 MQTT v3.1、v3.1.1 以及 v5.0 協(xié)議。
- 支持 websocket mqtt 子協(xié)議(支持 mqtt.js)。
- 支持 http rest api,http api 文檔詳見[1]。
- 支持 MQTT client 客戶端。
- 支持 MQTT server 服務(wù)端。
- 支持 MQTT client、server 共享訂閱支持(捐助VIP版采用 topic 樹存儲(chǔ),跟 topic 數(shù)無關(guān),百萬 topic 性能依舊)。
- 支持 MQTT 遺囑消息。
- 支持 MQTT 保留消息。
- 支持自定義消息(mq)處理轉(zhuǎn)發(fā)實(shí)現(xiàn)集群。
- MQTT 客戶端 阿里云 mqtt 連接 demo。
- 支持 GraalVM 編譯成本機(jī)可執(zhí)行程序。
- 支持 Spring boot 項(xiàng)目快速接入(mica-mqtt-spring-boot-starter)。
- mica-mqtt-spring-boot-starter 支持對接 Prometheus + Grafana。
- 基于 redis pub/sub 實(shí)現(xiàn)集群,詳見 mica-mqtt-broker 模塊[2]
教程
添加依賴
在springboot項(xiàng)目中添加maven依賴:
<!-- https://mvnrepository.com/artifact/net.dreamlu/mica-mqtt-client-spring-boot-starter -->
<dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
<version>2.2.8</version>
</dependency>配置參數(shù)
在spring配置文件中配置mqtt相關(guān)參數(shù),配置如下:
mqtt:
server:
enabled: false # 是否開啟服務(wù)端,默認(rèn):false
client:
enabled: true # 是否開啟客戶端,默認(rèn):false
ip: 172.16.10.203 # 連接的服務(wù)端 ip ,默認(rèn):127.0.0.1
port: 1883 # 端口:默認(rèn):1883
name: Mica2-Mqtt2-Client # 名稱,默認(rèn):Mica-Mqtt-Client
clientId: coalface_safety_3d # 客戶端Id(非常重要,一般為設(shè)備 sn,不可重復(fù))
user-name: admin # 認(rèn)證的用戶名 你的用戶名
password: 3@!cHy@j # 認(rèn)證的密碼
timeout: 5 # 連接超時(shí)時(shí)間,單位:秒,默認(rèn):5秒
reconnect: true # 是否重連,默認(rèn):true
re-interval: 5000 # 重連時(shí)間,默認(rèn) 5000 毫秒
version: MQTT_3_1 # mqtt 協(xié)議版本,默認(rèn):3.1.1
read-buffer-size: 8092 # 接收數(shù)據(jù)的 buffer size,默認(rèn):8092
max-bytes-in-message: 8092 # 消息解析最大 bytes 長度,默認(rèn):8092
buffer-allocator: heap # 堆內(nèi)存和堆外內(nèi)存,默認(rèn):堆內(nèi)存
keep-alive-secs: 60 # keep-alive 心跳維持時(shí)間,單位:秒
clean-session: false # mqtt clean session,默認(rèn):true
will-message: # 消息遺囑
qos: at_least_once
ssl:
enabled: false # 是否開啟 ssl 認(rèn)證,2.1.0 開始支持雙向認(rèn)證
keystore-path: # 可選參數(shù):ssl 雙向認(rèn)證 keystore 目錄,支持 classpath:/ 路徑。
keystore-pass: # 可選參數(shù):ssl 雙向認(rèn)證 keystore 密碼
truststore-path: # 可選參數(shù):ssl 雙向認(rèn)證 truststore 目錄,支持 classpath:/ 路徑。
truststore-pass: # 可選參數(shù):ssl 雙向認(rèn)證 truststore 密碼注意:ssl 存在三種情況
| 服務(wù)端開啟ssl | 客戶端 |
|---|---|
| ClientAuth 為 NONE(不需要客戶端驗(yàn)證) | 僅僅需要開啟 ssl 即可不用配置證書 |
| ClientAuth 為 OPTIONAL(與客戶端協(xié)商) | 需開啟 ssl 并且配置 truststore 證書 |
| ClientAuth 為 REQUIRE (必須的客戶端驗(yàn)證) | 需開啟 ssl 并且配置 truststore、 keystore證書 |
創(chuàng)建訂閱
創(chuàng)建一個(gè)mqtt訂閱消息監(jiān)聽類,例如SimulationSubscriber,代碼如下:
import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.tio.utils.buffer.ByteBufferUtil;
/**
* @author tarzan
*/
@Component
@Slf4j
public class SimulationSubscriber {
@MqttClientSubscribe("tuoyuan/publish/zj/#")
public void zjOne(String topic, byte[] payload){
String[] strs=topic.split("/");
String ID=strs[strs.length-1];
log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID);
}
@MqttClientSubscribe("/sys/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8 開始支持,@MqttClientSubscribe 注解支持 ${} 變量替換,會(huì)默認(rèn)替換成 +
// 注意:mica-mqtt 會(huì)先從 Spring boot 配置中替換參數(shù) ${},如果存在配置會(huì)優(yōu)先被替換。
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/tianma/publish/cmj")
public void cmj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/zj")
public void zj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/gbj")
public void gbj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ltl")
public void ltl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ntl")
public void ntl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ccl")
public void ccl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//業(yè)務(wù)的處理
System.out.println("*****************test**************************************"+jsonObject);
}
}- @Header(“topic”) 和@Payload 注解可以省略
- tuoyuan/publish/zj/# 中的# 是通配符
- 在MQTT協(xié)議中,#是一個(gè)通配符,代表匹配該主題的所有子主題。例如,如果你訂閱了主題sports/baseball/#,那么你將接收到所有以sports/baseball/開頭的主題的消息。
- 請注意,通配符#只能用于多層的主題名稱中,并且只能用于最后一個(gè)級別。例如,sports/baseball/#是有效的,但#sports/baseball或sports/#/baseball都是無效的。
- 除了#之外,MQTT協(xié)議還支持一個(gè)單層通配符+,它代表只匹配該級別的主題。例如,如果你訂閱了主題sports/baseball/+,那么你將只接收到以sports/baseball/開頭,且后面跟著至少一個(gè)字符的主題的消息。
- 請注意,使用通配符時(shí)需要謹(jǐn)慎,因?yàn)樗鼈兛赡軙?huì)匹配到意外的主題。確保你的訂閱主題明確,并且只匹配你感興趣的主題。
- /sys/${deviceName}/thing/sub/register
- 1.3.8 開始支持,@MqttClientSubscribe 注解支持 ${} 變量替換,會(huì)默認(rèn)替換成 +
- 注意:mica-mqtt 會(huì)先從 Spring boot 配置中替換參數(shù) ${},如果存在配置會(huì)優(yōu)先被替換。
創(chuàng)建發(fā)布
創(chuàng)建一個(gè)mqtt消息發(fā)布接口類,例如 MqttTestController,代碼如下:
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.springblade.core.secure.annotation.NoToken;
import org.springblade.core.tool.api.R;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
/**
* @author tarzan
*/
@RestController
@Api(tags = "mqtt測試")
@NoToken
@RequestMapping("/mqtt")
@AllArgsConstructor
@Slf4j
public class MqttTestController {
private final MqttClientTemplate mqttClientTemplate;
@ApiOperation(value = "消息發(fā)送")
@PostMapping("/publish")
private R<Boolean> publish(String topic, String msg) {
return R.status(mqttClientTemplate.publish(topic, msg.getBytes(StandardCharsets.UTF_8)));
}
}接口測試
接口調(diào)用

控制臺(tái)輸出

到此這篇關(guān)于springboot整合mqtt實(shí)現(xiàn)消息訂閱和推送的文章就介紹到這了,更多相關(guān)springboot整合mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring boot actuator端點(diǎn)啟用和暴露操作
這篇文章主要介紹了Spring boot actuator端點(diǎn)啟用和暴露操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
springboot lua檢查redis庫存的實(shí)現(xiàn)示例
本文主要介紹了springboot lua檢查redis庫存的實(shí)現(xiàn)示例,為了優(yōu)化性能,通過Lua腳本實(shí)現(xiàn)對多個(gè)馬戲場次下的座位等席的庫存余量檢查,感興趣的可以了解一下2024-09-09
java.lang.IllegalArgumentException:Invalid character&nb
本文介紹了java.lang.IllegalArgumentException: Invalid character found異常的解決,方法包括檢查代碼中的方法名,使用合適的HTTP請求方法常量,使用第三方HTTP庫,檢查請求URL以及使用調(diào)試和日志工具,通過這些方法,我們可以解決異常并確保網(wǎng)絡(luò)應(yīng)用程序的正常運(yùn)行2023-10-10
Springboot通過配置WebMvcConfig處理Cors非同源訪問跨域問題
這篇文章主要介紹了Springboot通過配置WebMvcConfig處理Cors非同源訪問跨域問題,關(guān)于Cors跨域的問題,前端有代理和jsonp的常用方式解決這種非同源的訪問拒絕策略2023-04-04
SpringBoot集成iText實(shí)現(xiàn)電子簽章功能
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何集成iText實(shí)現(xiàn)電子簽章功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-10-10

