欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

spring-integration連接MQTT全過程

 更新時間:2023年03月11日 10:53:25   作者:xiefangjian  
這篇文章主要介紹了spring-integration連接MQTT全過程,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

MQTT一種物聯(lián)網(wǎng)數(shù)據(jù)傳輸協(xié)議,構(gòu)建在TCP之上,采用發(fā)布與訂閱的模式進行數(shù)據(jù)交互,發(fā)布與訂閱是兩個獨立的連接通道,這里采用spring-integration-mqt來實現(xiàn)發(fā)布與訂閱MQTT,與直接采用MQTT的SDK相對要簡單許多,服務(wù)端采用ActiveMQ來支持MQTT的消息服務(wù)并實現(xiàn)消息轉(zhuǎn)發(fā)。

首先需要引入spring-integration-mqt的包

這里只需要引入這一個包即可。

<dependency>
? ? ?<groupId>org.springframework.integration</groupId>
? ? ?<artifactId>spring-integration-mqtt</artifactId>
? ? ?<version>5.3.1.RELEASE</version>
</dependency>

MQTT的配置比較簡單

和spring-integration集成一樣,需要配置相對應(yīng)的入站、出站就可以了

具體配置如下:

package org.noka.serialservice.config;
?
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
?
/**--------------------------------------------------------------
?* MQTT 數(shù)據(jù)轉(zhuǎn)發(fā)服務(wù)
?* mqtt.services MQTT服務(wù)地址不配置時,不會啟用該服務(wù)
?* 檢測mqtt.services這個參數(shù)是否配置,以確定是否啟用MQTT服務(wù)
?* @author ?xiefangjian@163.com
?* @version 1.0.0
?**------------------------------------------------------------*/
@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
? ? private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
?
? ? private final MsgSendService msgSendService;//發(fā)布消息到消息中間件接口
?
? ? @Value("${mqtt.appid:mqtt_id}")
? ? private String appid;//客戶端ID
?
? ? @Value("${mqtt.input.topic:mqtt_input_topic}")
? ? private String[] inputTopic;//訂閱主題,可以是多個主題
?
? ? @Value("${mqtt.out.topic:mqtt_out_topic}")
? ? private String[] outTopic;//發(fā)布主題,可以是多個主題
?
? ? @Value("${mqtt.services:#{null}}")
? ? private String[] mqttServices;//服務(wù)器地址以及端口
?
? ? @Value("${mqtt.user:#{null}}")
? ? private String user;//用戶名
?
? ? @Value("${mqtt.password:#{null}}")
? ? private String password;//密碼
?
? ? @Value("${mqtt.KeepAliveInterval:300}")
? ? private Integer KeepAliveInterval;//心跳時間,默認為5分鐘
?
? ? @Value("${mqtt.CleanSession:false}")
? ? private Boolean CleanSession;//是否不保持session,默認為session保持
?
? ? @Value("${mqtt.AutomaticReconnect:true}")
? ? private Boolean AutomaticReconnect;//是否自動重聯(lián),默認為開啟自動重聯(lián)
?
? ? @Value("${mqtt.CompletionTimeout:30000}")
? ? private Long CompletionTimeout;//連接超時,默認為30秒
?
? ? @Value("${mqtt.Qos:1}")
? ? private Integer Qos;//通信質(zhì)量,詳見MQTT協(xié)議
?
?
? ? public MQTTConfig(MsgSendService msgSendService) {
? ? ? ? this.msgSendService = msgSendService;
? ? }
?
? ? /**
? ? ?* MQTT連接配置
? ? ?* @return 連接工廠
? ? ?*/
? ? @Bean
? ? public MqttPahoClientFactory mqttClientFactory() {
? ? ? ? DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連接工廠類
? ? ? ? MqttConnectOptions options = new MqttConnectOptions();//連接參數(shù)
? ? ? ? options.setServerURIs(mqttServices);//連接地址
? ? ? ? if(null!=user) {
? ? ? ? ? ? options.setUserName(user);//用戶名
? ? ? ? }
? ? ? ? if(null!=password) {
? ? ? ? ? ? options.setPassword(password.toCharArray());//密碼
? ? ? ? }
? ? ? ? options.setKeepAliveInterval(KeepAliveInterval);//心跳時間
? ? ? ? options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯(lián)
? ? ? ? options.setCleanSession(CleanSession);//保持session
? ? ? ? factory.setConnectionOptions(options);
? ? ? ? return factory;
? ? }
?
? ? /**
? ? ?* 入站管道
? ? ?* @param mqttPahoClientFactory
? ? ?* @return
? ? ?*/
? ? @Bean
? ? public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
? ? ? ? MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連接
? ? ? ? DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
? ? ? ? converter.setPayloadAsBytes(true);//bytes類型接收
? ? ? ? adapter.setCompletionTimeout(CompletionTimeout);//連接超時的時間
? ? ? ? adapter.setConverter(converter);
? ? ? ? adapter.setQos(Qos);//消息質(zhì)量
? ? ? ? adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱
? ? ? ? return adapter;
? ? }
? ? /**
? ? ?* 向服務(wù)器發(fā)送數(shù)據(jù)管道綁定
? ? ?* @param connectionFactory tcp連接工廠類
? ? ?* @return 消息管道對象
? ? ?*/
? ? @Bean
? ? @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
? ? public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
? ? ? ? //創(chuàng)建一個新的出站管道,由于MQTT的發(fā)布與訂閱是兩個獨立的連接,因此客戶端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在服務(wù)端會認為是同一個客戶端,而造成連接失敗
? ? ? ? MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
? ? ? ? DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
? ? ? ? converter.setPayloadAsBytes(true);//bytes類型接收
? ? ? ? outGate.setAsync(true);
? ? ? ? outGate.setCompletionTimeout(CompletionTimeout);//設(shè)置連接超時時時
? ? ? ? outGate.setDefaultQos(Qos);//設(shè)置通信質(zhì)量
? ? ? ? outGate.setConverter(converter);
? ? ? ? return outGate;
? ? }
?
? ? /**
? ? ?* MQTT連接時調(diào)用的方法
? ? ?* @param event
? ? ?*/
? ? @Override
? ? public void onApplicationEvent(ApplicationEvent event) {
? ? ? ? if (event instanceof MqttSubscribedEvent) {
? ? ? ? ? ? String msg = "OK";
? ? ? ? ? ? /**------------------連接時需要發(fā)送起始消息,寫在這里-------------**/
? ? ? ? ? ? msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
? ? ? ? }
? ? }
}

其中ChanneName是一個常量類

來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現(xiàn)方法如下:

/** -----------------------------------------
?* 管道名稱常量類
?* @author ?xiefangjian@163.com
?* @version 1.0.0
?** ---------------------------------------**/
public class ChannelName {
? ? public final static String INPUT_DATA="input_data";//入站管道
? ? public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
? ? public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱
}

此時所有配置完成,接下來需要做的就是處理接收到的數(shù)據(jù)和發(fā)布數(shù)據(jù),以上配置完成以后,接收和發(fā)送數(shù)據(jù)都是通過數(shù)據(jù)管道來完成,配置的是數(shù)據(jù)管道名稱。

數(shù)據(jù)發(fā)送網(wǎng)關(guān)只是一個接口

用于向指定的數(shù)據(jù)管道里面發(fā)送數(shù)據(jù),實現(xiàn)如下:

package org.noka.serialservice.service;
?
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
?
/**----------------------------------------------------------------
?* 發(fā)送消息網(wǎng)關(guān),其它需要發(fā)向服務(wù)器發(fā)送消息時,調(diào)用該接口
?* @author ?xiefangjian@163.com
?* @version ?1.0.0
?**--------------------------------------------------------------**/
@MessagingGateway
@Component
public interface MsgGateway {
? ? /**
? ? ?* MQTT 發(fā)送網(wǎng)關(guān)
? ? ?* @param a 主題,可以指定不同的數(shù)據(jù)發(fā)布主題,在消息中間件里面體現(xiàn)為不同的消息隊列
? ? ?* @param out 消息內(nèi)容
? ? ?*/
? ? @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
? ? void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}

在需要的地方,可以向下面這樣調(diào)用這個接口,向MQTT服務(wù)器發(fā)送消息

//topic為主題名稱,out為消息內(nèi)容
msgGateway.send(topic, out);

MQTT服務(wù)器有數(shù)據(jù)下發(fā)時

會自動調(diào)將數(shù)據(jù)放入配置的入站數(shù)據(jù)管道中,在需要接收數(shù)據(jù)的地方,向下面這樣配置即可

? ? /**
? ? ?* 服務(wù)器有數(shù)據(jù)下發(fā)
? ? ?* 用ServiceActivator配置需要接收的數(shù)據(jù)管道名稱,當該管道里面的數(shù)據(jù)時,會自動調(diào)用該方法
? ? ?* @param in 服務(wù)器有數(shù)據(jù)下發(fā)時,序列化后的對象,這里使用byte數(shù)組
? ? ?*/
? ? @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
? ? public void upCase(Message<byte[]> in) {
? ? ? ? logger.info("[net service data]========================================");
? ? ? ? logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服務(wù)器下發(fā)的數(shù)據(jù)
? ? ? ? logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進制方式打印服務(wù)器下發(fā)的數(shù)據(jù)
? ? ? ? serialService.send(in.getPayload());//將服務(wù)器下發(fā)的數(shù)據(jù)轉(zhuǎn)發(fā)給串口
? ? }

最后是參數(shù)配置文件

#--------MQTT---------------------------
#設(shè)備ID,唯一標識
mqtt.appid=mqtt_id
#訂閱主題,多個主題用逗號分隔
mqtt.input.topic=mqtt_input_topic
#發(fā)布主題
mqtt.out.topic=mqtt_out_topic,aac
#MQTT服務(wù)器地址,可以是多個地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt用戶名,默認無
#mqtt.user=guest
#mqtt密碼,默認無
#mqtt.password=guest
#心跳間隔時間,默認3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,默認false
#mqtt.CleanSession=false
#是否自動連接,默認true
#mqtt.AutomaticReconnect=true
#連接超時,默認30000
#mqtt.CompletionTimeout=30000
#傳輸質(zhì)量,默認1
#mqtt.Qos=1

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java實現(xiàn)選中刪除功能的實例代碼

    java實現(xiàn)選中刪除功能的實例代碼

    這篇文章主要介紹了java實現(xiàn)選中刪除功能,本文通過實例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-02-02
  • Java單線程程序?qū)崿F(xiàn)實現(xiàn)簡單聊天功能

    Java單線程程序?qū)崿F(xiàn)實現(xiàn)簡單聊天功能

    這篇文章主要介紹了Java單線程程序?qū)崿F(xiàn)實現(xiàn)簡單聊天功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-10-10
  • SpringMvc響應(yīng)數(shù)據(jù)及結(jié)果視圖實現(xiàn)代碼

    SpringMvc響應(yīng)數(shù)據(jù)及結(jié)果視圖實現(xiàn)代碼

    這篇文章主要介紹了SpringMvc響應(yīng)數(shù)據(jù)及結(jié)果視圖實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-08-08
  • Java實現(xiàn)精準Excel數(shù)據(jù)排序的方法詳解

    Java實現(xiàn)精準Excel數(shù)據(jù)排序的方法詳解

    在數(shù)據(jù)處理或者數(shù)據(jù)分析的場景中,需要對已有的數(shù)據(jù)進行排序,在Excel中可以通過排序功能進行整理數(shù)據(jù),而在Java中,則可以借助Excel表格插件對數(shù)據(jù)進行批量排序,下面我們就來學(xué)習(xí)一下常見的數(shù)據(jù)排序方法吧
    2023-10-10
  • JAVA中ArrayList和數(shù)組的轉(zhuǎn)換與遇到的問題解決

    JAVA中ArrayList和數(shù)組的轉(zhuǎn)換與遇到的問題解決

    做研發(fā)的朋友都知道,在項目開發(fā)中經(jīng)常會碰到ArrayList與數(shù)組類型之間的相互轉(zhuǎn)換,這篇文章主要給大家介紹了關(guān)于JAVA中ArrayList和數(shù)組的轉(zhuǎn)換與遇到的問題解決,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2023-05-05
  • springboot Jpa多數(shù)據(jù)源(不同庫)配置過程

    springboot Jpa多數(shù)據(jù)源(不同庫)配置過程

    這篇文章主要介紹了springboot Jpa多數(shù)據(jù)源(不同庫)配置過程,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • 分析JVM源碼之Thread.interrupt系統(tǒng)級別線程打斷

    分析JVM源碼之Thread.interrupt系統(tǒng)級別線程打斷

    在java編程中,我們經(jīng)常會調(diào)用Thread.sleep()方法使得線程停止運行一段時間,而Thread類中也提供了interrupt方法供我們?nèi)ブ鲃哟驍嘁粋€線程。那么線程掛起和打斷的本質(zhì)究竟是什么,本文就此問題作一個探究
    2021-06-06
  • RabbitMQ交換機使用場景和消息可靠性總結(jié)分析

    RabbitMQ交換機使用場景和消息可靠性總結(jié)分析

    這篇文章主要為大家介紹了RabbitMQ交換機使用場景和消息可靠性總結(jié)分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-01-01
  • springboot 熱啟動的過程圖解

    springboot 熱啟動的過程圖解

    這篇文章主要介紹了springboot 熱啟動的過程圖解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-10-10
  • SpringCloud Feign參數(shù)問題及解決方法

    SpringCloud Feign參數(shù)問題及解決方法

    這篇文章主要介紹了SpringCloud Feign參數(shù)問題及解決方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-12-12

最新評論