spring-integration連接MQTT全過程
MQTT一種物聯(lián)網(wǎng)數(shù)據(jù)傳輸協(xié)議,構(gòu)建在TCP之上,采用發(fā)布與訂閱的模式進行數(shù)據(jù)交互,發(fā)布與訂閱是兩個獨立的連接通道,這里采用spring-integration-mqt來實現(xiàn)發(fā)布與訂閱MQTT,與直接采用MQTT的SDK相對要簡單許多,服務(wù)端采用ActiveMQ來支持MQTT的消息服務(wù)并實現(xiàn)消息轉(zhuǎn)發(fā)。
首先需要引入spring-integration-mqt的包
這里只需要引入這一個包即可。
<dependency> ? ? ?<groupId>org.springframework.integration</groupId> ? ? ?<artifactId>spring-integration-mqtt</artifactId> ? ? ?<version>5.3.1.RELEASE</version> </dependency>
MQTT的配置比較簡單
和spring-integration集成一樣,需要配置相對應(yīng)的入站、出站就可以了
具體配置如下:
package org.noka.serialservice.config; ? import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.noka.serialservice.service.MsgSendService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.support.MessageBuilder; ? /**-------------------------------------------------------------- ?* MQTT 數(shù)據(jù)轉(zhuǎn)發(fā)服務(wù) ?* mqtt.services MQTT服務(wù)地址不配置時,不會啟用該服務(wù) ?* 檢測mqtt.services這個參數(shù)是否配置,以確定是否啟用MQTT服務(wù) ?* @author ?xiefangjian@163.com ?* @version 1.0.0 ?**------------------------------------------------------------*/ @EnableIntegration @Configuration @ConditionalOnProperty("mqtt.services") public class MQTTConfig implements ApplicationListener<ApplicationEvent> { ? ? private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class); ? ? ? private final MsgSendService msgSendService;//發(fā)布消息到消息中間件接口 ? ? ? @Value("${mqtt.appid:mqtt_id}") ? ? private String appid;//客戶端ID ? ? ? @Value("${mqtt.input.topic:mqtt_input_topic}") ? ? private String[] inputTopic;//訂閱主題,可以是多個主題 ? ? ? @Value("${mqtt.out.topic:mqtt_out_topic}") ? ? private String[] outTopic;//發(fā)布主題,可以是多個主題 ? ? ? @Value("${mqtt.services:#{null}}") ? ? private String[] mqttServices;//服務(wù)器地址以及端口 ? ? ? @Value("${mqtt.user:#{null}}") ? ? private String user;//用戶名 ? ? ? @Value("${mqtt.password:#{null}}") ? ? private String password;//密碼 ? ? ? @Value("${mqtt.KeepAliveInterval:300}") ? ? private Integer KeepAliveInterval;//心跳時間,默認為5分鐘 ? ? ? @Value("${mqtt.CleanSession:false}") ? ? private Boolean CleanSession;//是否不保持session,默認為session保持 ? ? ? @Value("${mqtt.AutomaticReconnect:true}") ? ? private Boolean AutomaticReconnect;//是否自動重聯(lián),默認為開啟自動重聯(lián) ? ? ? @Value("${mqtt.CompletionTimeout:30000}") ? ? private Long CompletionTimeout;//連接超時,默認為30秒 ? ? ? @Value("${mqtt.Qos:1}") ? ? private Integer Qos;//通信質(zhì)量,詳見MQTT協(xié)議 ? ? ? ? public MQTTConfig(MsgSendService msgSendService) { ? ? ? ? this.msgSendService = msgSendService; ? ? } ? ? ? /** ? ? ?* MQTT連接配置 ? ? ?* @return 連接工廠 ? ? ?*/ ? ? @Bean ? ? public MqttPahoClientFactory mqttClientFactory() { ? ? ? ? DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連接工廠類 ? ? ? ? MqttConnectOptions options = new MqttConnectOptions();//連接參數(shù) ? ? ? ? options.setServerURIs(mqttServices);//連接地址 ? ? ? ? if(null!=user) { ? ? ? ? ? ? options.setUserName(user);//用戶名 ? ? ? ? } ? ? ? ? if(null!=password) { ? ? ? ? ? ? options.setPassword(password.toCharArray());//密碼 ? ? ? ? } ? ? ? ? options.setKeepAliveInterval(KeepAliveInterval);//心跳時間 ? ? ? ? options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯(lián) ? ? ? ? options.setCleanSession(CleanSession);//保持session ? ? ? ? factory.setConnectionOptions(options); ? ? ? ? return factory; ? ? } ? ? ? /** ? ? ?* 入站管道 ? ? ?* @param mqttPahoClientFactory ? ? ?* @return ? ? ?*/ ? ? @Bean ? ? public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){ ? ? ? ? MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連接 ? ? ? ? DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); ? ? ? ? converter.setPayloadAsBytes(true);//bytes類型接收 ? ? ? ? adapter.setCompletionTimeout(CompletionTimeout);//連接超時的時間 ? ? ? ? adapter.setConverter(converter); ? ? ? ? adapter.setQos(Qos);//消息質(zhì)量 ? ? ? ? adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱 ? ? ? ? return adapter; ? ? } ? ? /** ? ? ?* 向服務(wù)器發(fā)送數(shù)據(jù)管道綁定 ? ? ?* @param connectionFactory tcp連接工廠類 ? ? ?* @return 消息管道對象 ? ? ?*/ ? ? @Bean ? ? @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT) ? ? public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) { ? ? ? ? //創(chuàng)建一個新的出站管道,由于MQTT的發(fā)布與訂閱是兩個獨立的連接,因此客戶端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在服務(wù)端會認為是同一個客戶端,而造成連接失敗 ? ? ? ? MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory); ? ? ? ? DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); ? ? ? ? converter.setPayloadAsBytes(true);//bytes類型接收 ? ? ? ? outGate.setAsync(true); ? ? ? ? outGate.setCompletionTimeout(CompletionTimeout);//設(shè)置連接超時時時 ? ? ? ? outGate.setDefaultQos(Qos);//設(shè)置通信質(zhì)量 ? ? ? ? outGate.setConverter(converter); ? ? ? ? return outGate; ? ? } ? ? ? /** ? ? ?* MQTT連接時調(diào)用的方法 ? ? ?* @param event ? ? ?*/ ? ? @Override ? ? public void onApplicationEvent(ApplicationEvent event) { ? ? ? ? if (event instanceof MqttSubscribedEvent) { ? ? ? ? ? ? String msg = "OK"; ? ? ? ? ? ? /**------------------連接時需要發(fā)送起始消息,寫在這里-------------**/ ? ? ? ? ? ? msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build()); ? ? ? ? } ? ? } }
其中ChanneName是一個常量類
來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現(xiàn)方法如下:
/** ----------------------------------------- ?* 管道名稱常量類 ?* @author ?xiefangjian@163.com ?* @version 1.0.0 ?** ---------------------------------------**/ public class ChannelName { ? ? public final static String INPUT_DATA="input_data";//入站管道 ? ? public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道 ? ? public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱 }
此時所有配置完成,接下來需要做的就是處理接收到的數(shù)據(jù)和發(fā)布數(shù)據(jù),以上配置完成以后,接收和發(fā)送數(shù)據(jù)都是通過數(shù)據(jù)管道來完成,配置的是數(shù)據(jù)管道名稱。
數(shù)據(jù)發(fā)送網(wǎng)關(guān)只是一個接口
用于向指定的數(shù)據(jù)管道里面發(fā)送數(shù)據(jù),實現(xiàn)如下:
package org.noka.serialservice.service; ? import org.noka.serialservice.config.ChannelName; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; ? /**---------------------------------------------------------------- ?* 發(fā)送消息網(wǎng)關(guān),其它需要發(fā)向服務(wù)器發(fā)送消息時,調(diào)用該接口 ?* @author ?xiefangjian@163.com ?* @version ?1.0.0 ?**--------------------------------------------------------------**/ @MessagingGateway @Component public interface MsgGateway { ? ? /** ? ? ?* MQTT 發(fā)送網(wǎng)關(guān) ? ? ?* @param a 主題,可以指定不同的數(shù)據(jù)發(fā)布主題,在消息中間件里面體現(xiàn)為不同的消息隊列 ? ? ?* @param out 消息內(nèi)容 ? ? ?*/ ? ? @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT) ? ? void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out); }
在需要的地方,可以向下面這樣調(diào)用這個接口,向MQTT服務(wù)器發(fā)送消息
//topic為主題名稱,out為消息內(nèi)容 msgGateway.send(topic, out);
MQTT服務(wù)器有數(shù)據(jù)下發(fā)時
會自動調(diào)將數(shù)據(jù)放入配置的入站數(shù)據(jù)管道中,在需要接收數(shù)據(jù)的地方,向下面這樣配置即可
? ? /** ? ? ?* 服務(wù)器有數(shù)據(jù)下發(fā) ? ? ?* 用ServiceActivator配置需要接收的數(shù)據(jù)管道名稱,當該管道里面的數(shù)據(jù)時,會自動調(diào)用該方法 ? ? ?* @param in 服務(wù)器有數(shù)據(jù)下發(fā)時,序列化后的對象,這里使用byte數(shù)組 ? ? ?*/ ? ? @ServiceActivator(inputChannel = ChannelName.INPUT_DATA) ? ? public void upCase(Message<byte[]> in) { ? ? ? ? logger.info("[net service data]========================================"); ? ? ? ? logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服務(wù)器下發(fā)的數(shù)據(jù) ? ? ? ? logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進制方式打印服務(wù)器下發(fā)的數(shù)據(jù) ? ? ? ? serialService.send(in.getPayload());//將服務(wù)器下發(fā)的數(shù)據(jù)轉(zhuǎn)發(fā)給串口 ? ? }
最后是參數(shù)配置文件
#--------MQTT--------------------------- #設(shè)備ID,唯一標識 mqtt.appid=mqtt_id #訂閱主題,多個主題用逗號分隔 mqtt.input.topic=mqtt_input_topic #發(fā)布主題 mqtt.out.topic=mqtt_out_topic,aac #MQTT服務(wù)器地址,可以是多個地址 mqtt.services=tcp://47.244.191.41:1883 #mqtt用戶名,默認無 #mqtt.user=guest #mqtt密碼,默認無 #mqtt.password=guest #心跳間隔時間,默認3000 #mqtt.KeepAliveInterval=3000 #是否不保持session,默認false #mqtt.CleanSession=false #是否自動連接,默認true #mqtt.AutomaticReconnect=true #連接超時,默認30000 #mqtt.CompletionTimeout=30000 #傳輸質(zhì)量,默認1 #mqtt.Qos=1
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- Spring?Integration概述與怎么使用詳解
- 如何使用Spring?integration在Springboot中集成Mqtt詳解
- 源碼解讀Spring-Integration執(zhí)行過程
- 最新SpringCloud?Stream消息驅(qū)動講解
- Springcloud Stream消息驅(qū)動工具使用介紹
- Spring?Cloud?Stream消息驅(qū)動組件使用方法介紹
- Springcloud整合stream,rabbitmq實現(xiàn)消息驅(qū)動功能
- SpringCloud Stream消息驅(qū)動實例詳解
- Spring Integration 實現(xiàn)消息驅(qū)動的詳細步驟
相關(guān)文章
Java單線程程序?qū)崿F(xiàn)實現(xiàn)簡單聊天功能
這篇文章主要介紹了Java單線程程序?qū)崿F(xiàn)實現(xiàn)簡單聊天功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10SpringMvc響應(yīng)數(shù)據(jù)及結(jié)果視圖實現(xiàn)代碼
這篇文章主要介紹了SpringMvc響應(yīng)數(shù)據(jù)及結(jié)果視圖實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-08-08Java實現(xiàn)精準Excel數(shù)據(jù)排序的方法詳解
在數(shù)據(jù)處理或者數(shù)據(jù)分析的場景中,需要對已有的數(shù)據(jù)進行排序,在Excel中可以通過排序功能進行整理數(shù)據(jù),而在Java中,則可以借助Excel表格插件對數(shù)據(jù)進行批量排序,下面我們就來學(xué)習(xí)一下常見的數(shù)據(jù)排序方法吧2023-10-10JAVA中ArrayList和數(shù)組的轉(zhuǎn)換與遇到的問題解決
做研發(fā)的朋友都知道,在項目開發(fā)中經(jīng)常會碰到ArrayList與數(shù)組類型之間的相互轉(zhuǎn)換,這篇文章主要給大家介紹了關(guān)于JAVA中ArrayList和數(shù)組的轉(zhuǎn)換與遇到的問題解決,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-05-05springboot Jpa多數(shù)據(jù)源(不同庫)配置過程
這篇文章主要介紹了springboot Jpa多數(shù)據(jù)源(不同庫)配置過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05分析JVM源碼之Thread.interrupt系統(tǒng)級別線程打斷
在java編程中,我們經(jīng)常會調(diào)用Thread.sleep()方法使得線程停止運行一段時間,而Thread類中也提供了interrupt方法供我們?nèi)ブ鲃哟驍嘁粋€線程。那么線程掛起和打斷的本質(zhì)究竟是什么,本文就此問題作一個探究2021-06-06RabbitMQ交換機使用場景和消息可靠性總結(jié)分析
這篇文章主要為大家介紹了RabbitMQ交換機使用場景和消息可靠性總結(jié)分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01SpringCloud Feign參數(shù)問題及解決方法
這篇文章主要介紹了SpringCloud Feign參數(shù)問題及解決方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12