欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot集成mqtt客戶端詳解

 更新時間:2022年10月26日 08:31:14   作者:RedEric  
MQTT是一個基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實現(xiàn)的,這些特點使它適用范圍非常廣泛。本文為大家分享了Springboot整合mqtt服務(wù)的示例代碼,需要的可以參考一下

1. 前言

? 這里我們使用springboot搭建一個輕量級的mqtt客戶端,連接mqtt的Broker服務(wù)。

? 連接信息寫在配置文件里application.properties

spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60

2. 引入依賴

       <!--mqtt -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

3. 配置文件

? 新建MqttProperties.java文件,初始化application里的mqtt配置項

@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {
    private String username;
    private String mqpassword;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private String completionTimeout;
    private Integer keepAlive;
}

? 新建MqttConfiguration.java文件,為mqtt做初始化配置

@Configuration
@Slf4j
public class MqttConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * 事件觸發(fā)
     */
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
        mqttConnectOptions.setKeepAliveInterval(2);
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    /**
     * 配置client,監(jiān)聽的topic
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
                        mqttProperties.getDefaultTopic().split(","));
        adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
        adapter.setConverter(new DefaultPahoMessageConverter());
        //默認(rèn)添加TopicName中所有tipic
        adapter.addTopic("+/+/test");
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String qos = message.getHeaders().get("mqtt_receivedQos").toString();
                //觸發(fā)事件 這里不再做業(yè)務(wù)處理,包 listener中做處理
                eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
            }
        };
    }
    /**
     * 發(fā)送消息和消費消息Channel可以使用相同MqttPahoClientFactory
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 在這里進(jìn)行mqttOutboundChannel的相關(guān)設(shè)置
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
        // 如果設(shè)置成true,發(fā)送消息時將不會阻塞。
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

4. MQTT消息類

新建MqttEvent.java 消息類。用于發(fā)送mqtt的消息

@Getter
public class MqttEvent extends ApplicationEvent {
    private String topic;
    /**
     * 發(fā)送的消息
     */
    private String message;
    public MqttEvent(Object source,String topic,String message) {
        super(source);
        this.topic = topic;
        this.message = message;
    }
}

5. MQTT消息接收器

新建JobListener.java文件作為 mqtt的消息接收類

@Slf4j
@Component
public class JobListener {
    @Autowired
    DeviceDao deviceDao;
    /**
     * 監(jiān)聽topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.startsWith('pay')")
    public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {
        String topic = mqttEvent.getTopic();
		//寫邏輯處理
    }
    /**
     * 監(jiān)聽topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.equals('device')")
    public void onEmqttCallT(MqttEvent mqttEvent){
        log.info("接收到消11111111111:"+mqttEvent.getMessage());
    }
}

6. MQTT消息發(fā)送器

新建MqttGateway.java 提供發(fā)送mqttt消息的接口服務(wù)

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

7. 測試MQTT發(fā)送消息

@SpringBootTest
public class Test3 {
    @Autowired
    MqttGateway mqttGateway;
    @Test
    public void mqttTest () {
      mqttGateway.sendToMqtt("111//222/33","消息內(nèi)容");
    }
}

到此這篇關(guān)于Springboot集成mqtt客戶端詳解的文章就介紹到這了,更多相關(guān)Springboot集成mqtt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法

    Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法

    這篇文章主要介紹了Mybatis中關(guān)于自定義mapper.xml時,參數(shù)傳遞的方式及寫法,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • mybatis中的一級緩存深入剖析

    mybatis中的一級緩存深入剖析

    這篇文章主要介紹了mybatis中的一級緩存深入剖析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • java實現(xiàn)PDF轉(zhuǎn)圖片的方法

    java實現(xiàn)PDF轉(zhuǎn)圖片的方法

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)PDF轉(zhuǎn)圖片的方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-07-07
  • java 出現(xiàn)Zipexception 異常的解決辦法

    java 出現(xiàn)Zipexception 異常的解決辦法

    這篇文章主要介紹了java 出現(xiàn)Zipexception 異常的解決辦法的相關(guān)資料,出現(xiàn) java.util.zip.ZipException: error in opening zip file 異常的原因及解決方法,需要的朋友可以參考下
    2017-08-08
  • Java使用jmeter進(jìn)行壓力測試

    Java使用jmeter進(jìn)行壓力測試

    本篇文章簡單講一下使用jmeter進(jìn)行壓力測試。其壓測思想就是 通過創(chuàng)建指定數(shù)量的線程,同時請求指定接口,來模擬指定數(shù)量用戶同時進(jìn)行某個操作的場景,感興趣的小伙伴們可以參考一下
    2021-07-07
  • java編程實現(xiàn)獲取服務(wù)器IP地址及MAC地址的方法

    java編程實現(xiàn)獲取服務(wù)器IP地址及MAC地址的方法

    這篇文章主要介紹了java編程實現(xiàn)獲取機(jī)器IP地址及MAC地址的方法,實例分析了Java分別針對單網(wǎng)卡及多網(wǎng)卡的情況下獲取服務(wù)器IP地址與MAC地址的相關(guān)技巧,需要的朋友可以參考下
    2015-11-11
  • Java線上問題排查神器Arthas實戰(zhàn)原理解析

    Java線上問題排查神器Arthas實戰(zhàn)原理解析

    原先我們Java中我們常用分析問題一般是使用JDK自帶或第三方的分析工具如jstat、jmap、jstack、?jconsole、visualvm、Java?Mission?Control、MAT等,還有一款神器Arthas工具,可幫助程序員解決很多繁瑣的問題,感興趣的朋友一起看看吧
    2022-01-01
  • Java簡化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計模式

    Java簡化復(fù)雜系統(tǒng)調(diào)用的門面設(shè)計模式

    Java門面模式是一種結(jié)構(gòu)性設(shè)計模式,它為復(fù)雜系統(tǒng)提供了一個簡單的接口,使得系統(tǒng)的客戶端能夠更加方便地使用系統(tǒng)功能。門面模式通過封裝復(fù)雜的子系統(tǒng),隱藏系統(tǒng)的實現(xiàn)細(xì)節(jié),提高了系統(tǒng)的易用性和靈活性
    2023-04-04
  • SpringBoot2零基礎(chǔ)到精通之JUnit 5與指標(biāo)監(jiān)控

    SpringBoot2零基礎(chǔ)到精通之JUnit 5與指標(biāo)監(jiān)控

    SpringBoot是一種整合Spring技術(shù)棧的方式(或者說是框架),同時也是簡化Spring的一種快速開發(fā)的腳手架,本篇讓我們一起學(xué)習(xí)JUnit 5與指標(biāo)監(jiān)控
    2022-03-03
  • idea解決程序包不存在報錯的八種解決方法

    idea解決程序包不存在報錯的八種解決方法

    這篇文章主要介紹了idea解決程序包不存在報錯的八種解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2024-02-02

最新評論