欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn)

 更新時(shí)間:2023年10月25日 11:25:37   作者:怪 咖@  
最近由于iot越來(lái)越火, 物聯(lián)網(wǎng)的需求越來(lái)越多, 那么理所當(dāng)然的使用mqtt的場(chǎng)景也就越來(lái)越多,本文主要介紹了springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn),感興趣的可以了解一下

1、MQTT協(xié)議

1.1、MQTT簡(jiǎn)介

在了解EMQX前首先了解一下MQTT協(xié)議,MQTT 全稱(chēng)為 Message Queuing Telemetry Transport(消息隊(duì)列遙測(cè)傳輸),是一種基于 發(fā)布/訂閱 模式的 輕量級(jí)物聯(lián)網(wǎng)消息傳輸協(xié)議。IBM 公司的安迪·斯坦福-克拉克及 Arcom 公司的阿蘭·尼普于 1999 年撰寫(xiě)了該協(xié)議的第一個(gè)版本1,之后 MQTT 便以簡(jiǎn)單易實(shí)現(xiàn)、支持 QoS、輕量且省帶寬等眾多特性逐漸成為了 IoT 通訊的標(biāo)準(zhǔn)。

MQTT 協(xié)議每個(gè)消息最少僅需 2 個(gè)字節(jié) (其中報(bào)頭僅需 1 個(gè)字節(jié),其余字節(jié)可以全部作為消息載荷)就可以完成通信,專(zhuān)為那些資源和空間有限、功耗敏感的硬件所打造。

1.2、MQTT 協(xié)議基本特點(diǎn)

  • 使用發(fā)布/訂閱消息模式,提供了一對(duì)多的消息分發(fā)和應(yīng)用程序的解耦。
  • 不關(guān)心負(fù)載內(nèi)容的消息傳輸。
  • 提供 3 種消息服務(wù)質(zhì)量等級(jí),滿足不同投遞需求。
  • 很小的傳輸消耗和協(xié)議數(shù)據(jù)交換,最大限度減少網(wǎng)絡(luò)流量。
  • 提供連接異常斷開(kāi)時(shí)通知相關(guān)各方的機(jī)制。

1.3、MQTT 應(yīng)用行業(yè)

MQTT 作為一種低開(kāi)銷(xiāo),低帶寬占用的即時(shí)通訊協(xié)議,可以用極少的代碼和帶寬為聯(lián)網(wǎng)設(shè)備提供實(shí)時(shí)可靠的消息服務(wù),它適用于硬件資源有限的設(shè)備及帶寬有限的網(wǎng)絡(luò)環(huán)境。因此,MQTT 協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、智能硬件、車(chē)聯(lián)網(wǎng)、電力能源等行業(yè)。

1.4、MQTT 協(xié)議原理

基于發(fā)布/訂閱模式的 MQTT 協(xié)議中有三種角色:發(fā)布者(Publisher)、代理(Broker)、訂閱者(Subscriber)。發(fā)布者向代理發(fā)布消息,代理向訂閱者轉(zhuǎn)發(fā)這些消息。通常情況下,客戶端的角色是發(fā)布者和訂閱者,服務(wù)器的角色是代理,但實(shí)際上,服務(wù)器也可能主動(dòng)發(fā)布消息或者訂閱主題,客串一下客戶端的角色。

在這里插入圖片描述

為了方便理解,MQTT 傳輸?shù)南⒖梢院?jiǎn)化為:主題(Topic)和載荷(Payload)兩部分:

  • Topic,消息主題,訂閱者向代理訂閱主題后,一旦代理收到相應(yīng)主題的消息,就會(huì)向訂閱者轉(zhuǎn)發(fā)該消息。
  • Payload,消息載荷(也可以理解為傳輸?shù)臄?shù)據(jù)),訂閱者在消息中真正關(guān)心的部分,通常是業(yè)務(wù)相關(guān)的。

1.5、MQTT 協(xié)議基礎(chǔ)概念

1.5.1、會(huì)話(Session)

每個(gè)客戶端與服務(wù)器建立連接后就是一個(gè)會(huì)話,客戶端和服務(wù)器之間有狀態(tài)交互。會(huì)話可以存在于一個(gè)網(wǎng)絡(luò)連接之間,也可以跨越多個(gè)連續(xù)的網(wǎng)絡(luò)連接存在。

1.5.2、訂閱(Subscription)

訂閱包含一個(gè)主題過(guò)濾器(Topic Filter)和一個(gè)最大的服務(wù)質(zhì)量(QoS)等級(jí)。訂閱與單個(gè)會(huì)話(Session)關(guān)聯(lián)。會(huì)話可以包含多于一個(gè)的訂閱。

1.5.3、主題名(Topic Name)

附加在應(yīng)用消息上的一個(gè)標(biāo)簽,被用于匹配服務(wù)端已存在的訂閱。服務(wù)端會(huì)向所有匹配訂閱的客戶端發(fā)送此應(yīng)用消息。

1.5.4、主題過(guò)濾器(Topic Filter)

僅在訂閱時(shí)使用的主題表達(dá)式,可以包含通配符,以匹配多個(gè)主題名。就是可以通過(guò)通配符達(dá)到,發(fā)一條消息,多個(gè)主題能接受到消息的效果。

1.5.5、載荷(Payload)

對(duì)于 PUBLISH 報(bào)文來(lái)說(shuō)載荷就是業(yè)務(wù)消息(就是指發(fā)送的消息內(nèi)容),它可以是任意格式(二進(jìn)制、十六進(jìn)制、普通字符串、JSON 字符串、Base64)的數(shù)據(jù)。

1.6、MQTT 協(xié)議進(jìn)階

1.6.1、消息服務(wù)質(zhì)量(QoS)

MQTT 協(xié)議提供了 3 種消息服務(wù)質(zhì)量等級(jí)(Quality of Service),它保證了在不同的網(wǎng)絡(luò)環(huán)境下消息傳遞的可靠性。這里有一點(diǎn)要明白,必須先訂閱,發(fā)布消息才會(huì)收到。假如沒(méi)訂閱,他發(fā)送消息了,我再訂閱,這時(shí)候不管QoS設(shè)置幾,都是收不到消息的。

1.6.1.1、QoS 0 - 最多分發(fā)一次

當(dāng) QoS 為 0 時(shí),消息的分發(fā)依賴(lài)于底層網(wǎng)絡(luò)的能力。發(fā)布者只會(huì)發(fā)布一次消息,接收者不會(huì)應(yīng)答消息,發(fā)布者也不會(huì)儲(chǔ)存和重發(fā)消息。消息在這個(gè)等級(jí)下具有最高的傳輸效率,但可能送達(dá)一次也可能根本沒(méi)送達(dá)。

1.6.1.2、Qos 1 - 至少分發(fā)一次

當(dāng) QoS 為 1 時(shí),可以保證消息至少送達(dá)一次。MQTT 通過(guò)簡(jiǎn)單的 ACK 機(jī)制來(lái)保證 QoS 1。

  • 發(fā)送者:發(fā)布消息,并等待接收者的 PUBACK 報(bào)文的應(yīng)答,在規(guī)定的時(shí)間內(nèi)沒(méi)有收到 PUBACK 的應(yīng)答,發(fā)布者會(huì)將消息的 DUP 置為1 并重發(fā)消息。
  • 接受者:接收到 QoS 為 1 的消息時(shí)應(yīng)該回應(yīng) PUBACK 報(bào)文,可能因?yàn)榫W(wǎng)絡(luò)延遲等原因沒(méi)有及時(shí)發(fā)出,這時(shí)接收者可能會(huì)多次接受同一個(gè)消息,無(wú)論 DUP標(biāo)志如何,接收者都會(huì)將收到的消息當(dāng)作一個(gè)新的消息并發(fā)送 PUBACK 報(bào)文應(yīng)答。

核心:就是發(fā)送消息的時(shí)候,接受者需要確認(rèn)一次,規(guī)定時(shí)間內(nèi)沒(méi)有確認(rèn)就會(huì)重新發(fā)。如果使用這種方式,寫(xiě)業(yè)務(wù)的時(shí)候需要保證冪等性

1.6.1.3、QoS 2 - 只分發(fā)一次

當(dāng) QoS 為 2 時(shí),發(fā)布者和訂閱者通過(guò)兩次會(huì)話來(lái)保證消息只被傳遞一次,這是最高等級(jí)的服務(wù)質(zhì)量,消息丟失和重復(fù)都是不可接受的。使用這個(gè)服務(wù)質(zhì)量等級(jí)會(huì)有額外的開(kāi)銷(xiāo)。

  • 發(fā)送者:發(fā)布 QoS 為 2 的消息之后,消息儲(chǔ)存起來(lái)并等待接收者回復(fù) PUBREC 的消息。
  • 接受者:收到一條 QoS 為 2 的消息時(shí),他會(huì)處理此消息并返回一條 PUBREC 進(jìn)行應(yīng)答。
  • 發(fā)送者:收到 PUBREC 消息后,丟棄掉之前的發(fā)布消息。保存 PUBREC 消息,并應(yīng)答一個(gè) PUBREL。等待接收者回復(fù) PUBCOMP 消息
  • 接受者:當(dāng)接收者收到 PUBREL 消息之后,它會(huì)丟棄掉所有已保存的狀態(tài),并回復(fù) PUBCOMP。
  • 發(fā)送者:當(dāng)發(fā)送者收到 PUBCOMP 消息之后會(huì)清空之前所保存的狀態(tài)。

核心:發(fā)送消息的時(shí)候,接受者需要確認(rèn)兩次,來(lái)保證消息確實(shí)已經(jīng)送到。

無(wú)論在傳輸過(guò)程中何時(shí)出現(xiàn)丟包,發(fā)送端都負(fù)責(zé)重發(fā)上一條消息。不管發(fā)送端是 Publisher(發(fā)送端) 還是 Broker(服務(wù)器),都是如此。因此,接收端也需要對(duì)每一條命令消息都進(jìn)行應(yīng)答。

1.6.2、QoS 在發(fā)布與訂閱中的區(qū)別

發(fā)布時(shí)的 QoS 表示消息發(fā)送到服務(wù)端時(shí)使用的 QoS
訂閱時(shí)的 QoS 表示服務(wù)端向自己轉(zhuǎn)發(fā)消息時(shí)可以使用的最大 QoS

  • 客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 B 的訂閱QoS。
  • 客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時(shí),服務(wù)端向客戶端 B 轉(zhuǎn)發(fā)消息時(shí)使用的 QoS 為客戶端 A 的發(fā)布 QoS。

總結(jié):接收端可以設(shè)置訂閱Qos為2,這樣就可以接所有qos等級(jí)消息。也就是發(fā)布消息qos為多少,那我這邊接受消息就是多少。主要以發(fā)布消息的qos為準(zhǔn)。

1.6.3、如何選擇 MQTT QoS 等級(jí)

QoS 級(jí)別越高,流程越復(fù)雜,系統(tǒng)資源消耗越大。應(yīng)用程序可以根據(jù)自己的網(wǎng)絡(luò)場(chǎng)景和業(yè)務(wù)需求,選擇合適的 QoS 級(jí)別。

以下情況下可以選擇 QoS 0

  • 可以接受消息偶爾丟失。
  • 在同一個(gè)子網(wǎng)內(nèi)部的服務(wù)間的消息交互,或其他客戶端與服務(wù)端 網(wǎng)絡(luò)非常穩(wěn)定的場(chǎng)景。

以下情況下可以選擇 QoS 1

  • 對(duì)系統(tǒng)資源消耗較為關(guān)注,希望性能最優(yōu)化。
  • 消息不能丟失,但能接受并處理重復(fù)的消息。

以下情況下可以選擇 QoS 2

  • 不能忍受消息丟失(消息的丟失會(huì)造成生命或財(cái)產(chǎn)的損失),且不希望收到重復(fù)的消息。
  • 數(shù)據(jù)完整性與及時(shí)性要求較高的銀行、消防、航空等行業(yè)。

1.6.4、清除會(huì)話(Clean Session)

MQTT 客戶端向服務(wù)器發(fā)起 CONNECT 請(qǐng)求時(shí),可以通過(guò) Clean Session 標(biāo)志設(shè)置是否創(chuàng)建全新的會(huì)話。

Clean Session 設(shè)置為 0 時(shí):

  • 如果存在一個(gè)關(guān)聯(lián)此客戶標(biāo)識(shí)符的會(huì)話,服務(wù)端必須基于此會(huì)話的狀態(tài)恢復(fù)與客戶端的通信。
  • 如果不存在任何關(guān)聯(lián)此客戶標(biāo)識(shí)符的會(huì)話,服務(wù)端必須創(chuàng)建一個(gè)新的會(huì)話。

Clean Session 設(shè)置為 1:

  • 客戶端和服務(wù)端必須丟棄任何已存在的會(huì)話,并開(kāi)始一個(gè)新的會(huì)話。

總結(jié):監(jiān)聽(tīng)端建議設(shè)置為0,一般監(jiān)聽(tīng)端,我們都會(huì)配置單例,并且項(xiàng)目啟動(dòng)就開(kāi)始創(chuàng)建連接監(jiān)聽(tīng),設(shè)置為0,這樣可以保證連接的唯一性,和消息的安全性。

1.6.5、保活心跳(Keep Alive)

MQTT 客戶端向服務(wù)器發(fā)起 CONNECT 請(qǐng)求時(shí),通過(guò) Keep Alive 參數(shù)設(shè)置保活周期。

客戶端在無(wú)報(bào)文發(fā)送時(shí),按 Keep Alive 周期定時(shí)發(fā)送 2 字節(jié)的 PINGREQ 心跳報(bào)文,服務(wù)端收到 PINGREQ 報(bào)文后,回復(fù) 2 字節(jié)的 PINGRESP 報(bào)文。

服務(wù)端在 1.5 個(gè)心跳周期內(nèi),既沒(méi)有收到客戶端發(fā)布訂閱報(bào)文,也沒(méi)有收到 PINGREQ 心跳報(bào)文時(shí),將斷開(kāi)客戶端連接。

1.6.6、保留消息(Retained Message)

MQTT 客戶端向服務(wù)器發(fā)布(PUBLISH)消息時(shí),可以設(shè)置保留消息(Retained Message)標(biāo)志。保留消息會(huì)駐留在消息服務(wù)器,后來(lái)的訂閱者訂閱主題時(shí)可以接收到最新一條(注意,是只有最近的一條)保留消息。

1.6.7、遺囑消息(Will Message)

MQTT 客戶端向服務(wù)端發(fā)送 CONNECT 請(qǐng)求時(shí),可以攜帶遺囑消息。MQTT 客戶端異常下線時(shí)(客戶端斷開(kāi)前未向服務(wù)器發(fā)送 DISCONNECT 消息),MQTT 消息服務(wù)器會(huì)發(fā)布遺囑消息。

在連接的時(shí)候通過(guò)調(diào)用 MqttConnectOptions 實(shí)例的 setWill 方法來(lái)設(shè)定。任何訂閱了下面的主題的客戶端都可以收到該遺囑消息。

//方法1MqttConnectOptions.setWill(MqttTopic topic, byte[] payload, int qos, boolean retained)
//方法2MqttConnectOptions.setWill(java.lang.String topic, byte[] payload, int qos, boolean retained)

以下情況下會(huì)發(fā)送 Will Message:

  • 服務(wù)端發(fā)生了I/O 錯(cuò)誤或者網(wǎng)絡(luò)失?。?/li>
  • 客戶端在定義的心跳時(shí)期失聯(lián);
  • 客戶端在發(fā)送下線包( DISCONNECT)之前關(guān)閉網(wǎng)絡(luò)連接;
  • 服務(wù)端在收到下線包之前關(guān)閉網(wǎng)絡(luò)連接。

總結(jié):發(fā)送遺囑信息可以理解為,創(chuàng)建客戶端連接的時(shí)候,告訴服務(wù)器(mqtt服務(wù)器)我掛了之后,給哪些主題發(fā)這些消息。當(dāng)訂閱到遺囑消息之后,他就知道監(jiān)聽(tīng)端掛了,我不能給他發(fā)消息了,遺囑消息在客戶端正常調(diào)用 disconnect 方法之后并不會(huì)被發(fā)送。

高級(jí)使用場(chǎng)景:
這里介紹一下如何將 Retained(保留) 消息與Will (遺囑)消息結(jié)合起來(lái)進(jìn)行使用。

  • 客戶端 A 遺囑消息設(shè)定為”offline“,該遺囑主題與一個(gè)普通發(fā)送狀態(tài)的主題設(shè)定成同一個(gè) A/status;
  • 當(dāng)客戶端 A 連接時(shí),向主題 A/status 發(fā)送 “online” 的 Retained 消息,其它客戶端訂閱主題 A/status的時(shí)候,獲取 Retained 消息為 “online” ;
  • 當(dāng)客戶端 A 異常斷開(kāi)時(shí),系統(tǒng)自動(dòng)向主題 A/status 發(fā)送”offline“的消息,其它訂閱了此主題的客戶端會(huì)馬上收到”offline“消息;如果遺囑消息被設(shè)定了 Retained 的話,這時(shí)有新的訂閱A/status主題的客戶端上線的時(shí)候,獲取到的消息為“offline”。

2、EMQ X Cloud

2.1、EMQ X Cloud簡(jiǎn)介

通過(guò)開(kāi)放標(biāo)準(zhǔn)的物聯(lián)網(wǎng)協(xié)議 MQTT、MQTT over WebSocket、CoAP/LwM2M 將數(shù)以億計(jì)的物聯(lián)網(wǎng)設(shè)備可靠地連接到 EMQ X Cloud。通過(guò) TLS/SSL 和基于 X.509 證書(shū)的認(rèn)證確保安全的雙向通信。

在這里插入圖片描述

在該模型中,EMQ X Cloud 提供的 MQTT 服務(wù)不僅為設(shè)備與設(shè)備、設(shè)備與應(yīng)用間架起橋梁,同時(shí)可將需要的數(shù)據(jù)進(jìn)行持久化,以便非實(shí)時(shí)應(yīng)用在后續(xù)對(duì)獲取的數(shù)據(jù)加以利用。

2.2、EMQ X Cloud優(yōu)勢(shì)

2.2.1、協(xié)議支持完整

支持 MQTT v3.1,v3.1.1 與 v5.0 協(xié)議版本,是全球首個(gè)支持 MQTT 5.0 的公有云服務(wù),支持 MQTT WebSocket 服務(wù),完整支持 QoS0, QoS1 與 QoS2 級(jí)別 MQTT 消息。

2.2.2、多種協(xié)議接入

支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 協(xié)議在內(nèi)的多種通信協(xié)議接入,覆蓋各類(lèi)行業(yè)應(yīng)用;可根據(jù)您的特殊使用場(chǎng)景定制私有化功能,充分契合業(yè)務(wù)需求。

2.2.3、容量預(yù)估與伸縮

通過(guò)連接數(shù)與消息吞吐量自動(dòng)預(yù)估容量,通過(guò)緊密的監(jiān)控來(lái)制定伸縮計(jì)劃,集群大小可隨業(yè)務(wù)規(guī)模平滑調(diào)整。

2.3、EMQ X 和 RabbitMQ對(duì)比

EMQ X 是基于高并發(fā)的 Erlang/OTP 語(yǔ)言平臺(tái)開(kāi)發(fā),支持百萬(wàn)級(jí)連接、分布式集群架構(gòu)、發(fā)布訂閱模式的開(kāi)源 MQTT 消息服務(wù)器。開(kāi)源至今,EMQ X 在全球物聯(lián)網(wǎng)市場(chǎng)得到了廣泛應(yīng)用。在開(kāi)源版基礎(chǔ)上,還陸續(xù)發(fā)展了商業(yè)版和提供云版本(cloud-hosting)(https://www.emqx.com/zh/cloud)。EMQ X 支持很多插件,具有強(qiáng)大拓展能力,用戶依靠插件可以實(shí)現(xiàn)更多的功能。

RabbitMQ 是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱(chēng)面向消息的中間件)。RabbitMQ 服務(wù)器也是基于 Erlang 語(yǔ)言開(kāi)發(fā)的,現(xiàn)在可以通過(guò)插件配置的形式,使其支持 MQTT 協(xié)議。

2.3.1、測(cè)試場(chǎng)景

以下的測(cè)試均使用了 QoS 1 的消息。當(dāng)發(fā)送 QoS 1 的消息時(shí),這些消息每次都要作為可持久化的備份保存在硬盤(pán)上。所以隊(duì)列空間的使用也尤為重要。

這次評(píng)測(cè)使用了一個(gè)云主機(jī) M5 large 的實(shí)例,每個(gè) MQTT 消息服務(wù)器集群由 3 個(gè)節(jié)點(diǎn)組成,每個(gè)節(jié)點(diǎn)的配置是雙核,8GB 內(nèi)存。需要強(qiáng)調(diào)的是,我們對(duì)于 EMQ X 和 RabbitMQ 的測(cè)試使用了完全一致的硬件資源以消除變量。

壓力測(cè)試將會(huì)有兩個(gè)場(chǎng)景,「多對(duì)一」 和 「一對(duì)多」。

多對(duì)一:
許多設(shè)備作為發(fā)布者,如溫度傳感器或者是壓力傳感器,發(fā)送數(shù)據(jù)給一個(gè)服務(wù)器。服務(wù)器再將這些數(shù)據(jù)發(fā)送給一個(gè)控制器(即訂閱者)處理這些數(shù)據(jù)。

在這里插入圖片描述

一對(duì)多:
一個(gè)控制器作為發(fā)布者將消息傳送給服務(wù)器,再由服務(wù)器將這些消息傳送給多個(gè)作為訂閱者的設(shè)備。

在這里插入圖片描述

在每個(gè)場(chǎng)景里,「多」的那一方的數(shù)量將會(huì)從 2000 個(gè)逐漸上升到 10000 個(gè)。每個(gè)場(chǎng)景里,每一秒會(huì)發(fā)送一條載荷為 256 字節(jié)的消息。這樣的發(fā)布并不會(huì)造成過(guò)大的吞吐量。僅僅使用 256 字節(jié)載荷是為了展示出這兩個(gè)服務(wù)器的工作原理,以及他們的集群模式如何對(duì)這些場(chǎng)景作出反應(yīng)的。

2.3.2、測(cè)試結(jié)果

左側(cè)Y軸是指 CPU 占用,底部X軸是指「多」側(cè)的客戶端數(shù)量變化。

多對(duì)一:
從 「多對(duì)一」 的結(jié)果可以看出,EMQ X 和 RabbitMQ 相比并沒(méi)有太大差別。

在這里插入圖片描述

一對(duì)多:
但是從「一對(duì)多」的結(jié)果來(lái)看,RabbitMQ 相比于 EMQ X 確實(shí)有很明顯的差距。

在這里插入圖片描述

2.3.3、測(cè)試總結(jié)

結(jié)果表明:在「多對(duì)一」 場(chǎng)景中,EMQ X 和 RabbitMQ 相比并沒(méi)有太大差別;而在「一對(duì)多」場(chǎng)景中,RabbitMQ 則較 EMQ X 產(chǎn)生了較為明顯的差距。相比較而言,rabbitmq使用MQTT協(xié)議,和EMQX使用MQTT協(xié)議存在著一定的差距。

2.3.4、注意

使用MQTT的發(fā)布-訂閱模型不能滿足使用要求??梢赃x擇使用AMQP。

3、Eclipse Paho Java

Paho Java客戶端是用Java編寫(xiě)的MQTT客戶端庫(kù),用于開(kāi)發(fā)在JVM或其他Java兼容平臺(tái)(例如Android)上運(yùn)行的應(yīng)用程序。
Paho不僅可以對(duì)接EMQ X Broker,還可以對(duì)接滿足符合MQTT協(xié)議規(guī)范的消息代理服務(wù)端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場(chǎng)景。

4、SpringBoot整合Eclipse Paho Java

EMQX是消息服務(wù)器,而我們java想要發(fā)送消息,和訂閱消息都是和服務(wù)器打交道,想要和服務(wù)器打交道就需要想辦法連上他,這時(shí)候就需要用到了Eclipse Paho Java客戶端,用來(lái)在java當(dāng)中連接EMQX消息服務(wù)器。

下面案例是按照我的應(yīng)用場(chǎng)景來(lái)寫(xiě)的,監(jiān)聽(tīng)單獨(dú)用了一個(gè)客戶端存入了內(nèi)存,使用了static變量,啟動(dòng)項(xiàng)目的時(shí)候初始化,發(fā)送客戶端并沒(méi)有存入內(nèi)存,而是發(fā)送一條,創(chuàng)建一個(gè)客戶端。這里有一點(diǎn)需要注意,客戶端id一定不要重復(fù),就是對(duì)于MQTT服務(wù)器來(lái)說(shuō),clientid一定要保持唯一。

4.1、導(dǎo)入依賴(lài)

我用的springboot版本是2.3.9.RELEASE

    <!-- mqtt -->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <!--配置文件報(bào)錯(cuò)問(wèn)題-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>

4.2、讀取配置

在application.yml當(dāng)中添加

mqtt:
  hostUrl: tcp://192.168.56.103:1883
  username: admin
  password: public
  client-id: equipment_main
  cleanSession: true
  reconnect: true
  timeout: 100
  keepAlive: 100
  defaultTopic: client:report:1
  isOpen: true
  qos: 1

通過(guò)這個(gè)文件來(lái)讀取配置

@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連接地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺(tái)服務(wù)器下,不允許出現(xiàn)重復(fù)的客戶端id
     */
    private String clientId;

    /**
     * 默認(rèn)連接主題
     */
    private String defaultTopic;

    /**
     * 超時(shí)時(shí)間
     */
    private int timeout;

    /**
     * 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端
     * 發(fā)送個(gè)消息判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制
     */
    private int keepAlive;

    /**
     * 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連
     * 接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 啟動(dòng)的時(shí)候是否關(guān)閉mqtt
     */
    private Boolean isOpen;

    /**
     * 連接方式
     */
    private Integer qos;
}

4.3、添加mqtt接受服務(wù)的客戶端

@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    @Autowired
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連接
     */
    public void connect() {
        MqttClient client;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設(shè)置回調(diào)
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個(gè)主題
     *
     * @param topic 主題
     * @param qos   連接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("==============開(kāi)始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個(gè)主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        logger.info("==============開(kāi)始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

4.4、添加mqtt接受服務(wù)的回調(diào)類(lèi)

@Component
public class MqttAcceptCallback implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開(kāi)后觸發(fā)
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("連接斷開(kāi),可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            logger.info("emqx重新連接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到消息觸發(fā)
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("接收消息主題 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息內(nèi)容 : " + new String(mqttMessage.getPayload()));
//        int i = 1/0;
    }

    /**
     * 發(fā)布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主題:" + topic + "發(fā)送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("消息的內(nèi)容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq服務(wù)器后觸發(fā)
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
        // 以/#結(jié)尾表示訂閱所有以test開(kāi)頭的主題
        // 訂閱所有機(jī)構(gòu)主題
        mqttAcceptClient.subscribe("client:report:1", 0);
    }
}

4.5、添加mqtt發(fā)送客戶端

@Component
public class MqttSendClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設(shè)置回調(diào)
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 發(fā)布消息
     * 主題格式: server:report:$orgCode(參數(shù)實(shí)際使用機(jī)構(gòu)代碼)
     *
     * @param retained    是否保留
     * @param orgCode     orgId
     * @param pushMessage 消息體
     */
    public void publish(boolean retained, String orgCode, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttDeliveryToken token;
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish("server:report:" + orgCode, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關(guān)閉連接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) mqttClient.close();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

4.6、添加mqtt發(fā)送客戶端的回調(diào)類(lèi)

@Component
public class MqttSendCallBack implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);

    /**
     * 客戶端斷開(kāi)后觸發(fā)
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("連接斷開(kāi),可以做重連");
    }

    /**
     * 客戶端收到消息觸發(fā)
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("接收消息主題 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息內(nèi)容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發(fā)布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主題:" + topic + "發(fā)送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("消息的內(nèi)容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq服務(wù)器后觸發(fā)
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
    }
}

4.7、添加配置類(lèi)

自定義配置,通過(guò)這個(gè)配置,來(lái)控制啟動(dòng)項(xiàng)目的時(shí)候是否啟動(dòng)mqtt

public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
        //1、能獲取到ioc使用的beanfactory
        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
        //2、獲取類(lèi)加載器
        ClassLoader classLoader = context.getClassLoader();
        //3、獲取當(dāng)前環(huán)境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.valueOf(isOpen);
    }
}

4.8、啟動(dòng)服務(wù)的時(shí)候開(kāi)啟監(jiān)聽(tīng)客戶端

@Configuration
public class MqttConfig {

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 訂閱mqtt
     *
     * @return
     */
    @Conditional(MqttCondition.class)
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}

4.9、測(cè)試類(lèi)

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttSendClient MqttSendClient;

    @GetMapping(value = "/publishTopic")
    public Object publishTopic(String sendMessage) {
        System.out.println("message:"+sendMessage);
        sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";
        MqttSendClient.publish(false,"client:report:2",sendMessage);
        return null;
    }

}

5、發(fā)送和監(jiān)聽(tīng)消息測(cè)試

在這里插入圖片描述

測(cè)試發(fā)送消息:
訪問(wèn):http://localhost:8080/mqtt/publishTopic?sendMessage=222

在這里插入圖片描述

測(cè)試監(jiān)聽(tīng):

在這里插入圖片描述

6、總結(jié)

EMQX官網(wǎng):https://www.emqx.com/zh/blog/introduction-to-mqtt-qos

到此這篇關(guān)于springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)springboot使用EMQX內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 解決Maven依賴(lài)沖突的方法

    解決Maven依賴(lài)沖突的方法

    本文主要介紹了解決Maven依賴(lài)沖突的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • Spring注解中@Autowired和@Bean的區(qū)別詳解

    Spring注解中@Autowired和@Bean的區(qū)別詳解

    這篇文章主要詳細(xì)介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過(guò)兩個(gè)注解的使用場(chǎng)景介紹了二者的區(qū)別,感興趣的同學(xué)可以參考閱讀
    2023-06-06
  • Servlet+MyBatis項(xiàng)目轉(zhuǎn)Spring Cloud微服務(wù),多數(shù)據(jù)源配置修改建議

    Servlet+MyBatis項(xiàng)目轉(zhuǎn)Spring Cloud微服務(wù),多數(shù)據(jù)源配置修改建議

    今天小編就為大家分享一篇關(guān)于Servlet+MyBatis項(xiàng)目轉(zhuǎn)Spring Cloud微服務(wù),多數(shù)據(jù)源配置修改建議,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-01-01
  • javaweb servlet中使用請(qǐng)求轉(zhuǎn)發(fā)亂碼的實(shí)現(xiàn)

    javaweb servlet中使用請(qǐng)求轉(zhuǎn)發(fā)亂碼的實(shí)現(xiàn)

    下面小編就為大家?guī)?lái)一篇javaweb servlet中使用請(qǐng)求轉(zhuǎn)發(fā)亂碼的實(shí)現(xiàn)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2016-08-08
  • SpringBoot集成JWT實(shí)現(xiàn)Token登錄驗(yàn)證的示例代碼

    SpringBoot集成JWT實(shí)現(xiàn)Token登錄驗(yàn)證的示例代碼

    隨著技術(shù)的發(fā)展,分布式web應(yīng)用的普及,通過(guò)session管理用戶登錄狀態(tài)成本越來(lái)越高,因此慢慢發(fā)展成為token的方式做登錄身份校驗(yàn),本文就來(lái)介紹一下SpringBoot集成JWT實(shí)現(xiàn)Token登錄驗(yàn)證的示例代碼,感興趣的可以了解一下
    2023-12-12
  • Java實(shí)現(xiàn)復(fù)原IP地址的方法

    Java實(shí)現(xiàn)復(fù)原IP地址的方法

    這篇文章主要介紹了Java實(shí)現(xiàn)復(fù)原IP地址的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-02-02
  • java shiro實(shí)現(xiàn)退出登陸清空緩存

    java shiro實(shí)現(xiàn)退出登陸清空緩存

    本篇文章主要介紹了java shiro實(shí)現(xiàn)退出登陸清空緩存,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-02-02
  • Java中的runnable 和 callable 區(qū)別解析

    Java中的runnable 和 callable 區(qū)別解析

    Runnable接口用于定義不需要返回結(jié)果的任務(wù),而Callable接口可以返回結(jié)果并拋出異常,通常與Future結(jié)合使用,Runnable適用于簡(jiǎn)單的后臺(tái)任務(wù)和定時(shí)任務(wù),而Callable適用于并行計(jì)算、異步操作和復(fù)雜任務(wù),選擇使用哪個(gè)接口取決于具體的應(yīng)用場(chǎng)景,感興趣的朋友一起看看吧
    2025-03-03
  • java中的session對(duì)象如何獲取

    java中的session對(duì)象如何獲取

    這篇文章主要介紹了java中的session對(duì)象如何獲取,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析

    Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析

    這篇文章主要介紹了Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-08-08

最新評(píng)論