Springboot集成mqtt客戶端詳解
1. 前言
? 這里我們使用springboot搭建一個輕量級的mqtt客戶端,連接mqtt的Broker服務(wù)。
? 連接信息寫在配置文件里application.properties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60
2. 引入依賴
<!--mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
3. 配置文件
? 新建MqttProperties.java文件,初始化application里的mqtt配置項
@ConfigurationProperties("spring.mqtt") @Component @Getter @Setter public class MqttProperties { private String username; private String mqpassword; private String hostUrl; private String clientId; private String defaultTopic; private String completionTimeout; private Integer keepAlive; }
? 新建MqttConfiguration.java文件,為mqtt做初始化配置
@Configuration @Slf4j public class MqttConfiguration { @Autowired private MqttProperties mqttProperties; /** * 事件觸發(fā) */ @Autowired private ApplicationEventPublisher eventPublisher; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()}); mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,監(jiān)聽的topic */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(), mqttProperties.getDefaultTopic().split(",")); adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout())); adapter.setConverter(new DefaultPahoMessageConverter()); //默認(rèn)添加TopicName中所有tipic adapter.addTopic("+/+/test"); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String qos = message.getHeaders().get("mqtt_receivedQos").toString(); //觸發(fā)事件 這里不再做業(yè)務(wù)處理,包 listener中做處理 eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString())); } }; } /** * 發(fā)送消息和消費消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { // 在這里進(jìn)行mqttOutboundChannel的相關(guān)設(shè)置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory()); // 如果設(shè)置成true,發(fā)送消息時將不會阻塞。 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
4. MQTT消息類
新建MqttEvent.java 消息類。用于發(fā)送mqtt的消息
@Getter public class MqttEvent extends ApplicationEvent { private String topic; /** * 發(fā)送的消息 */ private String message; public MqttEvent(Object source,String topic,String message) { super(source); this.topic = topic; this.message = message; } }
5. MQTT消息接收器
新建JobListener.java文件作為 mqtt的消息接收類
@Slf4j @Component public class JobListener { @Autowired DeviceDao deviceDao; /** * 監(jiān)聽topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.startsWith('pay')") public void onEmqttCall1(MqttEvent mqttEvent) throws Exception { String topic = mqttEvent.getTopic(); //寫邏輯處理 } /** * 監(jiān)聽topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.equals('device')") public void onEmqttCallT(MqttEvent mqttEvent){ log.info("接收到消11111111111:"+mqttEvent.getMessage()); } }
6. MQTT消息發(fā)送器
新建MqttGateway.java 提供發(fā)送mqttt消息的接口服務(wù)
@Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
7. 測試MQTT發(fā)送消息
@SpringBootTest public class Test3 { @Autowired MqttGateway mqttGateway; @Test public void mqttTest () { mqttGateway.sendToMqtt("111//222/33","消息內(nèi)容"); } }
到此這篇關(guān)于Springboot集成mqtt客戶端詳解的文章就介紹到這了,更多相關(guān)Springboot集成mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法
這篇文章主要介紹了Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12java 出現(xiàn)Zipexception 異常的解決辦法
這篇文章主要介紹了java 出現(xiàn)Zipexception 異常的解決辦法的相關(guān)資料,出現(xiàn) java.util.zip.ZipException: error in opening zip file 異常的原因及解決方法,需要的朋友可以參考下2017-08-08java編程實現(xiàn)獲取服務(wù)器IP地址及MAC地址的方法
這篇文章主要介紹了java編程實現(xiàn)獲取機(jī)器IP地址及MAC地址的方法,實例分析了Java分別針對單網(wǎng)卡及多網(wǎng)卡的情況下獲取服務(wù)器IP地址與MAC地址的相關(guān)技巧,需要的朋友可以參考下2015-11-11Java線上問題排查神器Arthas實戰(zhàn)原理解析
原先我們Java中我們常用分析問題一般是使用JDK自帶或第三方的分析工具如jstat、jmap、jstack、?jconsole、visualvm、Java?Mission?Control、MAT等,還有一款神器Arthas工具,可幫助程序員解決很多繁瑣的問題,感興趣的朋友一起看看吧2022-01-01Java簡化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計模式
Java門面模式是一種結(jié)構(gòu)性設(shè)計模式,它為復(fù)雜系統(tǒng)提供了一個簡單的接口,使得系統(tǒng)的客戶端能夠更加方便地使用系統(tǒng)功能。門面模式通過封裝復(fù)雜的子系統(tǒng),隱藏系統(tǒng)的實現(xiàn)細(xì)節(jié),提高了系統(tǒng)的易用性和靈活性2023-04-04SpringBoot2零基礎(chǔ)到精通之JUnit 5與指標(biāo)監(jiān)控
SpringBoot是一種整合Spring技術(shù)棧的方式(或者說是框架),同時也是簡化Spring的一種快速開發(fā)的腳手架,本篇讓我們一起學(xué)習(xí)JUnit 5與指標(biāo)監(jiān)控2022-03-03