欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot實(shí)現(xiàn)MQTT通信的示例代碼

 更新時(shí)間:2025年01月24日 10:18:01   作者:就不告訴你嘿嘿  
本文主要介紹了Springboot實(shí)現(xiàn)MQTT通信的示例代碼,包含了MQTT協(xié)議的特點(diǎn)和工作原理等,具有一定的參考價(jià)值,感興趣的可以了解一下

MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級(jí)消息傳輸協(xié)議,常用于物聯(lián)網(wǎng)(IoT)場(chǎng)景中。它設(shè)計(jì)簡(jiǎn)潔、帶寬占用少,非常適合資源受限的設(shè)備和網(wǎng)絡(luò)環(huán)境。

一、MQ協(xié)議

MQTT 特點(diǎn)

  • 輕量級(jí)協(xié)議

    • 設(shè)計(jì)簡(jiǎn)單,占用帶寬少,特別適合嵌入式設(shè)備和不穩(wěn)定的網(wǎng)絡(luò)環(huán)境。
  • 發(fā)布/訂閱模型

    • 客戶端通過(guò)主題(Topic)發(fā)布消息,訂閱者通過(guò)主題接收消息,彼此不直接通信。
  • 可靠性保障

    • 提供三種服務(wù)質(zhì)量(QoS)等級(jí),確保消息可靠傳輸:
      • QoS 0:至多一次(不確認(rèn),可能丟失)。
      • QoS 1:至少一次(需要確認(rèn),但可能重復(fù))。
      • QoS 2:僅一次(確保消息不丟失且不重復(fù))。
  • 持續(xù)連接

    • 使用 TCP/IP 連接,通過(guò)心跳包(Keep-Alive)保持連接穩(wěn)定。
  • 支持離線消息

    • 使用“保留消息”和“持久會(huì)話”功能,實(shí)現(xiàn)離線設(shè)備接收消息。
  • 安全性

    • 支持 SSL/TLS 加密,結(jié)合用戶名和密碼進(jìn)行身份驗(yàn)證。

MQTT 工作原理

  • 連接

    • 客戶端通過(guò) CONNECT 消息向服務(wù)器建立連接,服務(wù)器返回 CONNACK 消息。
  • 發(fā)布

    • 客戶端通過(guò) PUBLISH 消息向服務(wù)器發(fā)布消息,指定消息的主題。
  • 訂閱

    • 客戶端通過(guò) SUBSCRIBE 消息訂閱一個(gè)或多個(gè)主題,服務(wù)器將匹配主題的消息推送給客戶端。
  • 心跳

    • 客戶端和服務(wù)器定期發(fā)送心跳包(PINGREQ 和 PINGRESP),確保連接有效。
  • 斷開(kāi)

    • 客戶端通過(guò) DISCONNECT 消息通知服務(wù)器主動(dòng)斷開(kāi)連接。

MQTT 主要應(yīng)用場(chǎng)景

  • 物聯(lián)網(wǎng)(IoT)

    • 設(shè)備狀態(tài)監(jiān)控、數(shù)據(jù)收集和遠(yuǎn)程控制。
  • 智能家居

    • 控制家電、監(jiān)控傳感器數(shù)據(jù)。
  • 車聯(lián)網(wǎng)

    • 實(shí)時(shí)車輛數(shù)據(jù)傳輸、位置追蹤。
  • 移動(dòng)應(yīng)用

    • 消息推送、實(shí)時(shí)聊天。
  • 工業(yè)領(lǐng)域

    • 設(shè)備數(shù)據(jù)采集和分析。

MQTT 配置與注意事項(xiàng)

  • 主題命名

    • 使用層級(jí)結(jié)構(gòu)(如 /iot/device/status),便于管理。
    • 避免過(guò)于復(fù)雜的主題結(jié)構(gòu)。
  • QoS 選擇

    • 根據(jù)應(yīng)用需求選擇適合的 QoS 等級(jí),平衡可靠性和性能。
  • 安全措施

    • 啟用 SSL/TLS 加密。
    • 配置用戶名和密碼,限制匿名連接。
    • 控制主題的訪問(wèn)權(quán)限。
  • 性能優(yōu)化

    • 控制消息大小,減少帶寬占用。
    • 調(diào)整心跳時(shí)間,優(yōu)化連接穩(wěn)定性。

二、MQTT服務(wù)器搭建

1、在springboot項(xiàng)目工程pom文件下引入相關(guān)依賴

        <!--mqtt相關(guān)依賴-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、修改application.yml配置文件

spring:
  application:
    name: provider
    #MQTT配置信息
  mqtt:
     #MQTT服務(wù)地址,端口號(hào)默認(rèn)1883,如果有多個(gè),用逗號(hào)隔開(kāi)
    url: tcp://127.0.0.1:1883
     #用戶名
    username: guest
     #密碼
    password: guest
     #客戶端id(不能重復(fù))
    client:
        id: provider-id
    #MQTT默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口是指定
    default: 
        topic: topic
server:
  port: 8080

3、消息發(fā)布者客戶端配置

?
package com.three.demo.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;

@Configuration
@Slf4j
public class MqttClientConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 客戶端對(duì)象
     */
    private MqttAsyncClient client;

    /**
     * 在bean初始化后連接到服務(wù)器
     */
    @PostConstruct
    public void init() {
        connect();
    }

    /**
     * 客戶端連接服務(wù)端
     */
    public void connect() {
        //連接設(shè)置
        MqttConnectOptions options = new MqttConnectOptions();
        //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開(kāi)連接期間推送的消息
        //設(shè)置為true表示每次連接服務(wù)器都是以新的身份
        options.setCleanSession(false);
        //設(shè)置連接用戶名
        options.setUserName(username);
        //設(shè)置連接密碼
        options.setPassword(password.toCharArray());
        //設(shè)置超時(shí)時(shí)間,單位為秒
        options.setConnectionTimeout(60);
        //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        // 開(kāi)啟自動(dòng)重連
        options.setAutomaticReconnect(true);
        // 設(shè)置最大重連時(shí)間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連
        options.setMaxReconnectDelay(5000);
        //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶端的遺囑信息
        options.setWill("willTopic", (clientId + "與服務(wù)器斷開(kāi)連接").getBytes(), 0, false);
        try {
            //創(chuàng)建MQTT客戶端對(duì)象
            client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
            //設(shè)置回調(diào)
            client.setCallback(mqttClientCallBack);
            // 使用異步連接
            client.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("MQTT連接成功");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("MQTT連接失敗:" + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            log.error("mqtt連接失敗。。" + e.getMessage());
        }
    }

    public void publish(int qos, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(pushLog.getData().getBytes());
        try {
            // 使用異步客戶端發(fā)布消息,并處理結(jié)果
            client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    System.out.println("發(fā)送成功");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("發(fā)送失?。? + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            log.error("發(fā)送失?。? + e.getMessage());
        }
    }

    /**
     * 斷開(kāi)連接
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

?

 4、消息發(fā)布客戶端回調(diào)

package com.three.demo.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 與服務(wù)器斷開(kāi)的回調(diào)
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.error(clientId + "與服務(wù)器斷開(kāi)連接!!" + cause.getMessage());
    }

	/**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId()+"發(fā)布消息成功!");
        
    }

}

5、創(chuàng)建控制器測(cè)試發(fā)布信息

package com.three.demo.mqtt.controller;
 
import com.three.demo.mqtt.config.MqttClientConfig;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
@Controller
public class SendController {
    @Autowired
    private MqttClientConfig client;
 
    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            client.publish(qos, retained, topic, message);
            return "發(fā)送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "發(fā)送失敗";
        }
    }
}

6、消息接收者配置

這里我對(duì)之前的代碼進(jìn)行改造

/**
     * 客戶端連接服務(wù)端
     */
    public void connect() {
        //連接設(shè)置
        MqttConnectOptions options = new MqttConnectOptions();
        //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務(wù)器在客戶端斷開(kāi)連接期間推送的消息
        //設(shè)置為true表示每次連接服務(wù)器都是以新的身份
        options.setCleanSession(false);
        //設(shè)置連接用戶名
        options.setUserName(username);
        //設(shè)置連接密碼
        options.setPassword(password.toCharArray());
        //設(shè)置超時(shí)時(shí)間,單位為秒
        options.setConnectionTimeout(60);
        //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*10秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        // 開(kāi)啟自動(dòng)重連
        options.setAutomaticReconnect(true);
        // 設(shè)置最大重連時(shí)間間隔 (可選),單位是毫秒,設(shè)置為 5000 表示最多等待 5 秒再嘗試重連
        options.setMaxReconnectDelay(5000);
        //設(shè)置遺囑消息的話題,若客戶端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶端的遺囑信息
        options.setWill("willTopic", (clientId + "與服務(wù)器斷開(kāi)連接").getBytes(), 0, false);
        try {
            //創(chuàng)建MQTT客戶端對(duì)象
            client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
            //設(shè)置回調(diào)
            client.setCallback(mqttClientCallBack);
            // 使用異步連接
            client.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("MQTT連接成功");
                    // 連接成功后訂閱主題
                    try {
                        //訂閱主題
                        //消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶端推送消息
                        int[] qos = {2, 2};
                        String[] topics = {
                                "/iot/msg/topic1",
                                "/iot/msg/topic2"
                        };
                        client.subscribe(topics, qos);
                        log.info("訂閱主題成功");
                    } catch (MqttException e) {
                        log.error("訂閱主題失敗:" + e.getMessage());
                    }
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("MQTT連接失?。? + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("mqtt連接失敗。。" + e.getMessage());
        }
    }

然后在消息客戶端回調(diào)類這里

package com.ruoyi.yyt.mqtt.config;
 
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
 
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;
 
   /**
     * 客戶端斷開(kāi)連接的回調(diào)
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error(clientId + "與服務(wù)器斷開(kāi)連接??!" + cause.getMessage());
    }
 
    /**
     * 消息到達(dá)的回調(diào)
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內(nèi)容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }
 
    /**
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId() + "發(fā)布消息成功!");
    }
    
}

這個(gè)時(shí)候我們啟動(dòng)服務(wù),調(diào)用測(cè)試接口

就可以看到接口返回發(fā)布成功,并且能看到后臺(tái)服務(wù)的打印日志了

 至此大功告成了!

到此這篇關(guān)于Springboot實(shí)現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關(guān)Springboot MQTT通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼

    SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼

    微信小程序支付是常見(jiàn)的一種功能,本文主要介紹了SpringBoot整合weixin-java-pay實(shí)現(xiàn)微信小程序支付的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2024-05-05
  • Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單FTP軟件 FTP軟件主界面(4)

    Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單FTP軟件 FTP軟件主界面(4)

    這篇文章主要為大家詳細(xì)介紹了Java語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單FTP軟件,F(xiàn)TP軟件主界面編寫的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-03-03
  • 圖解Java經(jīng)典算法冒泡排序的原理與實(shí)現(xiàn)

    圖解Java經(jīng)典算法冒泡排序的原理與實(shí)現(xiàn)

    冒泡排序是一種簡(jiǎn)單的排序算法,它也是一種穩(wěn)定排序算法。其實(shí)現(xiàn)原理是重復(fù)掃描待排序序列,并比較每一對(duì)相鄰的元素,當(dāng)該對(duì)元素順序不正確時(shí)進(jìn)行交換。一直重復(fù)這個(gè)過(guò)程,直到?jīng)]有任何兩個(gè)相鄰元素可以交換,就表明完成了排序
    2022-09-09
  • Spring使用Configuration注解管理bean的方式詳解

    Spring使用Configuration注解管理bean的方式詳解

    在Spring的世界里,Configuration注解就像是一位細(xì)心的園丁,它的主要職責(zé)是在這個(gè)繁花似錦的園子里,幫助我們聲明和管理各種各樣的bean,本文給大家介紹了在Spring中如何優(yōu)雅地管理你的bean,需要的朋友可以參考下
    2024-05-05
  • 怎樣提高mybatis-plus中saveBatch方法的效率

    怎樣提高mybatis-plus中saveBatch方法的效率

    這篇文章主要介紹了怎樣提高mybatis-plus中saveBatch方法的效率問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 如何將DeepSeek 集成到 Java 的 Spring Boot 項(xiàng)目中

    如何將DeepSeek 集成到 Java 的 Spring Boot&

    本文介紹了如何將DeepSeek集成到Java的SpringBoot項(xiàng)目中,包括準(zhǔn)備工作、集成步驟和示例說(shuō)明,感興趣的朋友一起看看吧
    2025-02-02
  • 基于JavaMail API收發(fā)郵件的方法

    基于JavaMail API收發(fā)郵件的方法

    這篇文章主要介紹了基于JavaMail API收發(fā)郵件的方法,實(shí)例分析了javamail的使用方法與相關(guān)注意事項(xiàng),非常具有實(shí)用價(jià)值,需要的朋友可以參考下
    2015-07-07
  • JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo)的操作代碼

    JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo)的操作代碼

    這篇文章主要介紹了JAVA實(shí)現(xiàn)Excel和PDF上下標(biāo),本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-09-09
  • Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理分析

    Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理分析

    這篇文章主要介紹了Java并發(fā)編程之synchronized底層實(shí)現(xiàn)原理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • 淺談?dòng)肧pringBoot實(shí)現(xiàn)策略模式

    淺談?dòng)肧pringBoot實(shí)現(xiàn)策略模式

    本文主要介紹了SpringBoot實(shí)現(xiàn)策略模式,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-10-10

最新評(píng)論