欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot使用EMQX(MQTT協(xié)議)的實現

 更新時間:2023年10月25日 11:25:37   作者:怪 咖@  
最近由于iot越來越火, 物聯(lián)網的需求越來越多, 那么理所當然的使用mqtt的場景也就越來越多,本文主要介紹了springboot使用EMQX(MQTT協(xié)議)的實現,感興趣的可以了解一下

1、MQTT協(xié)議

1.1、MQTT簡介

在了解EMQX前首先了解一下MQTT協(xié)議,MQTT 全稱為 Message Queuing Telemetry Transport(消息隊列遙測傳輸),是一種基于 發(fā)布/訂閱 模式的 輕量級物聯(lián)網消息傳輸協(xié)議。IBM 公司的安迪·斯坦福-克拉克及 Arcom 公司的阿蘭·尼普于 1999 年撰寫了該協(xié)議的第一個版本1,之后 MQTT 便以簡單易實現、支持 QoS、輕量且省帶寬等眾多特性逐漸成為了 IoT 通訊的標準。

MQTT 協(xié)議每個消息最少僅需 2 個字節(jié) (其中報頭僅需 1 個字節(jié),其余字節(jié)可以全部作為消息載荷)就可以完成通信,專為那些資源和空間有限、功耗敏感的硬件所打造。

1.2、MQTT 協(xié)議基本特點

  • 使用發(fā)布/訂閱消息模式,提供了一對多的消息分發(fā)和應用程序的解耦。
  • 不關心負載內容的消息傳輸。
  • 提供 3 種消息服務質量等級,滿足不同投遞需求。
  • 很小的傳輸消耗和協(xié)議數據交換,最大限度減少網絡流量。
  • 提供連接異常斷開時通知相關各方的機制。

1.3、MQTT 應用行業(yè)

MQTT 作為一種低開銷,低帶寬占用的即時通訊協(xié)議,可以用極少的代碼和帶寬為聯(lián)網設備提供實時可靠的消息服務,它適用于硬件資源有限的設備及帶寬有限的網絡環(huán)境。因此,MQTT 協(xié)議廣泛應用于物聯(lián)網、移動互聯(lián)網、智能硬件、車聯(lián)網、電力能源等行業(yè)。

1.4、MQTT 協(xié)議原理

基于發(fā)布/訂閱模式的 MQTT 協(xié)議中有三種角色:發(fā)布者(Publisher)、代理(Broker)、訂閱者(Subscriber)。發(fā)布者向代理發(fā)布消息,代理向訂閱者轉發(fā)這些消息。通常情況下,客戶端的角色是發(fā)布者和訂閱者,服務器的角色是代理,但實際上,服務器也可能主動發(fā)布消息或者訂閱主題,客串一下客戶端的角色。

在這里插入圖片描述

為了方便理解,MQTT 傳輸的消息可以簡化為:主題(Topic)和載荷(Payload)兩部分:

  • Topic,消息主題,訂閱者向代理訂閱主題后,一旦代理收到相應主題的消息,就會向訂閱者轉發(fā)該消息。
  • Payload,消息載荷(也可以理解為傳輸的數據),訂閱者在消息中真正關心的部分,通常是業(yè)務相關的。

1.5、MQTT 協(xié)議基礎概念

1.5.1、會話(Session)

每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態(tài)交互。會話可以存在于一個網絡連接之間,也可以跨越多個連續(xù)的網絡連接存在。

1.5.2、訂閱(Subscription)

訂閱包含一個主題過濾器(Topic Filter)和一個最大的服務質量(QoS)等級。訂閱與單個會話(Session)關聯(lián)。會話可以包含多于一個的訂閱。

1.5.3、主題名(Topic Name)

附加在應用消息上的一個標簽,被用于匹配服務端已存在的訂閱。服務端會向所有匹配訂閱的客戶端發(fā)送此應用消息。

1.5.4、主題過濾器(Topic Filter)

僅在訂閱時使用的主題表達式,可以包含通配符,以匹配多個主題名。就是可以通過通配符達到,發(fā)一條消息,多個主題能接受到消息的效果。

1.5.5、載荷(Payload)

對于 PUBLISH 報文來說載荷就是業(yè)務消息(就是指發(fā)送的消息內容),它可以是任意格式(二進制、十六進制、普通字符串、JSON 字符串、Base64)的數據。

1.6、MQTT 協(xié)議進階

1.6.1、消息服務質量(QoS)

MQTT 協(xié)議提供了 3 種消息服務質量等級(Quality of Service),它保證了在不同的網絡環(huán)境下消息傳遞的可靠性。這里有一點要明白,必須先訂閱,發(fā)布消息才會收到。假如沒訂閱,他發(fā)送消息了,我再訂閱,這時候不管QoS設置幾,都是收不到消息的。

1.6.1.1、QoS 0 - 最多分發(fā)一次

當 QoS 為 0 時,消息的分發(fā)依賴于底層網絡的能力。發(fā)布者只會發(fā)布一次消息,接收者不會應答消息,發(fā)布者也不會儲存和重發(fā)消息。消息在這個等級下具有最高的傳輸效率,但可能送達一次也可能根本沒送達。

1.6.1.2、Qos 1 - 至少分發(fā)一次

當 QoS 為 1 時,可以保證消息至少送達一次。MQTT 通過簡單的 ACK 機制來保證 QoS 1。

  • 發(fā)送者:發(fā)布消息,并等待接收者的 PUBACK 報文的應答,在規(guī)定的時間內沒有收到 PUBACK 的應答,發(fā)布者會將消息的 DUP 置為1 并重發(fā)消息。
  • 接受者:接收到 QoS 為 1 的消息時應該回應 PUBACK 報文,可能因為網絡延遲等原因沒有及時發(fā)出,這時接收者可能會多次接受同一個消息,無論 DUP標志如何,接收者都會將收到的消息當作一個新的消息并發(fā)送 PUBACK 報文應答。

核心:就是發(fā)送消息的時候,接受者需要確認一次,規(guī)定時間內沒有確認就會重新發(fā)。如果使用這種方式,寫業(yè)務的時候需要保證冪等性。

1.6.1.3、QoS 2 - 只分發(fā)一次

當 QoS 為 2 時,發(fā)布者和訂閱者通過兩次會話來保證消息只被傳遞一次,這是最高等級的服務質量,消息丟失和重復都是不可接受的。使用這個服務質量等級會有額外的開銷。

  • 發(fā)送者:發(fā)布 QoS 為 2 的消息之后,消息儲存起來并等待接收者回復 PUBREC 的消息。
  • 接受者:收到一條 QoS 為 2 的消息時,他會處理此消息并返回一條 PUBREC 進行應答。
  • 發(fā)送者:收到 PUBREC 消息后,丟棄掉之前的發(fā)布消息。保存 PUBREC 消息,并應答一個 PUBREL。等待接收者回復 PUBCOMP 消息
  • 接受者:當接收者收到 PUBREL 消息之后,它會丟棄掉所有已保存的狀態(tài),并回復 PUBCOMP。
  • 發(fā)送者:當發(fā)送者收到 PUBCOMP 消息之后會清空之前所保存的狀態(tài)。

核心:發(fā)送消息的時候,接受者需要確認兩次,來保證消息確實已經送到。

無論在傳輸過程中何時出現丟包,發(fā)送端都負責重發(fā)上一條消息。不管發(fā)送端是 Publisher(發(fā)送端) 還是 Broker(服務器),都是如此。因此,接收端也需要對每一條命令消息都進行應答。

1.6.2、QoS 在發(fā)布與訂閱中的區(qū)別

發(fā)布時的 QoS 表示消息發(fā)送到服務端時使用的 QoS
訂閱時的 QoS 表示服務端向自己轉發(fā)消息時可以使用的最大 QoS

  • 客戶端 A 的發(fā)布 QoS 大于客戶端 B 的訂閱 QoS 時,服務端向客戶端 B 轉發(fā)消息時使用的 QoS 為客戶端 B 的訂閱QoS。
  • 客戶端 A 的發(fā)布 QoS 小于客戶端 B 的訂閱 QoS 時,服務端向客戶端 B 轉發(fā)消息時使用的 QoS 為客戶端 A 的發(fā)布 QoS。

總結:接收端可以設置訂閱Qos為2,這樣就可以接所有qos等級消息。也就是發(fā)布消息qos為多少,那我這邊接受消息就是多少。主要以發(fā)布消息的qos為準。

1.6.3、如何選擇 MQTT QoS 等級

QoS 級別越高,流程越復雜,系統(tǒng)資源消耗越大。應用程序可以根據自己的網絡場景和業(yè)務需求,選擇合適的 QoS 級別。

以下情況下可以選擇 QoS 0

  • 可以接受消息偶爾丟失。
  • 在同一個子網內部的服務間的消息交互,或其他客戶端與服務端 網絡非常穩(wěn)定的場景。

以下情況下可以選擇 QoS 1

  • 對系統(tǒng)資源消耗較為關注,希望性能最優(yōu)化。
  • 消息不能丟失,但能接受并處理重復的消息。

以下情況下可以選擇 QoS 2

  • 不能忍受消息丟失(消息的丟失會造成生命或財產的損失),且不希望收到重復的消息。
  • 數據完整性與及時性要求較高的銀行、消防、航空等行業(yè)。

1.6.4、清除會話(Clean Session)

MQTT 客戶端向服務器發(fā)起 CONNECT 請求時,可以通過 Clean Session 標志設置是否創(chuàng)建全新的會話。

Clean Session 設置為 0 時:

  • 如果存在一個關聯(lián)此客戶標識符的會話,服務端必須基于此會話的狀態(tài)恢復與客戶端的通信。
  • 如果不存在任何關聯(lián)此客戶標識符的會話,服務端必須創(chuàng)建一個新的會話。

Clean Session 設置為 1:

  • 客戶端和服務端必須丟棄任何已存在的會話,并開始一個新的會話。

總結:監(jiān)聽端建議設置為0,一般監(jiān)聽端,我們都會配置單例,并且項目啟動就開始創(chuàng)建連接監(jiān)聽,設置為0,這樣可以保證連接的唯一性,和消息的安全性。

1.6.5、?;钚奶↘eep Alive)

MQTT 客戶端向服務器發(fā)起 CONNECT 請求時,通過 Keep Alive 參數設置?;钪芷?。

客戶端在無報文發(fā)送時,按 Keep Alive 周期定時發(fā)送 2 字節(jié)的 PINGREQ 心跳報文,服務端收到 PINGREQ 報文后,回復 2 字節(jié)的 PINGRESP 報文。

服務端在 1.5 個心跳周期內,既沒有收到客戶端發(fā)布訂閱報文,也沒有收到 PINGREQ 心跳報文時,將斷開客戶端連接。

1.6.6、保留消息(Retained Message)

MQTT 客戶端向服務器發(fā)布(PUBLISH)消息時,可以設置保留消息(Retained Message)標志。保留消息會駐留在消息服務器,后來的訂閱者訂閱主題時可以接收到最新一條(注意,是只有最近的一條)保留消息。

1.6.7、遺囑消息(Will Message)

MQTT 客戶端向服務端發(fā)送 CONNECT 請求時,可以攜帶遺囑消息。MQTT 客戶端異常下線時(客戶端斷開前未向服務器發(fā)送 DISCONNECT 消息),MQTT 消息服務器會發(fā)布遺囑消息。

在連接的時候通過調用 MqttConnectOptions 實例的 setWill 方法來設定。任何訂閱了下面的主題的客戶端都可以收到該遺囑消息。

//方法1MqttConnectOptions.setWill(MqttTopic topic, byte[] payload, int qos, boolean retained)
//方法2MqttConnectOptions.setWill(java.lang.String topic, byte[] payload, int qos, boolean retained)

以下情況下會發(fā)送 Will Message:

  • 服務端發(fā)生了I/O 錯誤或者網絡失敗;
  • 客戶端在定義的心跳時期失聯(lián);
  • 客戶端在發(fā)送下線包( DISCONNECT)之前關閉網絡連接;
  • 服務端在收到下線包之前關閉網絡連接。

總結:發(fā)送遺囑信息可以理解為,創(chuàng)建客戶端連接的時候,告訴服務器(mqtt服務器)我掛了之后,給哪些主題發(fā)這些消息。當訂閱到遺囑消息之后,他就知道監(jiān)聽端掛了,我不能給他發(fā)消息了,遺囑消息在客戶端正常調用 disconnect 方法之后并不會被發(fā)送。

高級使用場景:
這里介紹一下如何將 Retained(保留) 消息與Will (遺囑)消息結合起來進行使用。

  • 客戶端 A 遺囑消息設定為”offline“,該遺囑主題與一個普通發(fā)送狀態(tài)的主題設定成同一個 A/status;
  • 當客戶端 A 連接時,向主題 A/status 發(fā)送 “online” 的 Retained 消息,其它客戶端訂閱主題 A/status的時候,獲取 Retained 消息為 “online” ;
  • 當客戶端 A 異常斷開時,系統(tǒng)自動向主題 A/status 發(fā)送”offline“的消息,其它訂閱了此主題的客戶端會馬上收到”offline“消息;如果遺囑消息被設定了 Retained 的話,這時有新的訂閱A/status主題的客戶端上線的時候,獲取到的消息為“offline”。

2、EMQ X Cloud

2.1、EMQ X Cloud簡介

通過開放標準的物聯(lián)網協(xié)議 MQTT、MQTT over WebSocket、CoAP/LwM2M 將數以億計的物聯(lián)網設備可靠地連接到 EMQ X Cloud。通過 TLS/SSL 和基于 X.509 證書的認證確保安全的雙向通信。

在這里插入圖片描述

在該模型中,EMQ X Cloud 提供的 MQTT 服務不僅為設備與設備、設備與應用間架起橋梁,同時可將需要的數據進行持久化,以便非實時應用在后續(xù)對獲取的數據加以利用。

2.2、EMQ X Cloud優(yōu)勢

2.2.1、協(xié)議支持完整

支持 MQTT v3.1,v3.1.1 與 v5.0 協(xié)議版本,是全球首個支持 MQTT 5.0 的公有云服務,支持 MQTT WebSocket 服務,完整支持 QoS0, QoS1 與 QoS2 級別 MQTT 消息。

2.2.2、多種協(xié)議接入

支持包含 MQTT、MQTT-SN、CoAP、LwM2M、私有 TCP 協(xié)議在內的多種通信協(xié)議接入,覆蓋各類行業(yè)應用;可根據您的特殊使用場景定制私有化功能,充分契合業(yè)務需求。

2.2.3、容量預估與伸縮

通過連接數與消息吞吐量自動預估容量,通過緊密的監(jiān)控來制定伸縮計劃,集群大小可隨業(yè)務規(guī)模平滑調整。

2.3、EMQ X 和 RabbitMQ對比

EMQ X 是基于高并發(fā)的 Erlang/OTP 語言平臺開發(fā),支持百萬級連接、分布式集群架構、發(fā)布訂閱模式的開源 MQTT 消息服務器。開源至今,EMQ X 在全球物聯(lián)網市場得到了廣泛應用。在開源版基礎上,還陸續(xù)發(fā)展了商業(yè)版和提供云版本(cloud-hosting)(https://www.emqx.com/zh/cloud)。EMQ X 支持很多插件,具有強大拓展能力,用戶依靠插件可以實現更多的功能。

RabbitMQ 是實現了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ 服務器也是基于 Erlang 語言開發(fā)的,現在可以通過插件配置的形式,使其支持 MQTT 協(xié)議。

2.3.1、測試場景

以下的測試均使用了 QoS 1 的消息。當發(fā)送 QoS 1 的消息時,這些消息每次都要作為可持久化的備份保存在硬盤上。所以隊列空間的使用也尤為重要。

這次評測使用了一個云主機 M5 large 的實例,每個 MQTT 消息服務器集群由 3 個節(jié)點組成,每個節(jié)點的配置是雙核,8GB 內存。需要強調的是,我們對于 EMQ X 和 RabbitMQ 的測試使用了完全一致的硬件資源以消除變量。

壓力測試將會有兩個場景,「多對一」 和 「一對多」。

多對一:
許多設備作為發(fā)布者,如溫度傳感器或者是壓力傳感器,發(fā)送數據給一個服務器。服務器再將這些數據發(fā)送給一個控制器(即訂閱者)處理這些數據。

在這里插入圖片描述

一對多:
一個控制器作為發(fā)布者將消息傳送給服務器,再由服務器將這些消息傳送給多個作為訂閱者的設備。

在這里插入圖片描述

在每個場景里,「多」的那一方的數量將會從 2000 個逐漸上升到 10000 個。每個場景里,每一秒會發(fā)送一條載荷為 256 字節(jié)的消息。這樣的發(fā)布并不會造成過大的吞吐量。僅僅使用 256 字節(jié)載荷是為了展示出這兩個服務器的工作原理,以及他們的集群模式如何對這些場景作出反應的。

2.3.2、測試結果

左側Y軸是指 CPU 占用,底部X軸是指「多」側的客戶端數量變化。

多對一:
從 「多對一」 的結果可以看出,EMQ X 和 RabbitMQ 相比并沒有太大差別。

在這里插入圖片描述

一對多:
但是從「一對多」的結果來看,RabbitMQ 相比于 EMQ X 確實有很明顯的差距。

在這里插入圖片描述

2.3.3、測試總結

結果表明:在「多對一」 場景中,EMQ X 和 RabbitMQ 相比并沒有太大差別;而在「一對多」場景中,RabbitMQ 則較 EMQ X 產生了較為明顯的差距。相比較而言,rabbitmq使用MQTT協(xié)議,和EMQX使用MQTT協(xié)議存在著一定的差距。

2.3.4、注意

使用MQTT的發(fā)布-訂閱模型不能滿足使用要求??梢赃x擇使用AMQP。

3、Eclipse Paho Java

Paho Java客戶端是用Java編寫的MQTT客戶端庫,用于開發(fā)在JVM或其他Java兼容平臺(例如Android)上運行的應用程序。
Paho不僅可以對接EMQ X Broker,還可以對接滿足符合MQTT協(xié)議規(guī)范的消息代理服務端,目前Paho可以支持到MQTT5.0以下版本。MQTT3.3.1協(xié)議版本基本能滿足百分之九十多的接入場景。

4、SpringBoot整合Eclipse Paho Java

EMQX是消息服務器,而我們java想要發(fā)送消息,和訂閱消息都是和服務器打交道,想要和服務器打交道就需要想辦法連上他,這時候就需要用到了Eclipse Paho Java客戶端,用來在java當中連接EMQX消息服務器。

下面案例是按照我的應用場景來寫的,監(jiān)聽單獨用了一個客戶端存入了內存,使用了static變量,啟動項目的時候初始化,發(fā)送客戶端并沒有存入內存,而是發(fā)送一條,創(chuàng)建一個客戶端。這里有一點需要注意,客戶端id一定不要重復,就是對于MQTT服務器來說,clientid一定要保持唯一。

4.1、導入依賴

我用的springboot版本是2.3.9.RELEASE

    <!-- mqtt -->
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-stream</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <!--配置文件報錯問題-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.22</version>
		<scope>provided</scope>
	</dependency>

4.2、讀取配置

在application.yml當中添加

mqtt:
  hostUrl: tcp://192.168.56.103:1883
  username: admin
  password: public
  client-id: equipment_main
  cleanSession: true
  reconnect: true
  timeout: 100
  keepAlive: 100
  defaultTopic: client:report:1
  isOpen: true
  qos: 1

通過這個文件來讀取配置

@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {

    /**
     * 用戶名
     */
    private String username;

    /**
     * 密碼
     */
    private String password;

    /**
     * 連接地址
     */
    private String hostUrl;

    /**
     * 客戶端Id,同一臺服務器下,不允許出現重復的客戶端id
     */
    private String clientId;

    /**
     * 默認連接主題
     */
    private String defaultTopic;

    /**
     * 超時時間
     */
    private int timeout;

    /**
     * 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端
     * 發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制
     */
    private int keepAlive;

    /**
     * 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連
     * 接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
     */
    private Boolean cleanSession;

    /**
     * 是否斷線重連
     */
    private Boolean reconnect;

    /**
     * 啟動的時候是否關閉mqtt
     */
    private Boolean isOpen;

    /**
     * 連接方式
     */
    private Integer qos;
}

4.3、添加mqtt接受服務的客戶端

@Component
public class MqttAcceptClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);

    @Autowired
    private MqttAcceptCallback mqttAcceptCallback;

    @Autowired
    private MqttProperties mqttProperties;

    public static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttAcceptClient.client = client;
    }

    /**
     * 客戶端連接
     */
    public void connect() {
        MqttClient client;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(mqttProperties.getCleanSession());
            MqttAcceptClient.setClient(client);
            try {
                // 設置回調
                client.setCallback(mqttAcceptCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新連接
     */
    public void reconnection() {
        try {
            client.connect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱某個主題
     *
     * @param topic 主題
     * @param qos   連接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("==============開始訂閱主題==============" + topic);
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 取消訂閱某個主題
     *
     * @param topic
     */
    public void unsubscribe(String topic) {
        logger.info("==============開始取消訂閱主題==============" + topic);
        try {
            client.unsubscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

4.4、添加mqtt接受服務的回調類

@Component
public class MqttAcceptCallback implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 客戶端斷開后觸發(fā)
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("連接斷開,可以做重連");
        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
            logger.info("emqx重新連接....................................................");
            mqttAcceptClient.reconnection();
        }
    }

    /**
     * 客戶端收到消息觸發(fā)
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("接收消息主題 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息內容 : " + new String(mqttMessage.getPayload()));
//        int i = 1/0;
    }

    /**
     * 發(fā)布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主題:" + topic + "發(fā)送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq服務器后觸發(fā)
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
        // 以/#結尾表示訂閱所有以test開頭的主題
        // 訂閱所有機構主題
        mqttAcceptClient.subscribe("client:report:1", 0);
    }
}

4.5、添加mqtt發(fā)送客戶端

@Component
public class MqttSendClient {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendClient.class);

    @Autowired
    private MqttSendCallBack mqttSendCallBack;

    @Autowired
    private MqttProperties mqttProperties;

    public MqttClient connect() {
        MqttClient client = null;
        try {
            String uuid = UUID.randomUUID().toString().replaceAll("-","");
            client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(true);
            options.setAutomaticReconnect(false);
            try {
                // 設置回調
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return client;
    }

    /**
     * 發(fā)布消息
     * 主題格式: server:report:$orgCode(參數實際使用機構代碼)
     *
     * @param retained    是否保留
     * @param orgCode     orgId
     * @param pushMessage 消息體
     */
    public void publish(boolean retained, String orgCode, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttDeliveryToken token;
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish("server:report:" + orgCode, message);
        } catch (MqttException e) {
            e.printStackTrace();
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

    /**
     * 關閉連接
     *
     * @param mqttClient
     */
    public static void disconnect(MqttClient mqttClient) {
        try {
            if (mqttClient != null) mqttClient.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 釋放資源
     *
     * @param mqttClient
     */
    public static void close(MqttClient mqttClient) {
        try {
            if (mqttClient != null) mqttClient.close();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

4.6、添加mqtt發(fā)送客戶端的回調類

@Component
public class MqttSendCallBack implements MqttCallbackExtended {

    private static final Logger logger = LoggerFactory.getLogger(MqttSendCallBack.class);

    /**
     * 客戶端斷開后觸發(fā)
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("連接斷開,可以做重連");
    }

    /**
     * 客戶端收到消息觸發(fā)
     *
     * @param topic       主題
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("接收消息主題 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息內容 : " + new String(mqttMessage.getPayload()));
    }

    /**
     * 發(fā)布消息成功
     *
     * @param token token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        String[] topics = token.getTopics();
        for (String topic : topics) {
            logger.info("向主題:" + topic + "發(fā)送消息成功!");
        }
        try {
            MqttMessage message = token.getMessage();
            byte[] payload = message.getPayload();
            String s = new String(payload, "UTF-8");
            logger.info("消息的內容是:" + s);
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 連接emq服務器后觸發(fā)
     *
     * @param b
     * @param s
     */
    @Override
    public void connectComplete(boolean b, String s) {
        logger.info("--------------------ClientId:"
                + MqttAcceptClient.client.getClientId() + "客戶端連接成功!--------------------");
    }
}

4.7、添加配置類

自定義配置,通過這個配置,來控制啟動項目的時候是否啟動mqtt

public class MqttCondition implements Condition {

    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
        //1、能獲取到ioc使用的beanfactory
        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
        //2、獲取類加載器
        ClassLoader classLoader = context.getClassLoader();
        //3、獲取當前環(huán)境信息
        Environment environment = context.getEnvironment();
        String isOpen = environment.getProperty("mqtt.isOpen");
        return Boolean.valueOf(isOpen);
    }
}

4.8、啟動服務的時候開啟監(jiān)聽客戶端

@Configuration
public class MqttConfig {

    @Autowired
    private MqttAcceptClient mqttAcceptClient;

    /**
     * 訂閱mqtt
     *
     * @return
     */
    @Conditional(MqttCondition.class)
    @Bean
    public MqttAcceptClient getMqttPushClient() {
        mqttAcceptClient.connect();
        return mqttAcceptClient;
    }
}

4.9、測試類

@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttSendClient MqttSendClient;

    @GetMapping(value = "/publishTopic")
    public Object publishTopic(String sendMessage) {
        System.out.println("message:"+sendMessage);
        sendMessage=sendMessage+" : {\"name\":\"ljf\",\"age\":345}";
        MqttSendClient.publish(false,"client:report:2",sendMessage);
        return null;
    }

}

5、發(fā)送和監(jiān)聽消息測試

在這里插入圖片描述

測試發(fā)送消息:
訪問:http://localhost:8080/mqtt/publishTopic?sendMessage=222

在這里插入圖片描述

測試監(jiān)聽:

在這里插入圖片描述

6、總結

EMQX官網:https://www.emqx.com/zh/blog/introduction-to-mqtt-qos

到此這篇關于springboot使用EMQX(MQTT協(xié)議)的實現的文章就介紹到這了,更多相關springboot使用EMQX內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 解決Maven依賴沖突的方法

    解決Maven依賴沖突的方法

    本文主要介紹了解決Maven依賴沖突的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-05-05
  • Spring注解中@Autowired和@Bean的區(qū)別詳解

    Spring注解中@Autowired和@Bean的區(qū)別詳解

    這篇文章主要詳細介紹了Spring注解中@Autowired和@Bean二者有什么區(qū)別,文中通過兩個注解的使用場景介紹了二者的區(qū)別,感興趣的同學可以參考閱讀
    2023-06-06
  • Servlet+MyBatis項目轉Spring Cloud微服務,多數據源配置修改建議

    Servlet+MyBatis項目轉Spring Cloud微服務,多數據源配置修改建議

    今天小編就為大家分享一篇關于Servlet+MyBatis項目轉Spring Cloud微服務,多數據源配置修改建議,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧
    2019-01-01
  • javaweb servlet中使用請求轉發(fā)亂碼的實現

    javaweb servlet中使用請求轉發(fā)亂碼的實現

    下面小編就為大家?guī)硪黄猨avaweb servlet中使用請求轉發(fā)亂碼的實現。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2016-08-08
  • SpringBoot集成JWT實現Token登錄驗證的示例代碼

    SpringBoot集成JWT實現Token登錄驗證的示例代碼

    隨著技術的發(fā)展,分布式web應用的普及,通過session管理用戶登錄狀態(tài)成本越來越高,因此慢慢發(fā)展成為token的方式做登錄身份校驗,本文就來介紹一下SpringBoot集成JWT實現Token登錄驗證的示例代碼,感興趣的可以了解一下
    2023-12-12
  • Java實現復原IP地址的方法

    Java實現復原IP地址的方法

    這篇文章主要介紹了Java實現復原IP地址的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-02-02
  • java shiro實現退出登陸清空緩存

    java shiro實現退出登陸清空緩存

    本篇文章主要介紹了java shiro實現退出登陸清空緩存,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02
  • Java中的runnable 和 callable 區(qū)別解析

    Java中的runnable 和 callable 區(qū)別解析

    Runnable接口用于定義不需要返回結果的任務,而Callable接口可以返回結果并拋出異常,通常與Future結合使用,Runnable適用于簡單的后臺任務和定時任務,而Callable適用于并行計算、異步操作和復雜任務,選擇使用哪個接口取決于具體的應用場景,感興趣的朋友一起看看吧
    2025-03-03
  • java中的session對象如何獲取

    java中的session對象如何獲取

    這篇文章主要介紹了java中的session對象如何獲取,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java中數組與集合的相互轉換實現解析

    Java中數組與集合的相互轉換實現解析

    這篇文章主要介紹了Java中數組與集合的相互轉換實現解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-08-08

最新評論