SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例
一.Redis Stream 消息隊(duì)列模版配置類
/**
* Redis Stream 消息隊(duì)列配置
*/
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {
private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class);
private final RedisConnectionFactory redisConnectionFactory;
private final Consumer1 Consumer1;
private final Consumer2 Consumer2;
// 定義需要自定義的配置常量
private static final int BATCH_SIZE = 10; // 每次批量拉取的消息數(shù)量
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超時(shí)時(shí)間
private static final String THREAD_NAME_PREFIX = "your-business"; // 線程名稱前綴
private static final String GROUP_NAME_1 = "group1"; // 第一個(gè)消費(fèi)者組名稱
private static final String GROUP_NAME_2 = "group2"; // 第二個(gè)消費(fèi)者組名稱
private static final String CONSUMER_NAME_1 = "consumer1"; // 第一個(gè)消費(fèi)者名稱
private static final String CONSUMER_NAME_2 = "consumer2"; // 第二個(gè)消費(fèi)者名稱
private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主題鍵
@Bean
public ExecutorService asyncStreamConsumer() {
log.info("Redis Stream 消息隊(duì)列配置線程池");
AtomicInteger index = new AtomicInteger();
int processors = Runtime.getRuntime().availableProcessors();
// 創(chuàng)建一個(gè)自定義線程池
return new ThreadPoolExecutor(
processors,
processors + (processors >> 1),
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
}
);
}
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
ExecutorService asyncStreamConsumer) {
// 配置 StreamMessageListenerContainer 容器選項(xiàng)
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(BATCH_SIZE) // 批量拉取消息數(shù)量
.executor(asyncStreamConsumer) // 使用配置好的線程池
.pollTimeout(POLL_TIMEOUT) // 拉取消息的超時(shí)時(shí)間
.build();
// 創(chuàng)建 StreamMessageListenerContainer 實(shí)例
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 配置第一個(gè)消息監(jiān)聽器
container.receiveAutoAck(
Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一個(gè)消費(fèi)者組和消費(fèi)者名稱
StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量
Consumer1 // 指定第一個(gè)消息處理邏輯
);
// 配置第二個(gè)消息監(jiān)聽器
container.receiveAutoAck(
Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二個(gè)消費(fèi)者組和消費(fèi)者名稱
StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量
Consumer2 // 指定第二個(gè)消息處理邏輯
);
return container;
}
}
1. 介紹
RedisStreamConfiguration 是一個(gè)用于配置 Redis Stream 消息隊(duì)列的 Spring 配置類。它通過 Redis Stream 實(shí)現(xiàn)消息的異步處理和多消費(fèi)者消費(fèi),適用于需要高吞吐量、低延遲的業(yè)務(wù)場(chǎng)景。
2. 關(guān)鍵組件和自定義參數(shù)
此類主要配置了 Redis Stream 消息監(jiān)聽容器 StreamMessageListenerContainer,包括線程池配置、消費(fèi)批次和超時(shí)時(shí)間等,方便用戶根據(jù)業(yè)務(wù)需求自定義。
核心參數(shù)
BATCH_SIZE:定義每次批量拉取的消息數(shù)量。通過設(shè)定合適的批量大小,可以減少消費(fèi)請(qǐng)求次數(shù),提升處理效率。POLL_TIMEOUT:設(shè)置從 Redis Stream 拉取消息的超時(shí)時(shí)間。超時(shí)控制允許程序在無消息時(shí)保持阻塞,等待消息到達(dá)。THREAD_NAME_PREFIX:設(shè)置線程名稱前綴,幫助識(shí)別不同業(yè)務(wù)模塊的線程。GROUP_NAME_1和GROUP_NAME_2:定義兩個(gè)不同的消費(fèi)者組,適用于同一 Stream 多個(gè)消費(fèi)者并行處理消息的場(chǎng)景。CONSUMER_NAME_1和CONSUMER_NAME_2:為每個(gè)消費(fèi)者組指定獨(dú)立的消費(fèi)者名稱,有助于實(shí)現(xiàn)消費(fèi)任務(wù)的分配和管理。
代碼實(shí)現(xiàn)
配置了 StreamMessageListenerContainer 來處理 Stream 消息,并分別為兩個(gè)消費(fèi)者組和消費(fèi)者注冊(cè)不同的監(jiān)聽器。
3. 主要方法說明
ExecutorService(線程池配置)
@Bean
public ExecutorService asyncStreamConsumer() { ... }
用于創(chuàng)建一個(gè)自定義線程池,為 Redis Stream 的消息消費(fèi)提供異步執(zhí)行環(huán)境。processors 設(shè)置了核心線程數(shù)為 CPU 核心數(shù),最大線程數(shù)為 processors + (processors >> 1),即核心數(shù)的 1.5 倍。線程命名使用 THREAD_NAME_PREFIX 前綴,方便日志記錄和排查問題。
StreamMessageListenerContainer(消息監(jiān)聽容器)
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }
該方法創(chuàng)建并配置了 Redis Stream 的監(jiān)聽容器。關(guān)鍵步驟如下:
構(gòu)建容器選項(xiàng):包括批次大小、線程池、拉取超時(shí)時(shí)間等參數(shù)。
容器實(shí)例化:通過
StreamMessageListenerContainer.create()創(chuàng)建容器,初始化時(shí)自動(dòng)啟動(dòng)。消息監(jiān)聽器配置
- 為第一個(gè)消費(fèi)者組
GROUP_NAME_1和消費(fèi)者CONSUMER_NAME_1配置了消息監(jiān)聽器Consumer1,實(shí)現(xiàn)自動(dòng)確認(rèn)并消費(fèi)消息。 - 為第二個(gè)消費(fèi)者組
GROUP_NAME_2和消費(fèi)者CONSUMER_NAME_2配置了另一組消息監(jiān)聽器Consumer2,以便多消費(fèi)者處理。
- 為第一個(gè)消費(fèi)者組
4. 應(yīng)用場(chǎng)景
此配置適用于 Redis Stream 在大規(guī)模并發(fā)場(chǎng)景下的消息隊(duì)列管理。通過靈活配置多個(gè)消費(fèi)者組和消費(fèi)者,可以實(shí)現(xiàn)負(fù)載均衡的多線程消費(fèi)邏輯。
二.消費(fèi)者模版
/**
* 消息隊(duì)列消費(fèi)者
*/
@RequiredArgsConstructor
@Slf4j
@Component
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private final RedissonClient redissonClient;
private final StringRedisTemplate stringRedisTemplate;
private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
RecordId id = message.getId();
if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
// 判斷當(dāng)前的這個(gè)消息流程是否執(zhí)行完成
if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
return;
}
throw new ServiceException("消息未完成流程,需要消息隊(duì)列重試");
}
try {
Map<String, String> producerMap = message.getValue();
//你自己的業(yè)務(wù)邏輯
}
// 刪除消息
stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
} catch (Throwable ex) {
messageQueueIdempotentHandler.delMessageProcessed(id.toString());
log.error("消費(fèi)異常", ex);
throw ex;
}
//消費(fèi)完刪除
messageQueueIdempotentHandler.setAccomplish(id.toString());
}
}
本模板實(shí)現(xiàn)了一個(gè) Redis Stream 消息隊(duì)列消費(fèi)者的基礎(chǔ)結(jié)構(gòu)。該模板主要圍繞冪等性檢查、消息解析與處理以及消費(fèi)狀態(tài)管理三個(gè)核心功能,確保消息在高并發(fā)環(huán)境下的安全性與一致性。
三.生產(chǎn)者模版
/**
* 短鏈接監(jiān)控狀態(tài)保存消息隊(duì)列生產(chǎn)者
*/
@Component
@RequiredArgsConstructor
public class Producer implements MessageQueueProducer{
private final StringRedisTemplate stringRedisTemplate;
/**
* 發(fā)送消息
*/
public void send(Map<String, String> producerMap) {
stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap);
}
}
注意YOUR_KEY 替換成你自己的即可
四.總結(jié)
1. Redis Stream 消息隊(duì)列的優(yōu)勢(shì)
Redis Stream 是 Redis 提供的一種強(qiáng)大的消息隊(duì)列解決方案,適用于高吞吐量、低延遲的業(yè)務(wù)場(chǎng)景。與傳統(tǒng)的消息隊(duì)列系統(tǒng)(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成與配置方面更加簡(jiǎn)單,尤其適合基于 Redis 的應(yīng)用程序。Redis Stream 提供了以下優(yōu)勢(shì):
- 高吞吐量:支持高并發(fā)和快速消息消費(fèi),能夠在瞬間處理大量的消息。
- 順序消費(fèi):保證消息的順序消費(fèi),適用于需要順序處理的業(yè)務(wù)場(chǎng)景。
- 消費(fèi)組機(jī)制:通過消費(fèi)組管理消息消費(fèi),可以通過多個(gè)消費(fèi)者并行消費(fèi),提高處理能力。
- 持久化與備份:可以將消息存儲(chǔ)在 Redis 中,具備一定的持久化能力,防止數(shù)據(jù)丟失。
2. Redis Stream 配置與應(yīng)用
本文介紹了如何在 Spring Boot 中集成 Redis Stream 消息隊(duì)列的配置與消費(fèi)邏輯,主要包括:
- 消息消費(fèi)配置:通過
StreamMessageListenerContainer實(shí)現(xiàn)消息的異步消費(fèi)。配置了批量拉取的數(shù)量、阻塞超時(shí)、線程池等自定義參數(shù),幫助提升系統(tǒng)的并發(fā)處理能力。 - 多消費(fèi)者并行處理:通過消費(fèi)者組(Consumer Group)機(jī)制,實(shí)現(xiàn)多消費(fèi)者并行消費(fèi)同一個(gè) Stream,提高消息處理的吞吐量和效率。
- 冪等性與消費(fèi)確認(rèn):通過
MessageQueueIdempotentHandler來保證消息的冪等性,避免重復(fù)消費(fèi)的問題。處理邏輯保證每條消息只會(huì)被消費(fèi)一次,且在消費(fèi)失敗時(shí)能夠適當(dāng)回滾,確保系統(tǒng)的可靠性。
3. 消費(fèi)者與生產(chǎn)者模板
- 消費(fèi)者模板:消費(fèi)者通過實(shí)現(xiàn)
StreamListener接口來處理從 Redis Stream 拉取的消息。為了保證冪等性,消費(fèi)者首先檢查消息是否已經(jīng)處理過,未完成的消息會(huì)被標(biāo)記并重試,確保消息處理的安全性。 - 生產(chǎn)者模板:生產(chǎn)者通過
StringRedisTemplate將消息發(fā)送到 Redis Stream。當(dāng)業(yè)務(wù)中有新的消息需要處理時(shí),生產(chǎn)者將消息添加到 Redis Stream 進(jìn)行后續(xù)處理。
4. 應(yīng)用場(chǎng)景
Redis Stream 適用于許多場(chǎng)景,特別是需要高并發(fā)、高吞吐量且保證順序消費(fèi)的業(yè)務(wù)需求。例如,短鏈接生成與訪問統(tǒng)計(jì)、訂單處理、日志收集等業(yè)務(wù)場(chǎng)景,都能通過 Redis Stream 實(shí)現(xiàn)高效、可靠的消息隊(duì)列。
到此這篇關(guān)于SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot Redis消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- springboot整合redis之消息隊(duì)列
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot利用redis集成消息隊(duì)列的方法
相關(guān)文章
詳解Java如何判斷ResultSet結(jié)果集是否為空
ResultSet 表示 select 語句的查詢結(jié)果集。這篇文章主要為大家詳細(xì)介紹了Java如何判斷ResultSet結(jié)果集是否為空,感興趣的可以了解一下2023-02-02
IDEA關(guān)于.properties資源文件的編碼調(diào)整問題
這篇文章主要介紹了IDEA關(guān)于.properties資源文件的編碼調(diào)整問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
Java實(shí)現(xiàn)ArrayList排序的方法詳解
Java中常見的ArrayList排序方法主要為三種:JDK8的stream、Comparator#compare()和Comparable#compareTo(),本文將詳解這三者的使用,需要的可以參考一下2022-05-05
JPA @Query時(shí),無法使用limit函數(shù)的問題及解決
這篇文章主要介紹了JPA @Query時(shí),無法使用limit函數(shù)的問題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
解決idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException的問題
這篇文章主要介紹了idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException,解決打不開idea問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09

