SpringBoot 集成MQTT實(shí)現(xiàn)消息訂閱的詳細(xì)代碼
1、引入依賴
<!--MQTT start--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.4.4</version> </dependency> <!--MQTT end--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
2、增加yml配置
spring: mqtt: username: test password: test url: tcp://127.0.0.1:8080 subClientId: singo_sub_client_id_888 #訂閱 客戶端id pubClientId: singo_pub_client_id_888 #發(fā)布 客戶端id connectionTimeout: 30 keepAlive: 60
3、資源配置類
@Data @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfigurationProperties { private String username; private String password; private String url; private String subClientId; private String pubClientId; private int connectionTimeout; private int keepAlive; }
注意啟動(dòng)類需要增加注解
@EnableConfigurationProperties(MqttConfigurationProperties.class)
4、MQTT配置類
@Configuration public class MqttConfig { @Autowired private MqttConfigurationProperties mqttConfigurationProperties; /** * 連接參數(shù) * * @return */ @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfigurationProperties.getUsername()); options.setPassword(mqttConfigurationProperties.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()}); options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout()); options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive()); options.setCleanSession(true); // 設(shè)置為false以便斷線重連后恢復(fù)會(huì)話 options.setAutomaticReconnect(true); return options; } /** * 連接工廠 * * @param options * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(options); return factory; } /** * 消息輸入通道 * 每次只有一個(gè)消息處理器可以消費(fèi)消息。 * 當(dāng)前消息的處理完成之前,新消息需要排隊(duì)等待,無(wú)法并行處理。 * 默認(rèn)是:?jiǎn)尉€程、順序執(zhí)行的 * @return */ // @Bean // public DirectChannel mqttInputChannel() { // return new DirectChannel(); // } /** * 支持多線程并發(fā)處理消息的輸入通道 * * @return */ @Bean public ExecutorChannel mqttInputChannel() { return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 線程池大小可以調(diào)整 } /** * 配置入站適配器 * * @param mqttClientFactory * @return */ @Bean public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory); // adapter.addTopic("pub/300119110099"); 訂閱主題,也可以放在初始化動(dòng)態(tài)配置 adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 配置消息處理器 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道 public MessageHandler messageHandler() { return new MqttReceiverMessageHandler(); } }
5、消息處理器配置
@Slf4j @Component public class MqttReceiverMessageHandler implements MessageHandler { @Autowired private MqttMessageProcessingService mqttMessageProcessingService; @Override public void handleMessage(Message<?> message) throws MessagingException { MessageHeaders headers = message.getHeaders(); log.info("線程名稱:{},收到消息,主題:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload()); // log.info("收到消息主題:{}", headers.get("mqtt_receivedTopic").toString()); // log.info("收到消息:{}", message.getPayload()); // 消息保存到內(nèi)存隊(duì)列里面,定時(shí)批量入庫(kù),也可以在這里直接入庫(kù) mqttMessageProcessingService.addMessage(message.getPayload().toString()); } }
6、消息主題緩存對(duì)象
@Component public class MqttTopicStore { private final ConcurrentHashMap<String, String> topics = new ConcurrentHashMap<>(); public ConcurrentHashMap<String, String> getTopics() { return topics; } }
7、動(dòng)態(tài)訂閱數(shù)據(jù)庫(kù)主題配置
@Slf4j @Component public class MqttInit { @Autowired private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter; @Autowired private MqttTopicStore mqttTopicStore; @PostConstruct public void init() { subscribeAllTopics(); } public void subscribeAllTopics() { // List<MqttTopicConfig> topics = topicConfigMapper.findAllEnabled(); // for (MqttTopicConfig topic : topics) { // subscribeTopic(topic); // } log.info("===================>從數(shù)據(jù)庫(kù)里獲取并初始化訂閱所有主題"); List<String> topics = ListUtil.list(false, "pub/300119110099", "pub1/3010230209810018992", "pub1/30102302098100"); topics.stream().forEach(t -> { messageDrivenChannelAdapter.addTopic(t); // 同時(shí)往MqttTopicStore.topics中增加一條記錄用于緩存 }); } }
8、消息處理服務(wù)
@Service public class MqttMessageProcessingService { @Autowired private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter; @Autowired private MqttTopicStore mqttTopicStore; // 內(nèi)存隊(duì)列,用于暫存消息 private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>(); // 添加消息到隊(duì)列 public void addMessage(String message) { messageQueue.add(message); } /** * 可以放到定時(shí)任務(wù)里面去,注入后取隊(duì)列方便維護(hù) * 定時(shí)任務(wù),每5秒執(zhí)行一次 ,建議2分鐘一次 理想的觸發(fā)間隔應(yīng)略小于數(shù)據(jù)到達(dá)間隔,以確保及時(shí)處理和插入 * 如果每 5 分鐘收到一條數(shù)據(jù),可以設(shè)置任務(wù)執(zhí)行周期為4 分鐘或更短,以便任務(wù)有足夠的時(shí)間處理數(shù)據(jù),同時(shí)減少積壓的可能性。 */ @Scheduled(fixedRate = 1 * 60 * 1000) public void batchInsertToDatabase() { System.out.println("定時(shí)任務(wù)執(zhí)行中,當(dāng)前隊(duì)列大?。? + messageQueue.size()); List<String> batch = new ArrayList<>(); messageQueue.drainTo(batch, 500); // 一次性取最多500條消息 if (!batch.isEmpty()) { // 批量插入數(shù)據(jù)庫(kù) saveMessagesToDatabase(batch); } } private void saveMessagesToDatabase(List<String> messages) { // 假設(shè)這是批量插入邏輯 System.out.println("批量插入數(shù)據(jù)庫(kù),條數(shù):" + messages.size()); for (String message : messages) { System.out.println("插入消息:" + message); } // 實(shí)際數(shù)據(jù)庫(kù)操作代碼 } /** * 訂閱與取消訂閱定時(shí)任務(wù) */ public void subscribeAndUnsubscribeTask() { // 從數(shù)據(jù)庫(kù)獲取所有主題,正常狀態(tài)、刪除狀態(tài) // 正常狀態(tài):判斷mqttTopicStore.topics中是否存在,不存在則訂閱,并在mqttTopicStore.topics中增加 // 刪除狀態(tài): 判斷mqttTopicStore.topics中是否存在,存在則取消訂閱,并在mqttTopicStore.topics中刪除 // messageDrivenChannelAdapter.addTopic(t); } }
以上是簡(jiǎn)單的對(duì)接步驟,部分類、方法可以根據(jù)實(shí)際情況進(jìn)行合并處理?。。?!
9、定時(shí)任務(wù)
@Slf4j @Configuration @EnableScheduling public class MqttJob { @Value("${schedule.enable}") private boolean enable; @Autowired private MqttMessageProcessingService mqttMessageProcessingService; /** * 定時(shí)訂閱與取消訂閱主題,從共享主題對(duì)象MqttTopicStore里面取出主題列表,然后進(jìn)行訂閱或取消訂閱 * 每分鐘一次 */ public void subscribeAndUnsubscribe() { if (!enable) return; mqttMessageProcessingService.subscribeAndUnsubscribeTask(); } /** * 定時(shí)處理隊(duì)列里面的訂閱消息,會(huì)有丟失風(fēng)險(xiǎn),宕機(jī)時(shí)會(huì)丟失隊(duì)列里面的消息 * 每分鐘一次 要考慮一次消息處理的時(shí)間;也可先不使用隊(duì)列,每次收到消息直接實(shí)時(shí)入庫(kù),有性能問(wèn)題時(shí)在啟用 */ public void batchSaveSubscribeMessage() { } }
到此這篇關(guān)于SpringBoot 集成MQTT實(shí)現(xiàn)消息訂閱的文章就介紹到這了,更多相關(guān)SpringBoot MQTT消息訂閱內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java中申請(qǐng)不定長(zhǎng)度數(shù)組ArrayList的方法
今天小編就為大家分享一篇java中申請(qǐng)不定長(zhǎng)度數(shù)組ArrayList的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-07-07java 分轉(zhuǎn)元與元轉(zhuǎn)分實(shí)現(xiàn)操作
這篇文章主要介紹了java 分轉(zhuǎn)元與元轉(zhuǎn)分實(shí)現(xiàn)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02java語(yǔ)言自行實(shí)現(xiàn)ULID過(guò)程底層原理詳解
這篇文章主要為大家介紹了java語(yǔ)言自行實(shí)現(xiàn)ULID過(guò)程底層原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10Java多線程Thread基礎(chǔ)學(xué)習(xí)
每一個(gè)正在執(zhí)行的程序都是一個(gè)進(jìn)程,資源只有一塊,所以在同一時(shí)間段會(huì)有多個(gè)程序同時(shí)執(zhí)行,但是在一個(gè)時(shí)間點(diǎn)上,只能由一個(gè)程序執(zhí)行,多線程是在一個(gè)進(jìn)程的基礎(chǔ)之上的進(jìn)一步劃分,需要的朋友可以參考下2023-04-04redisson.tryLock()參數(shù)的使用及理解
這篇文章主要介紹了redisson.tryLock()參數(shù)的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04多線程計(jì)數(shù),怎么保持計(jì)數(shù)準(zhǔn)確的方法
這篇文章主要介紹了多線程計(jì)數(shù)的方法,有需要的朋友可以參考一下2014-01-01