欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成MQTT示例詳解

 更新時(shí)間:2022年07月22日 14:07:00   作者:bluesbruce  
這篇文章主要為大家介紹了SpringBoot集成MQTT示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

特別提醒: 文中提到的MQTT服務(wù)器Apache-Apollo,現(xiàn)在已經(jīng)不維護(hù)。但是客戶端的寫(xiě)法是通用的。目前我常用的是RabbitMQ加mqtt插件。

MQTT

MQTT(消息隊(duì)列遙測(cè)傳輸)是ISO標(biāo)準(zhǔn)(ISO/IEC PRF 20922)下基于發(fā)布/訂閱范式的消息協(xié)議。它工作在 TCP/IP協(xié)議族上,是為硬件性能低下的遠(yuǎn)程設(shè)備以及網(wǎng)絡(luò)狀況糟糕的情況下而設(shè)計(jì)的發(fā)布/訂閱型消息協(xié)議。國(guó)內(nèi)很多企業(yè)都廣泛使用MQTT作為Android手機(jī)客戶端與服務(wù)器端推送消息的協(xié)議。

特點(diǎn)

MQTT協(xié)議是為大量計(jì)算能力有限,且工作在低帶寬、不可靠的網(wǎng)絡(luò)的遠(yuǎn)程傳感器和控制設(shè)備通訊而設(shè)計(jì)的協(xié)議,它具有以下主要的幾項(xiàng)特性:

  • 使用發(fā)布/訂閱消息模式,提供一對(duì)多的消息發(fā)布,解除應(yīng)用程序耦合;
  • 對(duì)負(fù)載內(nèi)容屏蔽的消息傳輸;
  • 使用TCP/IP提供網(wǎng)絡(luò)連接;
  • 有三種消息發(fā)布服務(wù)質(zhì)量;

至多一次:消息發(fā)布完全依賴底層 TCP/IP 網(wǎng)絡(luò)。會(huì)發(fā)生消息丟失或重復(fù)。這一級(jí)別可用于如下情況,環(huán)境傳感器數(shù)據(jù),丟失一次讀記錄無(wú)所謂,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送。

至少一次:確保消息到達(dá),但消息重復(fù)可能會(huì)發(fā)生。

只有一次:確保消息到達(dá)一次。這一級(jí)別可用于如下情況,在計(jì)費(fèi)系統(tǒng)中,消息重復(fù)或丟失會(huì)導(dǎo)致不正確的結(jié)果。

  • 小型傳輸,開(kāi)銷很小(固定長(zhǎng)度的頭部是 2 字節(jié)),協(xié)議交換最小化,以降低網(wǎng)絡(luò)流量;
  • 使用Last Will和Testament特性通知有關(guān)各方客戶端異常中斷的機(jī)制。

Apache-Apollo

Apache Apollo是一個(gè)代理服務(wù)器,其是在ActiveMQ基礎(chǔ)上發(fā)展而來(lái)的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多種協(xié)議。

原理:服務(wù)器端創(chuàng)建一個(gè)唯一訂閱號(hào),發(fā)送者可以向這個(gè)訂閱號(hào)中發(fā)東西,然后接受者(即訂閱了這個(gè)訂閱號(hào)的人)都會(huì)收到這個(gè)訂閱號(hào)發(fā)出來(lái)的消息。以此來(lái)完成消息的推送。服務(wù)器其實(shí)是一個(gè)消息中轉(zhuǎn)站。

下載

下載地址:http://archive.apache.org/dist/activemq/activemq-apollo/

配置與啟動(dòng)

  • 需要安裝JDK環(huán)境
  • 在命令行模式下進(jìn)入bin,執(zhí)行apollo create mybroker d:\apache-apollo\broker,創(chuàng)建一個(gè)名為mybroker虛擬主機(jī)(Virtual Host)。需要特別注意的是,生成的目錄就是以后真正啟動(dòng)程序的位置。
  • 在命令行模式下進(jìn)入d:\apache-apollo\broker\bin,執(zhí)行apollo-broker run,也可以用apollo-broker-service.exe配置服務(wù)。
  • 訪問(wèn)http://127.0.0.1:61680打開(kāi)web管理界面。(密碼查看broker/etc/users.properties)
  • 啟動(dòng)端口,看cmd輸出。

SpringBoot2的開(kāi)發(fā)

添加依賴

<!-- 
  spring-boot版本 2.1.0.RELEASE
-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定義配置

# src/main/resources/config/mqtt.properties
##################
#  MQTT 配置
##################
# 用戶名
mqtt.username=admin
# 密碼
mqtt.password=password
# 推送信息的連接地址,如果有多個(gè),用逗號(hào)隔開(kāi),如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
#  MQTT 生產(chǎn)者
##################
# 連接服務(wù)器默認(rèn)客戶端ID
mqtt.producer.clientId=mqttProducer
# 默認(rèn)的推送主題,實(shí)際可在調(diào)用接口時(shí)指定
mqtt.producer.defaultTopic=topic1
##################
#  MQTT 消費(fèi)者
##################
# 連接服務(wù)器默認(rèn)客戶端ID
mqtt.consumer.clientId=mqttConsumer
# 默認(rèn)的接收主題,可以訂閱多個(gè)Topic,逗號(hào)分隔
mqtt.consumer.defaultTopic=topic1

配置MQTT發(fā)布和訂閱

import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
 * MQTT配置,生產(chǎn)者
 *
 * @author BBF
 */
@Configuration
public class MqttConfig {
  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
  private static final byte[] WILL_DATA;
  static {
    WILL_DATA = "offline".getBytes();
  }
  /**
   * 訂閱的bean名稱
   */
  public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
  /**
   * 發(fā)布的bean名稱
   */
  public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
  @Value("${mqtt.username}")
  private String username;
  @Value("${mqtt.password}")
  private String password;
  @Value("${mqtt.url}")
  private String url;
  @Value("${mqtt.producer.clientId}")
  private String producerClientId;
  @Value("${mqtt.producer.defaultTopic}")
  private String producerDefaultTopic;
  @Value("${mqtt.consumer.clientId}")
  private String consumerClientId;
  @Value("${mqtt.consumer.defaultTopic}")
  private String consumerDefaultTopic;
  /**
   * MQTT連接器選項(xiàng)
   *
   * @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
   */
  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,
    // 這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
    options.setCleanSession(true);
    // 設(shè)置連接的用戶名
    options.setUserName(username);
    // 設(shè)置連接的密碼
    options.setPassword(password.toCharArray());
    options.setServerURIs(StringUtils.split(url, ","));
    // 設(shè)置超時(shí)時(shí)間 單位為秒
    options.setConnectionTimeout(10);
    // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制
    options.setKeepAliveInterval(20);
    // 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。
    options.setWill("willTopic", WILL_DATA, 2, false);
    return options;
  }
  /**
   * MQTT客戶端
   *
   * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
   */
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }
  /**
   * MQTT信息通道(生產(chǎn)者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_OUT)
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息處理器(生產(chǎn)者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
        producerClientId,
        mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(producerDefaultTopic);
    return messageHandler;
  }
  /**
   * MQTT消息訂閱綁定(消費(fèi)者)
   *
   * @return {@link org.springframework.integration.core.MessageProducer}
   */
  @Bean
  public MessageProducer inbound() {
    // 可以同時(shí)消費(fèi)(訂閱)多個(gè)Topic
    MqttPahoMessageDrivenChannelAdapter adapter =
        new MqttPahoMessageDrivenChannelAdapter(
            consumerClientId, mqttClientFactory(),
            StringUtils.split(consumerDefaultTopic, ","));
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 設(shè)置訂閱通道
    adapter.setOutputChannel(mqttInboundChannel());
    return adapter;
  }
  /**
   * MQTT信息通道(消費(fèi)者)
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_IN)
  public MessageChannel mqttInboundChannel() {
    return new DirectChannel();
  }
  /**
   * MQTT消息處理器(消費(fèi)者)
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        LOGGER.error("===================={}============", message.getPayload());
      }
    };
  }
}

消息發(fā)布器

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
 * MQTT生產(chǎn)者消息發(fā)送接口
 * <p>MessagingGateway要指定生產(chǎn)者的通道名稱</p>
 * @author BBF
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {
  /**
   * 發(fā)送信息到MQTT服務(wù)器
   *
   * @param data 發(fā)送的文本
   */
  void sendToMqtt(String data);
  /**
   * 發(fā)送信息到MQTT服務(wù)器
   *
   * @param topic 主題
   * @param payload 消息主體
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      String payload);
  /**
   * 發(fā)送信息到MQTT服務(wù)器
   *
   * @param topic 主題
   * @param qos 對(duì)消息處理的幾種機(jī)制。
 0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。

   * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。

   * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。
   * @param payload 消息主體
   */
  void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
      @Header(MqttHeaders.QOS) int qos,
      String payload);
}

發(fā)送消息

/**
 * MQTT消息發(fā)送
 *
 * @author BBF
 */
@Controller
@RequestMapping(value = "/")
public class MqttController {
  /**
   * 注入發(fā)送MQTT的Bean
   */
  @Resource
  private IMqttSender iMqttSender;
  /**
   * 發(fā)送MQTT消息
   * @param message 消息內(nèi)容
   * @return 返回
   */
  @ResponseBody
  @GetMapping(value = "/mqtt", produces ="text/html")
  public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String message) {
    iMqttSender.sendToMqtt(message);
    return new ResponseEntity<>("OK", HttpStatus.OK);
  }
}

入口類

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;
/**
 * SpringBoot 入口類
 *
 * @author BBF
 */
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
    HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = "UTF-8", value = {"classpath:config/mqtt.properties"})
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

代碼

https://gitee.com/bbfbbf/mqtt-test

以上就是SpringBoot集成MQTT示例詳解的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot集成MQTT的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 詳解Java運(yùn)算中的取余

    詳解Java運(yùn)算中的取余

    這篇文章主要介紹了java運(yùn)算中的取余,在java運(yùn)算中,取余符號(hào)是?%,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-05-05
  • spring boot測(cè)試打包部署的方法

    spring boot測(cè)試打包部署的方法

    spring boot項(xiàng)目如何測(cè)試,如何部署,在生產(chǎn)中有什么好的部署方案嗎?這篇文章就來(lái)介紹一下spring boot 如何開(kāi)發(fā)、調(diào)試、打包到最后的投產(chǎn)上線,感興趣的朋友一起看看吧
    2018-01-01
  • 詳解Java實(shí)踐之建造者模式

    詳解Java實(shí)踐之建造者模式

    建造者模式是將一個(gè)復(fù)雜的對(duì)象的構(gòu)建與它的表示分離,使得同樣的構(gòu)建過(guò)程可以創(chuàng)建不同的表示。它隱藏了復(fù)雜對(duì)象的創(chuàng)建過(guò)程,它把復(fù)雜對(duì)象的創(chuàng)建過(guò)程加以抽象,通過(guò)子類繼承或者重載的方式,動(dòng)態(tài)的創(chuàng)建具有復(fù)合屬性的對(duì)象
    2021-06-06
  • JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例

    JavaBean valication驗(yàn)證實(shí)現(xiàn)方法示例

    這篇文章主要介紹了JavaBean valication驗(yàn)證實(shí)現(xiàn)方法,結(jié)合實(shí)例形式分析了JavaBean valication驗(yàn)證相關(guān)概念、原理、用法及操作注意事項(xiàng),需要的朋友可以參考下
    2020-03-03
  • java 相交鏈表的實(shí)現(xiàn)示例

    java 相交鏈表的實(shí)現(xiàn)示例

    本文主要介紹了java 相交鏈表的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng)

    Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單通訊錄管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-07-07
  • 解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題

    解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題

    這篇文章主要介紹了解決運(yùn)行jar包出錯(cuò):ClassNotFoundException問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄

    Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄

    這篇文章主要介紹了Mybatis-Plus中Mapper的接口文件與xml文件相關(guān)的坑記錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法)

    swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法)

    這篇文章主要介紹了swagger添加權(quán)限驗(yàn)證保證API(接口)安全性(兩種方法),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01
  • Spring?WebClient實(shí)戰(zhàn)示例

    Spring?WebClient實(shí)戰(zhàn)示例

    本文主要介紹了Spring?WebClient實(shí)戰(zhàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-01-01

最新評(píng)論