欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合MOTT動(dòng)態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端

 更新時(shí)間:2024年04月18日 08:47:47   作者:captain_wind  
MQTT是一種輕量級(jí)的消息傳輸協(xié)議(Message Queuing Telemetry Transport),旨在實(shí)現(xiàn)設(shè)備之間的低帶寬和高延遲的通信,本文給大家介紹了SpringBoot整合MOTT動(dòng)態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端,需要的朋友可以參考下

MQTT介紹:

概述:

MQTT是一種輕量級(jí)的消息傳輸協(xié)議(Message Queuing Telemetry Transport),旨在實(shí)現(xiàn)設(shè)備之間的低帶寬和高延遲的通信。它是基于發(fā)布/訂閱模式(Publish/Subscribe)的消息協(xié)議,最初由IBM開發(fā),現(xiàn)在成為了一種開放的標(biāo)準(zhǔn),被廣泛應(yīng)用于物聯(lián)網(wǎng)(IoT)領(lǐng)域。

MQTT特點(diǎn)包括:

1、輕量級(jí):MQTT協(xié)議設(shè)計(jì)簡單,消息頭部輕量,適用于受限環(huán)境的設(shè)備,如傳感器、嵌入式設(shè)備等。

2、簡單易用:MQTT采用發(fā)布/訂閱模式,消息的發(fā)送者(發(fā)布者)和接收者(訂閱者)之間解耦,通信過程簡單易理解。

3、低帶寬、高延遲:MQTT協(xié)議設(shè)計(jì)考慮了網(wǎng)絡(luò)帶寬受限和延遲較高的情況,能夠在不理想的網(wǎng)絡(luò)環(huán)境下保持穩(wěn)定的消息傳輸。

4、可靠性:MQTT支持消息的持久化和確認(rèn)機(jī)制,確保消息的可靠傳輸,同時(shí)提供了QoS(Quality of Service)等級(jí),可以根據(jù)實(shí)際需求進(jìn)行靈活配置。

5、靈活性:MQTT支持多種消息格式和負(fù)載類型,可以傳輸文本、二進(jìn)制數(shù)據(jù)等多種類型的消息,同時(shí)支持SSL/TLS加密,保障通信安全。

6、適用于多種場景:由于其輕量級(jí)和靈活性,MQTT被廣泛應(yīng)用于物聯(lián)網(wǎng)、傳感器網(wǎng)絡(luò)、遠(yuǎn)程監(jiān)控、消息通知等場景,成為連接設(shè)備的重要通信協(xié)議之一。

話不多說,直接看代碼如何連接

因項(xiàng)目需求,本次做的是在項(xiàng)目啟動(dòng)時(shí),動(dòng)態(tài)讀取數(shù)據(jù)庫中已經(jīng)配置好的mqtt連接信息,并且根據(jù)這些信息動(dòng)態(tài)的循環(huán)連接服務(wù)端,在接收到消息后進(jìn)行持久化和相關(guān)邏輯處理。

一、首先加載依賴

<!-- mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

二、因?yàn)槭且陧?xiàng)目啟動(dòng)時(shí)候連接,但是又要等項(xiàng)目初始化后拿到要用的mapper,所以在這個(gè)類中需要實(shí)現(xiàn)ApplicationRunner接口,而沒有用其他的方法,有多種實(shí)現(xiàn)但是我用的這個(gè)

package com.ruoyi;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.system.domain.mqtt.MqttBean;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author 1097022316
 * 啟動(dòng)后建立MQTT連接 并對(duì)數(shù)據(jù)持久化和相關(guān)邏輯處理
 */
@Component
public class StartInit implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        MemoryPersistence persistence = new MemoryPersistence();
        
        //這里是從數(shù)據(jù)庫中查詢出所有的mqtt連接相關(guān)信息,如ip、topic等
        List<DeviceCollector> collectorList = collectorService.list(new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic"));
        
        System.err.println("collectorList = " + collectorList);
        
        //如果沒有任何連接 直接結(jié)束
        if (CollUtil.isEmpty(collectorList)) {
            return;
        }
        
        List<MqttBean> mqttBeans = new ArrayList<>();
        
        //把從數(shù)據(jù)庫中查詢出來的信息組裝成mqttclient連接所需要的對(duì)象
        //一般都是ip、port、username、password、topic、clientid 這里是簡單的用法,如有高級(jí)用法可自行摸索(順便在下面評(píng)論教一下)
        
        collectorList.forEach(mqtt -> {
            MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId());
            mqttBeans.add(bean);
        });

        //對(duì)我們組裝的mqtt連接對(duì)象信息進(jìn)行遍歷循環(huán)連接
        mqttBeans.forEach(bean -> {
            try {
                    MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence);
                    MqttConnectOptions connOpts = new MqttConnectOptions();
                	//設(shè)置相關(guān)連接參數(shù),有些是必要有些是非必要 可自行點(diǎn)進(jìn)去查看源碼
                    connOpts.setAutomaticReconnect(false);
                    connOpts.setCleanSession(true);
                    connOpts.setUserName(bean.getUserName());
                    connOpts.setPassword(bean.getPassword().toCharArray());
                    mqttClient.connect(connOpts);
                    //把連接對(duì)象加入到全局
                    mqttClients.put(bean.getTopics(), mqttClient);

                    mqttClient.setCallback(new MqttCallbackExtended() {

                        @Override
                        public void connectComplete(boolean reconnect, String serverURI) {
                            if (reconnect) {
                                System.out.println("Reconnected successfully.  url = " + serverURI);
                            } else {
                                System.out.println("Connected successfully for the first time.");
                            }
                        }

                        /**
                         * 設(shè)置重連機(jī)制
                         */
                        @Override
                        public void connectionLost(Throwable cause) {
                            System.err.println(bean.getTopics() + "連接丟失" + cause.getMessage());
                            if (!mqttClient.isConnected()) {
                                try {
                                    Thread.sleep(1000 * 60 * 5);
                                    //嘗試連接
                                    System.out.println("bean = " + bean);
                                    boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
                                    if (flag) {
                                        System.err.println(bean.getTopics() + "重新連接,重新訂閱!");
                                    }
                                } catch (InterruptedException e) {
                                    MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了" + e.getMessage(), "CLCW");
                                    throw new RuntimeException(e);
                                }
                            }
                        }

                        @Override
                        public void messageArrived(String topic, MqttMessage message) {
							//這里是當(dāng)消息推送時(shí)我們做的事情
                            System.out.println("dosomethings...");
                        }

                        @Override
                        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                            System.out.println("消息發(fā)送完整");
                        }
                    });
                    mqttClient.subscribe(bean.getTopics(), 2);

            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(bean.getTopics() + "MQTT連接出異常了");
                MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了"+e.getMessage(),"LJCW");
                try {
                    Thread.sleep(1000 * 60 * 30);
                    //嘗試連接
                    System.out.println("bean = " + bean);
                    boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
                    if (flag) {
                        System.err.println(bean.getTopics() + "重新連接,重新訂閱!");
                    }
                } catch (InterruptedException ex) {
                    //可以在這里把報(bào)錯(cuò)信息存入本地看看
                    throw new RuntimeException(ex);
                }
            }
        });
    }
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author 1097022316
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqttBean {

    private String url;

    private String userName;

    private String password;

    private String topics;

    private String clientId;

}

重點(diǎn)是MqttClient中幾個(gè)參數(shù)和配置參數(shù),以及那幾個(gè)重寫的方法,看下源碼就好了。這里用的比較粗糙,只是簡單的實(shí)現(xiàn)了連接和重連,一些復(fù)雜的如心跳或者遺囑啥的都沒用,要研究可自行查看

以上就是SpringBoot整合MOTT動(dòng)態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot MOTT讀取數(shù)據(jù)庫的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接

    Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接

    這篇文章主要為大家詳細(xì)介紹了Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • java實(shí)現(xiàn)登錄案例

    java實(shí)現(xiàn)登錄案例

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)登錄案例的相關(guān)代碼,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-08-08
  • Spring框架中@PostConstruct注解詳解

    Spring框架中@PostConstruct注解詳解

    在Spring項(xiàng)目經(jīng)常遇到@PostConstruct注解,下面這篇文章主要給大家介紹了關(guān)于Spring框架中@PostConstruct注解的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-07-07
  • Springboot整合hibernate validator 全局異常處理步驟詳解

    Springboot整合hibernate validator 全局異常處理步驟詳解

    本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補(bǔ)呢文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2024-01-01
  • Spring創(chuàng)建bean對(duì)象三種方式代碼實(shí)例

    Spring創(chuàng)建bean對(duì)象三種方式代碼實(shí)例

    這篇文章主要介紹了Spring創(chuàng)建bean對(duì)象三種方式代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • Java遍歷字符串和統(tǒng)計(jì)字符個(gè)數(shù)的操作方法

    Java遍歷字符串和統(tǒng)計(jì)字符個(gè)數(shù)的操作方法

    這篇文章主要介紹了Java遍歷字符串和統(tǒng)計(jì)字符個(gè)數(shù)的操作方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-12-12
  • Java實(shí)現(xiàn)簡單的貪吃蛇游戲

    Java實(shí)現(xiàn)簡單的貪吃蛇游戲

    這篇文章主要介紹了Java實(shí)現(xiàn)簡單的貪吃蛇游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • 使用Jackson來實(shí)現(xiàn)Java對(duì)象與JSON的相互轉(zhuǎn)換的教程

    使用Jackson來實(shí)現(xiàn)Java對(duì)象與JSON的相互轉(zhuǎn)換的教程

    這篇文章主要介紹了使用Jackson來實(shí)現(xiàn)Java對(duì)象與JSON的互相轉(zhuǎn)換的教程,文中羅列了3中Jackson的使用方式,需要的朋友可以參考下
    2016-01-01
  • Java的內(nèi)存機(jī)制詳解

    Java的內(nèi)存機(jī)制詳解

    本文主要介紹了Java的內(nèi)存機(jī)制的相關(guān)知識(shí),具有很好的參考價(jià)值,下面跟著小編一起來看下吧
    2017-03-03
  • Java中線程組ThreadGroup與線程池的區(qū)別及示例

    Java中線程組ThreadGroup與線程池的區(qū)別及示例

    這篇文章主要介紹了Java中線程組與線程池的區(qū)別及示例,ThreadGroup是用來管理一組線程的,可以控制線程的執(zhí)行,查看線程的執(zhí)行狀態(tài)等操作,方便對(duì)于一組線程的統(tǒng)一管理,需要的朋友可以參考下
    2023-05-05

最新評(píng)論