欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布(示例代碼)

 更新時(shí)間:2024年06月21日 11:02:45   作者:Json  
這篇文章主要介紹了SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布的相關(guān)知識(shí),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧

SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布

背景 java框架SpringBoot通過(guò)mQTT通信 控制物聯(lián)網(wǎng)設(shè)備

還是直接上代碼

第一步依賴(lài):

      <!--mqtt相關(guān)依賴(lài)-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.14</version>
        </dependency>

第二步配置文件

#mqtt
mqtt:
  mqttUrl: tcp://127.0.0.1
  mqttPort: 1883
  mqttUsername: admin
  mqttPassword: public
  mqttClientId: aaa
# MQTT回調(diào)類(lèi)型 按一個(gè)MQTT服務(wù)區(qū)分
# 如果MQTT服務(wù)端換了 回調(diào)處理的是新的業(yè)務(wù)需求  就把這個(gè)換了
#  然后在MQTT配置文件中擴(kuò)展新的回調(diào)類(lèi)
mqttTypeCallback: breakerCallback  

第三步 config類(lèi)

package com.xxx.iotjava.mqtt.config;
import com.xxx.iotjava.mqtt.callback.BreakerCallback;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
/**
 * User:Json
 * Date: 2024/6/17
 **/
@Configuration
@Slf4j
public class MqttConfig {
    @Value("${mqtt.mqttUsername}")
    private String mqttUsername;
    @Value("${mqtt.mqttPassword}")
    private String mqttPassword;
    @Value("${mqtt.mqttUrl}")
    private String mqttUrl;
    @Value("${mqtt.mqttPort}")
    private Integer mqttPort;
    @Value("${mqtt.mqttClientId}")
    private String mqttClientId;
    @Value("${mqttTypeCallback}")
    private String mqttTypeCallback;
    private static String breakerCallback = "breakerCallback";
    /**
     * 客戶(hù)端對(duì)象
     */
    private MqttClient client;
    /**
     * 客戶(hù)端連接服務(wù)端
     * 目前只支持一個(gè) MQTT服務(wù)端 如果后續(xù)一個(gè)項(xiàng)目有多個(gè)MQTT服務(wù)端那就設(shè)計(jì)成工廠模式
     */
    public boolean connect() {
        if (isMqtt()){
            return false;
        }
        try {
            //new MemoryPersistence() 使用內(nèi)存持久化
            // 優(yōu)點(diǎn):不會(huì)在文件系統(tǒng)中創(chuàng)建任何文件(如 .lck 文件),適合對(duì)會(huì)話持久性沒(méi)有要求的場(chǎng)景。
            // 缺點(diǎn):  客戶(hù)端斷開(kāi)連接或重啟后,會(huì)話數(shù)據(jù)會(huì)丟失,無(wú)法保留訂閱信息和未發(fā)送的消息
           // String persistenceDirectory = "/path/to/your/mqtt/persistence";
            //new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化
             //如果persistenceDirectory 不寫(xiě) 他默認(rèn)創(chuàng)建 根目錄 linux要給權(quán)限
            // 優(yōu)點(diǎn): 客戶(hù)端斷開(kāi)連接或重啟后,能夠保留訂閱信息和未發(fā)送的消息。這對(duì)于需要保持會(huì)話狀態(tài)的應(yīng)用非常重要
            // 缺點(diǎn) 會(huì)在指定的目錄中創(chuàng)建文件(如 .lck 文件),需要確保指定的目錄是有效的,并且應(yīng)用有權(quán)限訪問(wèn)該目錄
            //創(chuàng)建MQTT客戶(hù)端對(duì)象
            client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence());
            //連接設(shè)置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,設(shè)置false表示服務(wù)器會(huì)保留客戶(hù)端的連接記錄(訂閱主題,qos),客戶(hù)端重連之后能獲取到服務(wù)器在客戶(hù)端斷開(kāi)連接期間推送的消息
            //設(shè)置為true表示每次連接服務(wù)器都是以新的身份
            //如果他為true 會(huì)出現(xiàn)一個(gè)問(wèn)題 
            //當(dāng)網(wǎng)絡(luò)斷開(kāi)后,客戶(hù)端會(huì)進(jìn)行重連,但是重連之前訂閱的主題就失效了,不再接受之前訂閱主題的消息。
             //因?yàn)榕渲美飳leanSession 設(shè)為 true ,當(dāng)客戶(hù)端掉線時(shí) ,
            //服務(wù)器端會(huì)清除 客戶(hù)端 session 。 重連后 客戶(hù)端會(huì)有一個(gè)新的session。 
            // 所以如果大家把他為true 重新連接mqtt后,要注意需要手動(dòng)再訂閱一下主題
            // 推薦文檔:https://www.cnblogs.com/A-yes/p/9894144.html
            options.setCleanSession(true);
            //設(shè)置連接用戶(hù)名
            options.setUserName(mqttUsername);
            //設(shè)置連接密碼
            options.setPassword(mqttPassword.toCharArray());
            options.setAutomaticReconnect(true);  // 啟用自動(dòng)重連
            //設(shè)置超時(shí)時(shí)間,單位為秒  如果在指定的時(shí)間內(nèi)未能建立連接,客戶(hù)端會(huì)放棄連接嘗試并拋出異常。
            options.setConnectionTimeout(100);
            //設(shè)置心跳時(shí)間 單位為秒,表示服務(wù)器每隔 1.5*20秒的時(shí)間向客戶(hù)端發(fā)送心跳判斷客戶(hù)端是否在線
            options.setKeepAliveInterval(20);
            //設(shè)置遺囑消息的話題,若客戶(hù)端和服務(wù)器之間的連接意外斷開(kāi),服務(wù)器將發(fā)布客戶(hù)端的遺囑信息
            //  options.setWill("willTopic",(mqttClientId + ":與服務(wù)器斷開(kāi)連接").getBytes(),0,false);
            if (StringUtils.isEmpty(mqttTypeCallback)) {
                log.error("MQTT回調(diào)類(lèi)型為空,請(qǐng)去java_config配置文件配置!");
            }
            //設(shè)置回調(diào)
            if (breakerCallback.equals(mqttTypeCallback)) {
                //斷路器回調(diào)
                client.setCallback(new BreakerCallback());
            }
            client.connect(options);
            return true;
        } catch (MqttException e) {
            log.error("MQTT啟動(dòng)報(bào)錯(cuò):" + e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    /**
     * qos
     * 0  最多一次傳遞【適用于對(duì)消息丟失不敏感的場(chǎng)景,如傳感器數(shù)據(jù)頻繁發(fā)送,可以接受偶爾的數(shù)據(jù)丟失】
     * 1 至少一次傳遞  【消息至少傳遞一次,但可能會(huì)重復(fù)(即重復(fù)消息)】
     * 2 僅一次傳遞 【消息確保僅傳遞一次,既不會(huì)丟失也不會(huì)重復(fù)?!?
     * retained
     * 保留消息:如果 retained 參數(shù)設(shè)置為 true,消息會(huì)被代理保留。代理將記住這個(gè)消息,并在新客戶(hù)端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。
     * 非保留消息:如果 retained 參數(shù)設(shè)置為 false,消息不會(huì)被保留,只會(huì)發(fā)送給當(dāng)前在線并訂閱該主題的客戶(hù)端。
     * topic    主題
     * message  內(nèi)容
     */
    public void publish(int qos, boolean retained, String topic, String message) {
        log.info("topic為:【"+topic+"】,qos為:【"+qos+"】 mqtt 發(fā)布數(shù)據(jù)為:"+message);
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained); //代理將記住這個(gè)消息,并在新客戶(hù)端訂閱該主題時(shí)立即發(fā)送這個(gè)消息。
        mqttMessage.setPayload(message.getBytes());
        //主題的目的地,用于發(fā)布信息
        MqttTopic mqttTopic = client.getTopic(topic);
        MqttDeliveryToken token;
        try {
            //將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
            token = mqttTopic.publish(mqttMessage);
            //token.waitForCompletion(); // 等待完成 會(huì)堵塞
        } catch (MqttException e) {
            log.warn("ClientId【" + mqttClientId + "】發(fā)布失敗!主題【" + topic + "】,發(fā)布數(shù)據(jù)為:" + message);
            e.printStackTrace();
        }
    }
    /**
     * 斷開(kāi)連接
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /***
     *  手動(dòng)連接
     *  可用于斷線后 手動(dòng)重連
     * ***/
    public boolean againConnect() {
        try {
            if (client != null && !client.isConnected()) {
                client.connect();
            }
            return true;
        } catch (MqttException e) {
            e.printStackTrace();
            return false;
        }
    }
    //驗(yàn)證是否啟動(dòng)mqtt連接
    private boolean isMqtt(){
        if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort)
                || StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword)
                || StringUtils.isEmpty(mqttClientId)
        ) {
            log.info("==========mqtt 參數(shù)不全,無(wú)需啟動(dòng)MQTT連接==================");
            return true;
        }
        return false;
    }
    /**
     * 訂閱指定主題
     * @param topic 訂閱的主題
     * @param qos   訂閱的服務(wù)質(zhì)量
     */
    public boolean subscribe(String topic, int qos) {
        if (isMqtt()){
            return false;
        }
        try {
            if (client != null && client.isConnected()) {
                client.subscribe(topic, qos);
                log.info("訂閱主題 {} 成功!", topic);
            } else {
                log.error("MQTT客戶(hù)端尚未連接,無(wú)法訂閱主題 {}!", topic);
            }
            return true;
        } catch (MqttException e) {
            log.error("訂閱主題 {} 失?。簕}", topic, e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 批量訂閱主題
     *  消息等級(jí),和主題數(shù)組一一對(duì)應(yīng),服務(wù)端將按照指定等級(jí)給訂閱了主題的客戶(hù)端推送消息
     * @param topic 訂閱的主題集合
     * @param qos   訂閱的服務(wù)質(zhì)量集合
     */
    public boolean subscribe(String[] topic, int[] qos) {
        if (isMqtt()){
            return false;
        }
        try {
            if (client != null && client.isConnected()) {
                client.subscribe(topic, qos);
                log.info("訂閱主題 {} 成功!", topic);
            } else {
                log.error("MQTT客戶(hù)端尚未連接,無(wú)法訂閱主題 {}!", topic);
            }
            return true;
        } catch (MqttException e) {
            log.error("訂閱主題 {} 失?。簕}", topic, e.getMessage());
            e.printStackTrace();
            return false;
        }
    }
}

第四步 回調(diào)類(lèi)

package com.xxx.iotjava.mqtt.callback;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxx.iotjava.entities.BreakerData;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum;
import com.xxx.iotjava.enums.breaker.BreakerOperationEnum;
import com.xxx.iotjava.service.inteface.IBreakerDataService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import com.alibaba.fastjson.JSONArray;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
 * User:Json
 * Date: 2024/6/17
 **/
@Component
@Slf4j
public class BreakerCallback implements MqttCallback {
    @Autowired
    IBreakerDataService iBreakerDataService;
    /**
     * 與服務(wù)器斷開(kāi)的回調(diào) 
     *  這里可以做手動(dòng)連接 但是配置config類(lèi) 配置了 自動(dòng)檢測(cè)異常 true 這里可以也不做
     *      options.setAutomaticReconnect(true);  // 啟用自動(dòng)重連
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("MQTT連接有異常:" + throwable.getMessage());
    }
    /**
     * 訂閱的回調(diào)
     * 消息到達(dá)的回調(diào)
     * 注意 如果這個(gè)回調(diào)方法 如果有異常 報(bào)錯(cuò) ,mqtt會(huì)重新連接 
     * 因?yàn)榕渲梦募?設(shè)置了  options.setAutomaticReconnect(true);  // 啟用自動(dòng)重連
     * 如果自動(dòng)重連了 如果是開(kāi)啟新的會(huì)話 以前的訂閱會(huì)消失  具體操作 再上面的配置文件類(lèi)說(shuō)明過(guò)了
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//        System.out.println("上報(bào)時(shí)間:"+ LocalDateTime.now());
//        System.out.println(String.format("接收消息主題 : %s",topic));
//        System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos()));
//        System.out.println(String.format("接收消息內(nèi)容 : %s",new String(mqttMessage.getPayload())));
//        System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained()));
    }
    /**
     * 發(fā)布的回調(diào)
     * 消息發(fā)布成功的回調(diào)
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        IMqttAsyncClient client = token.getClient();
        log.info(client.getClientId() + "發(fā)布消息成功!");
    }
}

第五步 mqtt工具類(lèi)

package com.xxx.iotjava.utils;
import com.xxx.init.utils.AppContextUtil;
import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum;
import com.xxx.iotjava.mqtt.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
/**
 * User:Json
 * Date: 2024/6/17
 **/
@Slf4j
public class MqttUtils {
    private static MqttConfig mqttConfig;
    public static MqttConfig getMqttConfig() {
        if (mqttConfig == null)
            mqttConfig = AppContextUtil.getBean(MqttConfig.class);
        return mqttConfig;
    }
    //初始化 訂閱
    public  static boolean subscribeInit(){
        return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0);
    }
    /**
     * 發(fā)送消息
     * qos 0 最多一次傳遞  1 至少一次傳遞  2 僅一次傳遞
     * retained  true 保留消息  false 非保留消息
     * topic    主題
     * message  內(nèi)容
     */
    public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) {
        try {
            getMqttConfig().publish(qos, retained, topic, message);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("MQtt發(fā)送消息報(bào)錯(cuò):" + e.getMessage());
            return false;
        }
    }
    /*
     * topic 主題
     * message 內(nèi)容
     * */
    public static boolean sendMqttMsg(String topic, String message) {
        return sendMqttMsg(1, false, topic, message);
    }
}

第六步 調(diào)用測(cè)試

發(fā)布
MqttUtils.sendMqttMsg(topic, data)
//訂閱 我做的是啟動(dòng)的時(shí)候 初始化訂閱 所以 直接根據(jù)定義的 topic 常量進(jìn)行初始化訂閱
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定義的topic 枚舉類(lèi) 常量
// 這里就不分享了
MqttUtils.subscribeInit();

完成

到此這篇關(guān)于SpringBoot中使用MQTT實(shí)現(xiàn)消息的訂閱和發(fā)布功能的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息的訂閱和發(fā)布內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Spring Cloud Hystrix入門(mén)和Hystrix命令原理分析

    Spring Cloud Hystrix入門(mén)和Hystrix命令原理分析

    這篇文章主要介紹了Spring Cloud Hystrix入門(mén)和Hystrix命令原理分析,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-08-08
  • 淺談java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制

    淺談java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制

    今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著java如何實(shí)現(xiàn)Redis的LRU緩存機(jī)制展開(kāi),文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • SpringBoot JPA出現(xiàn)錯(cuò)誤:No identifier specified for en解決方案

    SpringBoot JPA出現(xiàn)錯(cuò)誤:No identifier specified&nb

    這篇文章主要介紹了SpringBoot JPA出現(xiàn)錯(cuò)誤:No identifier specified for en解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • springboot如何靜態(tài)加載@configurationProperties

    springboot如何靜態(tài)加載@configurationProperties

    這篇文章主要介紹了springboot如何靜態(tài)加載@configurationProperties,本文一個(gè)錯(cuò)誤案例和成功案例結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-07-07
  • Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析

    Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析

    這篇文章主要介紹了Spring mvc AJAX技術(shù)實(shí)現(xiàn)原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • springboot清除字符串前后空格與防xss攻擊方法

    springboot清除字符串前后空格與防xss攻擊方法

    這篇文章主要介紹了springboot清除字符串前后空格與防xss攻擊方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • JAVA獲取Image的三種方式

    JAVA獲取Image的三種方式

    這篇文章主要介紹了JAVA獲取Image的三種方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-11-11
  • 解決idea找不到或無(wú)法加載主類(lèi)的錯(cuò)誤處理

    解決idea找不到或無(wú)法加載主類(lèi)的錯(cuò)誤處理

    這篇文章主要介紹了解決idea找不到或無(wú)法加載主類(lèi)的錯(cuò)誤處理,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-04-04
  • Spring MVC攔截器的基本使用方法

    Spring MVC攔截器的基本使用方法

    這篇文章主要給大家介紹了關(guān)于Spring MVC攔截器的基本使用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring MVC具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-07-07
  • java猜數(shù)字小游戲案例

    java猜數(shù)字小游戲案例

    這篇文章主要為大家詳細(xì)介紹了java猜數(shù)字小游戲案例,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-10-10

最新評(píng)論