springboot使用EMQX(MQTT協(xié)議)的實現
1、MQTT協(xié)議
1.1、MQTT簡介
在了解EMQX前首先了解一下MQTT協(xié)議,MQTT 全稱為 Message Queuing Telemetry Transport(消息隊列遙測傳輸),是一種基于 發(fā)布/訂閱 模式的 輕量級物聯(lián)網消息傳輸協(xié)議。IBM 公司的安迪·斯坦福-克拉克
及 Arcom 公司的阿蘭·尼普
于 1999
年撰寫了該協(xié)議的第一個版本1,之后 MQTT 便以簡單易實現、支持 QoS、輕量且省帶寬等眾多特性逐漸成為了 IoT 通訊的標準。
MQTT 協(xié)議每個消息最少僅需 2 個字節(jié) (其中報頭僅需 1 個字節(jié),其余字節(jié)可以全部作為消息載荷)就可以完成通信,專為那些資源和空間有限、功耗敏感的硬件所打造。
1.2、MQTT 協(xié)議基本特點
- 使用發(fā)布/訂閱消息模式,提供了一對多的消息分發(fā)和應用程序的解耦。
- 不關心負載內容的消息傳輸。
- 提供 3 種消息服務質量等級,滿足不同投遞需求。
- 很小的傳輸消耗和協(xié)議數據交換,最大限度減少網絡流量。
- 提供連接異常斷開時通知相關各方的機制。
1.3、MQTT 應用行業(yè)
MQTT 作為一種低開銷,低帶寬占用的即時通訊協(xié)議,可以用極少的代碼和帶寬為聯(lián)網設備提供實時可靠的消息服務,它適用于硬件資源有限的設備及帶寬有限的網絡環(huán)境。因此,MQTT 協(xié)議廣泛應用于物聯(lián)網、移動互聯(lián)網、智能硬件、車聯(lián)網、電力能源等行業(yè)。
1.4、MQTT 協(xié)議原理
基于發(fā)布/訂閱模式的 MQTT 協(xié)議中有三種角色:發(fā)布者(Publisher)
、代理(Broker)
、訂閱者(Subscriber)
。發(fā)布者向代理發(fā)布消息,代理向訂閱者轉發(fā)這些消息。通常情況下,客戶端的角色是發(fā)布者和訂閱者,服務器的角色是代理,但實際上,服務器也可能主動發(fā)布消息或者訂閱主題,客串一下客戶端的角色。
為了方便理解,MQTT 傳輸的消息可以簡化為:主題(Topic)和載荷(Payload)兩部分:
- Topic,消息主題,訂閱者向代理訂閱主題后,一旦代理收到相應主題的消息,就會向訂閱者轉發(fā)該消息。
- Payload,消息載荷(也可以理解為傳輸的數據),訂閱者在消息中真正關心的部分,通常是業(yè)務相關的。
1.5、MQTT 協(xié)議基礎概念
1.5.1、會話(Session)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態(tài)交互。會話可以存在于一個網絡連接之間,也可以跨越多個連續(xù)的網絡連接存在。
1.5.2、訂閱(Subscription)
訂閱包含一個主題過濾器(Topic Filter)和一個最大的服務質量(QoS)等級。訂閱與單個會話(Session)關聯(lián)。會話可以包含多于一個的訂閱。
1.5.3、主題名(Topic Name)
附加在應用消息上的一個標簽,被用于匹配服務端已存在的訂閱。服務端會向所有匹配訂閱的客戶端發(fā)送此應用消息。
1.5.4、主題過濾器(Topic Filter)
僅在訂閱時使用的主題表達式,可以包含通配符,以匹配多個主題名。就是可以通過通配符達到,發(fā)一條消息,多個主題能接受到消息的效果。
1.5.5、載荷(Payload)
對于 PUBLISH 報文來說載荷就是業(yè)務消息(就是指發(fā)送的消息內容),它可以是任意格式(二進制、十六進制、普通字符串、JSON 字符串、Base64)的數據。
1.6、MQTT 協(xié)議進階
1.6.1、消息服務質量(QoS)
MQTT 協(xié)議提供了 3 種消息服務質量等級(Quality of Service),它保證了在不同的網絡環(huán)境下消息傳遞的可靠性。這里有一點要明白,必須先訂閱,發(fā)布消息才會收到
。假如沒訂閱,他發(fā)送消息了,我再訂閱,這時候不管QoS設置幾,都是收不到消息的。
1.6.1.1、QoS 0 - 最多分發(fā)一次
當 QoS 為 0 時,消息的分發(fā)依賴于底層網絡的能力。發(fā)布者只會發(fā)布一次消息,接收者不會應答消息,發(fā)布者也不會儲存和重發(fā)消息。消息在這個等級下具有最高的傳輸效率,但可能送達一次也可能根本沒送達。
1.6.1.2、Qos 1 - 至少分發(fā)一次
當 QoS 為 1 時,可以保證消息至少送達一次。MQTT 通過簡單的 ACK 機制來保證 QoS 1。
- 發(fā)送者:發(fā)布消息,并等待接收者的 PUBACK 報文的應答,在規(guī)定的時間內沒有收到 PUBACK 的應答,發(fā)布者會將消息的 DUP 置為1 并重發(fā)消息。
- 接受者:接收到 QoS 為 1 的消息時應該回應 PUBACK 報文,可能因為網絡延遲等原因沒有及時發(fā)出,這時接收者可能會多次接受同一個消息,無論 DUP標志如何,接收者都會將收到的消息當作一個新的消息并發(fā)送 PUBACK 報文應答。
核心
:就是發(fā)送消息的時候,接受者需要確認一次,規(guī)定時間內沒有確認就會重新發(fā)。如果使用這種方式,寫業(yè)務的時候需要保證冪等性
。
1.6.1.3、QoS 2 - 只分發(fā)一次
當 QoS 為 2 時,發(fā)布者和訂閱者通過兩次會話來保證消息只被傳遞一次,這是最高等級的服務質量,消息丟失和重復都是不可接受的。使用這個服務質量等級會有額外的開銷。
- 發(fā)送者:發(fā)布 QoS 為 2 的消息之后,消息儲存起來并等待接收者回復 PUBREC 的消息。
- 接受者:收到一條 QoS 為 2 的消息時,他會處理此消息并返回一條 PUBREC 進行應答。
- 發(fā)送者:收到 PUBREC 消息后,丟棄掉之前的發(fā)布消息。保存 PUBREC 消息,并應答一個 PUBREL。等待接收者回復 PUBCOMP 消息
- 接受者:當接收者收到 PUBREL 消息之后,它會丟棄掉所有已保存的狀態(tài),并回復 PUBCOMP。
- 發(fā)送者:當發(fā)送者收到 PUBCOMP 消息之后會清空之前所保存的狀態(tài)。
核心
:發(fā)送消息的時候,接受者需要確認兩次,來保證消息確實已經送到。
無論在傳輸過程中何時出現丟包,發(fā)送端都負責重發(fā)上一條消息。不管發(fā)送端是 Publisher(發(fā)送端) 還是 Broker(服務器),都是如此。因此,接收端也需要對每一條命令消息都進行應答。
1.6.2、QoS 在發(fā)布與訂閱中的區(qū)別
發(fā)布時的 QoS 表示消息發(fā)送到服務端時使用的 QoS
訂閱時的 QoS 表示服務端向自己轉發(fā)消息時可以使用的最大 QoS
- 客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時,服務端向客戶端 B 轉發(fā)消息時使用的 QoS 為客戶端 B 的訂閱QoS。
- 客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時,服務端向客戶端 B 轉發(fā)消息時使用的 QoS 為客戶端 A 的發(fā)布 QoS。
總結:接收端可以設置訂閱Qos為2,這樣就可以接所有qos等級消息。也就是發(fā)布消息qos為多少,那我這邊接受消息就是多少。主要以發(fā)布消息的qos為準。
1.6.3、如何選擇 MQTT QoS 等級
QoS 級別越高,流程越復雜,系統(tǒng)資源消耗越大。應用程序可以根據自己的網絡場景和業(yè)務需求,選擇合適的 QoS 級別。
以下情況下可以選擇 QoS 0
- 可以接受消息偶爾丟失。
- 在同一個子網內部的服務間的消息交互,或其他客戶端與服務端 網絡非常穩(wěn)定的場景。
以下情況下可以選擇 QoS 1
- 對系統(tǒng)資源消耗較為關注,希望性能最優(yōu)化。
- 消息不能丟失,但能接受并處理重復的消息。
以下情況下可以選擇 QoS 2
- 不能忍受消息丟失(消息的丟失會造成生命或財產的損失),且不希望收到重復的消息。
- 數據完整性與及時性要求較高的銀行、消防、航空等行業(yè)。
1.6.4、清除會話(Clean Session)
MQTT 客戶端向服務器發(fā)起 CONNECT 請求時,可以通過 Clean Session 標志設置是否創(chuàng)建全新的會話。
Clean Session 設置為 0 時:
- 如果存在一個關聯(lián)此客戶標識符的會話,服務端必須基于此會話的狀態(tài)恢復與客戶端的通信。
- 如果不存在任何關聯(lián)此客戶標識符的會話,服務端必須創(chuàng)建一個新的會話。
Clean Session 設置為 1:
- 客戶端和服務端必須丟棄任何已存在的會話,并開始一個新的會話。
總結:監(jiān)聽端建議設置為0,一般監(jiān)聽端,我們都會配置單例,并且項目啟動就開始創(chuàng)建連接監(jiān)聽,設置為0,這樣可以保證連接的唯一性,和消息的安全性。
1.6.5、?;钚奶↘eep Alive)
MQTT 客戶端向服務器發(fā)起 CONNECT 請求時,通過 Keep Alive 參數設置?;钪芷?。
客戶端在無報文發(fā)送時,按 Keep Alive 周期定時發(fā)送 2 字節(jié)的 PINGREQ 心跳報文,服務端收到 PINGREQ 報文后,回復 2 字節(jié)的 PINGRESP 報文。
服務端在 1.5 個心跳周期內,既沒有收到客戶端發(fā)布訂閱報文,也沒有收到 PINGREQ 心跳報文時,將斷開客戶端連接。
1.6.6、保留消息(Retained Message)
MQTT 客戶端向服務器發(fā)布(PUBLISH)消息時,可以設置保留消息(Retained Message)標志。保留消息會駐留在消息服務器,后來的訂閱者訂閱主題時可以接收到最新一條(注意,是只有最近的一條)
保留消息。
1.6.7、遺囑消息(Will Message)
MQTT 客戶端向服務端發(fā)送 CONNECT 請求時,可以攜帶遺囑消息。MQTT 客戶端異常下線時(客戶端斷開前未向服務器發(fā)送 DISCONNECT 消息),MQTT 消息服務器會發(fā)布遺囑消息。
在連接的時候通過調用 MqttConnectOptions 實例的 setWill 方法來設定。任何訂閱了下面的主題的客戶端都可以收到該遺囑消息。
//方法1MqttConnectOptions.setWill(MqttTopic topic, byte[] payload, int qos, boolean retained) //方法2MqttConnectOptions.setWill(java.lang.String topic, byte[] payload, int qos, boolean retained)
以下情況下會發(fā)送 Will Message:
- 服務端發(fā)生了I/O 錯誤或者網絡失敗;
- 客戶端在定義的心跳時期失聯(lián);
- 客戶端在發(fā)送下線包( DISCONNECT)之前關閉網絡連接;
- 服務端在收到下線包之前關閉網絡連接。
總結:發(fā)送遺囑信息可以理解為,創(chuàng)建客戶端連接的時候,告訴服務器(mqtt服務器)我掛了之后,給哪些主題發(fā)這些消息。
當訂閱到遺囑消息之后,他就知道監(jiān)聽端掛了,我不能給他發(fā)消息了,遺囑消息在客戶端正常調用 disconnect 方法之后并不會被發(fā)送。
高級使用場景:
這里介紹一下如何將 Retained(保留) 消息與Will (遺囑)消息結合起來進行使用。
- 客戶端 A 遺囑消息設定為”offline“,該遺囑主題與一個普通發(fā)送狀態(tài)的主題設定成同一個 A/status;
- 當客戶端 A 連接時,向主題 A/status 發(fā)送 “online” 的 Retained 消息,其它客戶端訂閱主題 A/status的時候,獲取 Retained 消息為 “online” ;
- 當客戶端 A 異常斷開時,系統(tǒng)自動向主題 A/status 發(fā)送”offline“的消息,其它訂閱了此主題的客戶端會馬上收到”offline“消息;如果遺囑消息被設定了 Retained 的話,這時有新的訂閱A/status主題的客戶端上線的時候,獲取到的消息為“offline”。
2、EMQ X Cloud
2.1、EMQ X Cloud簡介
通過開放標準的物聯(lián)網協(xié)議 MQTT、MQTT over WebSocket、CoAP/LwM2M 將數以億計的物聯(lián)網設備可靠地連接到 EMQ X Cloud。通過 TLS/SSL 和基于 X.509 證書的認證確保安全的雙向通信。
在該模型中,EMQ X Cloud 提供的 MQTT 服務不僅為設備與設備、設備與應用間架起橋梁,同時可將需要的數據進行持久化,以便非實時應用在后續(xù)對獲取的數據加以利用。
2.2、EMQ X Cloud優(yōu)勢
2.2.1、協(xié)議支持完整
支持 MQTT v3.1,v3.1.1 與 v5.0 協(xié)議版本,是全球首個支持 MQTT 5.0 的公有云服務,支持 MQTT WebSocket 服務,完整支持 QoS0, QoS1 與 QoS2 級別 MQTT 消息。
2.2.2、多種協(xié)議接入
支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 協(xié)議在內的多種通信協(xié)議接入,覆蓋各類行業(yè)應用;可根據您的特殊使用場景定制私有化功能,充分契合業(yè)務需求。
2.2.3、容量預估與伸縮
通過連接數與消息吞吐量自動預估容量,通過緊密的監(jiān)控來制定伸縮計劃,集群大小可隨業(yè)務規(guī)模平滑調整。
2.3、EMQ X 和 RabbitMQ對比
EMQ X 是基于高并發(fā)的 Erlang/OTP 語言平臺開發(fā),支持百萬級連接、分布式集群架構、發(fā)布訂閱模式的開源 MQTT 消息服務器。開源至今,EMQ X 在全球物聯(lián)網市場得到了廣泛應用。在開源版基礎上,還陸續(xù)發(fā)展了商業(yè)版和提供云版本(cloud-hosting)(https://www.emqx.com/zh/cloud)。EMQ X 支持很多插件,具有強大拓展能力,用戶依靠插件可以實現更多的功能。
RabbitMQ 是實現了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ 服務器也是基于 Erlang 語言開發(fā)的,現在可以通過插件配置的形式,使其支持 MQTT 協(xié)議。
2.3.1、測試場景
以下的測試均使用了 QoS 1 的消息。當發(fā)送 QoS 1 的消息時,這些消息每次都要作為可持久化的備份保存在硬盤上。所以隊列空間的使用也尤為重要。
這次評測使用了一個云主機 M5 large 的實例,每個 MQTT 消息服務器集群由 3 個節(jié)點組成,每個節(jié)點的配置是雙核,8GB 內存。需要強調的是,我們對于 EMQ X 和 RabbitMQ 的測試使用了完全一致的硬件資源以消除變量。
壓力測試將會有兩個場景,「多對一」 和 「一對多」。
多對一:
許多設備作為發(fā)布者,如溫度傳感器或者是壓力傳感器,發(fā)送數據給一個服務器。服務器再將這些數據發(fā)送給一個控制器(即訂閱者)處理這些數據。
一對多:
一個控制器作為發(fā)布者將消息傳送給服務器,再由服務器將這些消息傳送給多個作為訂閱者的設備。
在每個場景里,「多」的那一方的數量將會從 2000 個逐漸上升到 10000 個。每個場景里,每一秒會發(fā)送一條載荷為 256 字節(jié)的消息。這樣的發(fā)布并不會造成過大的吞吐量。僅僅使用 256 字節(jié)載荷是為了展示出這兩個服務器的工作原理,以及他們的集群模式如何對這些場景作出反應的。
2.3.2、測試結果
左側Y軸是指 CPU 占用,底部X軸是指「多」側的客戶端數量變化。
多對一:
從 「多對一」 的結果可以看出,EMQ X 和 RabbitMQ 相比并沒有太大差別。
一對多:
但是從「一對多」的結果來看,RabbitMQ 相比于 EMQ X 確實有很明顯的差距。
2.3.3、測試總結
結果表明:在「多對一」 場景中,EMQ X 和 RabbitMQ 相比并沒有太大差別;而在「一對多」場景中,RabbitMQ 則較 EMQ X 產生了較為明顯的差距。相比較而言,rabbitmq使用MQTT協(xié)議,和EMQX使用MQTT協(xié)議存在著一定的差距。
2.3.4、注意
使用MQTT的發(fā)布-訂閱模型不能滿足使用要求??梢赃x擇使用AMQP。
3、Eclipse Paho Java
Paho Java客戶端是用Java編寫的MQTT客戶端庫,用于開發(fā)在JVM或其他Java兼容平臺(例如Android)上運行的應用程序。
Paho不僅可以對接EMQ X Broker,還可以對接滿足符合MQTT協(xié)議規(guī)范的消息代理服務端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場景。
4、SpringBoot整合Eclipse Paho Java
EMQX是消息服務器,而我們java想要發(fā)送消息,和訂閱消息都是和服務器打交道,想要和服務器打交道就需要想辦法連上他,這時候就需要用到了Eclipse Paho Java客戶端,用來在java當中連接EMQX消息服務器。
下面案例是按照我的應用場景來寫的,監(jiān)聽單獨用了一個客戶端存入了內存,使用了static變量,啟動項目的時候初始化,發(fā)送客戶端并沒有存入內存,而是發(fā)送一條,創(chuàng)建一個客戶端。這里有一點需要注意,客戶端id一定不要重復,就是對于MQTT服務器來說,clientid一定要保持唯一。
4.1、導入依賴
我用的springboot版本是2.3.9.RELEASE
<!-- mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <!--配置文件報錯問題--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>provided</scope> </dependency>
4.2、讀取配置
在application.yml當中添加
mqtt: hostUrl: tcp://192.168.56.103:1883 username: admin password: public client-id: equipment_main cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: client:report:1 isOpen: true qos: 1
通過這個文件來讀取配置
@Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { /** * 用戶名 */ private String username; /** * 密碼 */ private String password; /** * 連接地址 */ private String hostUrl; /** * 客戶端Id,同一臺服務器下,不允許出現重復的客戶端id */ private String clientId; /** * 默認連接主題 */ private String defaultTopic; /** * 超時時間 */ private int timeout; /** * 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端 * 發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制 */ private int keepAlive; /** * 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連 * 接記錄,這里設置為true表示每次連接到服務器都以新的身份連接 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 啟動的時候是否關閉mqtt */ private Boolean isOpen; /** * 連接方式 */ private Integer qos; }
4.3、添加mqtt接受服務的客戶端
@Component public class MqttAcceptClient { private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class); @Autowired private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 客戶端連接 */ public void connect() { MqttClient client; try { client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設置回調 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連接 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連接方式 */ public void subscribe(String topic, int qos) { logger.info("==============開始訂閱主題==============" + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個主題 * * @param topic */ public void unsubscribe(String topic) { logger.info("==============開始取消訂閱主題==============" + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
4.4、添加mqtt接受服務的回調類
@Component public class MqttAcceptCallback implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class); @Autowired private MqttAcceptClient mqttAcceptClient; /** * 客戶端斷開后觸發(fā) * * @param throwable */ @Override public void connectionLost(Throwable throwable) { logger.info("連接斷開,可以做重連"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { logger.info("emqx重新連接...................................................."); mqttAcceptClient.reconnection(); } } /** * 客戶端收到消息觸發(fā) * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { logger.info("接收消息主題 : " + topic); logger.info("接收消息Qos : " + mqttMessage.getQos()); logger.info("接收消息內容 : " + new String(mqttMessage.getPayload())); // int i = 1/0; } /** * 發(fā)布消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { logger.info("向主題:" + topic + "發(fā)送消息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); logger.info("消息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務器后觸發(fā) * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { logger.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); // 以/#結尾表示訂閱所有以test開頭的主題 // 訂閱所有機構主題 mqttAcceptClient.subscribe("client:report:1", 0); } }
4.5、添加mqtt發(fā)送客戶端
@Component public class MqttSendClient { private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class); @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll("-",""); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設置回調 client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 發(fā)布消息 * 主題格式: server:report:$orgCode(參數實際使用機構代碼) * * @param retained 是否保留 * @param orgCode orgId * @param pushMessage 消息體 */ public void publish(boolean retained, String orgCode, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttDeliveryToken token; MqttClient mqttClient = connect(); try { mqttClient.publish("server:report:" + orgCode, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關閉連接 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) mqttClient.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) mqttClient.close(); } catch (MqttException e) { e.printStackTrace(); } } }
4.6、添加mqtt發(fā)送客戶端的回調類
@Component public class MqttSendCallBack implements MqttCallbackExtended { private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class); /** * 客戶端斷開后觸發(fā) * * @param throwable */ @Override public void connectionLost(Throwable throwable) { logger.info("連接斷開,可以做重連"); } /** * 客戶端收到消息觸發(fā) * * @param topic 主題 * @param mqttMessage 消息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { logger.info("接收消息主題 : " + topic); logger.info("接收消息Qos : " + mqttMessage.getQos()); logger.info("接收消息內容 : " + new String(mqttMessage.getPayload())); } /** * 發(fā)布消息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { logger.info("向主題:" + topic + "發(fā)送消息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); logger.info("消息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連接emq服務器后觸發(fā) * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { logger.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------"); } }
4.7、添加配置類
自定義配置,通過這個配置,來控制啟動項目的時候是否啟動mqtt
public class MqttCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) { //1、能獲取到ioc使用的beanfactory ConfigurableListableBeanFactory beanFactory = context.getBeanFactory(); //2、獲取類加載器 ClassLoader classLoader = context.getClassLoader(); //3、獲取當前環(huán)境信息 Environment environment = context.getEnvironment(); String isOpen = environment.getProperty("mqtt.isOpen"); return Boolean.valueOf(isOpen); } }
4.8、啟動服務的時候開啟監(jiān)聽客戶端
@Configuration public class MqttConfig { @Autowired private MqttAcceptClient mqttAcceptClient; /** * 訂閱mqtt * * @return */ @Conditional(MqttCondition.class) @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; } }
4.9、測試類
@RestController @RequestMapping("/mqtt") public class MqttController { @Autowired private MqttSendClient MqttSendClient; @GetMapping(value = "/publishTopic") public Object publishTopic(String sendMessage) { System.out.println("message:"+sendMessage); sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}"; MqttSendClient.publish(false,"client:report:2",sendMessage); return null; } }
5、發(fā)送和監(jiān)聽消息測試
測試發(fā)送消息:
訪問:http://localhost:8080/mqtt/publishTopic?sendMessage=222
測試監(jiān)聽:
6、總結
EMQX官網:https://www.emqx.com/zh/blog/introduction-to-mqtt-qos
到此這篇關于springboot使用EMQX(MQTT協(xié)議)的實現的文章就介紹到這了,更多相關springboot使用EMQX內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring注解中@Autowired和@Bean的區(qū)別詳解
這篇文章主要詳細介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過兩個注解的使用場景介紹了二者的區(qū)別,感興趣的同學可以參考閱讀2023-06-06Servlet+MyBatis項目轉Spring Cloud微服務,多數據源配置修改建議
今天小編就為大家分享一篇關于Servlet+MyBatis項目轉Spring Cloud微服務,多數據源配置修改建議,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-01-01javaweb servlet中使用請求轉發(fā)亂碼的實現
下面小編就為大家?guī)硪黄猨avaweb servlet中使用請求轉發(fā)亂碼的實現。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-08-08SpringBoot集成JWT實現Token登錄驗證的示例代碼
隨著技術的發(fā)展,分布式web應用的普及,通過session管理用戶登錄狀態(tài)成本越來越高,因此慢慢發(fā)展成為token的方式做登錄身份校驗,本文就來介紹一下SpringBoot集成JWT實現Token登錄驗證的示例代碼,感興趣的可以了解一下2023-12-12Java中的runnable 和 callable 區(qū)別解析
Runnable接口用于定義不需要返回結果的任務,而Callable接口可以返回結果并拋出異常,通常與Future結合使用,Runnable適用于簡單的后臺任務和定時任務,而Callable適用于并行計算、異步操作和復雜任務,選擇使用哪個接口取決于具體的應用場景,感興趣的朋友一起看看吧2025-03-03