Springboot集成mqtt客戶端詳解
1. 前言
? 這里我們使用springboot搭建一個輕量級的mqtt客戶端,連接mqtt的Broker服務(wù)。
? 連接信息寫在配置文件里application.properties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60
2. 引入依賴
<!--mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
3. 配置文件
? 新建MqttProperties.java文件,初始化application里的mqtt配置項
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {
private String username;
private String mqpassword;
private String hostUrl;
private String clientId;
private String defaultTopic;
private String completionTimeout;
private Integer keepAlive;
}? 新建MqttConfiguration.java文件,為mqtt做初始化配置
@Configuration
@Slf4j
public class MqttConfiguration {
@Autowired
private MqttProperties mqttProperties;
/**
* 事件觸發(fā)
*/
@Autowired
private ApplicationEventPublisher eventPublisher;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置client,監(jiān)聽的topic
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
mqttProperties.getDefaultTopic().split(","));
adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
adapter.setConverter(new DefaultPahoMessageConverter());
//默認添加TopicName中所有tipic
adapter.addTopic("+/+/test");
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String qos = message.getHeaders().get("mqtt_receivedQos").toString();
//觸發(fā)事件 這里不再做業(yè)務(wù)處理,包 listener中做處理
eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
}
};
}
/**
* 發(fā)送消息和消費消息Channel可以使用相同MqttPahoClientFactory
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
// 在這里進行mqttOutboundChannel的相關(guān)設(shè)置
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
// 如果設(shè)置成true,發(fā)送消息時將不會阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}4. MQTT消息類
新建MqttEvent.java 消息類。用于發(fā)送mqtt的消息
@Getter
public class MqttEvent extends ApplicationEvent {
private String topic;
/**
* 發(fā)送的消息
*/
private String message;
public MqttEvent(Object source,String topic,String message) {
super(source);
this.topic = topic;
this.message = message;
}
}5. MQTT消息接收器
新建JobListener.java文件作為 mqtt的消息接收類
@Slf4j
@Component
public class JobListener {
@Autowired
DeviceDao deviceDao;
/**
* 監(jiān)聽topic
* @param mqttEvent
*/
@EventListener(condition = "#mqttEvent.topic.startsWith('pay')")
public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {
String topic = mqttEvent.getTopic();
//寫邏輯處理
}
/**
* 監(jiān)聽topic
* @param mqttEvent
*/
@EventListener(condition = "#mqttEvent.topic.equals('device')")
public void onEmqttCallT(MqttEvent mqttEvent){
log.info("接收到消11111111111:"+mqttEvent.getMessage());
}
}6. MQTT消息發(fā)送器
新建MqttGateway.java 提供發(fā)送mqttt消息的接口服務(wù)
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}7. 測試MQTT發(fā)送消息
@SpringBootTest
public class Test3 {
@Autowired
MqttGateway mqttGateway;
@Test
public void mqttTest () {
mqttGateway.sendToMqtt("111//222/33","消息內(nèi)容");
}
}到此這篇關(guān)于Springboot集成mqtt客戶端詳解的文章就介紹到這了,更多相關(guān)Springboot集成mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法
這篇文章主要介紹了Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
java 出現(xiàn)Zipexception 異常的解決辦法
這篇文章主要介紹了java 出現(xiàn)Zipexception 異常的解決辦法的相關(guān)資料,出現(xiàn) java.util.zip.ZipException: error in opening zip file 異常的原因及解決方法,需要的朋友可以參考下2017-08-08
java編程實現(xiàn)獲取服務(wù)器IP地址及MAC地址的方法
這篇文章主要介紹了java編程實現(xiàn)獲取機器IP地址及MAC地址的方法,實例分析了Java分別針對單網(wǎng)卡及多網(wǎng)卡的情況下獲取服務(wù)器IP地址與MAC地址的相關(guān)技巧,需要的朋友可以參考下2015-11-11
Java線上問題排查神器Arthas實戰(zhàn)原理解析
原先我們Java中我們常用分析問題一般是使用JDK自帶或第三方的分析工具如jstat、jmap、jstack、?jconsole、visualvm、Java?Mission?Control、MAT等,還有一款神器Arthas工具,可幫助程序員解決很多繁瑣的問題,感興趣的朋友一起看看吧2022-01-01
Java簡化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計模式
Java門面模式是一種結(jié)構(gòu)性設(shè)計模式,它為復(fù)雜系統(tǒng)提供了一個簡單的接口,使得系統(tǒng)的客戶端能夠更加方便地使用系統(tǒng)功能。門面模式通過封裝復(fù)雜的子系統(tǒng),隱藏系統(tǒng)的實現(xiàn)細節(jié),提高了系統(tǒng)的易用性和靈活性2023-04-04
SpringBoot2零基礎(chǔ)到精通之JUnit 5與指標(biāo)監(jiān)控
SpringBoot是一種整合Spring技術(shù)棧的方式(或者說是框架),同時也是簡化Spring的一種快速開發(fā)的腳手架,本篇讓我們一起學(xué)習(xí)JUnit 5與指標(biāo)監(jiān)控2022-03-03

