欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot 集成MQTT實現(xiàn)消息訂閱的詳細代碼

 更新時間:2024年11月28日 14:35:49   作者:不甘平凡--liang  
本文介紹了如何在SpringBoot中集成MQTT并實現(xiàn)消息訂閱,主要步驟包括添加依賴、配置文件設(shè)置、啟動類注解、MQTT配置類、消息處理器配置、主題緩存、動態(tài)數(shù)據(jù)庫主題配置以及消息處理服務,感興趣的朋友跟隨小編一起看看吧

1、引入依賴

  <!--MQTT start-->
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
  </dependency>
  <dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.4.4</version>
 </dependency>
 <!--MQTT end-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
  </dependency>

2、增加yml配置

  spring:
    mqtt:
      username: test
      password: test
      url: tcp://127.0.0.1:8080
      subClientId: singo_sub_client_id_888 #訂閱 客戶端id
      pubClientId: singo_pub_client_id_888 #發(fā)布 客戶端id
      connectionTimeout: 30
      keepAlive: 60

3、資源配置類

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {
    private String username;
    private String password;
    private String url;
    private String subClientId;
    private String pubClientId;
    private int connectionTimeout;
    private int keepAlive;
}

注意啟動類需要增加注解

@EnableConfigurationProperties(MqttConfigurationProperties.class)

4、MQTT配置類

@Configuration
public class MqttConfig {
    @Autowired
    private MqttConfigurationProperties mqttConfigurationProperties;
    /**
     * 連接參數(shù)
     *
     * @return
     */
    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigurationProperties.getUsername());
        options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
        options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());
        options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());
        options.setCleanSession(true); // 設(shè)置為false以便斷線重連后恢復會話
        options.setAutomaticReconnect(true);
        return options;
    }
    /**
     * 連接工廠
     *
     * @param options
     * @return
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }
    /**
     * 消息輸入通道
     * 每次只有一個消息處理器可以消費消息。
     * 當前消息的處理完成之前,新消息需要排隊等待,無法并行處理。
     * 默認是:單線程、順序執(zhí)行的
     * @return
     */
    // @Bean
    // public DirectChannel mqttInputChannel() {
    //     return new DirectChannel();
    // }
    /**
     * 支持多線程并發(fā)處理消息的輸入通道
     *
     * @return
     */
    @Bean
    public ExecutorChannel mqttInputChannel() {
        return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 線程池大小可以調(diào)整
    }
    /**
     * 配置入站適配器
     *
     * @param mqttClientFactory
     * @return
     */
    @Bean
    public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);
        // adapter.addTopic("pub/300119110099"); 訂閱主題,也可以放在初始化動態(tài)配置
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    /**
     * 配置消息處理器
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道
    public MessageHandler messageHandler() {
        return new MqttReceiverMessageHandler();
    }
}

5、消息處理器配置

@Slf4j
@Component
public class MqttReceiverMessageHandler implements MessageHandler {
    @Autowired
    private MqttMessageProcessingService mqttMessageProcessingService;
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        MessageHeaders headers = message.getHeaders();
        log.info("線程名稱:{},收到消息,主題:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());
        // log.info("收到消息主題:{}", headers.get("mqtt_receivedTopic").toString());
        // log.info("收到消息:{}", message.getPayload());
        // 消息保存到內(nèi)存隊列里面,定時批量入庫,也可以在這里直接入庫
        mqttMessageProcessingService.addMessage(message.getPayload().toString());
    }
}

6、消息主題緩存對象

@Component
public class MqttTopicStore {
    private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>();
    public ConcurrentHashMap<String, String> getTopics() {
        return topics;
    }
}

7、動態(tài)訂閱數(shù)據(jù)庫主題配置

@Slf4j
@Component
public class MqttInit {
    @Autowired
    private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
    @Autowired
    private MqttTopicStore mqttTopicStore;
    @PostConstruct
    public void init() {
        subscribeAllTopics();
    }
    public void subscribeAllTopics() {
        // List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled();
        // for (MqttTopicConfig topic : topics) {
        //     subscribeTopic(topic);
        // }
        log.info("===================>從數(shù)據(jù)庫里獲取并初始化訂閱所有主題");
        List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100");
        topics.stream().forEach(t -> {
            messageDrivenChannelAdapter.addTopic(t);
            // 同時往MqttTopicStore.topics中增加一條記錄用于緩存
        });
    }
}

8、消息處理服務

@Service
public class MqttMessageProcessingService {
    @Autowired
    private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter;
    @Autowired
    private MqttTopicStore mqttTopicStore;
    // 內(nèi)存隊列,用于暫存消息
    private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
    // 添加消息到隊列
    public void addMessage(String message) {
        messageQueue.add(message);
    }
    /**
     * 可以放到定時任務里面去,注入后取隊列方便維護
     * 定時任務,每5秒執(zhí)行一次 ,建議2分鐘一次 理想的觸發(fā)間隔應略小于數(shù)據(jù)到達間隔,以確保及時處理和插入
     * 如果每 5 分鐘收到一條數(shù)據(jù),可以設(shè)置任務執(zhí)行周期為4 分鐘或更短,以便任務有足夠的時間處理數(shù)據(jù),同時減少積壓的可能性。
     */
    @Scheduled(fixedRate = 1 * 60 * 1000)
    public void batchInsertToDatabase() {
        System.out.println("定時任務執(zhí)行中,當前隊列大?。? + messageQueue.size());
        List<String> batch = new ArrayList<>();
        messageQueue.drainTo(batch, 500); // 一次性取最多500條消息
        if (!batch.isEmpty()) {
            // 批量插入數(shù)據(jù)庫
            saveMessagesToDatabase(batch);
        }
    }
    private void saveMessagesToDatabase(List<String> messages) {
        // 假設(shè)這是批量插入邏輯
        System.out.println("批量插入數(shù)據(jù)庫,條數(shù):" + messages.size());
        for (String message : messages) {
            System.out.println("插入消息:" + message);
        }
        // 實際數(shù)據(jù)庫操作代碼
    }
    /**
     * 訂閱與取消訂閱定時任務
     */
    public void subscribeAndUnsubscribeTask() {
        // 從數(shù)據(jù)庫獲取所有主題,正常狀態(tài)、刪除狀態(tài)
        // 正常狀態(tài):判斷mqttTopicStore.topics中是否存在,不存在則訂閱,并在mqttTopicStore.topics中增加
        // 刪除狀態(tài): 判斷mqttTopicStore.topics中是否存在,存在則取消訂閱,并在mqttTopicStore.topics中刪除
        // messageDrivenChannelAdapter.addTopic(t);
    }
}

以上是簡單的對接步驟,部分類、方法可以根據(jù)實際情況進行合并處理!?。?!

9、定時任務

@Slf4j
@Configuration
@EnableScheduling
public class MqttJob {
    @Value("${schedule.enable}")
    private boolean enable;
    @Autowired
    private MqttMessageProcessingService mqttMessageProcessingService;
    /**
     * 定時訂閱與取消訂閱主題,從共享主題對象MqttTopicStore里面取出主題列表,然后進行訂閱或取消訂閱
     * 每分鐘一次
     */
    public void subscribeAndUnsubscribe() {
        if (!enable) return;
        mqttMessageProcessingService.subscribeAndUnsubscribeTask();
    }
    /**
     * 定時處理隊列里面的訂閱消息,會有丟失風險,宕機時會丟失隊列里面的消息
     * 每分鐘一次 要考慮一次消息處理的時間;也可先不使用隊列,每次收到消息直接實時入庫,有性能問題時在啟用
     */
    public void batchSaveSubscribeMessage() {
    }
}

到此這篇關(guān)于SpringBoot 集成MQTT實現(xiàn)消息訂閱的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Mybatis攔截器打印sql問題

    Mybatis攔截器打印sql問題

    這篇文章主要介紹了Mybatis攔截器打印sql問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • java中申請不定長度數(shù)組ArrayList的方法

    java中申請不定長度數(shù)組ArrayList的方法

    今天小編就為大家分享一篇java中申請不定長度數(shù)組ArrayList的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-07-07
  • java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作

    java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作

    這篇文章主要介紹了java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • Mybatis 如何在配置文件中給實體類起別名

    Mybatis 如何在配置文件中給實體類起別名

    這篇文章主要介紹了Mybatis 如何在配置文件中給實體類起別名的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • java語言自行實現(xiàn)ULID過程底層原理詳解

    java語言自行實現(xiàn)ULID過程底層原理詳解

    這篇文章主要為大家介紹了java語言自行實現(xiàn)ULID過程底層原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • Java多線程Thread基礎(chǔ)學習

    Java多線程Thread基礎(chǔ)學習

    每一個正在執(zhí)行的程序都是一個進程,資源只有一塊,所以在同一時間段會有多個程序同時執(zhí)行,但是在一個時間點上,只能由一個程序執(zhí)行,多線程是在一個進程的基礎(chǔ)之上的進一步劃分,需要的朋友可以參考下
    2023-04-04
  • Java RMI機制講解

    Java RMI機制講解

    這篇文章主要介紹了Java RMI機制講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下
    2021-07-07
  • redisson.tryLock()參數(shù)的使用及理解

    redisson.tryLock()參數(shù)的使用及理解

    這篇文章主要介紹了redisson.tryLock()參數(shù)的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • SpringBoot與JWT整合方式

    SpringBoot與JWT整合方式

    文章介紹了如何在Spring?Boot項目中整合JWT(JSON?Web?Token),包括JWT的結(jié)構(gòu)、使用方法、測試以及配置,主要內(nèi)容涵蓋了依賴配置、數(shù)據(jù)庫表設(shè)計、實體類、數(shù)據(jù)訪問層、服務層、JWT工具類、攔截器配置和控制器測試等多個方面
    2024-11-11
  • 多線程計數(shù),怎么保持計數(shù)準確的方法

    多線程計數(shù),怎么保持計數(shù)準確的方法

    這篇文章主要介紹了多線程計數(shù)的方法,有需要的朋友可以參考一下
    2014-01-01

最新評論