欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java連接Emqx實現(xiàn)訂閱發(fā)布消息的步驟記錄

 更新時間:2025年09月22日 10:34:31   作者:一杯冰美式_丶  
這篇文章主要介紹了Java連接Emqx實現(xiàn)訂閱發(fā)布消息的步驟記錄,EMQX是大規(guī)模分布式MQTT消息服務(wù)器,可以高效可靠連接海量物聯(lián)網(wǎng)設(shè)備,實時處理分發(fā)消息與事件流數(shù)據(jù),助力構(gòu)建關(guān)鍵業(yè)務(wù)的物聯(lián)網(wǎng)與云應(yīng)用,需要的朋友可以參考下

一:前提

安裝了Emqx開源版、MQTTX客戶端

二:訂閱發(fā)布實現(xiàn)步驟

1.引入依賴

<!--MQTT客戶端-->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

2.編輯配置文件

mqtt:
  broker:
    uri: tcp://127.0.0.1:31883
  client:
    id: mqtt-am-client-${random.uuid}
  # 訂閱主題配置(支持多個)
  inTopics:
    - topic: test/topic1
      qos: 0
    - topic: test/topic2
      qos: 1
    - topic: test/topic3
      qos: 2
  # 發(fā)布主題配置(支持多個)
  outTopics:
    - topic: out/topic1
      qos: 0
  username: am
  password: LGyPtuAB4th5p
  keepAliveInterval: 60

3.讀取配置文件

package com.wtzn.web.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
    private Broker broker;
    private Client client;
    private List<TopicConfig> inTopics;
    private List<TopicConfig> outTopics;
    private String userName;
    private String password;
    private int KeepAliveInterval;

    @Data
    public static class Broker {
        private String uri;
    }

    @Data
    public static class Client {
        private String id;
    }
    @Data
    public static class TopicConfig {
        private String topic;
        private int qos;
    }

}

4.創(chuàng)建Mqtt客戶端

package com.wtzn.web.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Autowired
    private MqttProperties mqttProperties;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 此客戶端的用戶名和密碼
        options.setUserName(mqttProperties.getUserName());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        options.setCleanSession(true);
        // 設(shè)置遺囑消息
      //  options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下線,這是我的遺囑".getBytes(), 2, true);
        // 連接超時重試
        options.setConnectionTimeout(5000); //毫秒
        options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(true);//網(wǎng)絡(luò)中斷重連
        client.connect(options);
        return client;
    }
}

5.controller層

package com.wtzn.web.controller;

import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.LinkedList;


@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttService mqttService;

    @SaIgnore
    @PostMapping("/mqtt")
    public void publish() {
        try {
          //  LinkedList<Payload> payloadLinkedList=new LinkedList<>();
            for(int i=1; i<=10000; i++){
                Payload payload=new Payload();
                payload.setTemperature(i);
              //  payloadLinkedList.add(payload);
                mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
            }

        } catch (MqttException e) {
            log.error("發(fā)布消息失敗{}", e.getMessage());
        }
        log.info("發(fā)布消息成功");
    }


}

6.service層

package com.wtzn.web.service;

import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;


@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {

    @Autowired
    private MqttClient mqttClient;

    @Autowired
    private MqttProperties mqttProperties;
    
    @PostConstruct
    public void init() throws MqttException {
        mqttClient.setCallback(this);
 /*       mqttClient.subscribe(mqttProperties.getInTopic());
        log.info("訂閱主題{}", mqttProperties.getInTopic());
*/
        mqttProperties.getInTopics().forEach(x -> {
            try {
                mqttClient.subscribe(x.getTopic(), x.getQos());
                log.info("訂閱主題{}", x.getTopic());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        });

    }

    @PreDestroy
    public void destroy() throws MqttException {
        mqttClient.disconnect();
        log.info("與服務(wù)器斷開連接");
    }

    /**
     * @description: 發(fā)送消息
     * @param: [message]
     * @return: void
     **/
    public void publish(String topic,int qos,String message) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        mqttClient.publish(topic, mqttMessage);
        log.info("向主題【{}】發(fā)布消息:【{}】", topic, message);
    }


    /**
     * @description: 接收消息
     * @param: [topic, message]
     * @return: void
     **/
    @Override
    public void messageArrived(String topic, MqttMessage message) throws MqttException {
        Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
        log.info("接收到來自【{}】的消息【{}】", topic, payload.getTemperature());
      /*  if (payload.getTemperature() > 37) {
            publish("發(fā)燒");
        }*/


    }


    @Override
    public void connectionLost(Throwable cause) {
        log.error("連接丟失:{}", cause.getMessage());
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = null;
            try {
                message = token.getMessage();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }

        log.info("消息已送達(dá)");
    }

    @Override
    public void connectComplete(boolean b, String s) {

            mqttProperties.getInTopics().forEach(x -> {
                try {
                    mqttClient.subscribe(x.getTopic(), x.getQos());
                    log.info("訂閱主題{}", x.getTopic());
                } catch (MqttException e) {
                    throw new RuntimeException(e);
                }
            });
    }
}

7.dao層

package com.wtzn.web.domain.bo;

import lombok.Data;

@Data
public class Payload {
    private Integer temperature;
}

三:測試

1.PostMan直接調(diào)用測試

2、下載MQTTX客戶端進(jìn)行測試

總結(jié) 

到此這篇關(guān)于Java連接Emqx實現(xiàn)訂閱發(fā)布消息的文章就介紹到這了,更多相關(guān)Java Emqx訂閱發(fā)布消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 解析SpringBoot中@Autowire注解的實現(xiàn)原理

    解析SpringBoot中@Autowire注解的實現(xiàn)原理

    在開發(fā)Java項目時,依賴注入是一種常見的實現(xiàn)方式,SpringBoot框架通過@Autowired注解來實現(xiàn)依賴注入的功能,本文將介紹SpringBoot中 Autowired注解實現(xiàn)的原理
    2023-06-06
  • Eclipse中自動添加注釋(兩種)

    Eclipse中自動添加注釋(兩種)

    本文主要介紹了Eclipse中自動添加注釋的兩種方法。具有很好的參考價值,下面跟著小編一起來看下吧
    2017-02-02
  • idea打開項目后無法顯示目錄結(jié)構(gòu),只能顯示.iml文件問題

    idea打開項目后無法顯示目錄結(jié)構(gòu),只能顯示.iml文件問題

    這篇文章主要介紹了idea打開項目后無法顯示目錄結(jié)構(gòu),只能顯示.iml文件問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • 這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作

    這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作

    這篇文章主要介紹了這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • 使用Java servlet實現(xiàn)自動登錄退出功能

    使用Java servlet實現(xiàn)自動登錄退出功能

    這篇文章主要介紹了使用Java servlet實現(xiàn)自動登錄退出功能,,本文通過實例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-11-11
  • IDEA Project不顯示/缺失文件問題及解決

    IDEA Project不顯示/缺失文件問題及解決

    在側(cè)邊欄的project模式下,如果發(fā)現(xiàn)缺少部分文件,可以嘗試關(guān)閉項目,打開項目所在目錄,刪除目錄下的.idea文件夾,然后重新打開項目即可解決
    2024-11-11
  • MapStruct對象映射轉(zhuǎn)換解決Bean屬性拷貝性能問題

    MapStruct對象映射轉(zhuǎn)換解決Bean屬性拷貝性能問題

    無意間看到項目中有小伙伴用到了 MapStruct 來做對象映射轉(zhuǎn)換當(dāng)時我就很好奇,這個是什么框架,能夠解決什么問題,帶著這兩個疑問就有了下面的文章
    2022-02-02
  • Java 精煉解讀方法的定義與使用

    Java 精煉解讀方法的定義與使用

    Java語言中的“方法”(Method)在其他語言當(dāng)中也可能被稱為“函數(shù)”(Function)。對于一些復(fù)雜的代碼邏輯,如果希望重復(fù)使用這些代碼,并且做到“隨時任意使用”,那么就可以將這些代碼放在一個大括號“{}”當(dāng)中,并且起一個名字。使用的時候,直接找到名字調(diào)用即可
    2022-03-03
  • Springboot異常錯誤處理解決方案詳解

    Springboot異常錯誤處理解決方案詳解

    這篇文章主要介紹了Springboot異常錯誤處理解決方案詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • JDK動態(tài)代理接口和接口實現(xiàn)類深入詳解

    JDK動態(tài)代理接口和接口實現(xiàn)類深入詳解

    這篇文章主要介紹了JDK動態(tài)代理接口和接口實現(xiàn)類,JDK動態(tài)代理是代理模式的一種實現(xiàn)方式,因為它是基于接口來做代理的,所以也常被稱為接口代理,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-06-06

最新評論