欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例

 更新時(shí)間:2025年05月16日 11:08:47   作者:sjsjsbbsbsn  
本文主要介紹了SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例,包括配置和消費(fèi)邏輯,RedisStream提供了高吞吐量、順序消費(fèi)和消費(fèi)組機(jī)制等優(yōu)勢,具有一定的參考價(jià)值,感興趣的可以了解一下

一.Redis Stream 消息隊(duì)列模版配置類

/**
 * Redis Stream 消息隊(duì)列配置
 */
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {

    private static final Logger log = LoggerFactory.getLogger(RedisStreamConfiguration.class);
    private final RedisConnectionFactory redisConnectionFactory;
    private final Consumer1 Consumer1;
    private final Consumer2 Consumer2;

    // 定義需要自定義的配置常量
    private static final int BATCH_SIZE = 10; // 每次批量拉取的消息數(shù)量
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(3); // 拉取消息的阻塞超時(shí)時(shí)間
    private static final String THREAD_NAME_PREFIX = "your-business"; // 線程名稱前綴
    private static final String GROUP_NAME_1 = "group1"; // 第一個(gè)消費(fèi)者組名稱
    private static final String GROUP_NAME_2 = "group2"; // 第二個(gè)消費(fèi)者組名稱
    private static final String CONSUMER_NAME_1 = "consumer1"; // 第一個(gè)消費(fèi)者名稱
    private static final String CONSUMER_NAME_2 = "consumer2"; // 第二個(gè)消費(fèi)者名稱
    private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream的主題鍵

    @Bean
    public ExecutorService asyncStreamConsumer() {
        log.info("Redis Stream 消息隊(duì)列配置線程池");
        AtomicInteger index = new AtomicInteger();
        int processors = Runtime.getRuntime().availableProcessors();

        // 創(chuàng)建一個(gè)自定義線程池
        return new ThreadPoolExecutor(
                processors,
                processors + (processors >> 1),
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                runnable -> {
                    Thread thread = new Thread(runnable);
                    thread.setName(THREAD_NAME_PREFIX + "_" + index.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }
        );
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
            ExecutorService asyncStreamConsumer) {

        // 配置 StreamMessageListenerContainer 容器選項(xiàng)
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(BATCH_SIZE) // 批量拉取消息數(shù)量
                        .executor(asyncStreamConsumer) // 使用配置好的線程池
                        .pollTimeout(POLL_TIMEOUT) // 拉取消息的超時(shí)時(shí)間
                        .build();

        // 創(chuàng)建 StreamMessageListenerContainer 實(shí)例
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        // 配置第一個(gè)消息監(jiān)聽器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_1, CONSUMER_NAME_1), // 指定第一個(gè)消費(fèi)者組和消費(fèi)者名稱
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量
                Consumer1 // 指定第一個(gè)消息處理邏輯
        );

        // 配置第二個(gè)消息監(jiān)聽器
        container.receiveAutoAck(
                Consumer.from(GROUP_NAME_2, CONSUMER_NAME_2), // 指定第二個(gè)消費(fèi)者組和消費(fèi)者名稱
                StreamOffset.create(STREAM_TOPIC_KEY, ReadOffset.lastConsumed()), // 指定主題和偏移量
                Consumer2 // 指定第二個(gè)消息處理邏輯
        );

        return container;
    }
}

1. 介紹

RedisStreamConfiguration 是一個(gè)用于配置 Redis Stream 消息隊(duì)列的 Spring 配置類。它通過 Redis Stream 實(shí)現(xiàn)消息的異步處理和多消費(fèi)者消費(fèi),適用于需要高吞吐量、低延遲的業(yè)務(wù)場景。

2. 關(guān)鍵組件和自定義參數(shù)

此類主要配置了 Redis Stream 消息監(jiān)聽容器 StreamMessageListenerContainer,包括線程池配置、消費(fèi)批次和超時(shí)時(shí)間等,方便用戶根據(jù)業(yè)務(wù)需求自定義。

核心參數(shù)

  • BATCH_SIZE:定義每次批量拉取的消息數(shù)量。通過設(shè)定合適的批量大小,可以減少消費(fèi)請求次數(shù),提升處理效率。
  • POLL_TIMEOUT:設(shè)置從 Redis Stream 拉取消息的超時(shí)時(shí)間。超時(shí)控制允許程序在無消息時(shí)保持阻塞,等待消息到達(dá)。
  • THREAD_NAME_PREFIX:設(shè)置線程名稱前綴,幫助識(shí)別不同業(yè)務(wù)模塊的線程。
  • GROUP_NAME_1 和 GROUP_NAME_2:定義兩個(gè)不同的消費(fèi)者組,適用于同一 Stream 多個(gè)消費(fèi)者并行處理消息的場景。
  • CONSUMER_NAME_1 和 CONSUMER_NAME_2:為每個(gè)消費(fèi)者組指定獨(dú)立的消費(fèi)者名稱,有助于實(shí)現(xiàn)消費(fèi)任務(wù)的分配和管理。

代碼實(shí)現(xiàn)

配置了 StreamMessageListenerContainer 來處理 Stream 消息,并分別為兩個(gè)消費(fèi)者組和消費(fèi)者注冊不同的監(jiān)聽器。

3. 主要方法說明

ExecutorService(線程池配置)

@Bean
public ExecutorService asyncStreamConsumer() { ... }

用于創(chuàng)建一個(gè)自定義線程池,為 Redis Stream 的消息消費(fèi)提供異步執(zhí)行環(huán)境。processors 設(shè)置了核心線程數(shù)為 CPU 核心數(shù),最大線程數(shù)為 processors + (processors >> 1),即核心數(shù)的 1.5 倍。線程命名使用 THREAD_NAME_PREFIX 前綴,方便日志記錄和排查問題。

StreamMessageListenerContainer(消息監(jiān)聽容器)

@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }

該方法創(chuàng)建并配置了 Redis Stream 的監(jiān)聽容器。關(guān)鍵步驟如下:

  • 構(gòu)建容器選項(xiàng):包括批次大小、線程池、拉取超時(shí)時(shí)間等參數(shù)。

  • 容器實(shí)例化:通過 StreamMessageListenerContainer.create() 創(chuàng)建容器,初始化時(shí)自動(dòng)啟動(dòng)。

  • 消息監(jiān)聽器配置

    • 為第一個(gè)消費(fèi)者組 GROUP_NAME_1 和消費(fèi)者 CONSUMER_NAME_1 配置了消息監(jiān)聽器 Consumer1,實(shí)現(xiàn)自動(dòng)確認(rèn)并消費(fèi)消息。
    • 為第二個(gè)消費(fèi)者組 GROUP_NAME_2 和消費(fèi)者 CONSUMER_NAME_2 配置了另一組消息監(jiān)聽器 Consumer2,以便多消費(fèi)者處理。

4. 應(yīng)用場景

此配置適用于 Redis Stream 在大規(guī)模并發(fā)場景下的消息隊(duì)列管理。通過靈活配置多個(gè)消費(fèi)者組和消費(fèi)者,可以實(shí)現(xiàn)負(fù)載均衡的多線程消費(fèi)邏輯。

二.消費(fèi)者模版

/**
 * 消息隊(duì)列消費(fèi)者
 */
@RequiredArgsConstructor
@Slf4j
@Component
public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> {
   
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;
    private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;


    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        if (!messageQueueIdempotentHandler.isMessageProcessed(id.toString())) {
            // 判斷當(dāng)前的這個(gè)消息流程是否執(zhí)行完成
            if (messageQueueIdempotentHandler.isAccomplish(id.toString())) {
                return;
            }
            throw new ServiceException("消息未完成流程,需要消息隊(duì)列重試");
        }
        try {
            Map<String, String> producerMap = message.getValue();
         	//你自己的業(yè)務(wù)邏輯
            }
            // 刪除消息
            stringRedisTemplate.opsForStream().delete(Objects.requireNonNull(stream), id.getValue());
        } catch (Throwable ex) {
            messageQueueIdempotentHandler.delMessageProcessed(id.toString());
            log.error("消費(fèi)異常", ex);
            throw ex;
        }
        //消費(fèi)完刪除
        messageQueueIdempotentHandler.setAccomplish(id.toString());
    }

   
}

本模板實(shí)現(xiàn)了一個(gè) Redis Stream 消息隊(duì)列消費(fèi)者的基礎(chǔ)結(jié)構(gòu)。該模板主要圍繞冪等性檢查、消息解析與處理以及消費(fèi)狀態(tài)管理三個(gè)核心功能,確保消息在高并發(fā)環(huán)境下的安全性與一致性。 

具體的冪等校驗(yàn)看我另一篇文章

三.生產(chǎn)者模版

/**
 * 短鏈接監(jiān)控狀態(tài)保存消息隊(duì)列生產(chǎn)者
 */
@Component
@RequiredArgsConstructor
public class Producer implements MessageQueueProducer{

    private final StringRedisTemplate stringRedisTemplate;


    /**
     * 發(fā)送消息
     */
    public void send(Map<String, String> producerMap) {
        stringRedisTemplate.opsForStream().add(YOUR_KEY, producerMap);
    }
}

注意YOUR_KEY 替換成你自己的即可

四.總結(jié)

1. Redis Stream 消息隊(duì)列的優(yōu)勢

Redis Stream 是 Redis 提供的一種強(qiáng)大的消息隊(duì)列解決方案,適用于高吞吐量、低延遲的業(yè)務(wù)場景。與傳統(tǒng)的消息隊(duì)列系統(tǒng)(如 RabbitMQ 或 Kafka)相比,Redis Stream 在集成與配置方面更加簡單,尤其適合基于 Redis 的應(yīng)用程序。Redis Stream 提供了以下優(yōu)勢:

  • 高吞吐量:支持高并發(fā)和快速消息消費(fèi),能夠在瞬間處理大量的消息。
  • 順序消費(fèi):保證消息的順序消費(fèi),適用于需要順序處理的業(yè)務(wù)場景。
  • 消費(fèi)組機(jī)制:通過消費(fèi)組管理消息消費(fèi),可以通過多個(gè)消費(fèi)者并行消費(fèi),提高處理能力。
  • 持久化與備份:可以將消息存儲(chǔ)在 Redis 中,具備一定的持久化能力,防止數(shù)據(jù)丟失。

2. Redis Stream 配置與應(yīng)用

本文介紹了如何在 Spring Boot 中集成 Redis Stream 消息隊(duì)列的配置與消費(fèi)邏輯,主要包括:

  • 消息消費(fèi)配置:通過 StreamMessageListenerContainer 實(shí)現(xiàn)消息的異步消費(fèi)。配置了批量拉取的數(shù)量、阻塞超時(shí)、線程池等自定義參數(shù),幫助提升系統(tǒng)的并發(fā)處理能力。
  • 多消費(fèi)者并行處理:通過消費(fèi)者組(Consumer Group)機(jī)制,實(shí)現(xiàn)多消費(fèi)者并行消費(fèi)同一個(gè) Stream,提高消息處理的吞吐量和效率。
  • 冪等性與消費(fèi)確認(rèn):通過 MessageQueueIdempotentHandler 來保證消息的冪等性,避免重復(fù)消費(fèi)的問題。處理邏輯保證每條消息只會(huì)被消費(fèi)一次,且在消費(fèi)失敗時(shí)能夠適當(dāng)回滾,確保系統(tǒng)的可靠性。

3. 消費(fèi)者與生產(chǎn)者模板

  • 消費(fèi)者模板:消費(fèi)者通過實(shí)現(xiàn) StreamListener 接口來處理從 Redis Stream 拉取的消息。為了保證冪等性,消費(fèi)者首先檢查消息是否已經(jīng)處理過,未完成的消息會(huì)被標(biāo)記并重試,確保消息處理的安全性。
  • 生產(chǎn)者模板:生產(chǎn)者通過 StringRedisTemplate 將消息發(fā)送到 Redis Stream。當(dāng)業(yè)務(wù)中有新的消息需要處理時(shí),生產(chǎn)者將消息添加到 Redis Stream 進(jìn)行后續(xù)處理。

4. 應(yīng)用場景

Redis Stream 適用于許多場景,特別是需要高并發(fā)、高吞吐量且保證順序消費(fèi)的業(yè)務(wù)需求。例如,短鏈接生成與訪問統(tǒng)計(jì)、訂單處理、日志收集等業(yè)務(wù)場景,都能通過 Redis Stream 實(shí)現(xiàn)高效、可靠的消息隊(duì)列。

到此這篇關(guān)于SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot Redis消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解Java如何判斷ResultSet結(jié)果集是否為空

    詳解Java如何判斷ResultSet結(jié)果集是否為空

    ResultSet 表示 select 語句的查詢結(jié)果集。這篇文章主要為大家詳細(xì)介紹了Java如何判斷ResultSet結(jié)果集是否為空,感興趣的可以了解一下
    2023-02-02
  • IDEA關(guān)于.properties資源文件的編碼調(diào)整問題

    IDEA關(guān)于.properties資源文件的編碼調(diào)整問題

    這篇文章主要介紹了IDEA關(guān)于.properties資源文件的編碼調(diào)整問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-06-06
  • Java正則替換手機(jī)號代碼實(shí)例

    Java正則替換手機(jī)號代碼實(shí)例

    本文的主要內(nèi)容是Java語言中正則表達(dá)式替換手機(jī)號的第4到第7位,實(shí)現(xiàn)方法十分簡單,同時(shí)涉及了一些正則表達(dá)式的相關(guān)用法,需要的朋友可以參考下。
    2017-09-09
  • Java實(shí)現(xiàn)ArrayList排序的方法詳解

    Java實(shí)現(xiàn)ArrayList排序的方法詳解

    Java中常見的ArrayList排序方法主要為三種:JDK8的stream、Comparator#compare()和Comparable#compareTo(),本文將詳解這三者的使用,需要的可以參考一下
    2022-05-05
  • Java中toString函數(shù)的使用示例代碼

    Java中toString函數(shù)的使用示例代碼

    toString()函數(shù)用于將當(dāng)前對象以字符串的形式返回,比如我定義了一個(gè)User類,創(chuàng)建了一個(gè)user對象,然后使用相應(yīng)命令去打印user對象,本文結(jié)合示例代碼介紹了toString函數(shù)的使用,需要的朋友可以參考下
    2024-02-02
  • Mybatis-Plus 官方神器發(fā)布

    Mybatis-Plus 官方神器發(fā)布

    mybatis-mate 為 mp 企業(yè)級模塊,支持分庫分表,數(shù)據(jù)審計(jì)、數(shù)據(jù)敏感詞過濾(AC算法),字段加密,字典回寫(數(shù)據(jù)綁定),數(shù)據(jù)權(quán)限,表結(jié)構(gòu)自動(dòng)生成 SQL 維護(hù)等,旨在更敏捷優(yōu)雅處理數(shù)據(jù),今天介紹一個(gè) MyBatis - Plus 官方發(fā)布的神器,感興趣的朋友一起看看吧
    2021-11-11
  • JPA @Query時(shí),無法使用limit函數(shù)的問題及解決

    JPA @Query時(shí),無法使用limit函數(shù)的問題及解決

    這篇文章主要介紹了JPA @Query時(shí),無法使用limit函數(shù)的問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Java圖文并茂詳解NIO與零拷貝

    Java圖文并茂詳解NIO與零拷貝

    零拷貝是網(wǎng)絡(luò)編程的關(guān)鍵,很多性能優(yōu)化都離不開。在?Java?程序中,常用的零拷貝有?mmap(memory?map,內(nèi)存映射)?和?sendFile。那么它們在?OS(操作系統(tǒng))?中,到底是怎么樣的一個(gè)的設(shè)計(jì)?另外我們看下NIO?中如何使用零拷貝
    2022-11-11
  • 解決idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException的問題

    解決idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException的問題

    這篇文章主要介紹了idea啟動(dòng)報(bào)錯(cuò)javax.imageio.IIOException,解決打不開idea問題,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • Spring cloud config 配置文件加密方式

    Spring cloud config 配置文件加密方式

    這篇文章給大家介紹了Spring cloud config 配置文件加密方式,非常不錯(cuò),具有一定的參考借鑒價(jià)值,感興趣的朋友跟隨腳步之家小編一起學(xué)習(xí)吧
    2018-05-05

最新評論