欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Redis Streams的實時消息處理實戰(zhàn)指南

 更新時間:2025年07月16日 09:12:39   作者:淺沫云歸  
這篇文章主要為大家詳細介紹了在生產環(huán)境中基于 Redis Streams 構建實時消息處理的完整經驗,包括技術選型、核心代碼示例、踩坑解決和優(yōu)化方案,希望對大家有所幫助

業(yè)務場景描述

在我們公司的電商平臺中,存在大量異步事件需要實時處理,例如用戶下單、庫存更新、支付回調等。這些事件對消息的可靠性、順序性和高吞吐量有較高要求。傳統(tǒng)的消息中間件(如Kafka、RabbitMQ)在運維成本或部署復雜度上存在一定挑戰(zhàn),在部分場景下難以滿足“輕量、低延遲、易集成” 的需求。

經過調研和驗證,Redis 6.0+ 提供的 Streams 特性在嵌入式部署、快速上手方面具有顯著優(yōu)勢。本篇文章將分享我們在生產環(huán)境中基于 Redis Streams 構建實時消息處理的完整經驗,包括技術選型、核心代碼示例、踩坑解決和優(yōu)化方案。

技術選型過程

  • 消息可靠性:Redis Streams 支持持久化,且提供 ACK 機制和 Pending List,能夠有效追蹤消費進度。
  • 順序消費:同一消費者組內,可保證分片流(同一 key)中消息按寫入順序被串行消費。
  • 橫向擴展:可通過 Stream 分片(多個 Stream Key)或消費者組內多實例并行消費提高吞吐。
  • 運營成本:Redis 已是團隊基礎設施,集群部署與監(jiān)控成熟度高,二次成本低。
  • 客戶端生態(tài):Lettuce、Jedis、Redisson 等客戶端均有支持,編碼友好。

基于以上考量,最終選型 Redis Streams,落地于現(xiàn)有 Redis 集群,無需額外獨立中間件部署。

實現(xiàn)方案詳解

環(huán)境與依賴

Maven 依賴(以 Lettuce 客戶端為例):

<dependencies>
    <dependency>
        <groupId>io.lettuce</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>6.1.5.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
</dependencies>

SpringBoot 配置(application.yml):

spring:
  redis:
    host: redis-cluster-host
    port: 6379
    password: your_password
    timeout: 2000ms

流程設計

  • Producer 將事件寫入 Stream:XADD
  • 多消費者(Consumer Group)并行讀?。篨READGROUP
  • 消費確認:XACK
  • 異常消息追蹤:Pending-List 與 XCLAIM 回補處理

生產者實現(xiàn)

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import java.util.HashMap;
import java.util.Map;

public class RedisStreamProducer {
    private RedisClient client;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;
    private static final String STREAM_KEY = "orderStream";

    public RedisStreamProducer(String uri) {
        client = RedisClient.create(uri);
        connection = client.connect();
        commands = connection.sync();
    }

    public String sendMessage(Map<String, String> message) {
        // XADD key * field value [field value ...]
        return commands.xadd(STREAM_KEY, message);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(String[] args) {
        RedisStreamProducer producer = new RedisStreamProducer("redis://:your_password@redis-host:6379/0");
        Map<String, String> order = new HashMap<>();
        order.put("orderId", "123456");
        order.put("userId", "u7890");
        order.put("amount", "258.50");
        String messageId = producer.sendMessage(order);
        System.out.println("消息發(fā)送成功, ID=" + messageId);
        producer.shutdown();
    }
}

消費者實現(xiàn)

import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.models.stream.Consumer;
import io.lettuce.core.models.stream.PendingMessage;

import java.time.Duration;
import java.util.List;
import java.util.Map;

public class RedisStreamConsumer {
    private RedisClient client;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    private static final String STREAM_KEY = "orderStream";
    private static final String GROUP_NAME = "orderGroup";
    private static final String CONSUMER_NAME = "consumer-1";

    public RedisStreamConsumer(String uri) {
        client = RedisClient.create(uri);
        connection = client.connect();
        commands = connection.sync();
        // 創(chuàng)建消費者組, 如果已創(chuàng)建可 ignore
        try {
            commands.xgroupCreate(STREAM_KEY, GROUP_NAME, "$", true);
        } catch (Exception e) {
            // Group exists
        }
    }

    public void consume() {
        while (true) {
            // 從 Pending List 先處理未 ack 的消息
            List<PendingMessage> pending = commands.xpending(STREAM_KEY, GROUP_NAME, Range.unbounded(), Limit.from(10));
            for (PendingMessage pm : pending) {
                // 重新消費
                StreamMessage<String, String> msg = commands.xclaim(
                    STREAM_KEY,
                    GROUP_NAME,
                    CONSUMER_NAME,
                    5000,
                    pm.getId());
                process(msg.getBody());
                commands.xack(STREAM_KEY, GROUP_NAME, pm.getId());
            }

            // 正常讀取新消息
            List<StreamMessage<String, String>> messages = commands.xreadgroup(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                XReadArgs.StreamOffset.lastConsumed(STREAM_KEY));
            if (messages != null) {
                for (StreamMessage<String, String> msg : messages) {
                    process(msg.getBody());
                    commands.xack(STREAM_KEY, GROUP_NAME, msg.getId());
                }
            }

            // 輪詢間隔
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    private void process(Map<String, String> body) {
        // 業(yè)務處理邏輯
        System.out.println("處理訂單: " + body);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(String[] args) {
        RedisStreamConsumer consumer = new RedisStreamConsumer("redis://:your_password@redis-host:6379/0");
        consumer.consume();
        consumer.shutdown();
    }
}

踩過的坑與解決方案

1.消息重復消費

  • 問題:消費者處理過程中拋出異常導致 ack 未發(fā)送,Pending List 中累積大量消息。
  • 解決:定期掃描 Pending List,并結合 XCLAIM 將“活躍但掛起”消息重新分配給健康消費者處理;同時在業(yè)務端做好冪等控制。

2.消息積壓與內存壓力

  • 問題:Stream 長度持續(xù)增長,Redis 實例內存壓力上升。
  • 解決:使用 XTRIM MAXLEN ~ N 對流進行修剪,結合業(yè)務保留時間策略,定期分批清理歷史消息。

3.消費者實例重啟后狀態(tài)丟失

  • 問題:未及時恢復 Pending List 中未處理消息,導致部分消息長時間滯留。
  • 解決:消費者啟動時優(yōu)先處理 Pending List,再進入正常消費流程;并通過定時任務對掛起較久的消息進行報警或二次補償處理。

總結與最佳實踐

  • Redis Streams 適合輕量級、低運維成本的實時消息場景,結合 ACK、Pending List 能保證高可靠性。
  • 采用消費者組(Consumer Group)可支持橫向擴展,讀寫分離與順序消費兼得。
  • 業(yè)務側必須做好冪等設計,避免消息重復帶來的副作用。
  • 對 Stream 進行合理修剪,避免數(shù)據(jù)無節(jié)制增長導致內存問題。
  • 建議結合監(jiān)控告警,對 Pending List 長度、消費者積壓情況進行實時監(jiān)控。

到此這篇關于基于Redis Streams的實時消息處理實戰(zhàn)指南的文章就介紹到這了,更多相關Redis Streams消息處理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Redisson 加鎖解鎖的實現(xiàn)

    Redisson 加鎖解鎖的實現(xiàn)

    本文主要介紹了Redisson 加鎖解鎖的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-08-08
  • redis搭建哨兵模式實現(xiàn)一主兩從三哨兵

    redis搭建哨兵模式實現(xiàn)一主兩從三哨兵

    本文主要介紹了redis搭建哨兵模式實現(xiàn)一主兩從三哨兵,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-08-08
  • 一篇文章帶你徹底搞懂Redis?事務

    一篇文章帶你徹底搞懂Redis?事務

    這篇文章主要介紹了一篇文章帶你徹底搞懂Redis?事務的相關資料,需要的朋友可以參考下
    2022-10-10
  • Redis中的配置文件,數(shù)據(jù)持久化,事務

    Redis中的配置文件,數(shù)據(jù)持久化,事務

    這篇文章主要介紹了Redis中的配置文件,數(shù)據(jù)持久化,事務問題,具有很好的參考價值,希望對大家有所幫助。
    2022-12-12
  • 詳解Redis如何處理Hash沖突

    詳解Redis如何處理Hash沖突

    在 Redis 中,哈希表是一種常見的數(shù)據(jù)結構,通常用于存儲對象的屬性,對于哈希表,最常遇到的是哈希沖突,那么,當 Redis遇到Hash沖突會如何處理?本文我們將詳細介紹Redis如何處理哈希沖突,需要的朋友可以參考下
    2024-09-09
  • Redis數(shù)據(jù)庫分布式設計方案介紹

    Redis數(shù)據(jù)庫分布式設計方案介紹

    大家好,本篇文章主要講的是Redis數(shù)據(jù)庫分布式設計方案介紹,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下
    2022-01-01
  • Redis安裝教程圖解

    Redis安裝教程圖解

    Redis是一個開源的使用ANSI C語言編寫、支持網絡、可基于內存亦可持久化的日志型、Key-Value數(shù)據(jù)庫,并提供多種語言的API。本文就教大家如何安裝Redis,需要的朋友可以參考下
    2015-10-10
  • 圖解Redis主從復制與Redis哨兵機制

    圖解Redis主從復制與Redis哨兵機制

    這篇文章主要介紹了圖解Redis主從復制與Redis哨兵機制,今天分享一下Redis的持久化、事務、管道相關的知識點,需要的朋友可以參考下
    2023-03-03
  • SpringBoot整合Mybatis-plus和Redis實現(xiàn)投票功能

    SpringBoot整合Mybatis-plus和Redis實現(xiàn)投票功能

    投票功能是一個非常常見的Web應用場景,這篇文章將為大家介紹一下如何將Redis和Mybatis-plus整合到SpringBoot中,實現(xiàn)投票功能,感興趣的可以了解一下
    2023-05-05
  • 基于Redis實現(xiàn)附近商鋪查詢功能

    基于Redis實現(xiàn)附近商鋪查詢功能

    這篇文章主要介紹了基于Redis實現(xiàn)-附近商鋪查詢功能,這個功能將使用到Redis中的GEO這種數(shù)據(jù)結構來實現(xiàn),需要的朋友可以參考下
    2025-05-05

最新評論