SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例
一.Redis Stream 消息隊(duì)列模版配置類
/** * Redis Stream 消息隊(duì)列配置 */ @Configuration @RequiredArgsConstructor public class RedisStreamConfiguration { private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class); private final RedisConnectionFactory redisConnectionFactory; private final Consumer1 Consumer1; private final Consumer2 Consumer2; // 定義需要自定義的配置常量 private static final int BATCH_SIZE = 10; // 每次批量拉取的消息數(shù)量 private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超時(shí)時(shí)間 private static final String THREAD_NAME_PREFIX = "your-business"; // 線程名稱前綴 private static final String GROUP_NAME_1 = "group1"; // 第一個(gè)消費(fèi)者組名稱 private static final String GROUP_NAME_2 = "group2"; // 第二個(gè)消費(fèi)者組名稱 private static final String CONSUMER_NAME_1 = "consumer1"; // 第一個(gè)消費(fèi)者名稱 private static final String CONSUMER_NAME_2 = "consumer2"; // 第二個(gè)消費(fèi)者名稱 private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主題鍵 @Bean public ExecutorService asyncStreamConsumer() { log.info("Redis Stream 消息隊(duì)列配置線程池"); AtomicInteger index = new AtomicInteger(); int processors = Runtime.getRuntime().availableProcessors(); // 創(chuàng)建一個(gè)自定義線程池 return new ThreadPoolExecutor( processors, processors + (processors >> 1), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), runnable -> { Thread thread = new Thread(runnable); thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet()); thread.setDaemon(true); return thread; } ); } @Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer( ExecutorService asyncStreamConsumer) { // 配置 StreamMessageListenerContainer 容器選項(xiàng) StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(BATCH_SIZE) // 批量拉取消息數(shù)量 .executor(asyncStreamConsumer) // 使用配置好的線程池 .pollTimeout(POLL_TIMEOUT) // 拉取消息的超時(shí)時(shí)間 .build(); // 創(chuàng)建 StreamMessageListenerContainer 實(shí)例 StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(redisConnectionFactory, options); // 配置第一個(gè)消息監(jiān)聽器 container.receiveAutoAck( Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一個(gè)消費(fèi)者組和消費(fèi)者名稱 StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量 Consumer1 // 指定第一個(gè)消息處理邏輯 ); // 配置第二個(gè)消息監(jiān)聽器 container.receiveAutoAck( Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二個(gè)消費(fèi)者組和消費(fèi)者名稱 StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量 Consumer2 // 指定第二個(gè)消息處理邏輯 ); return container; } }
1. 介紹
RedisStreamConfiguration
是一個(gè)用于配置 Redis Stream 消息隊(duì)列的 Spring 配置類。它通過 Redis Stream 實(shí)現(xiàn)消息的異步處理和多消費(fèi)者消費(fèi),適用于需要高吞吐量、低延遲的業(yè)務(wù)場景。
2. 關(guān)鍵組件和自定義參數(shù)
此類主要配置了 Redis Stream 消息監(jiān)聽容器 StreamMessageListenerContainer
,包括線程池配置、消費(fèi)批次和超時(shí)時(shí)間等,方便用戶根據(jù)業(yè)務(wù)需求自定義。
核心參數(shù)
BATCH_SIZE
:定義每次批量拉取的消息數(shù)量。通過設(shè)定合適的批量大小,可以減少消費(fèi)請求次數(shù),提升處理效率。POLL_TIMEOUT
:設(shè)置從 Redis Stream 拉取消息的超時(shí)時(shí)間。超時(shí)控制允許程序在無消息時(shí)保持阻塞,等待消息到達(dá)。THREAD_NAME_PREFIX
:設(shè)置線程名稱前綴,幫助識(shí)別不同業(yè)務(wù)模塊的線程。GROUP_NAME_1
和GROUP_NAME_2
:定義兩個(gè)不同的消費(fèi)者組,適用于同一 Stream 多個(gè)消費(fèi)者并行處理消息的場景。CONSUMER_NAME_1
和CONSUMER_NAME_2
:為每個(gè)消費(fèi)者組指定獨(dú)立的消費(fèi)者名稱,有助于實(shí)現(xiàn)消費(fèi)任務(wù)的分配和管理。
代碼實(shí)現(xiàn)
配置了 StreamMessageListenerContainer
來處理 Stream 消息,并分別為兩個(gè)消費(fèi)者組和消費(fèi)者注冊不同的監(jiān)聽器。
3. 主要方法說明
ExecutorService(線程池配置)
@Bean public ExecutorService asyncStreamConsumer() { ... }
用于創(chuàng)建一個(gè)自定義線程池,為 Redis Stream 的消息消費(fèi)提供異步執(zhí)行環(huán)境。processors
設(shè)置了核心線程數(shù)為 CPU 核心數(shù),最大線程數(shù)為 processors + (processors >> 1)
,即核心數(shù)的 1.5 倍。線程命名使用 THREAD_NAME_PREFIX
前綴,方便日志記錄和排查問題。
StreamMessageListenerContainer(消息監(jiān)聽容器)
@Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }
該方法創(chuàng)建并配置了 Redis Stream 的監(jiān)聽容器。關(guān)鍵步驟如下:
構(gòu)建容器選項(xiàng):包括批次大小、線程池、拉取超時(shí)時(shí)間等參數(shù)。
容器實(shí)例化:通過
StreamMessageListenerContainer.create()
創(chuàng)建容器,初始化時(shí)自動(dòng)啟動(dòng)。消息監(jiān)聽器配置
- 為第一個(gè)消費(fèi)者組
GROUP_NAME_1
和消費(fèi)者CONSUMER_NAME_1
配置了消息監(jiān)聽器Consumer1
,實(shí)現(xiàn)自動(dòng)確認(rèn)并消費(fèi)消息。 - 為第二個(gè)消費(fèi)者組
GROUP_NAME_2
和消費(fèi)者CONSUMER_NAME_2
配置了另一組消息監(jiān)聽器Consumer2
,以便多消費(fèi)者處理。
- 為第一個(gè)消費(fèi)者組
4. 應(yīng)用場景
此配置適用于 Redis Stream 在大規(guī)模并發(fā)場景下的消息隊(duì)列管理。通過靈活配置多個(gè)消費(fèi)者組和消費(fèi)者,可以實(shí)現(xiàn)負(fù)載均衡的多線程消費(fèi)邏輯。
二.消費(fèi)者模版
/** * 消息隊(duì)列消費(fèi)者 */ @RequiredArgsConstructor @Slf4j @Component public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> { private final RedissonClient redissonClient; private final StringRedisTemplate stringRedisTemplate; private final MessageQueueIdempotentHandler messageQueueIdempotentHandler; @Override public void onMessage(MapRecord<String, String, String> message) { String stream = message.getStream(); RecordId id = message.getId(); if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) { // 判斷當(dāng)前的這個(gè)消息流程是否執(zhí)行完成 if (messageQueueIdempotentHandler.isAccomplish(id.toString())) { return; } throw new ServiceException("消息未完成流程,需要消息隊(duì)列重試"); } try { Map<String, String> producerMap = message.getValue(); //你自己的業(yè)務(wù)邏輯 } // 刪除消息 stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue()); } catch (Throwable ex) { messageQueueIdempotentHandler.delMessageProcessed(id.toString()); log.error("消費(fèi)異常", ex); throw ex; } //消費(fèi)完刪除 messageQueueIdempotentHandler.setAccomplish(id.toString()); } }
本模板實(shí)現(xiàn)了一個(gè) Redis Stream
消息隊(duì)列消費(fèi)者的基礎(chǔ)結(jié)構(gòu)。該模板主要圍繞冪等性檢查、消息解析與處理以及消費(fèi)狀態(tài)管理三個(gè)核心功能,確保消息在高并發(fā)環(huán)境下的安全性與一致性。
三.生產(chǎn)者模版
/** * 短鏈接監(jiān)控狀態(tài)保存消息隊(duì)列生產(chǎn)者 */ @Component @RequiredArgsConstructor public class Producer implements MessageQueueProducer{ private final StringRedisTemplate stringRedisTemplate; /** * 發(fā)送消息 */ public void send(Map<String, String> producerMap) { stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap); } }
注意YOUR_KEY
替換成你自己的即可
四.總結(jié)
1. Redis Stream 消息隊(duì)列的優(yōu)勢
Redis Stream 是 Redis 提供的一種強(qiáng)大的消息隊(duì)列解決方案,適用于高吞吐量、低延遲的業(yè)務(wù)場景。與傳統(tǒng)的消息隊(duì)列系統(tǒng)(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成與配置方面更加簡單,尤其適合基于 Redis 的應(yīng)用程序。Redis Stream 提供了以下優(yōu)勢:
- 高吞吐量:支持高并發(fā)和快速消息消費(fèi),能夠在瞬間處理大量的消息。
- 順序消費(fèi):保證消息的順序消費(fèi),適用于需要順序處理的業(yè)務(wù)場景。
- 消費(fèi)組機(jī)制:通過消費(fèi)組管理消息消費(fèi),可以通過多個(gè)消費(fèi)者并行消費(fèi),提高處理能力。
- 持久化與備份:可以將消息存儲(chǔ)在 Redis 中,具備一定的持久化能力,防止數(shù)據(jù)丟失。
2. Redis Stream 配置與應(yīng)用
本文介紹了如何在 Spring Boot 中集成 Redis Stream 消息隊(duì)列的配置與消費(fèi)邏輯,主要包括:
- 消息消費(fèi)配置:通過
StreamMessageListenerContainer
實(shí)現(xiàn)消息的異步消費(fèi)。配置了批量拉取的數(shù)量、阻塞超時(shí)、線程池等自定義參數(shù),幫助提升系統(tǒng)的并發(fā)處理能力。 - 多消費(fèi)者并行處理:通過消費(fèi)者組(Consumer Group)機(jī)制,實(shí)現(xiàn)多消費(fèi)者并行消費(fèi)同一個(gè) Stream,提高消息處理的吞吐量和效率。
- 冪等性與消費(fèi)確認(rèn):通過
MessageQueueIdempotentHandler
來保證消息的冪等性,避免重復(fù)消費(fèi)的問題。處理邏輯保證每條消息只會(huì)被消費(fèi)一次,且在消費(fèi)失敗時(shí)能夠適當(dāng)回滾,確保系統(tǒng)的可靠性。
3. 消費(fèi)者與生產(chǎn)者模板
- 消費(fèi)者模板:消費(fèi)者通過實(shí)現(xiàn)
StreamListener
接口來處理從 Redis Stream 拉取的消息。為了保證冪等性,消費(fèi)者首先檢查消息是否已經(jīng)處理過,未完成的消息會(huì)被標(biāo)記并重試,確保消息處理的安全性。 - 生產(chǎn)者模板:生產(chǎn)者通過
StringRedisTemplate
將消息發(fā)送到 Redis Stream。當(dāng)業(yè)務(wù)中有新的消息需要處理時(shí),生產(chǎn)者將消息添加到 Redis Stream 進(jìn)行后續(xù)處理。
4. 應(yīng)用場景
Redis Stream 適用于許多場景,特別是需要高并發(fā)、高吞吐量且保證順序消費(fèi)的業(yè)務(wù)需求。例如,短鏈接生成與訪問統(tǒng)計(jì)、訂單處理、日志收集等業(yè)務(wù)場景,都能通過 Redis Stream 實(shí)現(xiàn)高效、可靠的消息隊(duì)列。
到此這篇關(guān)于SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot Redis消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- springboot整合redis之消息隊(duì)列
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot利用redis集成消息隊(duì)列的方法
相關(guān)文章
詳解Java如何判斷ResultSet結(jié)果集是否為空
ResultSet 表示 select 語句的查詢結(jié)果集。這篇文章主要為大家詳細(xì)介紹了Java如何判斷ResultSet結(jié)果集是否為空,感興趣的可以了解一下2023-02-02IDEA關(guān)于.properties資源文件的編碼調(diào)整問題
這篇文章主要介紹了IDEA關(guān)于.properties資源文件的編碼調(diào)整問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Java實(shí)現(xiàn)ArrayList排序的方法詳解
Java中常見的ArrayList排序方法主要為三種:JDK8的stream、Comparator#compare()和Comparable#compareTo(),本文將詳解這三者的使用,需要的可以參考一下2022-05-05JPA @Query時(shí),無法使用limit函數(shù)的問題及解決
這篇文章主要介紹了JPA @Query時(shí),無法使用limit函數(shù)的問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03解決idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException的問題
這篇文章主要介紹了idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException,解決打不開idea問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09