Java連接Emqx實現(xiàn)訂閱發(fā)布消息的步驟記錄
一:前提
安裝了Emqx開源版、MQTTX客戶端
二:訂閱發(fā)布實現(xiàn)步驟
1.引入依賴
<!--MQTT客戶端-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>2.編輯配置文件
mqtt:
broker:
uri: tcp://127.0.0.1:31883
client:
id: mqtt-am-client-${random.uuid}
# 訂閱主題配置(支持多個)
inTopics:
- topic: test/topic1
qos: 0
- topic: test/topic2
qos: 1
- topic: test/topic3
qos: 2
# 發(fā)布主題配置(支持多個)
outTopics:
- topic: out/topic1
qos: 0
username: am
password: LGyPtuAB4th5p
keepAliveInterval: 603.讀取配置文件
package com.wtzn.web.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Configuration
@ConfigurationProperties(prefix = "mqtt")
@Data
public class MqttProperties {
private Broker broker;
private Client client;
private List<TopicConfig> inTopics;
private List<TopicConfig> outTopics;
private String userName;
private String password;
private int KeepAliveInterval;
@Data
public static class Broker {
private String uri;
}
@Data
public static class Client {
private String id;
}
@Data
public static class TopicConfig {
private String topic;
private int qos;
}
}4.創(chuàng)建Mqtt客戶端
package com.wtzn.web.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(mqttProperties.getBroker().getUri(), mqttProperties.getClient().getId(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
// 此客戶端的用戶名和密碼
options.setUserName(mqttProperties.getUserName());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setCleanSession(true);
// 設(shè)置遺囑消息
// options.setWill(mqttProperties.getOutTopic(), "我是mqtt-am-client,我已下線,這是我的遺囑".getBytes(), 2, true);
// 連接超時重試
options.setConnectionTimeout(5000); //毫秒
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
options.setAutomaticReconnect(true);//網(wǎng)絡(luò)中斷重連
client.connect(options);
return client;
}
}5.controller層
package com.wtzn.web.controller;
import cn.dev33.satoken.annotation.SaIgnore;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.domain.bo.Payload;
import com.wtzn.web.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.LinkedList;
@RestController
@Slf4j
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttService mqttService;
@SaIgnore
@PostMapping("/mqtt")
public void publish() {
try {
// LinkedList<Payload> payloadLinkedList=new LinkedList<>();
for(int i=1; i<=10000; i++){
Payload payload=new Payload();
payload.setTemperature(i);
// payloadLinkedList.add(payload);
mqttService.publish("test/topic1",0,JsonUtils.toJsonString(payload));
}
} catch (MqttException e) {
log.error("發(fā)布消息失敗{}", e.getMessage());
}
log.info("發(fā)布消息成功");
}
}6.service層
package com.wtzn.web.service;
import com.wtzn.common.json.utils.JsonUtils;
import com.wtzn.web.config.MqttProperties;
import com.wtzn.web.domain.bo.Payload;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
@Service
@Slf4j
public class MqttService implements MqttCallbackExtended {
@Autowired
private MqttClient mqttClient;
@Autowired
private MqttProperties mqttProperties;
@PostConstruct
public void init() throws MqttException {
mqttClient.setCallback(this);
/* mqttClient.subscribe(mqttProperties.getInTopic());
log.info("訂閱主題{}", mqttProperties.getInTopic());
*/
mqttProperties.getInTopics().forEach(x -> {
try {
mqttClient.subscribe(x.getTopic(), x.getQos());
log.info("訂閱主題{}", x.getTopic());
} catch (MqttException e) {
throw new RuntimeException(e);
}
});
}
@PreDestroy
public void destroy() throws MqttException {
mqttClient.disconnect();
log.info("與服務(wù)器斷開連接");
}
/**
* @description: 發(fā)送消息
* @param: [message]
* @return: void
**/
public void publish(String topic,int qos,String message) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
mqttClient.publish(topic, mqttMessage);
log.info("向主題【{}】發(fā)布消息:【{}】", topic, message);
}
/**
* @description: 接收消息
* @param: [topic, message]
* @return: void
**/
@Override
public void messageArrived(String topic, MqttMessage message) throws MqttException {
Payload payload = JsonUtils.parseObject(new String(message.getPayload()), Payload.class);
log.info("接收到來自【{}】的消息【{}】", topic, payload.getTemperature());
/* if (payload.getTemperature() > 37) {
publish("發(fā)燒");
}*/
}
@Override
public void connectionLost(Throwable cause) {
log.error("連接丟失:{}", cause.getMessage());
}
@SneakyThrows
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
if( token!=null ){
MqttMessage message = null;
try {
message = token.getMessage();
} catch (MqttException e) {
throw new RuntimeException(e);
}
String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
String str = message==null ? null : new String(message.getPayload());
log.info("deliveryComplete: topic={}, message={}", topic, str);
} else {
log.info("deliveryComplete: null");
}
log.info("消息已送達(dá)");
}
@Override
public void connectComplete(boolean b, String s) {
mqttProperties.getInTopics().forEach(x -> {
try {
mqttClient.subscribe(x.getTopic(), x.getQos());
log.info("訂閱主題{}", x.getTopic());
} catch (MqttException e) {
throw new RuntimeException(e);
}
});
}
}7.dao層
package com.wtzn.web.domain.bo;
import lombok.Data;
@Data
public class Payload {
private Integer temperature;
}三:測試
1.PostMan直接調(diào)用測試



2、下載MQTTX客戶端進(jìn)行測試



總結(jié)
到此這篇關(guān)于Java連接Emqx實現(xiàn)訂閱發(fā)布消息的文章就介紹到這了,更多相關(guān)Java Emqx訂閱發(fā)布消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解析SpringBoot中@Autowire注解的實現(xiàn)原理
在開發(fā)Java項目時,依賴注入是一種常見的實現(xiàn)方式,SpringBoot框架通過@Autowired注解來實現(xiàn)依賴注入的功能,本文將介紹SpringBoot中 Autowired注解實現(xiàn)的原理2023-06-06
idea打開項目后無法顯示目錄結(jié)構(gòu),只能顯示.iml文件問題
這篇文章主要介紹了idea打開項目后無法顯示目錄結(jié)構(gòu),只能顯示.iml文件問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08
這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作
這篇文章主要介紹了這一次搞懂Spring代理創(chuàng)建及AOP鏈?zhǔn)秸{(diào)用過程操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08
使用Java servlet實現(xiàn)自動登錄退出功能
這篇文章主要介紹了使用Java servlet實現(xiàn)自動登錄退出功能,,本文通過實例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2019-11-11
MapStruct對象映射轉(zhuǎn)換解決Bean屬性拷貝性能問題
無意間看到項目中有小伙伴用到了 MapStruct 來做對象映射轉(zhuǎn)換當(dāng)時我就很好奇,這個是什么框架,能夠解決什么問題,帶著這兩個疑問就有了下面的文章2022-02-02
JDK動態(tài)代理接口和接口實現(xiàn)類深入詳解
這篇文章主要介紹了JDK動態(tài)代理接口和接口實現(xiàn)類,JDK動態(tài)代理是代理模式的一種實現(xiàn)方式,因為它是基于接口來做代理的,所以也常被稱為接口代理,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06

