SpringBoot實(shí)現(xiàn)MQTT消息發(fā)送和接收方式
Spring integration交互邏輯
對(duì)于發(fā)布者:
1.消息通過消息網(wǎng)關(guān)發(fā)送出去,由 MessageChannel
的實(shí)例 DirectChannel
處理發(fā)送的細(xì)節(jié)。
2.DirectChannel
收到消息后,內(nèi)部通過 MessageHandler
的實(shí)例 MqttPahoMessageHandler
發(fā)送到指定的 Topic。
對(duì)于訂閱者:
1.通過注入 MessageProducerSupport
的實(shí)例 MqttPahoMessageDrivenChannelAdapter
,實(shí)現(xiàn)訂閱 Topic 和綁定消息消費(fèi)的 MessageChannel
。
2.同樣由 MessageChannel
的實(shí)例 DirectChannel
處理消費(fèi)細(xì)節(jié)。
Channel 消息后會(huì)發(fā)送給我們自定義的 MqttInboundMessageHandler
實(shí)例進(jìn)行消費(fèi)。
可以看到整個(gè)處理的流程和前面將的基本一致。Spring Integration 就是抽象出了這么一套消息通信的機(jī)制,具體的通信細(xì)節(jié)由它集成的中間件來決定
1、maven依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.5.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
2、yaml配置文件
#mqtt配置 mqtt: username: 123 password: 123 #MQTT-服務(wù)器連接地址,如果有多個(gè),用逗號(hào)隔開 url: tcp://127.0.0.1:1883 #MQTT-連接服務(wù)器默認(rèn)客戶端ID client: id: ${random.value} default: #MQTT-默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口時(shí)指定 topic: topic,mqtt/test/# #連接超時(shí) completionTimeout: 3000
3、mqtt生產(chǎn)者消費(fèi)者配置類
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推送and接收 消息類 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.default.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private int completionTimeout; //連接超時(shí) /** * MQTT連接器選項(xiàng) **/ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接 mqttConnectOptions.setCleanSession(true); // 設(shè)置超時(shí)時(shí)間 單位為秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個(gè)方法并沒有重連的機(jī)制 mqttConnectOptions.setKeepAliveInterval(10); // 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工廠 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT信息通道(生產(chǎn)者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT消息處理器(生產(chǎn)者) **/ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents(true); // 消息發(fā)送和傳輸完成會(huì)有異步的通知回調(diào) //設(shè)置轉(zhuǎn)換器 發(fā)送bytes數(shù)據(jù) DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); return messageHandler; } /** * 配置client,監(jiān)聽的topic * MQTT消息訂閱綁定(消費(fèi)者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT信息通道(消費(fèi)者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT消息處理器(消費(fèi)者) **/ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //處理接收消息 mqttReceiveHandle.handle(message); } }; } }
4、消息處理類
/** * mqtt客戶端消息處理類 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info("收到訂閱消息: {}", message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info("消息主題:{}", topic); Object payLoad = message.getPayload(); byte[] data = (byte[]) payLoad; Packet packet = Packet.parse(data); log.info("發(fā)送的Packet數(shù)據(jù){}", JSON.toJSONString(packet)); } }
5、mqtt發(fā)送接口
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * mqtt發(fā)送消息 * (defaultRequestChannel = "mqttOutboundChannel" 對(duì)應(yīng)config配置) * **/ @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 發(fā)送信息到MQTT服務(wù)器 * * @param */ void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param qos 對(duì)消息處理的幾種機(jī)制。 * 0 表示的是訂閱者沒收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。 * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。 * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload); /** * 發(fā)送信息到MQTT服務(wù)器 * * @param topic 主題 * @param payload 消息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); }
6、mqtt事件監(jiān)聽類
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttListener { /** * 連接失敗的事件通知 * @param mqttConnectionFailedEvent */ @EventListener(classes = MqttConnectionFailedEvent.class) public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) { log.info("連接失敗的事件通知"); } /** * 已發(fā)送的事件通知 * @param mqttMessageSentEvent */ @EventListener(classes = MqttMessageSentEvent.class) public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) { log.info("已發(fā)送的事件通知"); } /** * 已傳輸完成的事件通知 * 1.QOS == 0,發(fā)送消息后會(huì)即可進(jìn)行此事件回調(diào),因?yàn)椴恍枰却貓?zhí) * 2.QOS == 1,發(fā)送消息后會(huì)等待ACK回執(zhí),ACK回執(zhí)后會(huì)進(jìn)行此事件通知 * 3.QOS == 2,發(fā)送消息后會(huì)等待PubRECV回執(zhí),知道收到PubCOMP后會(huì)進(jìn)行此事件通知 * @param mqttMessageDeliveredEvent */ @EventListener(classes = MqttMessageDeliveredEvent.class) public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) { log.info("已傳輸完成的事件通知"); } /** * 消息訂閱的事件通知 * @param mqttSubscribedEvent */ @EventListener(classes = MqttSubscribedEvent.class) public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) { log.info("消息訂閱的事件通知"); } }
7、接口測(cè)試
@Resource private MqttGateway mqttGateway; /** * sendData 消息 * topic 訂閱主題 **/ @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST) public String sendMqtt(String sendData, String topic) { MqttMessage mqttMessage = new MqttMessage(); mqttGateway.sendToMqtt(topic, sendData); //mqttGateway.sendToMqttObject(topic, sendData.getBytes()); return "OK"; }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- springboot集成mqtt的實(shí)踐開發(fā)
- springboot 實(shí)現(xiàn)mqtt物聯(lián)網(wǎng)的示例代碼
- springboot整合netty-mqtt-client實(shí)現(xiàn)Mqtt消息的訂閱和發(fā)布示例
- SpringBoot+MQTT+apollo實(shí)現(xiàn)訂閱發(fā)布功能的示例
- SpringBoot整合MQTT并實(shí)現(xiàn)異步線程調(diào)用的問題
- SpringBoot集成mqtt的多模塊項(xiàng)目配置詳解
- SpringBoot2.0集成MQTT消息推送功能實(shí)現(xiàn)
- Springboot整合mqtt服務(wù)的示例代碼
- springboot整合mqtt的詳細(xì)圖文教程
- springboot整合mqtt客戶端示例分享
相關(guān)文章
IDEA搭建Maven模塊化項(xiàng)目的實(shí)現(xiàn)
本文主要介紹了IDEA搭建Maven模塊化項(xiàng)目的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05java面向?qū)ο缶幊讨匾拍罾^承和多態(tài)示例解析
這篇文章主要為大家介紹了java面向?qū)ο缶幊痰膬蓚€(gè)重要概念繼承和多態(tài)示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05Java學(xué)習(xí)基礎(chǔ)之安裝JDK/配置JDK環(huán)境&IEDA工具安裝
這篇文章主要介紹了Java學(xué)習(xí)基礎(chǔ)系列文章的第一篇,主要內(nèi)容是安裝JDK/配置JDK環(huán)境&IEDA工具安裝的相關(guān)資料,需要的朋友可以參考下2020-02-02利用Spring Validation實(shí)現(xiàn)輸入驗(yàn)證功能
這篇文章主要給大家介紹了如何利用Spring Validation完美的實(shí)現(xiàn)輸入驗(yàn)證功能,文中有詳細(xì)的代碼示例,具有一定的參考價(jià)值,感興趣的朋友可以借鑒一下2023-06-06Java二維數(shù)組與動(dòng)態(tài)數(shù)組ArrayList類詳解
這篇文章主要給大家介紹了關(guān)于Java二維數(shù)組與動(dòng)態(tài)數(shù)組ArrayList類的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Java基礎(chǔ)之switch分支結(jié)構(gòu)詳解
這篇文章主要介紹了Java基礎(chǔ)之switch分支結(jié)構(gòu)詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很大的幫助,需要的朋友可以參考下2021-05-05Mybatis plus中使用in查詢出錯(cuò)如何解決
這篇文章主要介紹了Mybatis plus中使用in查詢出錯(cuò)的問題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版)
這篇文章主要介紹了SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05java數(shù)學(xué)歸納法非遞歸求斐波那契數(shù)列的方法
這篇文章主要介紹了java數(shù)學(xué)歸納法非遞歸求斐波那契數(shù)列的方法,涉及java非遞歸算法的使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07