欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java MQTT實(shí)戰(zhàn)應(yīng)用

 更新時間:2025年06月27日 11:23:43   作者:某代碼  
本文詳解MQTT協(xié)議,涵蓋其發(fā)布/訂閱機(jī)制、低功耗高效特性、三種服務(wù)質(zhì)量等級(QoS0/1/2),以及客戶端、代理、主題的核心概念,最后提供Linux部署教程、SpringBoot整合方法與MQTTX工具下載鏈接,對java mqtt相關(guān)知識感興趣的朋友一起看看吧

一、MQTT協(xié)議

MQTT(Message Queuing Telemetry Transport)是一種輕量級的發(fā)布/訂閱式消息傳遞協(xié)議,專為物聯(lián)網(wǎng)(IoT)和嵌入式設(shè)備設(shè)計,它簡化了設(shè)備之間的通信,并優(yōu)化帶寬使用。 

在MQTT中,消息的發(fā)送者稱為“發(fā)布者”(Publisher)消息的接收者稱為“訂閱者”(Subscriber),而消息的中轉(zhuǎn)站是“代理”(Broker)。發(fā)布者將消息發(fā)布到特定的“主題”(Topic),代理負(fù)責(zé)將消息轉(zhuǎn)發(fā)給所有訂閱了該主題的訂閱者。這種模式解耦了消息的發(fā)送者和接收者,使得系統(tǒng)更加靈活和可擴(kuò)展。

二、MQTT優(yōu)點(diǎn)

  • 低功耗、高效、可靠。
  • 輕量級:協(xié)議設(shè)計簡潔,消息頭部開銷小,適用于低帶寬和低功耗設(shè)備。
  • 支持發(fā)布/訂閱模式:設(shè)備可以發(fā)布消息到主題,其他設(shè)備可以訂閱對應(yīng)的主題接收消息。這一模式解耦了消息生產(chǎn)者和消費(fèi)者,簡化了系統(tǒng)架構(gòu),提高了靈活性和可擴(kuò)展性。
  • 可拓展性和兼容性:MQTT允許使用不同的傳輸協(xié)議,包括TCP、WebSocket等。它的簡單性使得它易于與其他協(xié)議和服務(wù)集成。
  • 持久化會話:MQTT支持消息持久化,允許設(shè)備在斷線后重新連接時恢復(fù)之前的會話狀態(tài),包括未完成的訂閱和未收到的消息隊(duì)列,這對于網(wǎng)絡(luò)不穩(wěn)定或經(jīng)常斷開的物聯(lián)網(wǎng)環(huán)境尤為重要。

三、三種服務(wù)質(zhì)量等級

  • QoS = 0(最多一次):消息最多被傳遞一次,可能丟失,但不會重復(fù)。此級別提供的可靠性最低,一旦消息被客戶端發(fā)送出去,它不會等待任何確認(rèn),即“Fire and Forget”模式。這意味著發(fā)布者不會確認(rèn)消息是否到達(dá)Broker,也不會嘗試重傳失敗的消息)
  • QoS = 1(至少一次):消息至少被傳遞一次,可能會重復(fù),但不會丟失。此級別保證消息至少被送達(dá)一次,但有可能被重復(fù)發(fā)送。在QoS 1下,Broker(消息隊(duì)列服務(wù)器)會發(fā)送PUBACK確認(rèn)消息給客戶端,如果客戶端沒有收到確認(rèn),則會重發(fā)消息,直到收到確認(rèn)為止。因此,雖然可以確保消息不會丟失,但也可能導(dǎo)致相同消息被多次接收
  • QoS = 2(恰好一次):消息保證被傳遞一次且僅一次,不會丟失也不會重復(fù)。這是MQTT提供的最高級別服務(wù)質(zhì)量,確保每條消息只會被接收一次,提供最嚴(yán)格的可靠性保證。該機(jī)制通過一個復(fù)雜的四次握手過程實(shí)現(xiàn),包括消息標(biāo)識符的確認(rèn)和釋放,確保消息既不丟失也不重復(fù)

四、客戶端、代理、主題

MQTT協(xié)議中,三個核心概念分別是客戶端(Client)、代理(Broker)和主題(Topic),它們共同構(gòu)成了MQTT通信的基礎(chǔ)框架,實(shí)現(xiàn)了消息的發(fā)布與訂閱機(jī)制。

1. 客戶端(Client):

作用:客戶端可以是消息的發(fā)布者(Publisher)或訂閱者(Subscriber),也可以同時具備這兩種角色。發(fā)布者負(fù)責(zé)向MQTT系統(tǒng)中的某個主題發(fā)布消息;訂閱者則訂閱感興趣的主題,以接收來自該主題的消息??蛻舳丝梢允莻鞲衅?、手機(jī)應(yīng)用、服務(wù)器程序等各種設(shè)備或應(yīng)用。

相互關(guān)系:客戶端不直接相互通信,而是通過Broker中轉(zhuǎn)消息。發(fā)布者客戶端向Broker發(fā)送消息,而訂閱者客戶端從Broker接收消息。

 2. 代理(Broker):

作用:Broker是MQTT通信的中心節(jié)點(diǎn),它接收來自發(fā)布者客戶端的消息,并根據(jù)消息中的主題分發(fā)給相應(yīng)的訂閱者客戶端。Broker負(fù)責(zé)維護(hù)客戶端的連接狀態(tài)、存儲消息(如果需要持久化)、管理主題的訂閱關(guān)系等。

相互關(guān)系:Broker是客戶端之間的中介,它管理著所有的消息流動。每個客戶端都與Broker建立連接,無論發(fā)布還是訂閱操作,都必須通過Broker來完成。

3. 主題(Topic):

作用:主題是MQTT中消息的分類標(biāo)簽,類似于一個消息通道或者頻道。每個消息都會關(guān)聯(lián)一個主題,發(fā)布者通過指定主題來決定消息的去向,而訂閱者通過訂閱特定主題來接收相關(guān)消息。

相互關(guān)系:主題是連接發(fā)布者與訂閱者的橋梁。發(fā)布者向特定主題發(fā)布消息,而訂閱者則通過訂閱這些主題來接收消息。Broker根據(jù)主題匹配規(guī)則,確保消息被正確地路由到已訂閱該主題的所有客戶端。主題可以是靜態(tài)的字符串,也可以包含通配符(如"+“和”#”)來實(shí)現(xiàn)靈活的匹配規(guī)則。

五、實(shí)戰(zhàn)應(yīng)用

1. 安裝部署(linux)

 -- 拉取鏡像

docker pull emqx/emqx:5.0.26

-- 安裝容器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26

2. 訪問控制臺

訪問:ip:18083

默認(rèn)的用戶名密碼:admin/public

3. 客戶端認(rèn)證

4. 創(chuàng)建用戶

5. SpringBoot中整合

5.1 導(dǎo)入jar包
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-configuration-processor</artifactId>
  <optional>true</optional>
</dependency>
5.2 yml配置
mqtt:
  #MQTT-服務(wù)器連接地址,如果有多個,用逗號隔開
  host: tcp://192.168.17.101:1883
  #MQTT-連接服務(wù)器默認(rèn)客戶端ID,可以隨便寫
  clientId: mqtt_test
  #MQTT-用戶名
  username: zhangsan
  #MQTT-密碼
  password: 123456
  #MQTT-指定消息的推送和訂閱主題
  topic: test
  #連接超時
  timeout: 100
  #設(shè)置會話心跳時間
  keepalive: 10
5.3 MqttConfig.java
@Slf4j
@Configuration
@ConfigurationProperties("mqtt")
@Data
public class MqttConfig {
    String host;
    String clientId;
    String topic;
    String username;
    String password;
    Integer timeout;
    Integer keepalive;
    // MQTT客戶端的配置類,可以設(shè)置mqtt服務(wù)器的賬號和密碼
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 設(shè)置是否自動重連
        options.setAutomaticReconnect(true);
        // false 保持會話不被清理自動重連后才能收到訂閱的主題消息(包括離線時發(fā)布的消息)
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        return options;
    }
    // MqttClient 類,MQTT的客戶端類,可以去連接MQTT服務(wù)器
    @Bean
    public MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) {
        try {
            MqttClient client = new MqttClient(host, clientId);
            // 回調(diào)對象,監(jiān)聽消息的獲取,采用的接口回調(diào),可以獲取對應(yīng)訂閱到的消息
            client.setCallback(new MessageCallback(client, this.topic, mqttConnectOptions));
            // 連接
            client.connect(mqttConnectOptions());
            return client;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("mqtt 連接異常");
        }
    }
}
5.4 MessageCallback.java
/**
 * consumer 消費(fèi)者,對收到的消息進(jìn)行處理
 */
//@Component
@Slf4j
public class MessageCallback implements MqttCallbackExtended {
    private MqttClient client;
    private String topic;
    private MqttConnectOptions mqttConnectOptions;
    public MessageCallback() {
    }
    public MessageCallback(MqttClient mqttClient, String topic, MqttConnectOptions mqttConnectOptions) {
        this.client = mqttClient;
        this.topic = topic;
        this.mqttConnectOptions = mqttConnectOptions;
    }
    // 在客戶端連接斷開時觸發(fā)
    @Override
    public void connectionLost(Throwable throwable) {
        if (client != null && !client.isConnected()) {
            log.info("{}, 連接斷開,正在reconnect....", client.getClientId());
            try {
                client.reconnect();
                // client.connect(this.mqttConnectOptions);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            log.info("未知異常,連接斷開");
        }
    }
    // 在客戶端與服務(wù)器連接成功時觸發(fā)
    @Override
    public void connectComplete(boolean b, String url) {
        log.info("{} 上線了{(lán)} {}", client.getClientId(), b, url);
        try {
            client.subscribe(this.topic, 0);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    // 在客戶端收到訂閱的消息時觸發(fā)
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("接收消息主題 : " + topic);
        log.info("接收消息內(nèi)容 : " + new String(message.getPayload()));
        String msg = new String(message.getPayload());
        try {
            JSONObject jsonObject = JSON.parseObject(msg);
            String clientId = String.valueOf(jsonObject.get("clientid"));
            if (topic.endsWith("disconnected")) {
                log.info("設(shè)備{}已掉線", clientId);
            } else if (topic.endsWith("connected")) {
                log.info("設(shè)備{}已上線", clientId);
            } else {
                log.info("其他主題的消息");
            }
        } catch (JSONException e) {
            log.error("JSON Format Parsing Exception : {}", msg);
        }
    }
    // 在客戶端發(fā)送消息至服務(wù)器成功時觸發(fā)
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------" + token.isComplete());
    }
}
5.5 MqttUtil.java
@Component
@Slf4j
public class MqttUtil {
    @Autowired(required = false)
    private MqttClient client;
    /**
     * 訂閱主題
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 訂閱主題
     *
     * @param topic
     */
    public void subscribe(String topic) {
        try {
            client.subscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 發(fā)布消息
     *
     * @param qos         連接方式 0,1,2 默認(rèn)0
     * @param retained    是否保留最新的消息
     * @param topic       訂閱主題
     * @param pushMessage 消息體
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic = client.getTopic(topic);
        if (null == mqttTopic) {
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            // 發(fā)送消息
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 發(fā)布消息
     *
     * @param topic       主題
     * @param pushMessage 消息內(nèi)容
     */
    public void publish(String topic, String pushMessage) {
        publish(0, true, topic, pushMessage);
    }
}
5.6 MqttController.java
@RestController
@Slf4j
public class MqttController {
    @Autowired
    MqttClient client;
    @Autowired
    MqttUtil mqttUtil;
    @GetMapping("/send")
    public String send() {
        try {
            for (int i = 0; i < 3; i++) {
                mqttUtil.publish("test", "消息hello" + i);
                log.info("發(fā)送成功:{}", i);
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
}

六、MQTTX官網(wǎng)地址

MQTT客戶端工具M(jìn)QTTX下載地址 : MQTTX:全功能 MQTT 客戶端工具

到此這篇關(guān)于Java MQTT實(shí)戰(zhàn)應(yīng)用的文章就介紹到這了,更多相關(guān)Java MQTT內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)

    hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié)

    這篇文章主要介紹了hibernate一對多關(guān)聯(lián)映射學(xué)習(xí)小結(jié),需要的朋友可以參考下
    2017-09-09
  • SpringBoot WebService服務(wù)端&客戶端使用案例教程

    SpringBoot WebService服務(wù)端&客戶端使用案例教程

    這篇文章主要介紹了SpringBoot WebService服務(wù)端&客戶端使用案例教程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-10-10
  • Java?String類的理解及字符串常量池介紹

    Java?String類的理解及字符串常量池介紹

    這篇文章主要介紹了Java?String類的理解及字符串常量池介紹,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-09-09
  • 利用logback filter過濾某個類 屏蔽某個類

    利用logback filter過濾某個類 屏蔽某個類

    這篇文章主要介紹了利用logback filter過濾某個類 屏蔽某個類的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Java運(yùn)算符>、>>、>>>三者的區(qū)別

    Java運(yùn)算符>、>>、>>>三者的區(qū)別

    這篇文章主要介紹了Java運(yùn)算符>、>>、>>>三者的區(qū)別,做了一個簡單的對比,并用實(shí)例說明,需要的朋友可以參考下
    2014-06-06
  • 方法參數(shù)屬性params,@PathVariable和@RequestParam用法及區(qū)別

    方法參數(shù)屬性params,@PathVariable和@RequestParam用法及區(qū)別

    這篇文章主要介紹了方法參數(shù)屬性params,@PathVariable和@RequestParam用法及區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • springboot整合mongodb使用詳解

    springboot整合mongodb使用詳解

    MongoDB是一個文檔數(shù)據(jù)庫(以?JSON?為數(shù)據(jù)模型),由C++語言編寫,旨在為WEB應(yīng)用提供可擴(kuò)展的高性能數(shù)據(jù)存儲解決方案,本文就給大家介紹一下詳細(xì)介紹一下springboot整合mongodb使用,需要的朋友可以參考下
    2023-07-07
  • 解決Aop @AfterReturning因返回類型不一致導(dǎo)致無法執(zhí)行切面代碼

    解決Aop @AfterReturning因返回類型不一致導(dǎo)致無法執(zhí)行切面代碼

    這篇文章主要介紹了解決Aop @AfterReturning因返回類型不一致導(dǎo)致無法執(zhí)行切面代碼問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Spring Boot如何動態(tài)創(chuàng)建Bean示例代碼

    Spring Boot如何動態(tài)創(chuàng)建Bean示例代碼

    這篇文章主要給大家介紹了關(guān)于Spring Boot如何動態(tài)創(chuàng)建Bean的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-09-09
  • Java 實(shí)現(xiàn)Excel文檔添加超鏈接的代碼

    Java 實(shí)現(xiàn)Excel文檔添加超鏈接的代碼

    超鏈接即內(nèi)容鏈接,通過給特定對象設(shè)置超鏈接,可實(shí)現(xiàn)載體與特定網(wǎng)頁、文件、郵件、網(wǎng)絡(luò)等的鏈接,點(diǎn)擊鏈接載體可打開鏈接目標(biāo),在文檔處理中是一種比較常用的功能,本文將介紹通過Java程序給Excel文檔添加超鏈接的方法,感興趣的朋友一起看看吧
    2020-02-02

最新評論