Java對(duì)接MQTT協(xié)議的完整實(shí)現(xiàn)示例代碼
前言
本文將詳細(xì)介紹如何使用Java和Spring Integration框架實(shí)現(xiàn)MQTT協(xié)議的對(duì)接。代碼包括MQTT客戶(hù)端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。
前置依賴(lài)
<!-- MQTT 依賴(lài) -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>1. MQTT配置類(lèi)
代碼解析
MqttConfig類(lèi)是MQTT的核心配置類(lèi),負(fù)責(zé)MQTT客戶(hù)端的初始化、連接選項(xiàng)的設(shè)置以及消息通道的創(chuàng)建。
package com.ruoyi.framework.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.io.IOException;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
options.setAutomaticReconnect(true); // 自動(dòng)重連
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", // 客戶(hù)端ID(唯一)
mqttClientFactory(), // 使用工廠創(chuàng)建客戶(hù)端
"testSub/#" // 訂閱的主題
);
adapter.setOutputChannelName("mqttInputChannel"); // 關(guān)鍵:綁定到輸入通道 消息輸出通道
adapter.setQos(1); // 設(shè)置 QoS 級(jí)別
return adapter;
}
// 出站適配器(發(fā)送)
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound",
mqttClientFactory()
);
handler.setAsync(true);
handler.setDefaultQos(1);
return handler;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel(); // 使用直連通道
}
// 出站通道(發(fā)送消息)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}1.1 MQTT客戶(hù)端工廠
mqttClientFactory方法創(chuàng)建了一個(gè)MqttPahoClientFactory實(shí)例,用于配置MQTT客戶(hù)端的連接選項(xiàng)。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl}); // 設(shè)置MQTT服務(wù)端地址
options.setAutomaticReconnect(true); // 開(kāi)啟自動(dòng)重連
factory.setConnectionOptions(options);
System.out.println("Connecting to broker: " + brokerUrl + " OK.");
return factory;
}brokerUrl:MQTT服務(wù)端的地址,通常為tcp://<IP>:<端口>。automaticReconnect:開(kāi)啟自動(dòng)重連功能,確保網(wǎng)絡(luò)波動(dòng)時(shí)客戶(hù)端能夠自動(dòng)恢復(fù)連接。
1.2 MQTT消息訂閱適配器
mqttInbound方法創(chuàng)建了一個(gè)MqttPahoMessageDrivenChannelAdapter實(shí)例,用于訂閱MQTT主題并接收消息。
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound", mqttClientFactory(), "testSub/#");
adapter.setOutputChannelName("mqttInputChannel"); // 綁定到輸入通道
adapter.setQos(1); // 設(shè)置QoS級(jí)別
return adapter;
}clientId + "-inbound":客戶(hù)端ID,需保證唯一性。"testSub/#":訂閱的主題,#表示匹配所有子主題。
1.3 MQTT消息發(fā)布適配器
mqttOutbound方法創(chuàng)建了一個(gè)MqttPahoMessageHandler實(shí)例,用于發(fā)布消息到MQTT主題。
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
clientId + "-outbound", mqttClientFactory());
handler.setAsync(true); // 異步發(fā)送
handler.setDefaultQos(1); // 設(shè)置QoS級(jí)別
return handler;
}clientId + "-outbound":客戶(hù)端ID,需保證唯一性。mqttOutboundChannel:消息發(fā)送通道。
1.4 消息通道
mqttInputChannel和mqttOutboundChannel方法分別創(chuàng)建了輸入和輸出通道,用于消息的傳遞。
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel(); // 直連通道
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}2. MQTT消息監(jiān)聽(tīng)器
代碼解析
MqttMessageListener類(lèi)負(fù)責(zé)處理從MQTT主題接收到的消息
Ⅰ
package com.ruoyi.framework.mqtt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener {
@Autowired
private IMqttService mqttService;
// 處理入站消息
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("heartbeat/")) { //心跳上報(bào)
mqttService.handleHeartbeat(payload);
} else if (topic.startsWith("report/")) { //數(shù)據(jù)上報(bào)
mqttService.handleReport(payload);
}
} catch (Exception e) {
log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
}
}
};
}
}或Ⅱ
package com.ruoyi.framework.mqtt;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.power.domain.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageListener {
@Autowired
private IMqttService mqttService;
// 處理入站消息
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("testSub/")) {
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
case MessageConstants.HEART_BEAT:
HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class);
mqttService.handleHeartbeat(heartbeat);
break;
case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
mqttService.handleReport(report);
break;
case MessageConstants.ALARM:
AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class);
mqttService.handleAlarm(alarm);
break;
case MessageConstants.CALL_ACK:
mqttService.handleCallReadAck(baseMsg);
break;
case MessageConstants.CONTROL_ACK:
ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
mqttService.handleControlAck(controlAck);
break;
default:
System.err.println("Unknown message type: " + baseMsg.getType());
}
} else if (topic.startsWith("report/allpoints")) {
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
// 如果沒(méi)收到callAck 則代表采集器沒(méi)收到callRead
case MessageConstants.CALL_ACK:
mqttService.handleCallReadAck(baseMsg);
break;
case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
mqttService.handleReport(report);
break;
case MessageConstants.CONTROL_ACK:
ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
mqttService.handleControlAck(controlAck);
break;
case MessageConstants.MULTIVALUESET_ACK:
MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class);
mqttService.handleMultiValueSet(multvaluesetAck);
break;
}
}
} catch (Exception e) {
log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
}
}
};
}
}2.1 消息處理邏輯
handler方法是一個(gè)MessageHandler,用于處理從mqttInputChannel接收到的消息。
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 獲取主題
String payload = message.getPayload().toString(); // 獲取消息內(nèi)容
Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);
try {
if (topic.startsWith("testSub/")) {
// 處理訂閱主題為testSub的消息
BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
switch (baseMsg.getType()) {
case MessageConstants.HEART_BEAT:
mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
break;
case MessageConstants.REPORT:
mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
break;
// 其他消息類(lèi)型的處理邏輯
}
} else if (topic.startsWith("report/allpoints")) {
// 處理訂閱主題為report/allpoints的消息
}
} catch (Exception e) {
log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
}
}
};
}mqtt_receivedTopic:從消息頭中獲取主題。payload:消息內(nèi)容,通常是JSON格式的字符串。- 使用
switch語(yǔ)句根據(jù)消息類(lèi)型調(diào)用不同的處理方法。
3. MQTT消息網(wǎng)關(guān)
代碼解析
MqttMessageGateway接口提供了一個(gè)簡(jiǎn)單的發(fā)送消息的方法。
package com.ruoyi.framework.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {
void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);
}@MessagingGateway:聲明一個(gè)消息網(wǎng)關(guān),defaultRequestChannel指定默認(rèn)的發(fā)送通道。@Header(MqttHeaders.TOPIC):指定消息的主題。
使用示例:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void publishMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}4. MQTT服務(wù)接口
代碼解析
IMqttService接口定義了處理不同類(lèi)型消息的方法。
package com.ruoyi.framework.mqtt;
import com.ruoyi.power.domain.protocol.*;
public interface IMqttService {
/**
* 處理心跳數(shù)據(jù)
* @param heartbeat MQTT 消息內(nèi)容
*/
void handleHeartbeat(HeartbeatMessage heartbeat);
/**
* 處理上報(bào)數(shù)據(jù)
* @param report MQTT 消息內(nèi)容
*/
void handleReport(ReportMessage report);
/**
* 服務(wù)器發(fā)送遙控命令到采集器
* 服務(wù)器發(fā)送遙調(diào)命令到采集器
* @param controlMessage 遙控命令
*/
void sendControl(ControlMessage controlMessage);
/**
* 處理上報(bào)儀表報(bào)警
* @param alarm 報(bào)警內(nèi)容
* @return String 配置內(nèi)容
*/
void handleAlarm(AlarmMessage alarm);
/**
* 下發(fā)控制命令到指定網(wǎng)關(guān)
* @param saleid 配電站ID
* @param gateid 網(wǎng)關(guān)ID
*/
void sendCallRead(String saleid, String gateid, String startTime, String endTime);
/**
* 采集器響應(yīng)召讀命令(響應(yīng)召讀命令回復(fù)包,不代表召讀時(shí)間段的數(shù)據(jù)一定存在,采集器收到召讀命令后首先回復(fù)
* 此數(shù)據(jù)包,下一不再查找相應(yīng)歷史數(shù)據(jù), 存在即發(fā)送,不存在不發(fā)送 )
* @param baseMsg 采集器響應(yīng)召讀命令(
*/
void handleCallReadAck(BaseMessage baseMsg);
/**
* 采集器發(fā)送執(zhí)行結(jié)果到服務(wù)器
* @param controlAck
*/
void handleControlAck(ControlMessage controlAck);
/**
* 由服務(wù)器發(fā)布獲取數(shù)據(jù)命令到采集器
* @param baseMessage
*/
void getCurrentData(BaseMessage baseMessage);
/**
*
* @param multiValueSetMessage
*/
void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);
/**
* 處理相應(yīng)采集器接收到服務(wù)器的命令
* @param multiValueSetMessage
*/
void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);
}- 每個(gè)方法對(duì)應(yīng)一種消息類(lèi)型的處理邏輯。
- 實(shí)現(xiàn)該接口的類(lèi)需要提供具體的業(yè)務(wù)邏輯。
5. 使用說(shuō)明
5.1 配置MQTT參數(shù)
在application.yml中配置MQTT的相關(guān)參數(shù):
mqtt: broker-url: tcp://127.0.0.1:1883 client-id: mqtt-client-123
5.2 實(shí)現(xiàn)IMqttService接口
創(chuàng)建一個(gè)類(lèi)實(shí)現(xiàn)IMqttService接口,并提供具體的業(yè)務(wù)邏輯。例如:
package com.ruoyi.framework.mqtt;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.common.constant.OperationConstants;
import com.ruoyi.common.constant.ResultConstants;
import com.ruoyi.common.utils.bean.BeanUtils;
import com.ruoyi.power.config.CustomIdGenerator;
import com.ruoyi.power.domain.*;
import com.ruoyi.power.domain.protocol.*;
import com.ruoyi.power.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.*;
@Slf4j
@Service
public class MqttServiceImpl implements IMqttService {
@Autowired
private MqttMessageGateway mqttGateway;
@Autowired
private HeartBeatMapper heartbeatMapper;
@Autowired
private GatewayInfoMapper gatewayInfoMapper;
@Autowired
private ReportMapper reportMapper;
@Autowired
private ReportMeterMapper reportMeterMapper;
@Autowired
private AlarmMapper alarmMapper;
@Autowired
private AlarmDetailMapper alarmDetailMapper;
@Autowired
private ControlMapper controlMapper;
// 處理心跳數(shù)據(jù)
@Override
public void handleHeartbeat(HeartbeatMessage heartbeat) {
try {
// 心跳存儲(chǔ)到數(shù)據(jù)庫(kù)
HeartBeat heartBeat = new HeartBeat();
heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
heartBeat.setGateId(heartbeat.getGateid());
heartBeat.setType(heartbeat.getType());
heartBeat.setSaleId(heartbeat.getSaleid());
heartBeat.setTime(heartbeat.getTime());
heartBeat.setOperation(heartbeat.getOperation());
heartbeatMapper.insertHeartBeat(heartBeat);
log.info("[心跳數(shù)據(jù)] 存儲(chǔ)成功: substationId={}, gatewayId={}",
heartbeat.getSaleid(), heartbeat.getGateid());
// 查詢(xún)或創(chuàng)建網(wǎng)關(guān)記錄
GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid()));
if(ObjectUtils.isNull(gatewayInfo)) {
createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid());
} else {
gatewayInfo.setLastHeartbeatTime(LocalDateTime.now());
gatewayInfo.setUpdateTime(LocalDateTime.now());
int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo);
if(updated == 0) {
log.warn("心跳更新沖突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid());
}
}
// 如果網(wǎng)關(guān)請(qǐng)求心跳,響應(yīng)心跳
sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat);
} catch (Exception e) {
log.error("[心跳數(shù)據(jù)] 處理失敗: {}", e.getMessage());
}
}
// 創(chuàng)建新網(wǎng)關(guān)記錄
private void createNewGateway(String saleid, String gateid) {
GatewayInfo newGateway = new GatewayInfo();
newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
newGateway.setSaleid(saleid);
newGateway.setGateid(gateid);
newGateway.setLastHeartbeatTime(LocalDateTime.now());
newGateway.setStatus("0");
newGateway.setCheckInterval(60L); // 默認(rèn)間隔
newGateway.setCreateTime(LocalDateTime.now());
gatewayInfoMapper.insertGatewayInfo(newGateway);
}
// 下發(fā)心跳
private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) {
String topic = String.format("report/allpoints", saleid, gateid);
heartbeat.setOperation(OperationConstants.TIME);
mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat));
log.info("[配置下發(fā)] topic={}, config={}", topic, JSON.toJSONString(heartbeat));
}
// 處理上報(bào)數(shù)據(jù)
@Override
public void handleReport(ReportMessage report) {
try {
// 存儲(chǔ)到儀表信息表 轉(zhuǎn)換為儀表信息表(meterMapper)
String reportId = createReportData(report);
// 批量存儲(chǔ)儀表數(shù)據(jù)
List<ReportMeter> meterEntities = report.getMeter().stream()
.map(m -> {
ReportMeter entity = new ReportMeter();
entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
entity.setReportId(reportId);
entity.setMeterId(m.getId());
entity.setStatus(m.getStatus());
entity.setName(m.getName());
entity.setValuesJson(JSON.toJSONString(m.getValues()));
return entity;
}).toList();
for (ReportMeter meter : meterEntities) {
reportMeterMapper.insertReportMeter(meter);
}
log.info("[上報(bào)數(shù)據(jù)] 存儲(chǔ)成功: substationId={}, gatewayId={}",
report.getSaleid(), report.getGateid());
} catch (Exception e) {
log.error("[上報(bào)數(shù)據(jù)] 處理失敗: {}", e.getMessage());
}
}
// 創(chuàng)建新數(shù)據(jù)記錄
private String createReportData(ReportMessage report) {
Report rep = new Report();
rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
rep.setSaleid(report.getSaleid());
rep.setGateid(report.getGateid());
rep.setTime(report.getTime());
rep.setType(report.getType());
rep.setSequence(report.getSequence());
rep.setSource(report.getSource());
rep.setCreateTime(LocalDateTime.now());
reportMapper.insert(rep);
return rep.getId();
}
// 下發(fā)控制命令
@Override
public void sendControl(ControlMessage controlMessage) {
ControlMessage message = new ControlMessage();
message.setSaleid(controlMessage.getSaleid());
message.setGateid(controlMessage.getGateid());
message.setType(controlMessage.getType());
message.setCuuid(LocalDateTime.now().toString());
message.setTime(LocalDateTime.now());
message.setMeterid(controlMessage.getMeterid());
message.setName(controlMessage.getName());
message.setFunctionid(controlMessage.getFunctionid());
message.setValue(controlMessage.getValue());
// 存儲(chǔ)到控制記錄表
createControl(controlMessage);
String topic = String.format("report/allpoints", message);
mqttGateway.sendMessage(topic, JSON.toJSONString(message));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message));
}
private void createControl(ControlMessage controlMessage) {
Control control = new Control();
control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
control.setSaleid(controlMessage.getSaleid());
control.setGateid(controlMessage.getGateid());
control.setType(controlMessage.getType());
control.setCuuid(controlMessage.getCuuid());
control.setTime(controlMessage.getTime());
control.setMeterid(controlMessage.getMeterid());
control.setName(controlMessage.getName());
control.setFunctionid(controlMessage.getFunctionid());
control.setValue(controlMessage.getValue());
control.setResult(controlMessage.getResult());
control.setErrordesc(controlMessage.getErrordesc());
controlMapper.insertControl(control);
}
@Override
public void handleAlarm(AlarmMessage alarmMessage) {
try {
// 存儲(chǔ)報(bào)警信息表 轉(zhuǎn)換為報(bào)警信息表(alarmMapper)
String alarmId = createAlarmData(alarmMessage);
// 批量存儲(chǔ)儀表數(shù)據(jù)
List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream()
.map(m -> {
AlarmDetail entity = new AlarmDetail();
entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
entity.setAlarmId(alarmId);
entity.setPtId(m.getId());
entity.setAlarmType(m.getAlarmType());
entity.setLabel(m.getLabel());
entity.setCurrentValue(m.getCurrentValue());
entity.setSettingValue(m.getSettingValue());
entity.setLevel(m.getLevel());
return entity;
}).toList();
for (AlarmDetail alarm : alarmEntities) {
alarmDetailMapper.insertAlarmDetail(alarm);
}
log.info("[上報(bào)數(shù)據(jù)] 存儲(chǔ)成功: substationId={}, gatewayId={}",
alarmMessage.getSaleid(), alarmMessage.getGateid());
} catch (Exception e) {
log.error("[上報(bào)數(shù)據(jù)] 處理失敗: {}", e.getMessage());
}
}
@Override
public void sendCallRead(String saleid, String gateid, String startTime, String endTime) {
HashMap<String, String> protocol = new HashMap<>();
protocol.put("saleid", saleid);
protocol.put("gateid", gateid);
protocol.put("type", MessageConstants.CALL_READ);
protocol.put("time", String.valueOf(LocalDateTime.now()));
protocol.put("startTime", startTime);
protocol.put("endTime", endTime);
String topic = String.format("report/allpoints", saleid, gateid);
mqttGateway.sendMessage(topic, JSON.toJSONString(protocol));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol));
}
@Override
public void handleCallReadAck(BaseMessage baseMsg) {
Report report = new Report();
report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
report.setSaleid(baseMsg.getSaleid());
report.setGateid(baseMsg.getGateid());
report.setTime(baseMsg.getTime());
report.setType(baseMsg.getType());
reportMapper.insert(report);
}
@Override
public void handleControlAck(ControlMessage controlAck) {
if(ResultConstants.FAILURE.equals(controlAck.getResult())) {
createControl(controlAck);
// 配置或設(shè)備問(wèn)題,記錄錯(cuò)誤并報(bào)警
log.error("控制失敗(不可重試): {}", controlAck.getErrordesc());
} else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) {
createControl(controlAck);
log.info("控制成功: {}", controlAck.getCuuid());
}
}
@Override
public void getCurrentData(BaseMessage baseMessage) {
String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid());
mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage));
}
@Override
public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
}
@Override
public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid());
ControlMessage controlMessage = new ControlMessage();
try {
mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage));
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) {
BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
createControl(controlMessage);
log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
} else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){
BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
createControl(controlMessage);
log.error("[控制失敗] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
}
} catch (Exception e) {
log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
}
}
private String createAlarmData(AlarmMessage alarmMessage) {
Alarm alarm = new Alarm();
alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
alarm.setSaleid(alarmMessage.getSaleid());
alarm.setGateid(alarmMessage.getGateid());
alarm.setTime(alarmMessage.getTime());
alarm.setType(alarmMessage.getType());
alarm.setSequence(alarmMessage.getSequence());
alarm.setName(alarmMessage.getName());
alarm.setMeterid(alarmMessage.getMeterid());
alarm.setCreateTime(LocalDateTime.now());
alarmMapper.insert(alarm);
return alarm.getId();
}
}5.3 發(fā)送MQTT消息
通過(guò)MqttMessageGateway發(fā)送消息:
@Autowired
private MqttMessageGateway mqttMessageGateway;
public void sendTestMessage() {
mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}6. 總結(jié)
本文介紹了如何使用Spring Integration框架實(shí)現(xiàn)MQTT協(xié)議的對(duì)接,包括客戶(hù)端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。通過(guò)上述代碼,您可以快速實(shí)現(xiàn)Java與MQTT的集成,并根據(jù)業(yè)務(wù)需求擴(kuò)展消息的處理邏輯。
到此這篇關(guān)于Java對(duì)接MQTT協(xié)議的文章就介紹到這了,更多相關(guān)Java對(duì)接MQTT協(xié)議內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java反射通過(guò)Getter方法獲取對(duì)象VO的屬性值過(guò)程解析
這篇文章主要介紹了Java反射通過(guò)Getter方法獲取對(duì)象VO的屬性值過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
如何解決遇到的錯(cuò)誤信息?java:?找不到符號(hào)?符號(hào):?變量?log
使用Lombok的@Slf4j注解時(shí)出現(xiàn)log變量找不到問(wèn)題,這篇文章主要介紹了如何解決遇到的錯(cuò)誤信息?java:?找不到符號(hào)?符號(hào):?變量?log的相關(guān)資料,文中將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下2025-05-05
JavaWeb開(kāi)發(fā)實(shí)現(xiàn)備忘錄
這篇文章主要為大家詳細(xì)介紹了JavaWeb開(kāi)發(fā)實(shí)現(xiàn)備忘錄,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06
Java數(shù)據(jù)結(jié)構(gòu)之樹(shù)和二叉樹(shù)的相關(guān)資料
這篇文章主要介紹了Java?數(shù)據(jù)結(jié)構(gòu)之樹(shù)和二叉樹(shù)相關(guān)資料,文中通過(guò)示例代碼和一些相關(guān)題目來(lái)做介紹,非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下!2023-01-01

