SpringBoot整合MOTT動(dòng)態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端
MQTT介紹:
概述:
MQTT是一種輕量級(jí)的消息傳輸協(xié)議(Message Queuing Telemetry Transport),旨在實(shí)現(xiàn)設(shè)備之間的低帶寬和高延遲的通信。它是基于發(fā)布/訂閱模式(Publish/Subscribe)的消息協(xié)議,最初由IBM開發(fā),現(xiàn)在成為了一種開放的標(biāo)準(zhǔn),被廣泛應(yīng)用于物聯(lián)網(wǎng)(IoT)領(lǐng)域。
MQTT特點(diǎn)包括:
1、輕量級(jí):MQTT協(xié)議設(shè)計(jì)簡單,消息頭部輕量,適用于受限環(huán)境的設(shè)備,如傳感器、嵌入式設(shè)備等。
2、簡單易用:MQTT采用發(fā)布/訂閱模式,消息的發(fā)送者(發(fā)布者)和接收者(訂閱者)之間解耦,通信過程簡單易理解。
3、低帶寬、高延遲:MQTT協(xié)議設(shè)計(jì)考慮了網(wǎng)絡(luò)帶寬受限和延遲較高的情況,能夠在不理想的網(wǎng)絡(luò)環(huán)境下保持穩(wěn)定的消息傳輸。
4、可靠性:MQTT支持消息的持久化和確認(rèn)機(jī)制,確保消息的可靠傳輸,同時(shí)提供了QoS(Quality of Service)等級(jí),可以根據(jù)實(shí)際需求進(jìn)行靈活配置。
5、靈活性:MQTT支持多種消息格式和負(fù)載類型,可以傳輸文本、二進(jìn)制數(shù)據(jù)等多種類型的消息,同時(shí)支持SSL/TLS加密,保障通信安全。
6、適用于多種場景:由于其輕量級(jí)和靈活性,MQTT被廣泛應(yīng)用于物聯(lián)網(wǎng)、傳感器網(wǎng)絡(luò)、遠(yuǎn)程監(jiān)控、消息通知等場景,成為連接設(shè)備的重要通信協(xié)議之一。
話不多說,直接看代碼如何連接
因項(xiàng)目需求,本次做的是在項(xiàng)目啟動(dòng)時(shí),動(dòng)態(tài)讀取數(shù)據(jù)庫中已經(jīng)配置好的mqtt連接信息,并且根據(jù)這些信息動(dòng)態(tài)的循環(huán)連接服務(wù)端,在接收到消息后進(jìn)行持久化和相關(guān)邏輯處理。
一、首先加載依賴
<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
二、因?yàn)槭且陧?xiàng)目啟動(dòng)時(shí)候連接,但是又要等項(xiàng)目初始化后拿到要用的mapper,所以在這個(gè)類中需要實(shí)現(xiàn)ApplicationRunner接口,而沒有用其他的方法,有多種實(shí)現(xiàn)但是我用的這個(gè)
package com.ruoyi; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.ruoyi.system.domain.mqtt.MqttBean; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * @author 1097022316 * 啟動(dòng)后建立MQTT連接 并對(duì)數(shù)據(jù)持久化和相關(guān)邏輯處理 */ @Component public class StartInit implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); //這里是從數(shù)據(jù)庫中查詢出所有的mqtt連接相關(guān)信息,如ip、topic等 List<DeviceCollector> collectorList = collectorService.list(new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic")); System.err.println("collectorList = " + collectorList); //如果沒有任何連接 直接結(jié)束 if (CollUtil.isEmpty(collectorList)) { return; } List<MqttBean> mqttBeans = new ArrayList<>(); //把從數(shù)據(jù)庫中查詢出來的信息組裝成mqttclient連接所需要的對(duì)象 //一般都是ip、port、username、password、topic、clientid 這里是簡單的用法,如有高級(jí)用法可自行摸索(順便在下面評(píng)論教一下) collectorList.forEach(mqtt -> { MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId()); mqttBeans.add(bean); }); //對(duì)我們組裝的mqtt連接對(duì)象信息進(jìn)行遍歷循環(huán)連接 mqttBeans.forEach(bean -> { try { MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); //設(shè)置相關(guān)連接參數(shù),有些是必要有些是非必要 可自行點(diǎn)進(jìn)去查看源碼 connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(true); connOpts.setUserName(bean.getUserName()); connOpts.setPassword(bean.getPassword().toCharArray()); mqttClient.connect(connOpts); //把連接對(duì)象加入到全局 mqttClients.put(bean.getTopics(), mqttClient); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { System.out.println("Reconnected successfully. url = " + serverURI); } else { System.out.println("Connected successfully for the first time."); } } /** * 設(shè)置重連機(jī)制 */ @Override public void connectionLost(Throwable cause) { System.err.println(bean.getTopics() + "連接丟失" + cause.getMessage()); if (!mqttClient.isConnected()) { try { Thread.sleep(1000 * 60 * 5); //嘗試連接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新連接,重新訂閱!"); } } catch (InterruptedException e) { MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了" + e.getMessage(), "CLCW"); throw new RuntimeException(e); } } } @Override public void messageArrived(String topic, MqttMessage message) { //這里是當(dāng)消息推送時(shí)我們做的事情 System.out.println("dosomethings..."); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("消息發(fā)送完整"); } }); mqttClient.subscribe(bean.getTopics(), 2); } catch (Exception e) { e.printStackTrace(); System.err.println(bean.getTopics() + "MQTT連接出異常了"); MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了"+e.getMessage(),"LJCW"); try { Thread.sleep(1000 * 60 * 30); //嘗試連接 System.out.println("bean = " + bean); boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId())); if (flag) { System.err.println(bean.getTopics() + "重新連接,重新訂閱!"); } } catch (InterruptedException ex) { //可以在這里把報(bào)錯(cuò)信息存入本地看看 throw new RuntimeException(ex); } } }); } }
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author 1097022316 */ @Data @AllArgsConstructor @NoArgsConstructor public class MqttBean { private String url; private String userName; private String password; private String topics; private String clientId; }
重點(diǎn)是MqttClient中幾個(gè)參數(shù)和配置參數(shù),以及那幾個(gè)重寫的方法,看下源碼就好了。這里用的比較粗糙,只是簡單的實(shí)現(xiàn)了連接和重連,一些復(fù)雜的如心跳或者遺囑啥的都沒用,要研究可自行查看
以上就是SpringBoot整合MOTT動(dòng)態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot MOTT讀取數(shù)據(jù)庫的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接
這篇文章主要為大家詳細(xì)介紹了Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06Springboot整合hibernate validator 全局異常處理步驟詳解
本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補(bǔ)呢文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01Spring創(chuàng)建bean對(duì)象三種方式代碼實(shí)例
這篇文章主要介紹了Spring創(chuàng)建bean對(duì)象三種方式代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07Java遍歷字符串和統(tǒng)計(jì)字符個(gè)數(shù)的操作方法
這篇文章主要介紹了Java遍歷字符串和統(tǒng)計(jì)字符個(gè)數(shù)的操作方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12使用Jackson來實(shí)現(xiàn)Java對(duì)象與JSON的相互轉(zhuǎn)換的教程
這篇文章主要介紹了使用Jackson來實(shí)現(xiàn)Java對(duì)象與JSON的互相轉(zhuǎn)換的教程,文中羅列了3中Jackson的使用方式,需要的朋友可以參考下2016-01-01Java中線程組ThreadGroup與線程池的區(qū)別及示例
這篇文章主要介紹了Java中線程組與線程池的區(qū)別及示例,ThreadGroup是用來管理一組線程的,可以控制線程的執(zhí)行,查看線程的執(zhí)行狀態(tài)等操作,方便對(duì)于一組線程的統(tǒng)一管理,需要的朋友可以參考下2023-05-05