欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot實現(xiàn)MQTT通信的示例代碼

 更新時間:2025年01月24日 10:18:01   作者:就不告訴你嘿嘿  
本文主要介紹了Springboot實現(xiàn)MQTT通信的示例代碼,包含了MQTT協(xié)議的特點和工作原理等,具有一定的參考價值,感興趣的可以了解一下

MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模型的輕量級消息傳輸協(xié)議,常用于物聯(lián)網(IoT)場景中。它設計簡潔、帶寬占用少,非常適合資源受限的設備和網絡環(huán)境。

一、MQ協(xié)議

MQTT 特點

  • 輕量級協(xié)議

    • 設計簡單,占用帶寬少,特別適合嵌入式設備和不穩(wěn)定的網絡環(huán)境。
  • 發(fā)布/訂閱模型

    • 客戶端通過主題(Topic)發(fā)布消息,訂閱者通過主題接收消息,彼此不直接通信。
  • 可靠性保障

    • 提供三種服務質量(QoS)等級,確保消息可靠傳輸:
      • QoS 0:至多一次(不確認,可能丟失)。
      • QoS 1:至少一次(需要確認,但可能重復)。
      • QoS 2:僅一次(確保消息不丟失且不重復)。
  • 持續(xù)連接

    • 使用 TCP/IP 連接,通過心跳包(Keep-Alive)保持連接穩(wěn)定。
  • 支持離線消息

    • 使用“保留消息”和“持久會話”功能,實現(xiàn)離線設備接收消息。
  • 安全性

    • 支持 SSL/TLS 加密,結合用戶名和密碼進行身份驗證。

MQTT 工作原理

  • 連接

    • 客戶端通過 CONNECT 消息向服務器建立連接,服務器返回 CONNACK 消息。
  • 發(fā)布

    • 客戶端通過 PUBLISH 消息向服務器發(fā)布消息,指定消息的主題。
  • 訂閱

    • 客戶端通過 SUBSCRIBE 消息訂閱一個或多個主題,服務器將匹配主題的消息推送給客戶端。
  • 心跳

    • 客戶端和服務器定期發(fā)送心跳包(PINGREQ 和 PINGRESP),確保連接有效。
  • 斷開

    • 客戶端通過 DISCONNECT 消息通知服務器主動斷開連接。

MQTT 主要應用場景

  • 物聯(lián)網(IoT)

    • 設備狀態(tài)監(jiān)控、數(shù)據(jù)收集和遠程控制。
  • 智能家居

    • 控制家電、監(jiān)控傳感器數(shù)據(jù)。
  • 車聯(lián)網

    • 實時車輛數(shù)據(jù)傳輸、位置追蹤。
  • 移動應用

    • 消息推送、實時聊天。
  • 工業(yè)領域

    • 設備數(shù)據(jù)采集和分析。

MQTT 配置與注意事項

  • 主題命名

    • 使用層級結構(如 /iot/device/status),便于管理。
    • 避免過于復雜的主題結構。
  • QoS 選擇

    • 根據(jù)應用需求選擇適合的 QoS 等級,平衡可靠性和性能。
  • 安全措施

    • 啟用 SSL/TLS 加密。
    • 配置用戶名和密碼,限制匿名連接。
    • 控制主題的訪問權限。
  • 性能優(yōu)化

    • 控制消息大小,減少帶寬占用。
    • 調整心跳時間,優(yōu)化連接穩(wěn)定性。

二、MQTT服務器搭建

1、在springboot項目工程pom文件下引入相關依賴

        <!--mqtt相關依賴-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、修改application.yml配置文件

spring:
  application:
    name: provider
    #MQTT配置信息
  mqtt:
     #MQTT服務地址,端口號默認1883,如果有多個,用逗號隔開
    url: tcp://127.0.0.1:1883
     #用戶名
    username: guest
     #密碼
    password: guest
     #客戶端id(不能重復)
    client:
        id: provider-id
    #MQTT默認的消息推送主題,實際可在調用接口是指定
    default: 
        topic: topic
server:
  port: 8080

3、消息發(fā)布者客戶端配置

?
package com.three.demo.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;

@Configuration
@Slf4j
public class MqttClientConfig {

    @Value("${spring.mqtt.username}")
    private String username;

    @Value("${spring.mqtt.password}")
    private String password;

    @Value("${spring.mqtt.url}")
    private String hostUrl;

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 客戶端對象
     */
    private MqttAsyncClient client;

    /**
     * 在bean初始化后連接到服務器
     */
    @PostConstruct
    public void init() {
        connect();
    }

    /**
     * 客戶端連接服務端
     */
    public void connect() {
        //連接設置
        MqttConnectOptions options = new MqttConnectOptions();
        //是否清空session,設置false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
        //設置為true表示每次連接服務器都是以新的身份
        options.setCleanSession(false);
        //設置連接用戶名
        options.setUserName(username);
        //設置連接密碼
        options.setPassword(password.toCharArray());
        //設置超時時間,單位為秒
        options.setConnectionTimeout(60);
        //設置心跳時間 單位為秒,表示服務器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        // 開啟自動重連
        options.setAutomaticReconnect(true);
        // 設置最大重連時間間隔 (可選),單位是毫秒,設置為 5000 表示最多等待 5 秒再嘗試重連
        options.setMaxReconnectDelay(5000);
        //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
        options.setWill("willTopic", (clientId + "與服務器斷開連接").getBytes(), 0, false);
        try {
            //創(chuàng)建MQTT客戶端對象
            client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
            //設置回調
            client.setCallback(mqttClientCallBack);
            // 使用異步連接
            client.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("MQTT連接成功");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("MQTT連接失?。? + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            log.error("mqtt連接失敗。。" + e.getMessage());
        }
    }

    public void publish(int qos, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(pushLog.getData().getBytes());
        try {
            // 使用異步客戶端發(fā)布消息,并處理結果
            client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    System.out.println("發(fā)送成功");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("發(fā)送失?。? + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            log.error("發(fā)送失?。? + e.getMessage());
        }
    }

    /**
     * 斷開連接
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

?

 4、消息發(fā)布客戶端回調

package com.three.demo.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;

    /**
     * 與服務器斷開的回調
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.error(clientId + "與服務器斷開連接!!" + cause.getMessage());
    }

	/**
     * 消息發(fā)布成功的回調
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId()+"發(fā)布消息成功!");
        
    }

}

5、創(chuàng)建控制器測試發(fā)布信息

package com.three.demo.mqtt.controller;
 
import com.three.demo.mqtt.config.MqttClientConfig;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
@Controller
public class SendController {
    @Autowired
    private MqttClientConfig client;
 
    @RequestMapping("/sendMessage")
    @ResponseBody
    public String sendMessage(int qos,boolean retained,String topic,String message){
        try {
            client.publish(qos, retained, topic, message);
            return "發(fā)送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "發(fā)送失敗";
        }
    }
}

6、消息接收者配置

這里我對之前的代碼進行改造

/**
     * 客戶端連接服務端
     */
    public void connect() {
        //連接設置
        MqttConnectOptions options = new MqttConnectOptions();
        //是否清空session,設置false表示服務器會保留客戶端的連接記錄(訂閱主題,qos),客戶端重連之后能獲取到服務器在客戶端斷開連接期間推送的消息
        //設置為true表示每次連接服務器都是以新的身份
        options.setCleanSession(false);
        //設置連接用戶名
        options.setUserName(username);
        //設置連接密碼
        options.setPassword(password.toCharArray());
        //設置超時時間,單位為秒
        options.setConnectionTimeout(60);
        //設置心跳時間 單位為秒,表示服務器每隔 1.5*10秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
        options.setKeepAliveInterval(20);
        // 開啟自動重連
        options.setAutomaticReconnect(true);
        // 設置最大重連時間間隔 (可選),單位是毫秒,設置為 5000 表示最多等待 5 秒再嘗試重連
        options.setMaxReconnectDelay(5000);
        //設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
        options.setWill("willTopic", (clientId + "與服務器斷開連接").getBytes(), 0, false);
        try {
            //創(chuàng)建MQTT客戶端對象
            client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());
            //設置回調
            client.setCallback(mqttClientCallBack);
            // 使用異步連接
            client.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("MQTT連接成功");
                    // 連接成功后訂閱主題
                    try {
                        //訂閱主題
                        //消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
                        int[] qos = {2, 2};
                        String[] topics = {
                                "/iot/msg/topic1",
                                "/iot/msg/topic2"
                        };
                        client.subscribe(topics, qos);
                        log.info("訂閱主題成功");
                    } catch (MqttException e) {
                        log.error("訂閱主題失?。? + e.getMessage());
                    }
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("MQTT連接失?。? + exception.getMessage());
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
            log.error("mqtt連接失敗。。" + e.getMessage());
        }
    }

然后在消息客戶端回調類這里

package com.ruoyi.yyt.mqtt.config;
 
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
 
@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {

    @Value("${spring.mqtt.client.id}")
    private String clientId;
 
   /**
     * 客戶端斷開連接的回調
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error(clientId + "與服務器斷開連接?。? + cause.getMessage());
    }
 
    /**
     * 消息到達的回調
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主題 : %s",topic));
        System.out.println(String.format("接收消息Qos : %d",message.getQos()));
        System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b",message.isRetained()));
    }
 
    /**
     * 消息發(fā)布成功的回調
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        System.out.println(client.getClientId() + "發(fā)布消息成功!");
    }
    
}

這個時候我們啟動服務,調用測試接口

就可以看到接口返回發(fā)布成功,并且能看到后臺服務的打印日志了

 至此大功告成了!

到此這篇關于Springboot實現(xiàn)MQTT通信的示例代碼的文章就介紹到這了,更多相關Springboot MQTT通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • SpringBoot整合weixin-java-pay實現(xiàn)微信小程序支付的示例代碼

    SpringBoot整合weixin-java-pay實現(xiàn)微信小程序支付的示例代碼

    微信小程序支付是常見的一種功能,本文主要介紹了SpringBoot整合weixin-java-pay實現(xiàn)微信小程序支付的示例代碼,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學習學習吧
    2024-05-05
  • Java語言實現(xiàn)簡單FTP軟件 FTP軟件主界面(4)

    Java語言實現(xiàn)簡單FTP軟件 FTP軟件主界面(4)

    這篇文章主要為大家詳細介紹了Java語言實現(xiàn)簡單FTP軟件,F(xiàn)TP軟件主界面編寫的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-03-03
  • 圖解Java經典算法冒泡排序的原理與實現(xiàn)

    圖解Java經典算法冒泡排序的原理與實現(xiàn)

    冒泡排序是一種簡單的排序算法,它也是一種穩(wěn)定排序算法。其實現(xiàn)原理是重復掃描待排序序列,并比較每一對相鄰的元素,當該對元素順序不正確時進行交換。一直重復這個過程,直到沒有任何兩個相鄰元素可以交換,就表明完成了排序
    2022-09-09
  • Spring使用Configuration注解管理bean的方式詳解

    Spring使用Configuration注解管理bean的方式詳解

    在Spring的世界里,Configuration注解就像是一位細心的園丁,它的主要職責是在這個繁花似錦的園子里,幫助我們聲明和管理各種各樣的bean,本文給大家介紹了在Spring中如何優(yōu)雅地管理你的bean,需要的朋友可以參考下
    2024-05-05
  • 怎樣提高mybatis-plus中saveBatch方法的效率

    怎樣提高mybatis-plus中saveBatch方法的效率

    這篇文章主要介紹了怎樣提高mybatis-plus中saveBatch方法的效率問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 如何將DeepSeek 集成到 Java 的 Spring Boot 項目中

    如何將DeepSeek 集成到 Java 的 Spring Boot&

    本文介紹了如何將DeepSeek集成到Java的SpringBoot項目中,包括準備工作、集成步驟和示例說明,感興趣的朋友一起看看吧
    2025-02-02
  • 基于JavaMail API收發(fā)郵件的方法

    基于JavaMail API收發(fā)郵件的方法

    這篇文章主要介紹了基于JavaMail API收發(fā)郵件的方法,實例分析了javamail的使用方法與相關注意事項,非常具有實用價值,需要的朋友可以參考下
    2015-07-07
  • JAVA實現(xiàn)Excel和PDF上下標的操作代碼

    JAVA實現(xiàn)Excel和PDF上下標的操作代碼

    這篇文章主要介紹了JAVA實現(xiàn)Excel和PDF上下標,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-09-09
  • Java并發(fā)編程之synchronized底層實現(xiàn)原理分析

    Java并發(fā)編程之synchronized底層實現(xiàn)原理分析

    這篇文章主要介紹了Java并發(fā)編程之synchronized底層實現(xiàn)原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • 淺談用SpringBoot實現(xiàn)策略模式

    淺談用SpringBoot實現(xiàn)策略模式

    本文主要介紹了SpringBoot實現(xiàn)策略模式,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-10-10

最新評論