springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn)
1、MQTT協(xié)議
1.1、MQTT簡介
在了解EMQX前首先了解一下MQTT協(xié)議,MQTT 全稱為 Message Queuing Telemetry Transport(消息隊(duì)列遙測傳輸),是一種基于 發(fā)布/訂閱 模式的 輕量級(jí)物聯(lián)網(wǎng)消息傳輸協(xié)議。IBM 公司的安迪·斯坦福-克拉克及 Arcom 公司的阿蘭·尼普于 1999 年撰寫了該協(xié)議的第一個(gè)版本1,之后 MQTT 便以簡單易實(shí)現(xiàn)、支持 QoS、輕量且省帶寬等眾多特性逐漸成為了 IoT 通訊的標(biāo)準(zhǔn)。
MQTT 協(xié)議每個(gè)消息最少僅需 2 個(gè)字節(jié) (其中報(bào)頭僅需 1 個(gè)字節(jié),其余字節(jié)可以全部作為消息載荷)就可以完成通信,專為那些資源和空間有限、功耗敏感的硬件所打造。
1.2、MQTT 協(xié)議基本特點(diǎn)
- 使用發(fā)布/訂閱消息模式,提供了一對(duì)多的消息分發(fā)和應(yīng)用程序的解耦。
- 不關(guān)心負(fù)載內(nèi)容的消息傳輸。
- 提供 3 種消息服務(wù)質(zhì)量等級(jí),滿足不同投遞需求。
- 很小的傳輸消耗和協(xié)議數(shù)據(jù)交換,最大限度減少網(wǎng)絡(luò)流量。
- 提供連接異常斷開時(shí)通知相關(guān)各方的機(jī)制。
1.3、MQTT 應(yīng)用行業(yè)
MQTT 作為一種低開銷,低帶寬占用的即時(shí)通訊協(xié)議,可以用極少的代碼和帶寬為聯(lián)網(wǎng)設(shè)備提供實(shí)時(shí)可靠的消息服務(wù),它適用于硬件資源有限的設(shè)備及帶寬有限的網(wǎng)絡(luò)環(huán)境。因此,MQTT 協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等行業(yè)。
1.4、MQTT 協(xié)議原理
基于發(fā)布/訂閱模式的 MQTT 協(xié)議中有三種角色:發(fā)布者(Publisher)、代理(Broker)、訂閱者(Subscriber)。發(fā)布者向代理發(fā)布消息,代理向訂閱者轉(zhuǎn)發(fā)這些消息。通常情況下,客戶端的角色是發(fā)布者和訂閱者,服務(wù)器的角色是代理,但實(shí)際上,服務(wù)器也可能主動(dòng)發(fā)布消息或者訂閱主題,客串一下客戶端的角色。

為了方便理解,MQTT 傳輸?shù)南⒖梢院喕癁椋褐黝}(Topic)和載荷(Payload)兩部分:
- Topic,消息主題,訂閱者向代理訂閱主題后,一旦代理收到相應(yīng)主題的消息,就會(huì)向訂閱者轉(zhuǎn)發(fā)該消息。
- Payload,消息載荷(也可以理解為傳輸?shù)臄?shù)據(jù)),訂閱者在消息中真正關(guān)心的部分,通常是業(yè)務(wù)相關(guān)的。
1.5、MQTT 協(xié)議基礎(chǔ)概念
1.5.1、會(huì)話(Session)
每個(gè)客戶端與服務(wù)器建立連接后就是一個(gè)會(huì)話,客戶端和服務(wù)器之間有狀態(tài)交互。會(huì)話可以存在于一個(gè)網(wǎng)絡(luò)連接之間,也可以跨越多個(gè)連續(xù)的網(wǎng)絡(luò)連接存在。
1.5.2、訂閱(Subscription)
訂閱包含一個(gè)主題過濾器(Topic Filter)和一個(gè)最大的服務(wù)質(zhì)量(QoS)等級(jí)。訂閱與單個(gè)會(huì)話(Session)關(guān)聯(lián)。會(huì)話可以包含多于一個(gè)的訂閱。
1.5.3、主題名(Topic Name)
附加在應(yīng)用消息上的一個(gè)標(biāo)簽,被用于匹配服務(wù)端已存在的訂閱。服務(wù)端會(huì)向所有匹配訂閱的客戶端發(fā)送此應(yīng)用消息。
1.5.4、主題過濾器(Topic Filter)
僅在訂閱時(shí)使用的主題表達(dá)式,可以包含通配符,以匹配多個(gè)主題名。就是可以通過通配符達(dá)到,發(fā)一條消息,多個(gè)主題能接受到消息的效果。
1.5.5、載荷(Payload)
對(duì)于 PUBLISH 報(bào)文來說載荷就是業(yè)務(wù)消息(就是指發(fā)送的消息內(nèi)容),它可以是任意格式(二進(jìn)制、十六進(jìn)制、普通字符串、JSON 字符串、Base64)的數(shù)據(jù)。
1.6、MQTT 協(xié)議進(jìn)階
1.6.1、消息服務(wù)質(zhì)量(QoS)
MQTT 協(xié)議提供了 3 種消息服務(wù)質(zhì)量等級(jí)(Quality of Service),它保證了在不同的網(wǎng)絡(luò)環(huán)境下消息傳遞的可靠性。這里有一點(diǎn)要明白,必須先訂閱,發(fā)布消息才會(huì)收到。假如沒訂閱,他發(fā)送消息了,我再訂閱,這時(shí)候不管QoS設(shè)置幾,都是收不到消息的。
1.6.1.1、QoS 0 - 最多分發(fā)一次
當(dāng) QoS 為 0 時(shí),消息的分發(fā)依賴于底層網(wǎng)絡(luò)的能力。發(fā)布者只會(huì)發(fā)布一次消息,接收者不會(huì)應(yīng)答消息,發(fā)布者也不會(huì)儲(chǔ)存和重發(fā)消息。消息在這個(gè)等級(jí)下具有最高的傳輸效率,但可能送達(dá)一次也可能根本沒送達(dá)。
1.6.1.2、Qos 1 - 至少分發(fā)一次
當(dāng) QoS 為 1 時(shí),可以保證消息至少送達(dá)一次。MQTT 通過簡單的 ACK 機(jī)制來保證 QoS 1。
- 發(fā)送者:發(fā)布消息,并等待接收者的 PUBACK 報(bào)文的應(yīng)答,在規(guī)定的時(shí)間內(nèi)沒有收到 PUBACK 的應(yīng)答,發(fā)布者會(huì)將消息的 DUP 置為1 并重發(fā)消息。
- 接受者:接收到 QoS 為 1 的消息時(shí)應(yīng)該回應(yīng) PUBACK 報(bào)文,可能因?yàn)榫W(wǎng)絡(luò)延遲等原因沒有及時(shí)發(fā)出,這時(shí)接收者可能會(huì)多次接受同一個(gè)消息,無論 DUP標(biāo)志如何,接收者都會(huì)將收到的消息當(dāng)作一個(gè)新的消息并發(fā)送 PUBACK 報(bào)文應(yīng)答。
核心:就是發(fā)送消息的時(shí)候,接受者需要確認(rèn)一次,規(guī)定時(shí)間內(nèi)沒有確認(rèn)就會(huì)重新發(fā)。如果使用這種方式,寫業(yè)務(wù)的時(shí)候需要保證冪等性。
1.6.1.3、QoS 2 - 只分發(fā)一次
當(dāng) QoS 為 2 時(shí),發(fā)布者和訂閱者通過兩次會(huì)話來保證消息只被傳遞一次,這是最高等級(jí)的服務(wù)質(zhì)量,消息丟失和重復(fù)都是不可接受的。使用這個(gè)服務(wù)質(zhì)量等級(jí)會(huì)有額外的開銷。
- 發(fā)送者:發(fā)布 QoS 為 2 的消息之后,消息儲(chǔ)存起來并等待接收者回復(fù) PUBREC 的消息。
- 接受者:收到一條 QoS 為 2 的消息時(shí),他會(huì)處理此消息并返回一條 PUBREC 進(jìn)行應(yīng)答。
- 發(fā)送者:收到 PUBREC 消息后,丟棄掉之前的發(fā)布消息。保存 PUBREC 消息,并應(yīng)答一個(gè) PUBREL。等待接收者回復(fù) PUBCOMP 消息
- 接受者:當(dāng)接收者收到 PUBREL 消息之后,它會(huì)丟棄掉所有已保存的狀態(tài),并回復(fù) PUBCOMP。
- 發(fā)送者:當(dāng)發(fā)送者收到 PUBCOMP 消息之后會(huì)清空之前所保存的狀態(tài)。
核心:發(fā)送消息的時(shí)候,接受者需要確認(rèn)兩次,來保證消息確實(shí)已經(jīng)送到。
無論在傳輸過程中何時(shí)出現(xiàn)丟包,發(fā)送端都負(fù)責(zé)重發(fā)上一條消息。不管發(fā)送端是 Publisher(發(fā)送端) 還是 Broker(服務(wù)器),都是如此。因此,接收端也需要對(duì)每一條命令消息都進(jìn)行應(yīng)答。
1.6.2、QoS 在發(fā)布與訂閱中的區(qū)別
發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS
訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS
- 客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 B 的訂閱QoS。
- 客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 A 的發(fā)布 QoS。
總結(jié):接收端可以設(shè)置訂閱Qos為2,這樣就可以接所有qos等級(jí)消息。也就是發(fā)布消息qos為多少,那我這邊接受消息就是多少。主要以發(fā)布消息的qos為準(zhǔn)。
1.6.3、如何選擇 MQTT QoS 等級(jí)
QoS 級(jí)別越高,流程越復(fù)雜,系統(tǒng)資源消耗越大。應(yīng)用程序可以根據(jù)自己的網(wǎng)絡(luò)場景和業(yè)務(wù)需求,選擇合適的 QoS 級(jí)別。
以下情況下可以選擇 QoS 0
- 可以接受消息偶爾丟失。
- 在同一個(gè)子網(wǎng)內(nèi)部的服務(wù)間的消息交互,或其他客戶端與服務(wù)端 網(wǎng)絡(luò)非常穩(wěn)定的場景。
以下情況下可以選擇 QoS 1
- 對(duì)系統(tǒng)資源消耗較為關(guān)注,希望性能最優(yōu)化。
- 消息不能丟失,但能接受并處理重復(fù)的消息。
以下情況下可以選擇 QoS 2
- 不能忍受消息丟失(消息的丟失會(huì)造成生命或財(cái)產(chǎn)的損失),且不希望收到重復(fù)的消息。
- 數(shù)據(jù)完整性與及時(shí)性要求較高的銀行、消防、航空等行業(yè)。
1.6.4、清除會(huì)話(Clean Session)
MQTT 客戶端向服務(wù)器發(fā)起 CONNECT 請(qǐng)求時(shí),可以通過 Clean Session 標(biāo)志設(shè)置是否創(chuàng)建全新的會(huì)話。
Clean Session 設(shè)置為 0 時(shí):
- 如果存在一個(gè)關(guān)聯(lián)此客戶標(biāo)識(shí)符的會(huì)話,服務(wù)端必須基于此會(huì)話的狀態(tài)恢復(fù)與客戶端的通信。
- 如果不存在任何關(guān)聯(lián)此客戶標(biāo)識(shí)符的會(huì)話,服務(wù)端必須創(chuàng)建一個(gè)新的會(huì)話。
Clean Session 設(shè)置為 1:
- 客戶端和服務(wù)端必須丟棄任何已存在的會(huì)話,并開始一個(gè)新的會(huì)話。
總結(jié):監(jiān)聽端建議設(shè)置為0,一般監(jiān)聽端,我們都會(huì)配置單例,并且項(xiàng)目啟動(dòng)就開始創(chuàng)建連接監(jiān)聽,設(shè)置為0,這樣可以保證連接的唯一性,和消息的安全性。
1.6.5、保活心跳(Keep Alive)
MQTT 客戶端向服務(wù)器發(fā)起 CONNECT 請(qǐng)求時(shí),通過 Keep Alive 參數(shù)設(shè)置保活周期。
客戶端在無報(bào)文發(fā)送時(shí),按 Keep Alive 周期定時(shí)發(fā)送 2 字節(jié)的 PINGREQ 心跳報(bào)文,服務(wù)端收到 PINGREQ 報(bào)文后,回復(fù) 2 字節(jié)的 PINGRESP 報(bào)文。
服務(wù)端在 1.5 個(gè)心跳周期內(nèi),既沒有收到客戶端發(fā)布訂閱報(bào)文,也沒有收到 PINGREQ 心跳報(bào)文時(shí),將斷開客戶端連接。
1.6.6、保留消息(Retained Message)
MQTT 客戶端向服務(wù)器發(fā)布(PUBLISH)消息時(shí),可以設(shè)置保留消息(Retained Message)標(biāo)志。保留消息會(huì)駐留在消息服務(wù)器,后來的訂閱者訂閱主題時(shí)可以接收到最新一條(注意,是只有最近的一條)保留消息。
1.6.7、遺囑消息(Will Message)
MQTT 客戶端向服務(wù)端發(fā)送 CONNECT 請(qǐng)求時(shí),可以攜帶遺囑消息。MQTT 客戶端異常下線時(shí)(客戶端斷開前未向服務(wù)器發(fā)送 DISCONNECT 消息),MQTT 消息服務(wù)器會(huì)發(fā)布遺囑消息。
在連接的時(shí)候通過調(diào)用 MqttConnectOptions 實(shí)例的 setWill 方法來設(shè)定。任何訂閱了下面的主題的客戶端都可以收到該遺囑消息。
//方法1MqttConnectOptions.setWill(MqttTopic topic, byte[] payload, int qos, boolean retained) //方法2MqttConnectOptions.setWill(java.lang.String topic, byte[] payload, int qos, boolean retained)
以下情況下會(huì)發(fā)送 Will Message:
- 服務(wù)端發(fā)生了I/O 錯(cuò)誤或者網(wǎng)絡(luò)失?。?/li>
- 客戶端在定義的心跳時(shí)期失聯(lián);
- 客戶端在發(fā)送下線包( DISCONNECT)之前關(guān)閉網(wǎng)絡(luò)連接;
- 服務(wù)端在收到下線包之前關(guān)閉網(wǎng)絡(luò)連接。
總結(jié):發(fā)送遺囑信息可以理解為,創(chuàng)建客戶端連接的時(shí)候,告訴服務(wù)器(mqtt服務(wù)器)我掛了之后,給哪些主題發(fā)這些消息。當(dāng)訂閱到遺囑消息之后,他就知道監(jiān)聽端掛了,我不能給他發(fā)消息了,遺囑消息在客戶端正常調(diào)用 disconnect 方法之后并不會(huì)被發(fā)送。
高級(jí)使用場景:
這里介紹一下如何將 Retained(保留) 消息與Will (遺囑)消息結(jié)合起來進(jìn)行使用。
- 客戶端 A 遺囑消息設(shè)定為”offline“,該遺囑主題與一個(gè)普通發(fā)送狀態(tài)的主題設(shè)定成同一個(gè) A/status;
- 當(dāng)客戶端 A 連接時(shí),向主題 A/status 發(fā)送 “online” 的 Retained 消息,其它客戶端訂閱主題 A/status的時(shí)候,獲取 Retained 消息為 “online” ;
- 當(dāng)客戶端 A 異常斷開時(shí),系統(tǒng)自動(dòng)向主題 A/status 發(fā)送”offline“的消息,其它訂閱了此主題的客戶端會(huì)馬上收到”offline“消息;如果遺囑消息被設(shè)定了 Retained 的話,這時(shí)有新的訂閱A/status主題的客戶端上線的時(shí)候,獲取到的消息為“offline”。
2、EMQ X Cloud
2.1、EMQ X Cloud簡介
通過開放標(biāo)準(zhǔn)的物聯(lián)網(wǎng)協(xié)議 MQTT、MQTT over WebSocket、CoAP/LwM2M 將數(shù)以億計(jì)的物聯(lián)網(wǎng)設(shè)備可靠地連接到 EMQ X Cloud。通過 TLS/SSL 和基于 X.509 證書的認(rèn)證確保安全的雙向通信。

在該模型中,EMQ X Cloud 提供的 MQTT 服務(wù)不僅為設(shè)備與設(shè)備、設(shè)備與應(yīng)用間架起橋梁,同時(shí)可將需要的數(shù)據(jù)進(jìn)行持久化,以便非實(shí)時(shí)應(yīng)用在后續(xù)對(duì)獲取的數(shù)據(jù)加以利用。
2.2、EMQ X Cloud優(yōu)勢
2.2.1、協(xié)議支持完整
支持 MQTT v3.1,v3.1.1 與 v5.0 協(xié)議版本,是全球首個(gè)支持 MQTT 5.0 的公有云服務(wù),支持 MQTT WebSocket 服務(wù),完整支持 QoS0, QoS1 與 QoS2 級(jí)別 MQTT 消息。
2.2.2、多種協(xié)議接入
支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 協(xié)議在內(nèi)的多種通信協(xié)議接入,覆蓋各類行業(yè)應(yīng)用;可根據(jù)您的特殊使用場景定制私有化功能,充分契合業(yè)務(wù)需求。
2.2.3、容量預(yù)估與伸縮
通過連接數(shù)與消息吞吐量自動(dòng)預(yù)估容量,通過緊密的監(jiān)控來制定伸縮計(jì)劃,集群大小可隨業(yè)務(wù)規(guī)模平滑調(diào)整。
2.3、EMQ X 和 RabbitMQ對(duì)比
EMQ X 是基于高并發(fā)的 Erlang/OTP 語言平臺(tái)開發(fā),支持百萬級(jí)連接、分布式集群架構(gòu)、發(fā)布訂閱模式的開源 MQTT 消息服務(wù)器。開源至今,EMQ X 在全球物聯(lián)網(wǎng)市場得到了廣泛應(yīng)用。在開源版基礎(chǔ)上,還陸續(xù)發(fā)展了商業(yè)版和提供云版本(cloud-hosting)(https://www.emqx.com/zh/cloud)。EMQ X 支持很多插件,具有強(qiáng)大拓展能力,用戶依靠插件可以實(shí)現(xiàn)更多的功能。
RabbitMQ 是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ 服務(wù)器也是基于 Erlang 語言開發(fā)的,現(xiàn)在可以通過插件配置的形式,使其支持 MQTT 協(xié)議。
2.3.1、測試場景
以下的測試均使用了 QoS 1 的消息。當(dāng)發(fā)送 QoS 1 的消息時(shí),這些消息每次都要作為可持久化的備份保存在硬盤上。所以隊(duì)列空間的使用也尤為重要。
這次評(píng)測使用了一個(gè)云主機(jī) M5 large 的實(shí)例,每個(gè) MQTT 消息服務(wù)器集群由 3 個(gè)節(jié)點(diǎn)組成,每個(gè)節(jié)點(diǎn)的配置是雙核,8GB 內(nèi)存。需要強(qiáng)調(diào)的是,我們對(duì)于 EMQ X 和 RabbitMQ 的測試使用了完全一致的硬件資源以消除變量。
壓力測試將會(huì)有兩個(gè)場景,「多對(duì)一」 和 「一對(duì)多」。
多對(duì)一:
許多設(shè)備作為發(fā)布者,如溫度傳感器或者是壓力傳感器,發(fā)送數(shù)據(jù)給一個(gè)服務(wù)器。服務(wù)器再將這些數(shù)據(jù)發(fā)送給一個(gè)控制器(即訂閱者)處理這些數(shù)據(jù)。

一對(duì)多:
一個(gè)控制器作為發(fā)布者將消息傳送給服務(wù)器,再由服務(wù)器將這些消息傳送給多個(gè)作為訂閱者的設(shè)備。

在每個(gè)場景里,「多」的那一方的數(shù)量將會(huì)從 2000 個(gè)逐漸上升到 10000 個(gè)。每個(gè)場景里,每一秒會(huì)發(fā)送一條載荷為 256 字節(jié)的消息。這樣的發(fā)布并不會(huì)造成過大的吞吐量。僅僅使用 256 字節(jié)載荷是為了展示出這兩個(gè)服務(wù)器的工作原理,以及他們的集群模式如何對(duì)這些場景作出反應(yīng)的。
2.3.2、測試結(jié)果
左側(cè)Y軸是指 CPU 占用,底部X軸是指「多」側(cè)的客戶端數(shù)量變化。
多對(duì)一:
從 「多對(duì)一」 的結(jié)果可以看出,EMQ X 和 RabbitMQ 相比并沒有太大差別。

一對(duì)多:
但是從「一對(duì)多」的結(jié)果來看,RabbitMQ 相比于 EMQ X 確實(shí)有很明顯的差距。

2.3.3、測試總結(jié)
結(jié)果表明:在「多對(duì)一」 場景中,EMQ X 和 RabbitMQ 相比并沒有太大差別;而在「一對(duì)多」場景中,RabbitMQ 則較 EMQ X 產(chǎn)生了較為明顯的差距。相比較而言,rabbitmq使用MQTT協(xié)議,和EMQX使用MQTT協(xié)議存在著一定的差距。
2.3.4、注意
使用MQTT的發(fā)布-訂閱模型不能滿足使用要求??梢赃x擇使用AMQP。
3、Eclipse Paho Java
Paho Java客戶端是用Java編寫的MQTT客戶端庫,用于開發(fā)在JVM或其他Java兼容平臺(tái)(例如Android)上運(yùn)行的應(yīng)用程序。
Paho不僅可以對(duì)接EMQ X Broker,還可以對(duì)接滿足符合MQTT協(xié)議規(guī)范的消息代理服務(wù)端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場景。
4、SpringBoot整合Eclipse Paho Java
EMQX是消息服務(wù)器,而我們java想要發(fā)送消息,和訂閱消息都是和服務(wù)器打交道,想要和服務(wù)器打交道就需要想辦法連上他,這時(shí)候就需要用到了Eclipse Paho Java客戶端,用來在java當(dāng)中連接EMQX消息服務(wù)器。
下面案例是按照我的應(yīng)用場景來寫的,監(jiān)聽單獨(dú)用了一個(gè)客戶端存入了內(nèi)存,使用了static變量,啟動(dòng)項(xiàng)目的時(shí)候初始化,發(fā)送客戶端并沒有存入內(nèi)存,而是發(fā)送一條,創(chuàng)建一個(gè)客戶端。這里有一點(diǎn)需要注意,客戶端id一定不要重復(fù),就是對(duì)于MQTT服務(wù)器來說,clientid一定要保持唯一。
4.1、導(dǎo)入依賴
我用的springboot版本是2.3.9.RELEASE
<!-- mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--配置文件報(bào)錯(cuò)問題-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
4.2、讀取配置
在application.yml當(dāng)中添加
mqtt: hostUrl: tcp://192.168.56.103:1883 username: admin password: public client-id: equipment_main cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: client:report:1 isOpen: true qos: 1
通過這個(gè)文件來讀取配置
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {
/**
* 用戶名
*/
private String username;
/**
* 密碼
*/
private String password;
/**
* 連接地址
*/
private String hostUrl;
/**
* 客戶端Id,同一臺(tái)服務(wù)器下,不允許出現(xiàn)重復(fù)的客戶端id
*/
private String clientId;
/**
* 默認(rèn)連接主題
*/
private String defaultTopic;
/**
* 超時(shí)時(shí)間
*/
private int timeout;
/**
* 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端
* 發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制
*/
private int keepAlive;
/**
* 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連
* 接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
*/
private Boolean cleanSession;
/**
* 是否斷線重連
*/
private Boolean reconnect;
/**
* 啟動(dòng)的時(shí)候是否關(guān)閉mqtt
*/
private Boolean isOpen;
/**
* 連接方式
*/
private Integer qos;
}
4.3、添加mqtt接受服務(wù)的客戶端
@Component
public class MqttAcceptClient {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);
@Autowired
private MqttAcceptCallback mqttAcceptCallback;
@Autowired
private MqttProperties mqttProperties;
public static MqttClient client;
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttAcceptClient.client = client;
}
/**
* 客戶端連接
*/
public void connect() {
MqttClient client;
try {
client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setAutomaticReconnect(mqttProperties.getReconnect());
options.setCleanSession(mqttProperties.getCleanSession());
MqttAcceptClient.setClient(client);
try {
// 設(shè)置回調(diào)
client.setCallback(mqttAcceptCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重新連接
*/
public void reconnection() {
try {
client.connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱某個(gè)主題
*
* @param topic 主題
* @param qos 連接方式
*/
public void subscribe(String topic, int qos) {
logger.info("==============開始訂閱主題==============" + topic);
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 取消訂閱某個(gè)主題
*
* @param topic
*/
public void unsubscribe(String topic) {
logger.info("==============開始取消訂閱主題==============" + topic);
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.4、添加mqtt接受服務(wù)的回調(diào)類
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);
@Autowired
private MqttAcceptClient mqttAcceptClient;
/**
* 客戶端斷開后觸發(fā)
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("連接斷開,可以做重連");
if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
logger.info("emqx重新連接....................................................");
mqttAcceptClient.reconnection();
}
}
/**
* 客戶端收到消息觸發(fā)
*
* @param topic 主題
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("接收消息主題 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息內(nèi)容 : " + new String(mqttMessage.getPayload()));
// int i = 1/0;
}
/**
* 發(fā)布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主題:" + topic + "發(fā)送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
logger.info("消息的內(nèi)容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 連接emq服務(wù)器后觸發(fā)
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
logger.info("--------------------ClientId:"
+ MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
// 以/#結(jié)尾表示訂閱所有以test開頭的主題
// 訂閱所有機(jī)構(gòu)主題
mqttAcceptClient.subscribe("client:report:1", 0);
}
}
4.5、添加mqtt發(fā)送客戶端
@Component
public class MqttSendClient {
private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);
@Autowired
private MqttSendCallBack mqttSendCallBack;
@Autowired
private MqttProperties mqttProperties;
public MqttClient connect() {
MqttClient client = null;
try {
String uuid = UUID.randomUUID().toString().replaceAll("-","");
client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setCleanSession(true);
options.setAutomaticReconnect(false);
try {
// 設(shè)置回調(diào)
client.setCallback(mqttSendCallBack);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
return client;
}
/**
* 發(fā)布消息
* 主題格式: server:report:$orgCode(參數(shù)實(shí)際使用機(jī)構(gòu)代碼)
*
* @param retained 是否保留
* @param orgCode orgId
* @param pushMessage 消息體
*/
public void publish(boolean retained, String orgCode, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(mqttProperties.getQos());
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttDeliveryToken token;
MqttClient mqttClient = connect();
try {
mqttClient.publish("server:report:" + orgCode, message);
} catch (MqttException e) {
e.printStackTrace();
} finally {
disconnect(mqttClient);
close(mqttClient);
}
}
/**
* 關(guān)閉連接
*
* @param mqttClient
*/
public static void disconnect(MqttClient mqttClient) {
try {
if (mqttClient != null) mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 釋放資源
*
* @param mqttClient
*/
public static void close(MqttClient mqttClient) {
try {
if (mqttClient != null) mqttClient.close();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
4.6、添加mqtt發(fā)送客戶端的回調(diào)類
@Component
public class MqttSendCallBack implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);
/**
* 客戶端斷開后觸發(fā)
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
logger.info("連接斷開,可以做重連");
}
/**
* 客戶端收到消息觸發(fā)
*
* @param topic 主題
* @param mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("接收消息主題 : " + topic);
logger.info("接收消息Qos : " + mqttMessage.getQos());
logger.info("接收消息內(nèi)容 : " + new String(mqttMessage.getPayload()));
}
/**
* 發(fā)布消息成功
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
String[] topics = token.getTopics();
for (String topic : topics) {
logger.info("向主題:" + topic + "發(fā)送消息成功!");
}
try {
MqttMessage message = token.getMessage();
byte[] payload = message.getPayload();
String s = new String(payload, "UTF-8");
logger.info("消息的內(nèi)容是:" + s);
} catch (MqttException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 連接emq服務(wù)器后觸發(fā)
*
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
logger.info("--------------------ClientId:"
+ MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
}
}
4.7、添加配置類
自定義配置,通過這個(gè)配置,來控制啟動(dòng)項(xiàng)目的時(shí)候是否啟動(dòng)mqtt
public class MqttCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
//1、能獲取到ioc使用的beanfactory
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
//2、獲取類加載器
ClassLoader classLoader = context.getClassLoader();
//3、獲取當(dāng)前環(huán)境信息
Environment environment = context.getEnvironment();
String isOpen = environment.getProperty("mqtt.isOpen");
return Boolean.valueOf(isOpen);
}
}
4.8、啟動(dòng)服務(wù)的時(shí)候開啟監(jiān)聽客戶端
@Configuration
public class MqttConfig {
@Autowired
private MqttAcceptClient mqttAcceptClient;
/**
* 訂閱mqtt
*
* @return
*/
@Conditional(MqttCondition.class)
@Bean
public MqttAcceptClient getMqttPushClient() {
mqttAcceptClient.connect();
return mqttAcceptClient;
}
}
4.9、測試類
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttSendClient MqttSendClient;
@GetMapping(value = "/publishTopic")
public Object publishTopic(String sendMessage) {
System.out.println("message:"+sendMessage);
sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";
MqttSendClient.publish(false,"client:report:2",sendMessage);
return null;
}
}
5、發(fā)送和監(jiān)聽消息測試

測試發(fā)送消息:
訪問:http://localhost:8080/mqtt/publishTopic?sendMessage=222

測試監(jiān)聽:

6、總結(jié)
EMQX官網(wǎng):https://www.emqx.com/zh/blog/introduction-to-mqtt-qos
到此這篇關(guān)于springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)springboot使用EMQX內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring注解中@Autowired和@Bean的區(qū)別詳解
這篇文章主要詳細(xì)介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過兩個(gè)注解的使用場景介紹了二者的區(qū)別,感興趣的同學(xué)可以參考閱讀2023-06-06
Servlet+MyBatis項(xiàng)目轉(zhuǎn)Spring Cloud微服務(wù),多數(shù)據(jù)源配置修改建議
今天小編就為大家分享一篇關(guān)于Servlet+MyBatis項(xiàng)目轉(zhuǎn)Spring Cloud微服務(wù),多數(shù)據(jù)源配置修改建議,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-01-01
javaweb servlet中使用請(qǐng)求轉(zhuǎn)發(fā)亂碼的實(shí)現(xiàn)
下面小編就為大家?guī)硪黄猨avaweb servlet中使用請(qǐng)求轉(zhuǎn)發(fā)亂碼的實(shí)現(xiàn)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-08-08
SpringBoot集成JWT實(shí)現(xiàn)Token登錄驗(yàn)證的示例代碼
隨著技術(shù)的發(fā)展,分布式web應(yīng)用的普及,通過session管理用戶登錄狀態(tài)成本越來越高,因此慢慢發(fā)展成為token的方式做登錄身份校驗(yàn),本文就來介紹一下SpringBoot集成JWT實(shí)現(xiàn)Token登錄驗(yàn)證的示例代碼,感興趣的可以了解一下2023-12-12
Java實(shí)現(xiàn)復(fù)原IP地址的方法
這篇文章主要介紹了Java實(shí)現(xiàn)復(fù)原IP地址的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-02-02
java shiro實(shí)現(xiàn)退出登陸清空緩存
本篇文章主要介紹了java shiro實(shí)現(xiàn)退出登陸清空緩存,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02
Java中的runnable 和 callable 區(qū)別解析
Runnable接口用于定義不需要返回結(jié)果的任務(wù),而Callable接口可以返回結(jié)果并拋出異常,通常與Future結(jié)合使用,Runnable適用于簡單的后臺(tái)任務(wù)和定時(shí)任務(wù),而Callable適用于并行計(jì)算、異步操作和復(fù)雜任務(wù),選擇使用哪個(gè)接口取決于具體的應(yīng)用場景,感興趣的朋友一起看看吧2025-03-03
Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析
這篇文章主要介紹了Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08

