欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java對接MQTT協(xié)議的完整實現(xiàn)示例代碼

 更新時間:2025年08月09日 14:15:24   作者:最業(yè)余的程序猿  
MQTT是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議,MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛,這篇文章主要介紹了Java對接MQTT協(xié)議的完整實現(xiàn),需要的朋友可以參考下

前言

本文將詳細介紹如何使用Java和Spring Integration框架實現(xiàn)MQTT協(xié)議的對接。代碼包括MQTT客戶端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。

前置依賴

<!-- MQTT 依賴 -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

1. MQTT配置類

代碼解析

MqttConfig類是MQTT的核心配置類,負(fù)責(zé)MQTT客戶端的初始化、連接選項的設(shè)置以及消息通道的創(chuàng)建。

package com.ruoyi.framework.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.io.IOException;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl}); // Broker 地址
        options.setAutomaticReconnect(true); // 自動重連
        factory.setConnectionOptions(options);
        System.out.println("Connecting to broker: " + brokerUrl + " OK.");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                clientId + "-inbound",  // 客戶端ID(唯一)
                mqttClientFactory(),   // 使用工廠創(chuàng)建客戶端
                "testSub/#" // 訂閱的主題
        );
        adapter.setOutputChannelName("mqttInputChannel"); // 關(guān)鍵:綁定到輸入通道 消息輸出通道
        adapter.setQos(1); // 設(shè)置 QoS 級別
        return adapter;
    }

    // 出站適配器(發(fā)送)
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
                clientId + "-outbound",
                mqttClientFactory()
        );
        handler.setAsync(true);
        handler.setDefaultQos(1);
        return handler;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel(); // 使用直連通道
    }

    // 出站通道(發(fā)送消息)
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

}

1.1 MQTT客戶端工廠

mqttClientFactory方法創(chuàng)建了一個MqttPahoClientFactory實例,用于配置MQTT客戶端的連接選項。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{brokerUrl}); // 設(shè)置MQTT服務(wù)端地址
    options.setAutomaticReconnect(true); // 開啟自動重連
    factory.setConnectionOptions(options);
    System.out.println("Connecting to broker: " + brokerUrl + " OK.");
    return factory;
}
  • brokerUrl:MQTT服務(wù)端的地址,通常為tcp://<IP>:<端口>
  • automaticReconnect:開啟自動重連功能,確保網(wǎng)絡(luò)波動時客戶端能夠自動恢復(fù)連接。

1.2 MQTT消息訂閱適配器

mqttInbound方法創(chuàng)建了一個MqttPahoMessageDrivenChannelAdapter實例,用于訂閱MQTT主題并接收消息。

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
            clientId + "-inbound", mqttClientFactory(), "testSub/#");
    adapter.setOutputChannelName("mqttInputChannel"); // 綁定到輸入通道
    adapter.setQos(1); // 設(shè)置QoS級別
    return adapter;
}
  • clientId + "-inbound":客戶端ID,需保證唯一性。
  • "testSub/#":訂閱的主題,#表示匹配所有子主題。

1.3 MQTT消息發(fā)布適配器

mqttOutbound方法創(chuàng)建了一個MqttPahoMessageHandler實例,用于發(fā)布消息到MQTT主題。

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(
            clientId + "-outbound", mqttClientFactory());
    handler.setAsync(true); // 異步發(fā)送
    handler.setDefaultQos(1); // 設(shè)置QoS級別
    return handler;
}
  • clientId + "-outbound":客戶端ID,需保證唯一性。
  • mqttOutboundChannel:消息發(fā)送通道。

1.4 消息通道

mqttInputChannelmqttOutboundChannel方法分別創(chuàng)建了輸入和輸出通道,用于消息的傳遞。

@Bean
public MessageChannel mqttInputChannel() {
    return new DirectChannel(); // 直連通道
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}

2. MQTT消息監(jiān)聽器

代碼解析

MqttMessageListener類負(fù)責(zé)處理從MQTT主題接收到的消息

package com.ruoyi.framework.mqtt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 處理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("heartbeat/")) {  //心跳上報
                        mqttService.handleHeartbeat(payload);
                    } else if (topic.startsWith("report/")) {  //數(shù)據(jù)上報
                        mqttService.handleReport(payload);
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
                }
            }
        };
    }
}

 或Ⅱ

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.power.domain.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

@Component
public class MqttMessageListener {

    @Autowired
    private IMqttService mqttService;

    // 處理入站消息
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String payload = message.getPayload().toString();
                Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
                log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

                try {
                    if (topic.startsWith("testSub/")) {
                        BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            case MessageConstants.HEART_BEAT:
                                HeartbeatMessage heartbeat = JSON.parseObject(payload, HeartbeatMessage.class);
                                mqttService.handleHeartbeat(heartbeat);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.ALARM:
                                AlarmMessage alarm = JSON.parseObject(payload, AlarmMessage.class);
                                mqttService.handleAlarm(alarm);
                                break;
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            default:
                                System.err.println("Unknown message type: " + baseMsg.getType());
                        }
                    } else if (topic.startsWith("report/allpoints")) {
                        BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                        switch (baseMsg.getType()) {
                            // 如果沒收到callAck 則代表采集器沒收到callRead
                            case MessageConstants.CALL_ACK:
                                mqttService.handleCallReadAck(baseMsg);
                                break;
                            case MessageConstants.REPORT, MessageConstants.GET_CURRENT_DATA:
                                ReportMessage report = JSON.parseObject(payload, ReportMessage.class);
                                mqttService.handleReport(report);
                                break;
                            case MessageConstants.CONTROL_ACK:
                                ControlMessage controlAck = JSON.parseObject(payload, ControlMessage.class);
                                mqttService.handleControlAck(controlAck);
                                break;
                            case MessageConstants.MULTIVALUESET_ACK:
                                MultiValueSetMessage multvaluesetAck = JSON.parseObject(payload, MultiValueSetMessage.class);
                                mqttService.handleMultiValueSet(multvaluesetAck);
                                break;
                        }
                    }
                } catch (Exception e) {
                    log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
                }
            }
        };
    }
}

2.1 消息處理邏輯

handler方法是一個MessageHandler,用于處理從mqttInputChannel接收到的消息。

@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // 獲取主題
            String payload = message.getPayload().toString(); // 獲取消息內(nèi)容
            Logger log = LoggerFactory.getLogger(MqttMessageListener.class);
            log.info("[MQTT] 收到消息: topic={}, payload={}", topic, payload);

            try {
                if (topic.startsWith("testSub/")) {
                    // 處理訂閱主題為testSub的消息
                    BaseMessage baseMsg = JSON.parseObject(payload, BaseMessage.class);
                    switch (baseMsg.getType()) {
                        case MessageConstants.HEART_BEAT:
                            mqttService.handleHeartbeat(JSON.parseObject(payload, HeartbeatMessage.class));
                            break;
                        case MessageConstants.REPORT:
                            mqttService.handleReport(JSON.parseObject(payload, ReportMessage.class));
                            break;
                        // 其他消息類型的處理邏輯
                    }
                } else if (topic.startsWith("report/allpoints")) {
                    // 處理訂閱主題為report/allpoints的消息
                }
            } catch (Exception e) {
                log.error("[MQTT] 消息處理失敗: {}", e.getMessage());
            }
        }
    };
}
  • mqtt_receivedTopic:從消息頭中獲取主題。
  • payload:消息內(nèi)容,通常是JSON格式的字符串。
  • 使用switch語句根據(jù)消息類型調(diào)用不同的處理方法。

3. MQTT消息網(wǎng)關(guān)

代碼解析

MqttMessageGateway接口提供了一個簡單的發(fā)送消息的方法。

package com.ruoyi.framework.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMessageGateway {

    void sendMessage(@Header(MqttHeaders.TOPIC) String topic, String payload);

}
  • @MessagingGateway:聲明一個消息網(wǎng)關(guān),defaultRequestChannel指定默認(rèn)的發(fā)送通道。
  • @Header(MqttHeaders.TOPIC):指定消息的主題。

使用示例:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void publishMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

4. MQTT服務(wù)接口

代碼解析

IMqttService接口定義了處理不同類型消息的方法。

package com.ruoyi.framework.mqtt;

import com.ruoyi.power.domain.protocol.*;

public interface IMqttService {

    /**
     * 處理心跳數(shù)據(jù)
     * @param heartbeat MQTT 消息內(nèi)容
     */
    void handleHeartbeat(HeartbeatMessage heartbeat);

    /**
     * 處理上報數(shù)據(jù)
     * @param report MQTT 消息內(nèi)容
     */
    void handleReport(ReportMessage report);

    /**
     * 服務(wù)器發(fā)送遙控命令到采集器
     * 服務(wù)器發(fā)送遙調(diào)命令到采集器
     * @param controlMessage 遙控命令
     */
    void sendControl(ControlMessage controlMessage);

    /**
     * 處理上報儀表報警
     * @param alarm 報警內(nèi)容
     * @return String 配置內(nèi)容
     */
    void handleAlarm(AlarmMessage alarm);

    /**
     * 下發(fā)控制命令到指定網(wǎng)關(guān)
     * @param saleid 配電站ID
     * @param gateid 網(wǎng)關(guān)ID
     */
    void sendCallRead(String saleid, String gateid, String startTime, String endTime);

    /**
     * 采集器響應(yīng)召讀命令(響應(yīng)召讀命令回復(fù)包,不代表召讀時間段的數(shù)據(jù)一定存在,采集器收到召讀命令后首先回復(fù)
     * 此數(shù)據(jù)包,下一不再查找相應(yīng)歷史數(shù)據(jù), 存在即發(fā)送,不存在不發(fā)送 )
     * @param baseMsg 采集器響應(yīng)召讀命令(
     */
    void handleCallReadAck(BaseMessage baseMsg);

    /**
     * 采集器發(fā)送執(zhí)行結(jié)果到服務(wù)器
     * @param controlAck
     */
    void handleControlAck(ControlMessage controlAck);

    /**
     * 由服務(wù)器發(fā)布獲取數(shù)據(jù)命令到采集器
     * @param baseMessage
     */
    void getCurrentData(BaseMessage baseMessage);

    /**
     *
     * @param multiValueSetMessage
     */
    void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage);

    /**
     * 處理相應(yīng)采集器接收到服務(wù)器的命令
     * @param multiValueSetMessage
     */
    void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage);

}
  • 每個方法對應(yīng)一種消息類型的處理邏輯。
  • 實現(xiàn)該接口的類需要提供具體的業(yè)務(wù)邏輯。

5. 使用說明

5.1 配置MQTT參數(shù)

application.yml中配置MQTT的相關(guān)參數(shù):

mqtt:
  broker-url: tcp://127.0.0.1:1883
  client-id: mqtt-client-123

5.2 實現(xiàn)IMqttService接口

創(chuàng)建一個類實現(xiàn)IMqttService接口,并提供具體的業(yè)務(wù)邏輯。例如:

package com.ruoyi.framework.mqtt;

import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.common.constant.MessageConstants;
import com.ruoyi.common.constant.OperationConstants;
import com.ruoyi.common.constant.ResultConstants;
import com.ruoyi.common.utils.bean.BeanUtils;
import com.ruoyi.power.config.CustomIdGenerator;
import com.ruoyi.power.domain.*;
import com.ruoyi.power.domain.protocol.*;
import com.ruoyi.power.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.*;

@Slf4j
@Service
public class MqttServiceImpl implements IMqttService {

    @Autowired
    private MqttMessageGateway mqttGateway;

    @Autowired
    private HeartBeatMapper heartbeatMapper;

    @Autowired
    private GatewayInfoMapper gatewayInfoMapper;

    @Autowired
    private ReportMapper reportMapper;

    @Autowired
    private ReportMeterMapper reportMeterMapper;

    @Autowired
    private AlarmMapper alarmMapper;

    @Autowired
    private AlarmDetailMapper alarmDetailMapper;

    @Autowired
    private ControlMapper controlMapper;

    // 處理心跳數(shù)據(jù)
    @Override
    public void handleHeartbeat(HeartbeatMessage heartbeat) {
        try {
            // 心跳存儲到數(shù)據(jù)庫
            HeartBeat heartBeat = new HeartBeat();
            heartBeat.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
            heartBeat.setGateId(heartbeat.getGateid());
            heartBeat.setType(heartbeat.getType());
            heartBeat.setSaleId(heartbeat.getSaleid());
            heartBeat.setTime(heartbeat.getTime());
            heartBeat.setOperation(heartbeat.getOperation());
            heartbeatMapper.insertHeartBeat(heartBeat);
            log.info("[心跳數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}",
                    heartbeat.getSaleid(), heartbeat.getGateid());

            // 查詢或創(chuàng)建網(wǎng)關(guān)記錄
            GatewayInfo gatewayInfo = gatewayInfoMapper.selectOne(Wrappers.<GatewayInfo>lambdaQuery().eq(GatewayInfo::getGateid, heartbeat.getGateid()));
            if(ObjectUtils.isNull(gatewayInfo)) {
                createNewGateway(heartbeat.getSaleid(), heartbeat.getGateid());
            } else {
                gatewayInfo.setLastHeartbeatTime(LocalDateTime.now());
                gatewayInfo.setUpdateTime(LocalDateTime.now());
                int updated = gatewayInfoMapper.updateGatewayInfo(gatewayInfo);
                if(updated == 0) {
                    log.warn("心跳更新沖突 saleid:{}, gateid:{}", heartbeat.getSaleid(), heartbeat.getGateid());
                }
            }

            // 如果網(wǎng)關(guān)請求心跳,響應(yīng)心跳
            sendHeartbeat(heartbeat.getSaleid(), heartbeat.getGateid(), heartbeat);
        } catch (Exception e) {
            log.error("[心跳數(shù)據(jù)] 處理失敗: {}", e.getMessage());
        }
    }

    // 創(chuàng)建新網(wǎng)關(guān)記錄
    private void createNewGateway(String saleid, String gateid) {
        GatewayInfo newGateway = new GatewayInfo();
        newGateway.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        newGateway.setSaleid(saleid);
        newGateway.setGateid(gateid);
        newGateway.setLastHeartbeatTime(LocalDateTime.now());
        newGateway.setStatus("0");
        newGateway.setCheckInterval(60L); // 默認(rèn)間隔
        newGateway.setCreateTime(LocalDateTime.now());
        gatewayInfoMapper.insertGatewayInfo(newGateway);
    }

    // 下發(fā)心跳
    private void sendHeartbeat(String saleid, String gateid, HeartbeatMessage heartbeat) {
        String topic = String.format("report/allpoints", saleid, gateid);
        heartbeat.setOperation(OperationConstants.TIME);
        mqttGateway.sendMessage(topic, JSON.toJSONString(heartbeat));
        log.info("[配置下發(fā)] topic={}, config={}", topic, JSON.toJSONString(heartbeat));
    }

    // 處理上報數(shù)據(jù)
    @Override
    public void handleReport(ReportMessage report) {
        try {
            // 存儲到儀表信息表 轉(zhuǎn)換為儀表信息表(meterMapper)
            String reportId = createReportData(report);
            // 批量存儲儀表數(shù)據(jù)
            List<ReportMeter> meterEntities = report.getMeter().stream()
                    .map(m -> {
                        ReportMeter entity = new ReportMeter();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setReportId(reportId);
                        entity.setMeterId(m.getId());
                        entity.setStatus(m.getStatus());
                        entity.setName(m.getName());
                        entity.setValuesJson(JSON.toJSONString(m.getValues()));
                        return entity;
                    }).toList();
            for (ReportMeter meter : meterEntities) {
                reportMeterMapper.insertReportMeter(meter);
            }
            log.info("[上報數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}",
                    report.getSaleid(), report.getGateid());
        } catch (Exception e) {
            log.error("[上報數(shù)據(jù)] 處理失敗: {}", e.getMessage());
        }
    }

    // 創(chuàng)建新數(shù)據(jù)記錄
    private String createReportData(ReportMessage report) {
        Report rep = new Report();
        rep.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        rep.setSaleid(report.getSaleid());
        rep.setGateid(report.getGateid());
        rep.setTime(report.getTime());
        rep.setType(report.getType());
        rep.setSequence(report.getSequence());
        rep.setSource(report.getSource());
        rep.setCreateTime(LocalDateTime.now());
        reportMapper.insert(rep);
        return rep.getId();
    }

    // 下發(fā)控制命令
    @Override
    public void sendControl(ControlMessage controlMessage) {
        ControlMessage message = new ControlMessage();
        message.setSaleid(controlMessage.getSaleid());
        message.setGateid(controlMessage.getGateid());
        message.setType(controlMessage.getType());
        message.setCuuid(LocalDateTime.now().toString());
        message.setTime(LocalDateTime.now());
        message.setMeterid(controlMessage.getMeterid());
        message.setName(controlMessage.getName());
        message.setFunctionid(controlMessage.getFunctionid());
        message.setValue(controlMessage.getValue());
        // 存儲到控制記錄表
        createControl(controlMessage);
        String topic = String.format("report/allpoints", message);
        mqttGateway.sendMessage(topic, JSON.toJSONString(message));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(message));
    }

    private void createControl(ControlMessage controlMessage) {
        Control control = new Control();
        control.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        control.setSaleid(controlMessage.getSaleid());
        control.setGateid(controlMessage.getGateid());
        control.setType(controlMessage.getType());
        control.setCuuid(controlMessage.getCuuid());
        control.setTime(controlMessage.getTime());
        control.setMeterid(controlMessage.getMeterid());
        control.setName(controlMessage.getName());
        control.setFunctionid(controlMessage.getFunctionid());
        control.setValue(controlMessage.getValue());
        control.setResult(controlMessage.getResult());
        control.setErrordesc(controlMessage.getErrordesc());
        controlMapper.insertControl(control);
    }

    @Override
    public void handleAlarm(AlarmMessage alarmMessage) {
        try {
            // 存儲報警信息表 轉(zhuǎn)換為報警信息表(alarmMapper)
            String alarmId = createAlarmData(alarmMessage);
            // 批量存儲儀表數(shù)據(jù)
            List<AlarmDetail> alarmEntities = alarmMessage.getFunction().stream()
                    .map(m -> {
                        AlarmDetail entity = new AlarmDetail();
                        entity.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
                        entity.setAlarmId(alarmId);
                        entity.setPtId(m.getId());
                        entity.setAlarmType(m.getAlarmType());
                        entity.setLabel(m.getLabel());
                        entity.setCurrentValue(m.getCurrentValue());
                        entity.setSettingValue(m.getSettingValue());
                        entity.setLevel(m.getLevel());
                        return entity;
                    }).toList();
            for (AlarmDetail alarm : alarmEntities) {
                alarmDetailMapper.insertAlarmDetail(alarm);
            }
            log.info("[上報數(shù)據(jù)] 存儲成功: substationId={}, gatewayId={}",
                    alarmMessage.getSaleid(), alarmMessage.getGateid());

        } catch (Exception e) {
            log.error("[上報數(shù)據(jù)] 處理失敗: {}", e.getMessage());
        }

    }

    @Override
    public void sendCallRead(String saleid, String gateid, String startTime, String endTime) {
        HashMap<String, String> protocol = new HashMap<>();
        protocol.put("saleid", saleid);
        protocol.put("gateid", gateid);
        protocol.put("type", MessageConstants.CALL_READ);
        protocol.put("time", String.valueOf(LocalDateTime.now()));
        protocol.put("startTime", startTime);
        protocol.put("endTime", endTime);
        String topic = String.format("report/allpoints", saleid, gateid);
        mqttGateway.sendMessage(topic, JSON.toJSONString(protocol));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(protocol));
    }

    @Override
    public void handleCallReadAck(BaseMessage baseMsg) {
        Report report = new Report();
        report.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        report.setSaleid(baseMsg.getSaleid());
        report.setGateid(baseMsg.getGateid());
        report.setTime(baseMsg.getTime());
        report.setType(baseMsg.getType());
        reportMapper.insert(report);
    }

    @Override
    public void handleControlAck(ControlMessage controlAck) {
        if(ResultConstants.FAILURE.equals(controlAck.getResult())) {
            createControl(controlAck);
            // 配置或設(shè)備問題,記錄錯誤并報警
            log.error("控制失?。ú豢芍卦嚕? {}", controlAck.getErrordesc());
        } else if(ResultConstants.SUCCESS.equals(controlAck.getResult())) {
            createControl(controlAck);
            log.info("控制成功: {}", controlAck.getCuuid());
        }
    }

    @Override
    public void getCurrentData(BaseMessage baseMessage) {
        String topic = String.format("report/allpoints", baseMessage.getSaleid(), baseMessage.getGateid());
        mqttGateway.sendMessage(topic, JSON.toJSONString(baseMessage));
        log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(baseMessage));
    }

    @Override
    public void sendMultiValueSet(MultiValueSetMessage multiValueSetMessage) {

    }

    @Override
    public void handleMultiValueSet(MultiValueSetMessage multiValueSetMessage) {
        String topic = String.format("report/allpoints", multiValueSetMessage.getSaleid(), multiValueSetMessage.getGateid());
        ControlMessage controlMessage = new ControlMessage();
        try {
            mqttGateway.sendMessage(topic, JSON.toJSONString(multiValueSetMessage));
            log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            if(ResultConstants.SUCCESS.equals(multiValueSetMessage.getResult())) {
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.info("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            } else if(ResultConstants.FAILURE.equals(multiValueSetMessage.getResult())){
                BeanUtils.copyProperties(multiValueSetMessage, controlMessage);
                createControl(controlMessage);
                log.error("[控制失敗] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
            }
        } catch (Exception e) {
            log.error("[控制命令] topic={}, command={}", topic, JSON.toJSONString(multiValueSetMessage));
        }

    }

    private String createAlarmData(AlarmMessage alarmMessage) {
        Alarm alarm = new Alarm();
        alarm.setId(String.valueOf(CustomIdGenerator.SEQUENCE.nextId()));
        alarm.setSaleid(alarmMessage.getSaleid());
        alarm.setGateid(alarmMessage.getGateid());
        alarm.setTime(alarmMessage.getTime());
        alarm.setType(alarmMessage.getType());
        alarm.setSequence(alarmMessage.getSequence());
        alarm.setName(alarmMessage.getName());
        alarm.setMeterid(alarmMessage.getMeterid());
        alarm.setCreateTime(LocalDateTime.now());
        alarmMapper.insert(alarm);
        return alarm.getId();
    }

}

5.3 發(fā)送MQTT消息

通過MqttMessageGateway發(fā)送消息:

@Autowired
private MqttMessageGateway mqttMessageGateway;

public void sendTestMessage() {
    mqttMessageGateway.sendMessage("testSub/topic", "{\"key\":\"value\"}");
}

6. 總結(jié)

本文介紹了如何使用Spring Integration框架實現(xiàn)MQTT協(xié)議的對接,包括客戶端的配置、消息的訂閱與發(fā)布、以及消息的處理邏輯。通過上述代碼,您可以快速實現(xiàn)Java與MQTT的集成,并根據(jù)業(yè)務(wù)需求擴展消息的處理邏輯。

到此這篇關(guān)于Java對接MQTT協(xié)議的文章就介紹到這了,更多相關(guān)Java對接MQTT協(xié)議內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中ArrayList刪除的常用操作及方法

    Java中ArrayList刪除的常用操作及方法

    ArrayList是最常用的一種java集合,在開發(fā)中我們常常需要從ArrayList中刪除特定元素,本文主要介紹了Java中ArrayList刪除的常用操作及方法,感興趣的可以了解一下
    2023-11-11
  • 詳解JAVA Stream流

    詳解JAVA Stream流

    這篇文章主要介紹了JAVA Stream流的相關(guān)資料,文中講解非常細致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下
    2020-07-07
  • Java反射通過Getter方法獲取對象VO的屬性值過程解析

    Java反射通過Getter方法獲取對象VO的屬性值過程解析

    這篇文章主要介紹了Java反射通過Getter方法獲取對象VO的屬性值過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-02-02
  • Java 基礎(chǔ)之事務(wù)詳細介紹

    Java 基礎(chǔ)之事務(wù)詳細介紹

    這篇文章主要介紹了Java 基礎(chǔ)之事務(wù)詳細介紹的相關(guān)資料,需要的朋友可以參考下
    2017-03-03
  • 如何解決遇到的錯誤信息?java:?找不到符號?符號:?變量?log

    如何解決遇到的錯誤信息?java:?找不到符號?符號:?變量?log

    使用Lombok的@Slf4j注解時出現(xiàn)log變量找不到問題,這篇文章主要介紹了如何解決遇到的錯誤信息?java:?找不到符號?符號:?變量?log的相關(guān)資料,文中將解決的辦法介紹的非常詳細,需要的朋友可以參考下
    2025-05-05
  • Java中的方法內(nèi)聯(lián)介紹

    Java中的方法內(nèi)聯(lián)介紹

    大家好,本篇文章主要講的是Java中的方法內(nèi)聯(lián)介紹,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-01-01
  • JavaWeb開發(fā)實現(xiàn)備忘錄

    JavaWeb開發(fā)實現(xiàn)備忘錄

    這篇文章主要為大家詳細介紹了JavaWeb開發(fā)實現(xiàn)備忘錄,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Java數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹的相關(guān)資料

    Java數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹的相關(guān)資料

    這篇文章主要介紹了Java?數(shù)據(jù)結(jié)構(gòu)之樹和二叉樹相關(guān)資料,文中通過示例代碼和一些相關(guān)題目來做介紹,非常詳細。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下!
    2023-01-01
  • Mybatis中自定義TypeHandler處理枚舉詳解

    Mybatis中自定義TypeHandler處理枚舉詳解

    本文主要介紹了Mybatis中自定義TypeHandler處理枚舉的相關(guān)知識。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-02-02
  • intellij idea tomcat熱部署配置教程

    intellij idea tomcat熱部署配置教程

    這篇文章主要介紹了intellij idea tomcat熱部署配置教程圖解,非常不錯,具有一定的參考借鑒價值,需要的朋友參考下吧
    2018-07-07

最新評論