欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文帶你搞懂Redis Stream的6種消息處理模式

 更新時間:2025年05月04日 07:56:18   作者:風(fēng)象南  
Redis 5.0版本引入的Stream數(shù)據(jù)類型,為Redis生態(tài)帶來了強(qiáng)大而靈活的消息隊列功能,本文將為大家詳細(xì)介紹Redis Stream的6種消息處理模式,感興趣的小伙伴可以了解一下

Redis 5.0版本引入的Stream數(shù)據(jù)類型,為Redis生態(tài)帶來了強(qiáng)大而靈活的消息隊列功能,彌補(bǔ)了之前發(fā)布/訂閱模式的不足,如消息持久化、消費者組、消息確認(rèn)等特性。

Redis Stream結(jié)合了傳統(tǒng)消息隊列和時序數(shù)據(jù)庫的特點,適用于日志收集、事件驅(qū)動應(yīng)用、實時分析等多種場景。

本文將介紹Redis Stream的6種消息處理模式。

1. 簡單消費模式(Simple Consumption)

基本概念

簡單消費模式是Redis Stream最基礎(chǔ)的使用方式,不使用消費者組,直接讀取流中的消息。生產(chǎn)者將消息追加到流中,消費者通過指定起始ID來讀取消息。

核心命令

# 發(fā)布消息
XADD stream_name [ID] field value [field value ...]

# 讀取消息
XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id

實現(xiàn)示例

Redis CLI

# 添加消息到stream
> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"

# 從頭開始讀取所有消息
> XREAD STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 從指定ID開始讀取
> XREAD STREAMS mystream 1647257548956-0
(empty list or set)

# 從最新的消息ID之后開始讀取(阻塞等待新消息)
> XREAD BLOCK 5000 STREAMS mystream $
(nil)

Java Spring Boot示例

@Service
public class SimpleStreamService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 發(fā)布消息到Stream
     */
    public String publishEvent(String streamKey, Map<String, Object> eventData) {
        StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey);
        return redisTemplate.opsForStream().add(record).getValue();
    }
    
    /**
     * 從指定位置開始讀取消息
     */
    public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
        StreamReadOptions readOptions = StreamReadOptions.empty().count(count);
        return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId)));
    }
    
    /**
     * 阻塞式讀取消息
     */
    public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
        StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis));
        return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey));
    }
}

使用場景

  • 簡單的事件日志記錄
  • 單一消費者場景
  • 時間序列數(shù)據(jù)收集
  • 開發(fā)和調(diào)試階段

優(yōu)缺點

優(yōu)點

  • 實現(xiàn)簡單,無需創(chuàng)建和管理消費者組
  • 直接控制從哪個位置開始消費消息
  • 適合單個消費者場景

缺點

  • 無法實現(xiàn)負(fù)載均衡
  • 無法追蹤消息確認(rèn)狀態(tài)
  • 需要手動管理已讀消息ID
  • 服務(wù)重啟需自行記錄上次讀取位置

2. 消費者組模式(Consumer Groups)

基本概念

消費者組允許多個消費者共同處理一個流的消息,實現(xiàn)負(fù)載均衡,并提供消息確認(rèn)機(jī)制,確保消息至少被處理一次。每個消費者組維護(hù)自己的消費位置,不同消費者組之間互不干擾。

核心命令

# 創(chuàng)建消費者組
XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM]

# 從消費者組讀取消息
XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID]

# 確認(rèn)消息處理完成
XACK stream_name group_name message_id [message_id ...]

實現(xiàn)示例

Redis CLI

# 創(chuàng)建消費者組
> XGROUP CREATE mystream processing-group $ MKSTREAM
OK

# 消費者1讀取消息
> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 確認(rèn)消息已處理
> XACK mystream processing-group 1647257548956-0
(integer) 1

# 消費者2讀取消息(已無未處理消息)
> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) (empty list or set)

Java Spring Boot示例

@Service
public class ConsumerGroupService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 創(chuàng)建消費者組
     */
    public void createGroup(String streamKey, String groupName) {
        try {
            redisTemplate.opsForStream().createGroup(streamKey, groupName);
        } catch (RedisSystemException e) {
            // 處理流不存在的情況
            if (e.getRootCause() instanceof RedisCommandExecutionException 
                    && e.getRootCause().getMessage().contains("NOGROUP")) {
                redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName);
            } else {
                throw e;
            }
        }
    }
    
    /**
     * 從消費者組讀取消息
     */
    public List<MapRecord<String, Object, Object>> readFromGroup(
            String streamKey, String groupName, String consumerName, int count) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(count);
        return redisTemplate.opsForStream().read(
                Consumer.from(groupName, consumerName),
                options,
                StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
    }
    
    /**
     * 阻塞式從消費者組讀取消息
     */
    public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
            String streamKey, String groupName, String consumerName, int count, Duration timeout) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout);
        return redisTemplate.opsForStream().read(
                Consumer.from(groupName, consumerName),
                options,
                StreamOffset.create(streamKey, ReadOffset.lastConsumed())
        );
    }
    
    /**
     * 確認(rèn)消息已處理
     */
    public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
        return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds);
    }
}

使用場景

  • 需要橫向擴(kuò)展消息處理能力的系統(tǒng)
  • 要求消息可靠處理的業(yè)務(wù)場景
  • 實現(xiàn)消息工作隊列
  • 微服務(wù)間的事件傳遞

優(yōu)缺點

優(yōu)點

  • 多個消費者可以并行處理消息
  • 提供消息確認(rèn)機(jī)制,保證消息不丟失
  • 支持消費者崩潰后恢復(fù)處理
  • 每個消費者組維護(hù)獨立的消費位置

缺點

  • 實現(xiàn)相對復(fù)雜
  • 需要妥善管理消費者組和消費者
  • 需要顯式處理消息確認(rèn)
  • 需要定期處理未確認(rèn)的消息

3. 阻塞式消費模式(Blocking Consumption)

基本概念

阻塞式消費允許消費者在沒有新消息時保持連接,等待新消息到達(dá)。這種模式減少了輪詢開銷,提高了實時性,適合對消息處理時效性要求高的場景。

核心命令

# 阻塞式簡單消費
XREAD BLOCK milliseconds STREAMS stream_name ID

# 阻塞式消費者組消費
XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >

實現(xiàn)示例

Redis CLI

# 阻塞等待新消息(最多等待10秒)
> XREAD BLOCK 10000 STREAMS mystream $
(nil)  # 如果10秒內(nèi)沒有新消息

# 使用消費者組的阻塞式消費
> XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream >
(nil)  # 如果10秒內(nèi)沒有新分配的消息

Java Spring Boot示例

@Service
public class BlockingStreamConsumerService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 阻塞式消息消費者任務(wù)
     */
    @Async
    public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
        StreamReadOptions options = StreamReadOptions.empty()
                .count(1)
                .block(timeout);
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 阻塞讀取消息
                List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                        .read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId)));
                
                if (records != null && !records.isEmpty()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        // 處理消息
                        processMessage(record);
                        
                        // 更新最后讀取的ID
                        lastId = record.getId().getValue();
                    }
                } else {
                    // 超時未讀取到消息,可以執(zhí)行一些其他邏輯
                }
            } catch (Exception e) {
                // 異常處理
                log.error("Error reading from stream: {}", e.getMessage(), e);
                try {
                    Thread.sleep(1000); // 出錯后等待一段時間再重試
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    /**
     * 阻塞式消費者組消費
     */
    @Async
    public void startGroupBlockingConsumer(
            String streamKey, String groupName, String consumerName, Duration timeout) {
        
        StreamReadOptions options = StreamReadOptions.empty()
                .count(1)
                .block(timeout);
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 阻塞讀取消息
                List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
                        .read(Consumer.from(groupName, consumerName),
                               options,
                               StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
                
                if (records != null && !records.isEmpty()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        try {
                            // 處理消息
                            processMessage(record);
                            
                            // 確認(rèn)消息
                            redisTemplate.opsForStream()
                                    .acknowledge(streamKey, groupName, record.getId().getValue());
                        } catch (Exception e) {
                            // 處理失敗,記錄日志
                            log.error("Error processing message: {}", e.getMessage(), e);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error reading from stream group: {}", e.getMessage(), e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
    
    private void processMessage(MapRecord<String, Object, Object> record) {
        // 實際消息處理邏輯
        log.info("Processing message: {}", record);
        // ...處理消息的具體業(yè)務(wù)邏輯
    }
}

使用場景

  • 實時數(shù)據(jù)處理系統(tǒng)
  • 事件驅(qū)動的任務(wù)處理
  • 低延遲要求的應(yīng)用
  • 即時通訊系統(tǒng)
  • 通知服務(wù)

優(yōu)缺點

優(yōu)點

  • 減少輪詢帶來的資源浪費
  • 實時性好,消息到達(dá)后立即處理
  • 降低Redis和客戶端的負(fù)載
  • 節(jié)省CPU和網(wǎng)絡(luò)資源

缺點

  • 長連接可能占用Redis連接資源
  • 需要合理設(shè)置超時時間
  • 可能需要處理網(wǎng)絡(luò)中斷后的重連
  • 消費者需要具備并發(fā)處理能力

4. 扇出模式(Fan-out Pattern)

基本概念

扇出模式允許多個獨立的消費者組同時消費同一個流中的所有消息,類似于發(fā)布/訂閱模式,但具有消息持久化和回溯能力。每個消費者組獨立維護(hù)自己的消費位置。

核心命令

創(chuàng)建多個消費者組,它們都獨立消費同一個流:

XGROUP CREATE stream_name group_name_1 $ MKSTREAM
XGROUP CREATE stream_name group_name_2 $ MKSTREAM
XGROUP CREATE stream_name group_name_3 $ MKSTREAM

實現(xiàn)示例

Redis CLI

# 創(chuàng)建多個消費者組
> XGROUP CREATE notifications analytics-group $ MKSTREAM
OK
> XGROUP CREATE notifications email-group $ MKSTREAM
OK
> XGROUP CREATE notifications mobile-group $ MKSTREAM
OK

# 添加一條消息
> XADD notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"

# 從各個消費者組讀?。總€組都能收到所有消息)
> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

Java Spring Boot示例

@Service
public class FanOutService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 初始化扇出消費者組
     */
    public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
        // 確保流存在
        try {
            StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey);
        } catch (Exception e) {
            // 流不存在,發(fā)送一個初始消息
            Map<String, Object> initialMessage = new HashMap<>();
            initialMessage.put("init", "true");
            redisTemplate.opsForStream().add(streamKey, initialMessage);
        }
        
        // 創(chuàng)建所有消費者組
        for (String groupName : groupNames) {
            try {
                redisTemplate.opsForStream().createGroup(streamKey, groupName);
            } catch (Exception e) {
                // 忽略組已存在的錯誤
                log.info("Group {} may already exist: {}", groupName, e.getMessage());
            }
        }
    }
    
    /**
     * 發(fā)布扇出消息
     */
    public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
        StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey);
        return redisTemplate.opsForStream().add(record).getValue();
    }
    
    /**
     * 為特定組啟動消費者
     */
    @Async
    public void startGroupConsumer(
            String streamKey, String groupName, String consumerName, 
            Consumer<MapRecord<String, Object, Object>> messageHandler) {
        
        StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2));
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                        Consumer.from(groupName, consumerName),
                        options,
                        StreamOffset.create(streamKey, ReadOffset.lastConsumed())
                );
                
                if (messages != null && !messages.isEmpty()) {
                    for (MapRecord<String, Object, Object> message : messages) {
                        try {
                            // 處理消息
                            messageHandler.accept(message);
                            
                            // 確認(rèn)消息
                            redisTemplate.opsForStream().acknowledge(
                                    streamKey, groupName, message.getId().getValue());
                        } catch (Exception e) {
                            log.error("Error processing message in group {}: {}", 
                                    groupName, e.getMessage(), e);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Error reading from stream for group {}: {}", 
                        groupName, e.getMessage(), e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

使用示例

@Service
public class NotificationService {
    
    @Autowired
    private FanOutService fanOutService;
    
    @PostConstruct
    public void init() {
        // 初始化扇出組
        List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group");
        fanOutService.initializeFanOutGroups("user-events", groups);
        
        // 啟動各個消費者組的處理器
        fanOutService.startGroupConsumer(
                "user-events", "email-group", "email-consumer", this::processEmailNotification);
        
        fanOutService.startGroupConsumer(
                "user-events", "sms-group", "sms-consumer", this::processSmsNotification);
        
        fanOutService.startGroupConsumer(
                "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
    }
    
    private void processEmailNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing email notification: {}", messageData);
        // 郵件發(fā)送邏輯
    }
    
    private void processSmsNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing SMS notification: {}", messageData);
        // 短信發(fā)送邏輯
    }
    
    private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = message.getValue();
        log.info("Processing analytics event: {}", messageData);
        // 分析事件處理邏輯
    }
    
    public void publishUserEvent(String eventType, Map<String, Object> eventData) {
        Map<String, Object> message = new HashMap<>(eventData);
        message.put("event_type", eventType);
        message.put("timestamp", System.currentTimeMillis());
        
        fanOutService.publishFanOutMessage("user-events", message);
    }
}

使用場景

  • 多個系統(tǒng)需要獨立處理同一事件流
  • 實現(xiàn)事件廣播機(jī)制
  • 系統(tǒng)集成:一個事件觸發(fā)多個業(yè)務(wù)流程
  • 日志統(tǒng)一處理并分發(fā)到不同服務(wù)
  • 通知系統(tǒng):一個事件需要通過多種方式通知用戶

優(yōu)缺點

優(yōu)點

  • 實現(xiàn)一次發(fā)布多次消費
  • 各消費者組獨立工作,互不影響
  • 新增消費者組可以從頭開始消費所有歷史消息
  • 可靠性高,消息持久化存儲

缺點

  • 隨著流數(shù)據(jù)增長,可能占用較多存儲空間
  • 需要合理設(shè)置流的最大長度或過期策略
  • 消費者組數(shù)量過多可能增加Redis負(fù)載
  • 需要單獨管理每個消費者組的狀態(tài)

5. 重試與恢復(fù)模式(Retry and Recovery)

基本概念

這種模式關(guān)注處理失敗消息的恢復(fù)和重試機(jī)制。Redis Stream消費者組會跟蹤每個消息的處理狀態(tài),允許查看和管理未確認(rèn)(PEL - Pending Entry List)的消息,實現(xiàn)可靠的消息處理。

核心命令

# 查看消費者組中未確認(rèn)的消息
XPENDING stream_name group_name [start_id end_id count] [consumer_name]

# 查看消費者組中長時間未確認(rèn)的消息詳情
XPENDING stream_name group_name start_id end_id count [consumer_name]

# 認(rèn)領(lǐng)處理超時的消息
XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]

實現(xiàn)示例

Redis CLI

# 查看未確認(rèn)的消息數(shù)量
> XPENDING mystream processing-group
1) (integer) 2         # 未確認(rèn)消息數(shù)量
2) "1647257548956-0"   # 最小ID
3) "1647257549123-0"   # 最大ID
4) 1) 1) "consumer-1"  # 各個消費者的未確認(rèn)消息數(shù)
      2) (integer) 1
   2) 1) "consumer-2"
      2) (integer) 1

# 查看特定消費者的未確認(rèn)消息
> XPENDING mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0"   # 消息ID
   2) "consumer-1"         # 當(dāng)前持有的消費者
   3) (integer) 120000     # 空閑時間(毫秒)
   4) (integer) 2          # 傳遞次數(shù)

# 認(rèn)領(lǐng)超過2分鐘未處理的消息
> XCLAIM mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
   2) 1) "sensor_id"
      2) "1234"
      3) "temperature"
      4) "19.8"
      5) "humidity"
      6) "56"

Java Spring Boot示例

@Service
public class MessageRecoveryService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 獲取消費者組中的未確認(rèn)消息
     */
    public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
        return redisTemplate.opsForStream().pending(streamKey, groupName);
    }
    
    /**
     * 獲取指定消費者的詳細(xì)未確認(rèn)消息
     */
    public PendingMessages getPendingMessages(
            String streamKey, String groupName, String consumerName, 
            Range<String> idRange, long count) {
        
        return redisTemplate.opsForStream().pending(
                streamKey, 
                Consumer.from(groupName, consumerName), 
                idRange, 
                count);
    }
    
    /**
     * 認(rèn)領(lǐng)長時間未處理的消息
     */
    public List<MapRecord<String, Object, Object>> claimMessages(
            String streamKey, String groupName, String newConsumerName, 
            Duration minIdleTime, String... messageIds) {
        
        return redisTemplate.opsForStream().claim(
                streamKey, 
                Consumer.from(groupName, newConsumerName), 
                minIdleTime, 
                messageIds);
    }
    
    /**
     * 定時檢查和恢復(fù)未處理的消息
     */
    @Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
    public void recoverStaleMessages() {
        // 配置參數(shù)
        String streamKey = "mystream";
        String groupName = "processing-group";
        String recoveryConsumer = "recovery-consumer";
        Duration minIdleTime = Duration.ofMinutes(5); // 超過5分鐘未處理的消息
        
        try {
            // 1. 獲取所有未確認(rèn)消息的摘要
            PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
            
            if (summary != null && summary.getTotalPendingMessages() > 0) {
                // 2. 遍歷每個消費者的未確認(rèn)消息
                for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) {
                    // 獲取該消費者的詳細(xì)未確認(rèn)消息列表
                    PendingMessages pendingMessages = getPendingMessages(
                            streamKey, groupName, consumer.getName(), 
                            Range.unbounded(), 50); // 每次最多處理50條
                    
                    if (pendingMessages != null) {
                        // 3. 篩選出空閑時間超過閾值的消息
                        List<String> staleMessageIds = new ArrayList<>();
                        
                        for (PendingMessage message : pendingMessages) {
                            if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) {
                                staleMessageIds.add(message.getIdAsString());
                            }
                        }
                        
                        // 4. 認(rèn)領(lǐng)這些消息
                        if (!staleMessageIds.isEmpty()) {
                            log.info("Claiming {} stale messages from consumer {}", 
                                    staleMessageIds.size(), consumer.getName());
                            
                            List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
                                    streamKey, groupName, recoveryConsumer, minIdleTime, 
                                    staleMessageIds.toArray(new String[0]));
                            
                            // 5. 處理這些被認(rèn)領(lǐng)的消息
                            processClaimedMessages(streamKey, groupName, claimedMessages);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Error recovering stale messages: {}", e.getMessage(), e);
        }
    }
    
    /**
     * 處理被認(rèn)領(lǐng)的消息
     */
    private void processClaimedMessages(
            String streamKey, String groupName, 
            List<MapRecord<String, Object, Object>> messages) {
        
        if (messages == null || messages.isEmpty()) {
            return;
        }
        
        for (MapRecord<String, Object, Object> message : messages) {
            try {
                // 執(zhí)行消息處理邏輯
                processMessage(message);
                
                // 確認(rèn)消息
                redisTemplate.opsForStream().acknowledge(
                        streamKey, groupName, message.getId().getValue());
                
                log.info("Successfully processed recovered message: {}", message.getId());
            } catch (Exception e) {
                log.error("Failed to process recovered message {}: {}", 
                        message.getId(), e.getMessage(), e);
                // 根據(jù)業(yè)務(wù)需求決定是否將消息加入死信隊列
                moveToDeadLetterQueue(streamKey, message);
            }
        }
    }
    
    /**
     * 將消息移至死信隊列
     */
    private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
        String deadLetterStream = sourceStream + ":dead-letter";
        Map<Object, Object> messageData = message.getValue();
        
        Map<String, Object> dlqMessage = new HashMap<>();
        messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v));
        
        // 添加元數(shù)據(jù)
        dlqMessage.put("original_id", message.getId().getValue());
        dlqMessage.put("error_time", System.currentTimeMillis());
        
        redisTemplate.opsForStream().add(deadLetterStream, dlqMessage);
        
        // 可選:從原消費者組確認(rèn)該消息
        // redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue());
    }
    
    private void processMessage(MapRecord<String, Object, Object> message) {
        // 實際的消息處理邏輯
        log.info("Processing recovered message: {}", message);
        // ...
    }
}

使用場景

  • 需要可靠消息處理的關(guān)鍵業(yè)務(wù)系統(tǒng)
  • 處理時間較長的任務(wù)
  • 需要錯誤重試機(jī)制的工作流
  • 監(jiān)控和診斷消息處理過程
  • 實現(xiàn)死信隊列處理特定失敗場景

優(yōu)缺點

優(yōu)點

  • 提高系統(tǒng)容錯性和可靠性
  • 自動恢復(fù)因消費者崩潰導(dǎo)致的未處理消息
  • 可以識別和處理長時間未確認(rèn)的消息
  • 支持實現(xiàn)復(fù)雜的重試策略和死信處理

缺點

  • 需要額外開發(fā)和維護(hù)恢復(fù)機(jī)制
  • 可能導(dǎo)致消息重復(fù)處理,需要確保業(yè)務(wù)邏輯冪等
  • 系統(tǒng)復(fù)雜度增加
  • 需要監(jiān)控和管理PEL(未確認(rèn)消息列表)的大小

6. 流處理窗口模式(Streaming Window Processing)

基本概念

流處理窗口模式基于時間或消息計數(shù)劃分?jǐn)?shù)據(jù)流,在每個窗口內(nèi)執(zhí)行聚合或分析操作。這種模式適用于實時分析、趨勢監(jiān)測和時間序列處理。雖然Redis Stream本身不直接提供窗口操作,但可以結(jié)合Redis的其他特性實現(xiàn)。

實現(xiàn)方式

主要通過以下幾種方式實現(xiàn):

1. 基于消息ID的時間范圍(Redis消息ID包含毫秒時間戳)

2. 結(jié)合Redis的排序集合(SortedSet)存儲窗口數(shù)據(jù)

3. 使用Redis的過期鍵實現(xiàn)滑動窗口

實現(xiàn)示例

Redis CLI

窗口數(shù)據(jù)收集與查詢:

# 添加帶時間戳的數(shù)據(jù)
> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"

# 查詢特定時間范圍的數(shù)據(jù)
> XRANGE temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.5"
      5) "timestamp"
      6) "1647257548000"
2) 1) "1647257560234-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.8"
      5) "timestamp"
      6) "1647257558000"

Java Spring Boot示例

@Service
public class TimeWindowProcessingService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
     * 添加數(shù)據(jù)點到流,并存儲到相應(yīng)的時間窗口
     */
    public String addDataPoint(String streamKey, String sensorId, double value) {
        long timestamp = System.currentTimeMillis();
        
        // 1. 添加到原始數(shù)據(jù)流
        Map<String, Object> dataPoint = new HashMap<>();
        dataPoint.put("sensor_id", sensorId);
        dataPoint.put("value", String.valueOf(value));
        dataPoint.put("timestamp", String.valueOf(timestamp));
        
        StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey);
        RecordId recordId = redisTemplate.opsForStream().add(record);
        
        // 2. 計算所屬的窗口(這里以5分鐘為一個窗口)
        long windowStart = timestamp - (timestamp % (5 * 60 * 1000));
        String windowKey = streamKey + ":window:" + windowStart;
        
        // 3. 將數(shù)據(jù)點添加到窗口的有序集合中,分?jǐn)?shù)為時間戳
        String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint);
        redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp);
        
        // 4. 設(shè)置窗口鍵的過期時間(保留24小時)
        redisTemplate.expire(windowKey, Duration.ofHours(24));
        
        return recordId.getValue();
    }
    
    /**
     * 獲取指定時間窗口內(nèi)的數(shù)據(jù)點
     */
    public List<Map<String, Object>> getWindowData(
            String streamKey, long windowStartTime, long windowEndTime) {
        
        // 計算可能的窗口鍵(每5分鐘一個窗口)
        List<String> windowKeys = new ArrayList<>();
        long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
        
        while (current <= windowEndTime) {
            windowKeys.add(streamKey + ":window:" + current);
            current += (5 * 60 * 1000);
        }
        
        // 從各個窗口獲取數(shù)據(jù)點
        List<Map<String, Object>> results = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        
        for (String windowKey : windowKeys) {
            Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore(
                    windowKey, windowStartTime, windowEndTime);
            
            if (dataPoints != null) {
                for (String dataPointJson : dataPoints) {
                    try {
                        Map<String, Object> dataPoint = mapper.readValue(
                                dataPointJson, new TypeReference<Map<String, Object>>() {});
                        results.add(dataPoint);
                    } catch (Exception e) {
                        log.error("Error parsing data point: {}", e.getMessage(), e);
                    }
                }
            }
        }
        
        // 按時間戳排序
        results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString())));
        
        return results;
    }
    
    /**
     * 計算窗口內(nèi)數(shù)據(jù)的聚合統(tǒng)計
     */
    public Map<String, Object> getWindowStats(
            String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
        
        List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
        
        // 過濾特定傳感器的數(shù)據(jù)
        List<Double> values = windowData.stream()
                .filter(dp -> sensorId.equals(dp.get("sensor_id").toString()))
                .map(dp -> Double.parseDouble(dp.get("value").toString()))
                .collect(Collectors.toList());
        
        Map<String, Object> stats = new HashMap<>();
        stats.put("count", values.size());
        
        if (!values.isEmpty()) {
            DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v));
            stats.put("min", summaryStats.getMin());
            stats.put("max", summaryStats.getMax());
            stats.put("avg", summaryStats.getAverage());
            stats.put("sum", summaryStats.getSum());
        }
        
        stats.put("start_time", windowStartTime);
        stats.put("end_time", windowEndTime);
        stats.put("sensor_id", sensorId);
        
        return stats;
    }
    
    /**
     * 實現(xiàn)滑動窗口處理
     */
    @Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
    public void processSlidingWindows() {
        String streamKey = "temperature";
        long now = System.currentTimeMillis();
        
        // 處理過去10分鐘窗口的數(shù)據(jù)
        long windowEndTime = now;
        long windowStartTime = now - (10 * 60 * 1000);
        
        List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例傳感器ID
        
        for (String sensorId : sensorIds) {
            try {
                // 獲取窗口統(tǒng)計
                Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
                
                // 根據(jù)統(tǒng)計結(jié)果執(zhí)行業(yè)務(wù)邏輯
                if (stats.containsKey("avg")) {
                    double avgTemp = (double) stats.get("avg");
                    if (avgTemp > 25.0) {
                        // 觸發(fā)高溫警報
                        log.warn("High temperature alert for sensor {}: {} °C", sensorId, avgTemp);
                        triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
                    }
                }
                
                // 存儲聚合結(jié)果用于歷史趨勢分析
                saveAggregatedResults(streamKey, sensorId, stats);
                
            } catch (Exception e) {
                log.error("Error processing sliding window for sensor {}: {}", 
                        sensorId, e.getMessage(), e);
            }
        }
    }
    
    /**
     * 觸發(fā)警報
     */
    private void triggerAlert(String sensorId, String alertType, double value) {
        Map<String, Object> alertData = new HashMap<>();
        alertData.put("sensor_id", sensorId);
        alertData.put("alert_type", alertType);
        alertData.put("value", value);
        alertData.put("timestamp", System.currentTimeMillis());
        
        redisTemplate.opsForStream().add("alerts", alertData);
    }
    
    /**
     * 保存聚合結(jié)果
     */
    private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
        long windowTime = (long) stats.get("end_time");
        String aggregateKey = streamKey + ":aggregate:" + sensorId;
        
        // 使用時間作為分?jǐn)?shù)存儲聚合結(jié)果
        redisTemplate.opsForZSet().add(
                aggregateKey, 
                new ObjectMapper().writeValueAsString(stats),
                windowTime);
        
        // 保留30天的聚合數(shù)據(jù)
        redisTemplate.expire(aggregateKey, Duration.ofDays(30));
    }
}

使用場景

  • 實時數(shù)據(jù)分析與統(tǒng)計
  • 趨勢檢測和預(yù)測
  • 異常值和閾值監(jiān)控
  • 時間序列數(shù)據(jù)處理
  • IoT數(shù)據(jù)流處理和聚合
  • 用戶行為分析

優(yōu)缺點

優(yōu)點

  • 支持基于時間的數(shù)據(jù)分析
  • 可以實現(xiàn)實時聚合和計算
  • 靈活的窗口定義(滑動窗口、滾動窗口)
  • 可擴(kuò)展以支持復(fù)雜的分析場景

缺點

  • 實現(xiàn)復(fù)雜度較高
  • 可能需要額外的數(shù)據(jù)結(jié)構(gòu)和存儲空間
  • 對于大數(shù)據(jù)量的窗口計算可能影響性能
  • 需要小心管理內(nèi)存使用和數(shù)據(jù)過期策略

結(jié)論

Redis Stream提供了強(qiáng)大而靈活的消息處理功能,通過組合這些模式,可以構(gòu)建出高性能、可靠且靈活的消息處理系統(tǒng),滿足從簡單的任務(wù)隊列到復(fù)雜的實時數(shù)據(jù)處理等各種應(yīng)用需求。

在選擇和實現(xiàn)這些模式時,應(yīng)充分考慮業(yè)務(wù)特性、性能需求、可靠性要求以及系統(tǒng)規(guī)模,結(jié)合Redis Stream的特性,打造最適合自己應(yīng)用場景的消息處理解決方案。

以上就是一文帶你搞懂Redis Stream的6種消息處理模式的詳細(xì)內(nèi)容,更多關(guān)于Redis Stream消息處理模式的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Redis?鍵值對(key-value)數(shù)據(jù)庫實現(xiàn)方法

    Redis?鍵值對(key-value)數(shù)據(jù)庫實現(xiàn)方法

    Redis 的鍵值對中的 key 就是字符串對象,而 value 可以是字符串對象,也可以是集合數(shù)據(jù)類型的對象,比如 List 對象,Hash 對象、Set 對象和 Zset 對象,這篇文章主要介紹了Redis?鍵值對數(shù)據(jù)庫是怎么實現(xiàn)的,需要的朋友可以參考下
    2024-05-05
  • Windows系統(tǒng)安裝Redis的詳細(xì)圖文教程

    Windows系統(tǒng)安裝Redis的詳細(xì)圖文教程

    但有時候想在windows下折騰下Redis,那么就可以參考下面的方法了,雖然腳本之家小編以前整理了一些,發(fā)現(xiàn)這篇做的比較詳細(xì),下載也給出來了
    2018-08-08
  • ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式)

    ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式)

    這篇文章主要介紹了ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式),需要的朋友可以參考下
    2018-03-03
  • RedisTemplate中boundHashOps的使用小結(jié)

    RedisTemplate中boundHashOps的使用小結(jié)

    redisTemplate.boundHashOps(key)?是 RedisTemplate 類的一個方法,本文主要介紹了RedisTemplate中boundHashOps的使用小結(jié),具有一定的參考價值,感興趣的可以了解一下
    2024-04-04
  • Redis底層類型之json命令使用

    Redis底層類型之json命令使用

    這篇文章主要為大家介紹了Redis底層類型之json命令使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • 使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決

    使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決

    這篇文章主要介紹了使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • Redis 如何批量設(shè)置過期時間(PIPLINE的使用)

    Redis 如何批量設(shè)置過期時間(PIPLINE的使用)

    有時候我們并不希望redis的key一直存在。例如緩存,驗證碼等數(shù)據(jù),我們希望它們能在一定時間內(nèi)自動的被銷毀。本文就詳細(xì)的介紹一下Redis 如何批量設(shè)置過期時間,感興趣的可以了解一下
    2021-11-11
  • Redis在項目中的使用(JedisPool方式)

    Redis在項目中的使用(JedisPool方式)

    項目操作redis是使用的RedisTemplate方式,另外還可以完全使用JedisPool和Jedis來操作redis,本文給大家介紹Redis在項目中的使用,JedisPool方式,感興趣的朋友跟隨小編一起看看吧
    2021-12-12
  • Redis所實現(xiàn)的Reactor模型設(shè)計方案

    Redis所實現(xiàn)的Reactor模型設(shè)計方案

    這篇文章主要介紹了Redis所實現(xiàn)的Reactor模型,本文將帶領(lǐng)讀者從源碼的角度來查看redis關(guān)于reactor模型的設(shè)計,需要的朋友可以參考下
    2024-06-06
  • 詳解Redis中地理位置功能Geospatial的應(yīng)用

    詳解Redis中地理位置功能Geospatial的應(yīng)用

    Geospatial?Indexes?是?Redis?提供的一種數(shù)據(jù)結(jié)構(gòu),用于存儲和查詢地理位置信息,這篇文章就來和大家詳細(xì)講講Geospatial的具體應(yīng)用吧
    2023-06-06

最新評論