SpringBoot 集成MQTT實現(xiàn)消息訂閱的詳細代碼
1、引入依賴
<!--MQTT start--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.4</version> </dependency> <!--MQTT end--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
2、增加yml配置
spring: mqtt: username: test password: test url: tcp://127.0.0.1:8080 subClientId: singo_sub_client_id_888 #訂閱 客戶端id pubClientId: singo_pub_client_id_888 #發(fā)布 客戶端id connectionTimeout: 30 keepAlive: 60
3、資源配置類
@Data @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfigurationProperties { private String username; private String password; private String url; private String subClientId; private String pubClientId; private int connectionTimeout; private int keepAlive; }
注意啟動類需要增加注解
@EnableConfigurationProperties(MqttConfigurationProperties.class)
4、MQTT配置類
@Configuration public class MqttConfig { @Autowired private MqttConfigurationProperties mqttConfigurationProperties; /** * 連接參數(shù) * * @return */ @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfigurationProperties.getUsername()); options.setPassword(mqttConfigurationProperties.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()}); options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout()); options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive()); options.setCleanSession(true); // 設(shè)置為false以便斷線重連后恢復會話 options.setAutomaticReconnect(true); return options; } /** * 連接工廠 * * @param options * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(options); return factory; } /** * 消息輸入通道 * 每次只有一個消息處理器可以消費消息。 * 當前消息的處理完成之前,新消息需要排隊等待,無法并行處理。 * 默認是:單線程、順序執(zhí)行的 * @return */ // @Bean // public DirectChannel mqttInputChannel() { // return new DirectChannel(); // } /** * 支持多線程并發(fā)處理消息的輸入通道 * * @return */ @Bean public ExecutorChannel mqttInputChannel() { return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 線程池大小可以調(diào)整 } /** * 配置入站適配器 * * @param mqttClientFactory * @return */ @Bean public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory); // adapter.addTopic("pub/300119110099"); 訂閱主題,也可以放在初始化動態(tài)配置 adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 配置消息處理器 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道 public MessageHandler messageHandler() { return new MqttReceiverMessageHandler(); } }
5、消息處理器配置
@Slf4j @Component public class MqttReceiverMessageHandler implements MessageHandler { @Autowired private MqttMessageProcessingService mqttMessageProcessingService; @Override public void handleMessage(Message<?> message) throws MessagingException { MessageHeaders headers = message.getHeaders(); log.info("線程名稱:{},收到消息,主題:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload()); // log.info("收到消息主題:{}", headers.get("mqtt_receivedTopic").toString()); // log.info("收到消息:{}", message.getPayload()); // 消息保存到內(nèi)存隊列里面,定時批量入庫,也可以在這里直接入庫 mqttMessageProcessingService.addMessage(message.getPayload().toString()); } }
6、消息主題緩存對象
@Component public class MqttTopicStore { private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>(); public ConcurrentHashMap<String, String> getTopics() { return topics; } }
7、動態(tài)訂閱數(shù)據(jù)庫主題配置
@Slf4j @Component public class MqttInit { @Autowired private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter; @Autowired private MqttTopicStore mqttTopicStore; @PostConstruct public void init() { subscribeAllTopics(); } public void subscribeAllTopics() { // List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled(); // for (MqttTopicConfig topic : topics) { // subscribeTopic(topic); // } log.info("===================>從數(shù)據(jù)庫里獲取并初始化訂閱所有主題"); List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100"); topics.stream().forEach(t -> { messageDrivenChannelAdapter.addTopic(t); // 同時往MqttTopicStore.topics中增加一條記錄用于緩存 }); } }
8、消息處理服務
@Service public class MqttMessageProcessingService { @Autowired private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter; @Autowired private MqttTopicStore mqttTopicStore; // 內(nèi)存隊列,用于暫存消息 private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(); // 添加消息到隊列 public void addMessage(String message) { messageQueue.add(message); } /** * 可以放到定時任務里面去,注入后取隊列方便維護 * 定時任務,每5秒執(zhí)行一次 ,建議2分鐘一次 理想的觸發(fā)間隔應略小于數(shù)據(jù)到達間隔,以確保及時處理和插入 * 如果每 5 分鐘收到一條數(shù)據(jù),可以設(shè)置任務執(zhí)行周期為4 分鐘或更短,以便任務有足夠的時間處理數(shù)據(jù),同時減少積壓的可能性。 */ @Scheduled(fixedRate = 1 * 60 * 1000) public void batchInsertToDatabase() { System.out.println("定時任務執(zhí)行中,當前隊列大?。? + messageQueue.size()); List<String> batch = new ArrayList<>(); messageQueue.drainTo(batch, 500); // 一次性取最多500條消息 if (!batch.isEmpty()) { // 批量插入數(shù)據(jù)庫 saveMessagesToDatabase(batch); } } private void saveMessagesToDatabase(List<String> messages) { // 假設(shè)這是批量插入邏輯 System.out.println("批量插入數(shù)據(jù)庫,條數(shù):" + messages.size()); for (String message : messages) { System.out.println("插入消息:" + message); } // 實際數(shù)據(jù)庫操作代碼 } /** * 訂閱與取消訂閱定時任務 */ public void subscribeAndUnsubscribeTask() { // 從數(shù)據(jù)庫獲取所有主題,正常狀態(tài)、刪除狀態(tài) // 正常狀態(tài):判斷mqttTopicStore.topics中是否存在,不存在則訂閱,并在mqttTopicStore.topics中增加 // 刪除狀態(tài): 判斷mqttTopicStore.topics中是否存在,存在則取消訂閱,并在mqttTopicStore.topics中刪除 // messageDrivenChannelAdapter.addTopic(t); } }
以上是簡單的對接步驟,部分類、方法可以根據(jù)實際情況進行合并處理!?。?!
9、定時任務
@Slf4j @Configuration @EnableScheduling public class MqttJob { @Value("${schedule.enable}") private boolean enable; @Autowired private MqttMessageProcessingService mqttMessageProcessingService; /** * 定時訂閱與取消訂閱主題,從共享主題對象MqttTopicStore里面取出主題列表,然后進行訂閱或取消訂閱 * 每分鐘一次 */ public void subscribeAndUnsubscribe() { if (!enable) return; mqttMessageProcessingService.subscribeAndUnsubscribeTask(); } /** * 定時處理隊列里面的訂閱消息,會有丟失風險,宕機時會丟失隊列里面的消息 * 每分鐘一次 要考慮一次消息處理的時間;也可先不使用隊列,每次收到消息直接實時入庫,有性能問題時在啟用 */ public void batchSaveSubscribeMessage() { } }
到此這篇關(guān)于SpringBoot 集成MQTT實現(xiàn)消息訂閱的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java中申請不定長度數(shù)組ArrayList的方法
今天小編就為大家分享一篇java中申請不定長度數(shù)組ArrayList的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作
這篇文章主要介紹了java 分轉(zhuǎn)元與元轉(zhuǎn)分實現(xiàn)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02redisson.tryLock()參數(shù)的使用及理解
這篇文章主要介紹了redisson.tryLock()參數(shù)的使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04