欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java MQTT實(shí)戰(zhàn)應(yīng)用

 更新時(shí)間:2025年06月27日 11:23:43   作者:某代碼  
本文詳解MQTT協(xié)議,涵蓋其發(fā)布/訂閱機(jī)制、低功耗高效特性、三種服務(wù)質(zhì)量等級(jí)(QoS0/1/2),以及客戶端、代理、主題的核心概念,最后提供Linux部署教程、SpringBoot整合方法與MQTTX工具下載鏈接,對(duì)java mqtt相關(guān)知識(shí)感興趣的朋友一起看看吧

一、MQTT協(xié)議

MQTT(Message Queuing Telemetry Transport)是一種輕量級(jí)的發(fā)布/訂閱式消息傳遞協(xié)議,專為物聯(lián)網(wǎng)(IoT)和嵌入式設(shè)備設(shè)計(jì),它簡化了設(shè)備之間的通信,并優(yōu)化帶寬使用。 

在MQTT中,消息的發(fā)送者稱為“發(fā)布者”(Publisher)消息的接收者稱為“訂閱者”(Subscriber),而消息的中轉(zhuǎn)站是“代理”(Broker)。發(fā)布者將消息發(fā)布到特定的“主題”(Topic),代理負(fù)責(zé)將消息轉(zhuǎn)發(fā)給所有訂閱了該主題的訂閱者。這種模式解耦了消息的發(fā)送者和接收者,使得系統(tǒng)更加靈活和可擴(kuò)展。

二、MQTT優(yōu)點(diǎn)

  • 低功耗、高效、可靠。
  • 輕量級(jí):協(xié)議設(shè)計(jì)簡潔,消息頭部開銷小,適用于低帶寬和低功耗設(shè)備。
  • 支持發(fā)布/訂閱模式:設(shè)備可以發(fā)布消息到主題,其他設(shè)備可以訂閱對(duì)應(yīng)的主題接收消息。這一模式解耦了消息生產(chǎn)者和消費(fèi)者,簡化了系統(tǒng)架構(gòu),提高了靈活性和可擴(kuò)展性。
  • 可拓展性和兼容性:MQTT允許使用不同的傳輸協(xié)議,包括TCP、WebSocket等。它的簡單性使得它易于與其他協(xié)議和服務(wù)集成。
  • 持久化會(huì)話:MQTT支持消息持久化,允許設(shè)備在斷線后重新連接時(shí)恢復(fù)之前的會(huì)話狀態(tài),包括未完成的訂閱和未收到的消息隊(duì)列,這對(duì)于網(wǎng)絡(luò)不穩(wěn)定或經(jīng)常斷開的物聯(lián)網(wǎng)環(huán)境尤為重要。

三、三種服務(wù)質(zhì)量等級(jí)

  • QoS = 0(最多一次):消息最多被傳遞一次,可能丟失,但不會(huì)重復(fù)。此級(jí)別提供的可靠性最低,一旦消息被客戶端發(fā)送出去,它不會(huì)等待任何確認(rèn),即“Fire and Forget”模式。這意味著發(fā)布者不會(huì)確認(rèn)消息是否到達(dá)Broker,也不會(huì)嘗試重傳失敗的消息)
  • QoS = 1(至少一次):消息至少被傳遞一次,可能會(huì)重復(fù),但不會(huì)丟失。此級(jí)別保證消息至少被送達(dá)一次,但有可能被重復(fù)發(fā)送。在QoS 1下,Broker(消息隊(duì)列服務(wù)器)會(huì)發(fā)送PUBACK確認(rèn)消息給客戶端,如果客戶端沒有收到確認(rèn),則會(huì)重發(fā)消息,直到收到確認(rèn)為止。因此,雖然可以確保消息不會(huì)丟失,但也可能導(dǎo)致相同消息被多次接收
  • QoS = 2(恰好一次):消息保證被傳遞一次且僅一次,不會(huì)丟失也不會(huì)重復(fù)。這是MQTT提供的最高級(jí)別服務(wù)質(zhì)量,確保每條消息只會(huì)被接收一次,提供最嚴(yán)格的可靠性保證。該機(jī)制通過一個(gè)復(fù)雜的四次握手過程實(shí)現(xiàn),包括消息標(biāo)識(shí)符的確認(rèn)和釋放,確保消息既不丟失也不重復(fù)

四、客戶端、代理、主題

MQTT協(xié)議中,三個(gè)核心概念分別是客戶端(Client)、代理(Broker)和主題(Topic),它們共同構(gòu)成了MQTT通信的基礎(chǔ)框架,實(shí)現(xiàn)了消息的發(fā)布與訂閱機(jī)制。

1. 客戶端(Client):

作用:客戶端可以是消息的發(fā)布者(Publisher)或訂閱者(Subscriber),也可以同時(shí)具備這兩種角色。發(fā)布者負(fù)責(zé)向MQTT系統(tǒng)中的某個(gè)主題發(fā)布消息;訂閱者則訂閱感興趣的主題,以接收來自該主題的消息??蛻舳丝梢允莻鞲衅?、手機(jī)應(yīng)用、服務(wù)器程序等各種設(shè)備或應(yīng)用。

相互關(guān)系:客戶端不直接相互通信,而是通過Broker中轉(zhuǎn)消息。發(fā)布者客戶端向Broker發(fā)送消息,而訂閱者客戶端從Broker接收消息。

 2. 代理(Broker):

作用:Broker是MQTT通信的中心節(jié)點(diǎn),它接收來自發(fā)布者客戶端的消息,并根據(jù)消息中的主題分發(fā)給相應(yīng)的訂閱者客戶端。Broker負(fù)責(zé)維護(hù)客戶端的連接狀態(tài)、存儲(chǔ)消息(如果需要持久化)、管理主題的訂閱關(guān)系等。

相互關(guān)系:Broker是客戶端之間的中介,它管理著所有的消息流動(dòng)。每個(gè)客戶端都與Broker建立連接,無論發(fā)布還是訂閱操作,都必須通過Broker來完成。

3. 主題(Topic):

作用:主題是MQTT中消息的分類標(biāo)簽,類似于一個(gè)消息通道或者頻道。每個(gè)消息都會(huì)關(guān)聯(lián)一個(gè)主題,發(fā)布者通過指定主題來決定消息的去向,而訂閱者通過訂閱特定主題來接收相關(guān)消息。

相互關(guān)系:主題是連接發(fā)布者與訂閱者的橋梁。發(fā)布者向特定主題發(fā)布消息,而訂閱者則通過訂閱這些主題來接收消息。Broker根據(jù)主題匹配規(guī)則,確保消息被正確地路由到已訂閱該主題的所有客戶端。主題可以是靜態(tài)的字符串,也可以包含通配符(如"+“和”#”)來實(shí)現(xiàn)靈活的匹配規(guī)則。

五、實(shí)戰(zhàn)應(yīng)用

1. 安裝部署(linux)

 -- 拉取鏡像

docker pull emqx/emqx:5.0.26

-- 安裝容器

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.26

2. 訪問控制臺(tái)

訪問:ip:18083

默認(rèn)的用戶名密碼:admin/public

3. 客戶端認(rèn)證

4. 創(chuàng)建用戶

5. SpringBoot中整合

5.1 導(dǎo)入jar包
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-configuration-processor</artifactId>
  <optional>true</optional>
</dependency>
5.2 yml配置
mqtt:
  #MQTT-服務(wù)器連接地址,如果有多個(gè),用逗號(hào)隔開
  host: tcp://192.168.17.101:1883
  #MQTT-連接服務(wù)器默認(rèn)客戶端ID,可以隨便寫
  clientId: mqtt_test
  #MQTT-用戶名
  username: zhangsan
  #MQTT-密碼
  password: 123456
  #MQTT-指定消息的推送和訂閱主題
  topic: test
  #連接超時(shí)
  timeout: 100
  #設(shè)置會(huì)話心跳時(shí)間
  keepalive: 10
5.3 MqttConfig.java
@Slf4j
@Configuration
@ConfigurationProperties("mqtt")
@Data
public class MqttConfig {
    String host;
    String clientId;
    String topic;
    String username;
    String password;
    Integer timeout;
    Integer keepalive;
    // MQTT客戶端的配置類,可以設(shè)置mqtt服務(wù)器的賬號(hào)和密碼
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        // 設(shè)置是否自動(dòng)重連
        options.setAutomaticReconnect(true);
        // false 保持會(huì)話不被清理自動(dòng)重連后才能收到訂閱的主題消息(包括離線時(shí)發(fā)布的消息)
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        return options;
    }
    // MqttClient 類,MQTT的客戶端類,可以去連接MQTT服務(wù)器
    @Bean
    public MqttClient mqttClient(MqttConnectOptions mqttConnectOptions) {
        try {
            MqttClient client = new MqttClient(host, clientId);
            // 回調(diào)對(duì)象,監(jiān)聽消息的獲取,采用的接口回調(diào),可以獲取對(duì)應(yīng)訂閱到的消息
            client.setCallback(new MessageCallback(client, this.topic, mqttConnectOptions));
            // 連接
            client.connect(mqttConnectOptions());
            return client;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("mqtt 連接異常");
        }
    }
}
5.4 MessageCallback.java
/**
 * consumer 消費(fèi)者,對(duì)收到的消息進(jìn)行處理
 */
//@Component
@Slf4j
public class MessageCallback implements MqttCallbackExtended {
    private MqttClient client;
    private String topic;
    private MqttConnectOptions mqttConnectOptions;
    public MessageCallback() {
    }
    public MessageCallback(MqttClient mqttClient, String topic, MqttConnectOptions mqttConnectOptions) {
        this.client = mqttClient;
        this.topic = topic;
        this.mqttConnectOptions = mqttConnectOptions;
    }
    // 在客戶端連接斷開時(shí)觸發(fā)
    @Override
    public void connectionLost(Throwable throwable) {
        if (client != null && !client.isConnected()) {
            log.info("{}, 連接斷開,正在reconnect....", client.getClientId());
            try {
                client.reconnect();
                // client.connect(this.mqttConnectOptions);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            log.info("未知異常,連接斷開");
        }
    }
    // 在客戶端與服務(wù)器連接成功時(shí)觸發(fā)
    @Override
    public void connectComplete(boolean b, String url) {
        log.info("{} 上線了{(lán)} {}", client.getClientId(), b, url);
        try {
            client.subscribe(this.topic, 0);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    // 在客戶端收到訂閱的消息時(shí)觸發(fā)
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("接收消息主題 : " + topic);
        log.info("接收消息內(nèi)容 : " + new String(message.getPayload()));
        String msg = new String(message.getPayload());
        try {
            JSONObject jsonObject = JSON.parseObject(msg);
            String clientId = String.valueOf(jsonObject.get("clientid"));
            if (topic.endsWith("disconnected")) {
                log.info("設(shè)備{}已掉線", clientId);
            } else if (topic.endsWith("connected")) {
                log.info("設(shè)備{}已上線", clientId);
            } else {
                log.info("其他主題的消息");
            }
        } catch (JSONException e) {
            log.error("JSON Format Parsing Exception : {}", msg);
        }
    }
    // 在客戶端發(fā)送消息至服務(wù)器成功時(shí)觸發(fā)
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------" + token.isComplete());
    }
}
5.5 MqttUtil.java
@Component
@Slf4j
public class MqttUtil {
    @Autowired(required = false)
    private MqttClient client;
    /**
     * 訂閱主題
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 訂閱主題
     *
     * @param topic
     */
    public void subscribe(String topic) {
        try {
            client.subscribe(topic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 發(fā)布消息
     *
     * @param qos         連接方式 0,1,2 默認(rèn)0
     * @param retained    是否保留最新的消息
     * @param topic       訂閱主題
     * @param pushMessage 消息體
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic = client.getTopic(topic);
        if (null == mqttTopic) {
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            // 發(fā)送消息
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * 發(fā)布消息
     *
     * @param topic       主題
     * @param pushMessage 消息內(nèi)容
     */
    public void publish(String topic, String pushMessage) {
        publish(0, true, topic, pushMessage);
    }
}
5.6 MqttController.java
@RestController
@Slf4j
public class MqttController {
    @Autowired
    MqttClient client;
    @Autowired
    MqttUtil mqttUtil;
    @GetMapping("/send")
    public String send() {
        try {
            for (int i = 0; i < 3; i++) {
                mqttUtil.publish("test", "消息hello" + i);
                log.info("發(fā)送成功:{}", i);
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
}

六、MQTTX官網(wǎng)地址

MQTT客戶端工具M(jìn)QTTX下載地址 : MQTTX:全功能 MQTT 客戶端工具

到此這篇關(guān)于Java MQTT實(shí)戰(zhàn)應(yīng)用的文章就介紹到這了,更多相關(guān)Java MQTT內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java向上取整的幾種常見實(shí)現(xiàn)方法

    Java向上取整的幾種常見實(shí)現(xiàn)方法

    這篇文章主要介紹了Java向上取整的幾種常見實(shí)現(xiàn)方法,包括整數(shù)除法技巧、Math.ceil()函數(shù)、手動(dòng)檢查余數(shù)、位運(yùn)算和使用BigDecimal的setScale方法,每種方法都有其適用場景,選擇合適的方法可以提高代碼的性能和可讀性,需要的朋友可以參考下
    2024-12-12
  • MyBatis-Plus工具使用之EntityWrapper解析

    MyBatis-Plus工具使用之EntityWrapper解析

    這篇文章主要介紹了MyBatis-Plus工具使用之EntityWrapper解析,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • spark中使用groupByKey進(jìn)行分組排序的示例代碼

    spark中使用groupByKey進(jìn)行分組排序的示例代碼

    這篇文章主要介紹了spark中使用groupByKey進(jìn)行分組排序的實(shí)例代碼,本文通過實(shí)例代碼給大家講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-03-03
  • springmvc常用注解標(biāo)簽詳解

    springmvc常用注解標(biāo)簽詳解

    本篇文章主要介紹了springmvc常用注解標(biāo)簽詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-07-07
  • Java新手環(huán)境搭建 JDK8安裝配置教程

    Java新手環(huán)境搭建 JDK8安裝配置教程

    這篇文章主要為大家詳細(xì)介紹了Java新手環(huán)境搭建,JDK8安裝配置教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-03-03
  • IDEA如何加載resources文件夾下文件相對(duì)路徑

    IDEA如何加載resources文件夾下文件相對(duì)路徑

    這篇文章主要介紹了IDEA如何加載resources文件夾下文件相對(duì)路徑問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • Java工廠模式優(yōu)雅地創(chuàng)建對(duì)象以及提高代碼復(fù)用率和靈活性

    Java工廠模式優(yōu)雅地創(chuàng)建對(duì)象以及提高代碼復(fù)用率和靈活性

    Java工廠模式是一種創(chuàng)建型設(shè)計(jì)模式,通過定義一個(gè)工廠類來封裝對(duì)象的創(chuàng)建過程,將對(duì)象的創(chuàng)建和使用分離,提高代碼的可維護(hù)性和可擴(kuò)展性,同時(shí)可以實(shí)現(xiàn)更好的代碼復(fù)用和靈活性
    2023-05-05
  • Java實(shí)現(xiàn)HTTPS連接的示例代碼

    Java實(shí)現(xiàn)HTTPS連接的示例代碼

    現(xiàn)在的網(wǎng)絡(luò)世界,安全性是大家都非常關(guān)注的問題,特別是對(duì)于咱們這些程序員來說,所以,理解并實(shí)現(xiàn)HTTPS連接,對(duì)于保護(hù)咱們的數(shù)據(jù)安全是極其重要的,下面我們就來學(xué)習(xí)一下在Java中如何實(shí)現(xiàn)HTTPS連接吧
    2023-12-12
  • Spring Cloud中關(guān)于Feign的常見問題總結(jié)

    Spring Cloud中關(guān)于Feign的常見問題總結(jié)

    這篇文章主要給大家介紹了Spring Cloud中關(guān)于Feign的常見問題,文中通過示例代碼介紹的很詳細(xì),需要的朋友可以參考借鑒,下面來一起看看吧。
    2017-02-02
  • Spring AOP攔截-三種方式實(shí)現(xiàn)自動(dòng)代理詳解

    Spring AOP攔截-三種方式實(shí)現(xiàn)自動(dòng)代理詳解

    這篇文章主要介紹了Spring AOP攔截-三種方式實(shí)現(xiàn)自動(dòng)代理詳解,還是比較不錯(cuò)的,這里分享給大家,供需要的朋友參考。
    2017-11-11

最新評(píng)論