欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成Eclipse Mosquitto的實(shí)現(xiàn)示例

 更新時(shí)間:2025年08月26日 08:28:47   作者:墨鴉_Cormorant  
本文主要介紹了SpringBoot集成Eclipse Mosquitto的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

添加 MQTT 客戶端依賴

在 Spring Boot 項(xiàng)目的 pom.xml 中添加 Eclipse Paho MQTT 客戶端依賴(主流的 MQTT Java 客戶端):

<!-- MQTT 客戶端 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

配置 MQTT 連接參數(shù)

application.yml(或 application.properties)中配置 Mosquitto 連接信息:

mqtt:
  # 是否啟用
  enable: true
  # Mosquitto 服務(wù)地址(非加密端口),若啟用 TLS 加密,使用 ssl://localhost:8883
  broker: tcp://localhost:1883
  # 客戶端唯一標(biāo)識(shí)(建議加隨機(jī)數(shù)避免沖突)
  client-id: springboot-mqtt-client
  # 認(rèn)證用戶名(Mosquitto 啟用認(rèn)證時(shí)必填)
  username: user1
  # 認(rèn)證密碼
  password: 123456
  # 默認(rèn) QoS 等級(jí)(0/1/2)
  defalut-qos: 1
  # 心跳間隔(秒)
  keep-alive: 60

實(shí)現(xiàn) MQTT 客戶端(發(fā)布 + 訂閱)

MQTT 客戶端配置類

配置實(shí)體類

import lombok.Data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "mqtt")
@ConditionalOnProperty(name = "mqtt.enable", havingValue = "true")
public class MqttProperties {
    /**
     * 是否啟用
     */
    private boolean enable;
    /**
     * Mosquitto 服務(wù)地址(非加密端口)。若啟用 TLS 加密,使用 ssl://localhost:8883
     */
    private String broker;
    /**
     * 客戶端唯一標(biāo)識(shí)(建議加隨機(jī)數(shù)避免沖突)
     */
    private String clientId;
    /**
     * 認(rèn)證用戶名(Mosquitto 啟用認(rèn)證時(shí)必填)
     */
    private String username;
    /**
     * 認(rèn)證密碼
     */
    private String password;
    /**
     * 默認(rèn) QoS 等級(jí)(0/1/2),非關(guān)鍵數(shù)據(jù)用 QoS 0,重要狀態(tài)用 QoS 1,核心控制指令用 QoS 2
     */
    private int defaultQos;
    /**
     * 心跳間隔(秒)
     */
    private int keepAlive = 60;
}

配置類

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

@Slf4j
@Configuration
@ConditionalOnBean(MqttProperties.class)
public class MqttConfig {
    private final MqttProperties mqttProp;

    public MqttConfig(MqttProperties mqttProp) {
        this.mqttProp = mqttProp;
    }

    /**
     * 創(chuàng)建 MQTT 客戶端實(shí)例
     */
    @Bean
    public MqttClient mqttClient() throws MqttException {
        // 客戶端 ID 建議添加隨機(jī)數(shù),避免重復(fù)連接
        String clientIdWithRandom = mqttProp.getClientId() + "_" + System.currentTimeMillis();
        MqttClient client = new MqttClient(mqttProp.getBroker(), clientIdWithRandom, new MemoryPersistence());

        // 配置連接參數(shù)
        MqttConnectOptions options = new MqttConnectOptions();
        if (StringUtils.hasText((mqttProp.getUsername())))
            options.setUserName(mqttProp.getUsername());
        if (StringUtils.hasText((mqttProp.getPassword())))
            options.setPassword(mqttProp.getPassword().toCharArray());
        options.setKeepAliveInterval(mqttProp.getKeepAlive());
        // 自動(dòng)重連
        options.setAutomaticReconnect(true);
        // 不清除會(huì)話(保留訂閱關(guān)系和未確認(rèn)消息)
        options.setCleanSession(false);

        // 連接回調(diào)(處理連接狀態(tài))
        client.setCallback(new MqttCallback() {
            /**
             * 連接斷開(kāi)時(shí)觸發(fā),可在此實(shí)現(xiàn)重連邏輯
             */
            @Override
            public void connectionLost(Throwable cause) {
                log.error("MQTT 連接斷開(kāi),原因:{}", cause.getMessage());
            }

            /**
             * 收到訂閱的消息時(shí)觸發(fā),用于處理業(yè)務(wù)邏輯(如存儲(chǔ)數(shù)據(jù)到數(shù)據(jù)庫(kù))
             */
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // 接收消息回調(diào)(訂閱的主題有消息時(shí)觸發(fā))
                String content = new String(message.getPayload());
                log.debug("收到消息 - 主題:{},內(nèi)容:{}", topic, content);
                // TODO 業(yè)務(wù)邏輯
            }

            /**
             * 消息發(fā)布完成后觸發(fā),可用于確認(rèn)消息已送達(dá)
             */
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // 消息發(fā)布完成回調(diào)
                try {
                    log.debug("消息發(fā)布成功,主題:{}", token.getTopics()[0]);
                } catch (Exception e) {
                    log.error("", e);
                }
            }
        });

        // 連接到 Mosquitto
        client.connect(options);
        log.info("MQTT 連接成功:{}", mqttProp.getClientId());
        return client;
    }
}

發(fā)布和訂閱工具類

消息發(fā)布工具類

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

/**
 * MQTT 消息發(fā)布工具類
 */
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttPublisher {
    private final MqttClient mqttClient;
    private final MqttProperties mqttProp;

    public MqttPublisher(MqttClient mqttClient, MqttProperties mqttProp) {
        this.mqttClient = mqttClient;
        this.mqttProp = mqttProp;
    }

    /**
     * 發(fā)布消息到指定主題
     * @param topic 主題
     * @param content 消息內(nèi)容
     */
    public void publish(String topic, String content) throws MqttException {
        publish(topic, content, mqttProp.getDefaultQos());
    }

    /**
     * 發(fā)布消息到指定主題(自定義QoS)
     * @param topic 主題
     * @param content 消息內(nèi)容
     * @param qos QoS等級(jí)
     */
    public void publish(String topic, String content, int qos) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect(); // 若斷開(kāi)連接,嘗試重連
        }
        log.debug("發(fā)布消息,主題:{},內(nèi)容:{}, QoS等級(jí):{}", topic, content, qos);
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        mqttClient.publish(topic, message);
    }
}

消息訂閱工具類

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

/**
 * MQTT 消息訂閱工具類
 */
@Slf4j
@Component
@ConditionalOnBean(MqttProperties.class)
public class MqttSubscriber {
    private final MqttClient mqttClient;
    private final MqttProperties mqttProp;

    public MqttSubscriber(MqttClient mqttClient, MqttProperties mqttProp) {
        this.mqttClient = mqttClient;
        this.mqttProp = mqttProp;
    }

    /**
     * 訂閱指定主題
     * @param topic 主題(支持通配符,如 sensor/+)
     */
    public void subscribe(String topic) throws MqttException {
        subscribe(topic, mqttProp.getDefaultQos());
    }

    /**
     * 訂閱指定主題(自定義QoS)
     * @param topic 主題
     * @param qos QoS等級(jí)
     */
    public void subscribe(String topic, int qos) throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.reconnect();
        }
        mqttClient.subscribe(topic, qos);
        log.info("已訂閱主題:{},QoS等級(jí):{}", topic, qos);
    }

    /**
     * 取消訂閱主題
     * @param topic 主題
     */
    public void unsubscribe(String topic) throws MqttException {
        mqttClient.unsubscribe(topic);
        log.info("已取消訂閱主題:{}", topic);
    }
}

測(cè)試 MQTT 功能

創(chuàng)建一個(gè)測(cè)試控制器,驗(yàn)證消息發(fā)布和訂閱:

import com.blackcrow.test.mqtt.config.MqttPublisher;
import com.blackcrow.test.mqtt.config.MqttSubscriber;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MqttTestController {
    @Autowired
    private MqttPublisher mqttPublisher;
    @Autowired
    private MqttSubscriber mqttSubscriber;

    @Value("${mqtt.default-topic:topic/temp}")
    private String defaultTopic;

    /**
     * 訂閱主題
     */
    @GetMapping("/subscribe")
    public String subscribe(@RequestParam(required = false) String topic) {
        try {
            String targetTopic = topic != null ? topic : defaultTopic;
            mqttSubscriber.subscribe(targetTopic);
            return "訂閱成功:" + targetTopic;
        } catch (MqttException e) {
            return "訂閱失敗:" + e.getMessage();
        }
    }

    /**
     * 發(fā)布消息
      */
    @GetMapping("/publish")
    public String publish(@RequestParam(required = false) String topic, @RequestParam String message) {
        try {
            String targetTopic = topic != null ? topic : defaultTopic;
            mqttPublisher.publish(targetTopic, message);
            return "發(fā)布成功:主題=" + targetTopic + ",消息=" + message;
        } catch (MqttException e) {
            return "發(fā)布失?。? + e.getMessage();
        }
    }
}

到此這篇關(guān)于SpringBoot集成Eclipse Mosquitto的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot集成Eclipse Mosquitto內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

最新評(píng)論