Java對接MQTT協(xié)議的完整實現(xiàn)示例代碼
前言
本文將詳細介紹如何使用Java和Spring Integration框架實現(xiàn)MQTT協(xié)議的對接。代碼包括MQTT客戶端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。
前置依賴
<!-- MQTT 依賴 --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
1. MQTT配置類
代碼解析
MqttConfig
類是MQTT的核心配置類,負(fù)責(zé)MQTT客戶端的初始化、連接選項的設(shè)置以及消息通道的創(chuàng)建。
package com.ruoyi.framework.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.io.IOException; @Configuration public class MqttConfig { @Value("${mqtt.broker-url}") private String brokerUrl; @Value("${mqtt.client-id}") private String clientId; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{brokerUrl}); // Broker 地址 options.setAutomaticReconnect(true); // 自動重連 factory.setConnectionOptions(options); System.out.println("Connecting to broker: " + brokerUrl + " OK."); return factory; } @Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + "-inbound", // 客戶端ID(唯一) mqttClientFactory(), // 使用工廠創(chuàng)建客戶端 "testSub/#" // 訂閱的主題 ); adapter.setOutputChannelName("mqttInputChannel"); // 關(guān)鍵:綁定到輸入通道 消息輸出通道 adapter.setQos(1); // 設(shè)置 QoS 級別 return adapter; } // 出站適配器(發(fā)送) @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler handler = new MqttPahoMessageHandler( clientId + "-outbound", mqttClientFactory() ); handler.setAsync(true); handler.setDefaultQos(1); return handler; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); // 使用直連通道 } // 出站通道(發(fā)送消息) @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
1.1 MQTT客戶端工廠
mqttClientFactory
方法創(chuàng)建了一個MqttPahoClientFactory
實例,用于配置MQTT客戶端的連接選項。
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{brokerUrl}); // 設(shè)置MQTT服務(wù)端地址 options.setAutomaticReconnect(true); // 開啟自動重連 factory.setConnectionOptions(options); System.out.println("Connecting to broker: " + brokerUrl + " OK."); return factory; }
brokerUrl
:MQTT服務(wù)端的地址,通常為tcp://<IP>:<端口>
。automaticReconnect
:開啟自動重連功能,確保網(wǎng)絡(luò)波動時客戶端能夠自動恢復(fù)連接。
1.2 MQTT消息訂閱適配器
mqttInbound
方法創(chuàng)建了一個MqttPahoMessageDrivenChannelAdapter
實例,用于訂閱MQTT主題并接收消息。
@Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clientId + "-inbound", mqttClientFactory(), "testSub/#"); adapter.setOutputChannelName("mqttInputChannel"); // 綁定到輸入通道 adapter.setQos(1); // 設(shè)置QoS級別 return adapter; }
clientId + "-inbound"
:客戶端ID,需保證唯一性。"testSub/#"
:訂閱的主題,#
表示匹配所有子主題。
1.3 MQTT消息發(fā)布適配器
mqttOutbound
方法創(chuàng)建了一個MqttPahoMessageHandler
實例,用于發(fā)布消息到MQTT主題。
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler handler = new MqttPahoMessageHandler( clientId + "-outbound", mqttClientFactory()); handler.setAsync(true); // 異步發(fā)送 handler.setDefaultQos(1); // 設(shè)置QoS級別 return handler; }
clientId + "-outbound"
:客戶端ID,需保證唯一性。mqttOutboundChannel
:消息發(fā)送通道。
1.4 消息通道
mqttInputChannel
和mqttOutboundChannel
方法分別創(chuàng)建了輸入和輸出通道,用于消息的傳遞。
@Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); // 直連通道 } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
2. MQTT消息監(jiān)聽器
代碼解析
MqttMessageListener
類負(fù)責(zé)處理從MQTT主題接收到的消息
Ⅰ
package com.ruoyi.framework.mqtt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttMessageListener { @Autowired private IMqttService mqttService; // 處理入站消息 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); Logger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("heartbeat/")) { //心跳上報 mqttService.handleHeartbeat(payload); } else if (topic.startsWith("report/")) { //數(shù)據(jù)上報 mqttService.handleReport(payload); } } catch (Exception e) { log.error("[MQTT] 消息處理失敗: {}", e.getMessage()); } } }; } }
或Ⅱ
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.power.domain.protocol.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class MqttMessageListener { @Autowired private IMqttService mqttService; // 處理入站消息 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); Logger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("testSub/")) { BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { case MessageConstants.HEART_BEAT: HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class); mqttService.handleHeartbeat(heartbeat); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.ALARM: AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class); mqttService.handleAlarm(alarm); break; case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; default: System.err.println("Unknown message type: " + baseMsg.getType()); } } else if (topic.startsWith("report/allpoints")) { BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { // 如果沒收到callAck 則代表采集器沒收到callRead case MessageConstants.CALL_ACK: mqttService.handleCallReadAck(baseMsg); break; case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA: ReportMessage report = JSON.parseObject(payload, ReportMessage.class); mqttService.handleReport(report); break; case MessageConstants.CONTROL_ACK: ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class); mqttService.handleControlAck(controlAck); break; case MessageConstants.MULTIVALUESET_ACK: MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class); mqttService.handleMultiValueSet(multvaluesetAck); break; } } } catch (Exception e) { log.error("[MQTT] 消息處理失敗: {}", e.getMessage()); } } }; } }
2.1 消息處理邏輯
handler
方法是一個MessageHandler
,用于處理從mqttInputChannel
接收到的消息。
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 獲取主題 String payload = message.getPayload().toString(); // 獲取消息內(nèi)容 Logger log = LoggerFactory.getLogger(MqttMessageListener.class); log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload); try { if (topic.startsWith("testSub/")) { // 處理訂閱主題為testSub的消息 BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class); switch (baseMsg.getType()) { case MessageConstants.HEART_BEAT: mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class)); break; case MessageConstants.REPORT: mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class)); break; // 其他消息類型的處理邏輯 } } else if (topic.startsWith("report/allpoints")) { // 處理訂閱主題為report/allpoints的消息 } } catch (Exception e) { log.error("[MQTT] 消息處理失敗: {}", e.getMessage()); } } }; }
mqtt_receivedTopic
:從消息頭中獲取主題。payload
:消息內(nèi)容,通常是JSON格式的字符串。- 使用
switch
語句根據(jù)消息類型調(diào)用不同的處理方法。
3. MQTT消息網(wǎng)關(guān)
代碼解析
MqttMessageGateway
接口提供了一個簡單的發(fā)送消息的方法。
package com.ruoyi.framework.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttMessageGateway { void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload); }
@MessagingGateway
:聲明一個消息網(wǎng)關(guān),defaultRequestChannel
指定默認(rèn)的發(fā)送通道。@Header(MqttHeaders.TOPIC)
:指定消息的主題。
使用示例:
@Autowired private MqttMessageGateway mqttMessageGateway; public void publishMessage() { mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}"); }
4. MQTT服務(wù)接口
代碼解析
IMqttService
接口定義了處理不同類型消息的方法。
package com.ruoyi.framework.mqtt; import com.ruoyi.power.domain.protocol.*; public interface IMqttService { /** * 處理心跳數(shù)據(jù) * @param heartbeat MQTT 消息內(nèi)容 */ void handleHeartbeat(HeartbeatMessage heartbeat); /** * 處理上報數(shù)據(jù) * @param report MQTT 消息內(nèi)容 */ void handleReport(ReportMessage report); /** * 服務(wù)器發(fā)送遙控命令到采集器 * 服務(wù)器發(fā)送遙調(diào)命令到采集器 * @param controlMessage 遙控命令 */ void sendControl(ControlMessage controlMessage); /** * 處理上報儀表報警 * @param alarm 報警內(nèi)容 * @return String 配置內(nèi)容 */ void handleAlarm(AlarmMessage alarm); /** * 下發(fā)控制命令到指定網(wǎng)關(guān) * @param saleid 配電站ID * @param gateid 網(wǎng)關(guān)ID */ void sendCallRead(String saleid, String gateid, String startTime, String endTime); /** * 采集器響應(yīng)召讀命令(響應(yīng)召讀命令回復(fù)包,不代表召讀時間段的數(shù)據(jù)一定存在,采集器收到召讀命令后首先回復(fù) * 此數(shù)據(jù)包,下一不再查找相應(yīng)歷史數(shù)據(jù), 存在即發(fā)送,不存在不發(fā)送 ) * @param baseMsg 采集器響應(yīng)召讀命令( */ void handleCallReadAck(BaseMessage baseMsg); /** * 采集器發(fā)送執(zhí)行結(jié)果到服務(wù)器 * @param controlAck */ void handleControlAck(ControlMessage controlAck); /** * 由服務(wù)器發(fā)布獲取數(shù)據(jù)命令到采集器 * @param baseMessage */ void getCurrentData(BaseMessage baseMessage); /** * * @param multiValueSetMessage */ void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage); /** * 處理相應(yīng)采集器接收到服務(wù)器的命令 * @param multiValueSetMessage */ void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage); }
- 每個方法對應(yīng)一種消息類型的處理邏輯。
- 實現(xiàn)該接口的類需要提供具體的業(yè)務(wù)邏輯。
5. 使用說明
5.1 配置MQTT參數(shù)
在application.yml
中配置MQTT的相關(guān)參數(shù):
mqtt: broker-url: tcp://127.0.0.1:1883 client-id: mqtt-client-123
5.2 實現(xiàn)IMqttService接口
創(chuàng)建一個類實現(xiàn)IMqttService
接口,并提供具體的業(yè)務(wù)邏輯。例如:
package com.ruoyi.framework.mqtt; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.toolkit.ObjectUtils; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.constant.MessageConstants; import com.ruoyi.common.constant.OperationConstants; import com.ruoyi.common.constant.ResultConstants; import com.ruoyi.common.utils.bean.BeanUtils; import com.ruoyi.power.config.CustomIdGenerator; import com.ruoyi.power.domain.*; import com.ruoyi.power.domain.protocol.*; import com.ruoyi.power.mapper.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; @Slf4j @Service public class MqttServiceImpl implements IMqttService { @Autowired private MqttMessageGateway mqttGateway; @Autowired private HeartBeatMapper heartbeatMapper; @Autowired private GatewayInfoMapper gatewayInfoMapper; @Autowired private ReportMapper reportMapper; @Autowired private ReportMeterMapper reportMeterMapper; @Autowired private AlarmMapper alarmMapper; @Autowired private AlarmDetailMapper alarmDetailMapper; @Autowired private ControlMapper controlMapper; // 處理心跳數(shù)據(jù) @Override public void handleHeartbeat(HeartbeatMessage heartbeat) { try { // 心跳存儲到數(shù)據(jù)庫 HeartBeat heartBeat = new HeartBeat(); heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); heartBeat.setGateId(heartbeat.getGateid()); heartBeat.setType(heartbeat.getType()); heartBeat.setSaleId(heartbeat.getSaleid()); heartBeat.setTime(heartbeat.getTime()); heartBeat.setOperation(heartbeat.getOperation()); heartbeatMapper.insertHeartBeat(heartBeat); log.info("[心跳數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}", heartbeat.getSaleid(), heartbeat.getGateid()); // 查詢或創(chuàng)建網(wǎng)關(guān)記錄 GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid())); if(ObjectUtils.isNull(gatewayInfo)) { createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid()); } else { gatewayInfo.setLastHeartbeatTime(LocalDateTime.now()); gatewayInfo.setUpdateTime(LocalDateTime.now()); int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo); if(updated == 0) { log.warn("心跳更新沖突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid()); } } // 如果網(wǎng)關(guān)請求心跳,響應(yīng)心跳 sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat); } catch (Exception e) { log.error("[心跳數(shù)據(jù)] 處理失敗: {}", e.getMessage()); } } // 創(chuàng)建新網(wǎng)關(guān)記錄 private void createNewGateway(String saleid, String gateid) { GatewayInfo newGateway = new GatewayInfo(); newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); newGateway.setSaleid(saleid); newGateway.setGateid(gateid); newGateway.setLastHeartbeatTime(LocalDateTime.now()); newGateway.setStatus("0"); newGateway.setCheckInterval(60L); // 默認(rèn)間隔 newGateway.setCreateTime(LocalDateTime.now()); gatewayInfoMapper.insertGatewayInfo(newGateway); } // 下發(fā)心跳 private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) { String topic = String.format("report/allpoints", saleid, gateid); heartbeat.setOperation(OperationConstants.TIME); mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat)); log.info("[配置下發(fā)] topic={}, config={}", topic, JSON.toJSONString(heartbeat)); } // 處理上報數(shù)據(jù) @Override public void handleReport(ReportMessage report) { try { // 存儲到儀表信息表 轉(zhuǎn)換為儀表信息表(meterMapper) String reportId = createReportData(report); // 批量存儲儀表數(shù)據(jù) List<ReportMeter> meterEntities = report.getMeter().stream() .map(m -> { ReportMeter entity = new ReportMeter(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setReportId(reportId); entity.setMeterId(m.getId()); entity.setStatus(m.getStatus()); entity.setName(m.getName()); entity.setValuesJson(JSON.toJSONString(m.getValues())); return entity; }).toList(); for (ReportMeter meter : meterEntities) { reportMeterMapper.insertReportMeter(meter); } log.info("[上報數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}", report.getSaleid(), report.getGateid()); } catch (Exception e) { log.error("[上報數(shù)據(jù)] 處理失敗: {}", e.getMessage()); } } // 創(chuàng)建新數(shù)據(jù)記錄 private String createReportData(ReportMessage report) { Report rep = new Report(); rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); rep.setSaleid(report.getSaleid()); rep.setGateid(report.getGateid()); rep.setTime(report.getTime()); rep.setType(report.getType()); rep.setSequence(report.getSequence()); rep.setSource(report.getSource()); rep.setCreateTime(LocalDateTime.now()); reportMapper.insert(rep); return rep.getId(); } // 下發(fā)控制命令 @Override public void sendControl(ControlMessage controlMessage) { ControlMessage message = new ControlMessage(); message.setSaleid(controlMessage.getSaleid()); message.setGateid(controlMessage.getGateid()); message.setType(controlMessage.getType()); message.setCuuid(LocalDateTime.now().toString()); message.setTime(LocalDateTime.now()); message.setMeterid(controlMessage.getMeterid()); message.setName(controlMessage.getName()); message.setFunctionid(controlMessage.getFunctionid()); message.setValue(controlMessage.getValue()); // 存儲到控制記錄表 createControl(controlMessage); String topic = String.format("report/allpoints", message); mqttGateway.sendMessage(topic, JSON.toJSONString(message)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message)); } private void createControl(ControlMessage controlMessage) { Control control = new Control(); control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); control.setSaleid(controlMessage.getSaleid()); control.setGateid(controlMessage.getGateid()); control.setType(controlMessage.getType()); control.setCuuid(controlMessage.getCuuid()); control.setTime(controlMessage.getTime()); control.setMeterid(controlMessage.getMeterid()); control.setName(controlMessage.getName()); control.setFunctionid(controlMessage.getFunctionid()); control.setValue(controlMessage.getValue()); control.setResult(controlMessage.getResult()); control.setErrordesc(controlMessage.getErrordesc()); controlMapper.insertControl(control); } @Override public void handleAlarm(AlarmMessage alarmMessage) { try { // 存儲報警信息表 轉(zhuǎn)換為報警信息表(alarmMapper) String alarmId = createAlarmData(alarmMessage); // 批量存儲儀表數(shù)據(jù) List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream() .map(m -> { AlarmDetail entity = new AlarmDetail(); entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); entity.setAlarmId(alarmId); entity.setPtId(m.getId()); entity.setAlarmType(m.getAlarmType()); entity.setLabel(m.getLabel()); entity.setCurrentValue(m.getCurrentValue()); entity.setSettingValue(m.getSettingValue()); entity.setLevel(m.getLevel()); return entity; }).toList(); for (AlarmDetail alarm : alarmEntities) { alarmDetailMapper.insertAlarmDetail(alarm); } log.info("[上報數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}", alarmMessage.getSaleid(), alarmMessage.getGateid()); } catch (Exception e) { log.error("[上報數(shù)據(jù)] 處理失敗: {}", e.getMessage()); } } @Override public void sendCallRead(String saleid, String gateid, String startTime, String endTime) { HashMap<String, String> protocol = new HashMap<>(); protocol.put("saleid", saleid); protocol.put("gateid", gateid); protocol.put("type", MessageConstants.CALL_READ); protocol.put("time", String.valueOf(LocalDateTime.now())); protocol.put("startTime", startTime); protocol.put("endTime", endTime); String topic = String.format("report/allpoints", saleid, gateid); mqttGateway.sendMessage(topic, JSON.toJSONString(protocol)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol)); } @Override public void handleCallReadAck(BaseMessage baseMsg) { Report report = new Report(); report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); report.setSaleid(baseMsg.getSaleid()); report.setGateid(baseMsg.getGateid()); report.setTime(baseMsg.getTime()); report.setType(baseMsg.getType()); reportMapper.insert(report); } @Override public void handleControlAck(ControlMessage controlAck) { if(ResultConstants.FAILURE.equals(controlAck.getResult())) { createControl(controlAck); // 配置或設(shè)備問題,記錄錯誤并報警 log.error("控制失?。ú豢芍卦嚕? {}", controlAck.getErrordesc()); } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) { createControl(controlAck); log.info("控制成功: {}", controlAck.getCuuid()); } } @Override public void getCurrentData(BaseMessage baseMessage) { String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid()); mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage)); } @Override public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) { } @Override public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) { String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid()); ControlMessage controlMessage = new ControlMessage(); try { mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage)); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) { BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){ BeanUtils.copyProperties(multiValueSetMessage, controlMessage); createControl(controlMessage); log.error("[控制失敗] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } catch (Exception e) { log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage)); } } private String createAlarmData(AlarmMessage alarmMessage) { Alarm alarm = new Alarm(); alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId())); alarm.setSaleid(alarmMessage.getSaleid()); alarm.setGateid(alarmMessage.getGateid()); alarm.setTime(alarmMessage.getTime()); alarm.setType(alarmMessage.getType()); alarm.setSequence(alarmMessage.getSequence()); alarm.setName(alarmMessage.getName()); alarm.setMeterid(alarmMessage.getMeterid()); alarm.setCreateTime(LocalDateTime.now()); alarmMapper.insert(alarm); return alarm.getId(); } }
5.3 發(fā)送MQTT消息
通過MqttMessageGateway
發(fā)送消息:
@Autowired private MqttMessageGateway mqttMessageGateway; public void sendTestMessage() { mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}"); }
6. 總結(jié)
本文介紹了如何使用Spring Integration框架實現(xiàn)MQTT協(xié)議的對接,包括客戶端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。通過上述代碼,您可以快速實現(xiàn)Java與MQTT的集成,并根據(jù)業(yè)務(wù)需求擴展消息的處理邏輯。
到此這篇關(guān)于Java對接MQTT協(xié)議的文章就介紹到這了,更多相關(guān)Java對接MQTT協(xié)議內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java反射通過Getter方法獲取對象VO的屬性值過程解析
這篇文章主要介紹了Java反射通過Getter方法獲取對象VO的屬性值過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-02-02如何解決遇到的錯誤信息?java:?找不到符號?符號:?變量?log
使用Lombok的@Slf4j注解時出現(xiàn)log變量找不到問題,這篇文章主要介紹了如何解決遇到的錯誤信息?java:?找不到符號?符號:?變量?log的相關(guān)資料,文中將解決的辦法介紹的非常詳細,需要的朋友可以參考下2025-05-05Java數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹的相關(guān)資料
這篇文章主要介紹了Java?數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹相關(guān)資料,文中通過示例代碼和一些相關(guān)題目來做介紹,非常詳細。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下!2023-01-01