SpringBoot整合MOTT動態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端
MQTT介紹:
概述:
MQTT是一種輕量級的消息傳輸協(xié)議(Message Queuing Telemetry Transport),旨在實(shí)現(xiàn)設(shè)備之間的低帶寬和高延遲的通信。它是基于發(fā)布/訂閱模式(Publish/Subscribe)的消息協(xié)議,最初由IBM開發(fā),現(xiàn)在成為了一種開放的標(biāo)準(zhǔn),被廣泛應(yīng)用于物聯(lián)網(wǎng)(IoT)領(lǐng)域。
MQTT特點(diǎn)包括:
1、輕量級:MQTT協(xié)議設(shè)計簡單,消息頭部輕量,適用于受限環(huán)境的設(shè)備,如傳感器、嵌入式設(shè)備等。
2、簡單易用:MQTT采用發(fā)布/訂閱模式,消息的發(fā)送者(發(fā)布者)和接收者(訂閱者)之間解耦,通信過程簡單易理解。
3、低帶寬、高延遲:MQTT協(xié)議設(shè)計考慮了網(wǎng)絡(luò)帶寬受限和延遲較高的情況,能夠在不理想的網(wǎng)絡(luò)環(huán)境下保持穩(wěn)定的消息傳輸。
4、可靠性:MQTT支持消息的持久化和確認(rèn)機(jī)制,確保消息的可靠傳輸,同時提供了QoS(Quality of Service)等級,可以根據(jù)實(shí)際需求進(jìn)行靈活配置。
5、靈活性:MQTT支持多種消息格式和負(fù)載類型,可以傳輸文本、二進(jìn)制數(shù)據(jù)等多種類型的消息,同時支持SSL/TLS加密,保障通信安全。
6、適用于多種場景:由于其輕量級和靈活性,MQTT被廣泛應(yīng)用于物聯(lián)網(wǎng)、傳感器網(wǎng)絡(luò)、遠(yuǎn)程監(jiān)控、消息通知等場景,成為連接設(shè)備的重要通信協(xié)議之一。
話不多說,直接看代碼如何連接
因項(xiàng)目需求,本次做的是在項(xiàng)目啟動時,動態(tài)讀取數(shù)據(jù)庫中已經(jīng)配置好的mqtt連接信息,并且根據(jù)這些信息動態(tài)的循環(huán)連接服務(wù)端,在接收到消息后進(jìn)行持久化和相關(guān)邏輯處理。
一、首先加載依賴
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
二、因?yàn)槭且陧?xiàng)目啟動時候連接,但是又要等項(xiàng)目初始化后拿到要用的mapper,所以在這個類中需要實(shí)現(xiàn)ApplicationRunner接口,而沒有用其他的方法,有多種實(shí)現(xiàn)但是我用的這個
package com.ruoyi;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.system.domain.mqtt.MqttBean;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 1097022316
* 啟動后建立MQTT連接 并對數(shù)據(jù)持久化和相關(guān)邏輯處理
*/
@Component
public class StartInit implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
//這里是從數(shù)據(jù)庫中查詢出所有的mqtt連接相關(guān)信息,如ip、topic等
List<DeviceCollector> collectorList = collectorService.list(new QueryWrapper<DeviceCollector>().isNotNull("collector_ip").ne("collector_ip", "").eq("is_del", "0").groupBy("collector_topic"));
System.err.println("collectorList = " + collectorList);
//如果沒有任何連接 直接結(jié)束
if (CollUtil.isEmpty(collectorList)) {
return;
}
List<MqttBean> mqttBeans = new ArrayList<>();
//把從數(shù)據(jù)庫中查詢出來的信息組裝成mqttclient連接所需要的對象
//一般都是ip、port、username、password、topic、clientid 這里是簡單的用法,如有高級用法可自行摸索(順便在下面評論教一下)
collectorList.forEach(mqtt -> {
MqttBean bean = new MqttBean("tcp://" + mqtt.getCollectorIp() + ":" + mqtt.getCollectorPort(), mqtt.getCollectorUsername(), mqtt.getCollectorPassword(), mqtt.getCollectorTopic(), mqtt.getCollectorClientId());
mqttBeans.add(bean);
});
//對我們組裝的mqtt連接對象信息進(jìn)行遍歷循環(huán)連接
mqttBeans.forEach(bean -> {
try {
MqttClient mqttClient = new MqttClient(bean.getUrl(), bean.getClientId(), persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
//設(shè)置相關(guān)連接參數(shù),有些是必要有些是非必要 可自行點(diǎn)進(jìn)去查看源碼
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(bean.getUserName());
connOpts.setPassword(bean.getPassword().toCharArray());
mqttClient.connect(connOpts);
//把連接對象加入到全局
mqttClients.put(bean.getTopics(), mqttClient);
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
System.out.println("Reconnected successfully. url = " + serverURI);
} else {
System.out.println("Connected successfully for the first time.");
}
}
/**
* 設(shè)置重連機(jī)制
*/
@Override
public void connectionLost(Throwable cause) {
System.err.println(bean.getTopics() + "連接丟失" + cause.getMessage());
if (!mqttClient.isConnected()) {
try {
Thread.sleep(1000 * 60 * 5);
//嘗試連接
System.out.println("bean = " + bean);
boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
if (flag) {
System.err.println(bean.getTopics() + "重新連接,重新訂閱!");
}
} catch (InterruptedException e) {
MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了" + e.getMessage(), "CLCW");
throw new RuntimeException(e);
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) {
//這里是當(dāng)消息推送時我們做的事情
System.out.println("dosomethings...");
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("消息發(fā)送完整");
}
});
mqttClient.subscribe(bean.getTopics(), 2);
} catch (Exception e) {
e.printStackTrace();
System.err.println(bean.getTopics() + "MQTT連接出異常了");
MyMqttUtils.saveToTxt(bean.getTopics() + "MQTT連接出異常了"+e.getMessage(),"LJCW");
try {
Thread.sleep(1000 * 60 * 30);
//嘗試連接
System.out.println("bean = " + bean);
boolean flag = MyMqttUtils.connectMqtt(new MqttBean(bean.getUrl(), bean.getUserName(), bean.getPassword(), bean.getTopics(), bean.getClientId()));
if (flag) {
System.err.println(bean.getTopics() + "重新連接,重新訂閱!");
}
} catch (InterruptedException ex) {
//可以在這里把報錯信息存入本地看看
throw new RuntimeException(ex);
}
}
});
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 1097022316
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MqttBean {
private String url;
private String userName;
private String password;
private String topics;
private String clientId;
}
重點(diǎn)是MqttClient中幾個參數(shù)和配置參數(shù),以及那幾個重寫的方法,看下源碼就好了。這里用的比較粗糙,只是簡單的實(shí)現(xiàn)了連接和重連,一些復(fù)雜的如心跳或者遺囑啥的都沒用,要研究可自行查看
以上就是SpringBoot整合MOTT動態(tài)讀取數(shù)據(jù)庫連接信息并連接MQTT服務(wù)端的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot MOTT讀取數(shù)據(jù)庫的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接
這篇文章主要為大家詳細(xì)介紹了Java使用IO流實(shí)現(xiàn)音頻的剪切和拼接,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06
Springboot整合hibernate validator 全局異常處理步驟詳解
本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補(bǔ)呢文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01
Spring創(chuàng)建bean對象三種方式代碼實(shí)例
這篇文章主要介紹了Spring創(chuàng)建bean對象三種方式代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07
Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法
這篇文章主要介紹了Java遍歷字符串和統(tǒng)計字符個數(shù)的操作方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-12-12
使用Jackson來實(shí)現(xiàn)Java對象與JSON的相互轉(zhuǎn)換的教程
這篇文章主要介紹了使用Jackson來實(shí)現(xiàn)Java對象與JSON的互相轉(zhuǎn)換的教程,文中羅列了3中Jackson的使用方式,需要的朋友可以參考下2016-01-01
Java中線程組ThreadGroup與線程池的區(qū)別及示例
這篇文章主要介紹了Java中線程組與線程池的區(qū)別及示例,ThreadGroup是用來管理一組線程的,可以控制線程的執(zhí)行,查看線程的執(zhí)行狀態(tài)等操作,方便對于一組線程的統(tǒng)一管理,需要的朋友可以參考下2023-05-05

