一文帶你搞懂Redis Stream的6種消息處理模式
Redis 5.0版本引入的Stream數(shù)據(jù)類型,為Redis生態(tài)帶來了強(qiáng)大而靈活的消息隊列功能,彌補(bǔ)了之前發(fā)布/訂閱模式的不足,如消息持久化、消費者組、消息確認(rèn)等特性。
Redis Stream結(jié)合了傳統(tǒng)消息隊列和時序數(shù)據(jù)庫的特點,適用于日志收集、事件驅(qū)動應(yīng)用、實時分析等多種場景。
本文將介紹Redis Stream的6種消息處理模式。
1. 簡單消費模式(Simple Consumption)
基本概念
簡單消費模式是Redis Stream最基礎(chǔ)的使用方式,不使用消費者組,直接讀取流中的消息。生產(chǎn)者將消息追加到流中,消費者通過指定起始ID來讀取消息。
核心命令
# 發(fā)布消息 XADD stream_name [ID] field value [field value ...] # 讀取消息 XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id
實現(xiàn)示例
Redis CLI
# 添加消息到stream > XADD mystream * sensor_id 1234 temperature 19.8 humidity 56 "1647257548956-0" # 從頭開始讀取所有消息 > XREAD STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 從指定ID開始讀取 > XREAD STREAMS mystream 1647257548956-0 (empty list or set) # 從最新的消息ID之后開始讀取(阻塞等待新消息) > XREAD BLOCK 5000 STREAMS mystream $ (nil)
Java Spring Boot示例
@Service public class SimpleStreamService { @Autowired private StringRedisTemplate redisTemplate; /** * 發(fā)布消息到Stream */ public String publishEvent(String streamKey, Map<String, Object> eventData) { StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey); return redisTemplate.opsForStream().add(record).getValue(); } /** * 從指定位置開始讀取消息 */ public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) { StreamReadOptions readOptions = StreamReadOptions.empty().count(count); return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId))); } /** * 阻塞式讀取消息 */ public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) { StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis)); return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey)); } }
使用場景
- 簡單的事件日志記錄
- 單一消費者場景
- 時間序列數(shù)據(jù)收集
- 開發(fā)和調(diào)試階段
優(yōu)缺點
優(yōu)點
- 實現(xiàn)簡單,無需創(chuàng)建和管理消費者組
- 直接控制從哪個位置開始消費消息
- 適合單個消費者場景
缺點
- 無法實現(xiàn)負(fù)載均衡
- 無法追蹤消息確認(rèn)狀態(tài)
- 需要手動管理已讀消息ID
- 服務(wù)重啟需自行記錄上次讀取位置
2. 消費者組模式(Consumer Groups)
基本概念
消費者組允許多個消費者共同處理一個流的消息,實現(xiàn)負(fù)載均衡,并提供消息確認(rèn)機(jī)制,確保消息至少被處理一次。每個消費者組維護(hù)自己的消費位置,不同消費者組之間互不干擾。
核心命令
# 創(chuàng)建消費者組 XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM] # 從消費者組讀取消息 XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID] # 確認(rèn)消息處理完成 XACK stream_name group_name message_id [message_id ...]
實現(xiàn)示例
Redis CLI
# 創(chuàng)建消費者組 > XGROUP CREATE mystream processing-group $ MKSTREAM OK # 消費者1讀取消息 > XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 確認(rèn)消息已處理 > XACK mystream processing-group 1647257548956-0 (integer) 1 # 消費者2讀取消息(已無未處理消息) > XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) (empty list or set)
Java Spring Boot示例
@Service public class ConsumerGroupService { @Autowired private StringRedisTemplate redisTemplate; /** * 創(chuàng)建消費者組 */ public void createGroup(String streamKey, String groupName) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (RedisSystemException e) { // 處理流不存在的情況 if (e.getRootCause() instanceof RedisCommandExecutionException && e.getRootCause().getMessage().contains("NOGROUP")) { redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName); } else { throw e; } } } /** * 從消費者組讀取消息 */ public List<MapRecord<String, Object, Object>> readFromGroup( String streamKey, String groupName, String consumerName, int count) { StreamReadOptions options = StreamReadOptions.empty().count(count); return redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); } /** * 阻塞式從消費者組讀取消息 */ public List<MapRecord<String, Object, Object>> readFromGroupBlocking( String streamKey, String groupName, String consumerName, int count, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout); return redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); } /** * 確認(rèn)消息已處理 */ public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) { return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds); } }
使用場景
- 需要橫向擴(kuò)展消息處理能力的系統(tǒng)
- 要求消息可靠處理的業(yè)務(wù)場景
- 實現(xiàn)消息工作隊列
- 微服務(wù)間的事件傳遞
優(yōu)缺點
優(yōu)點
- 多個消費者可以并行處理消息
- 提供消息確認(rèn)機(jī)制,保證消息不丟失
- 支持消費者崩潰后恢復(fù)處理
- 每個消費者組維護(hù)獨立的消費位置
缺點
- 實現(xiàn)相對復(fù)雜
- 需要妥善管理消費者組和消費者
- 需要顯式處理消息確認(rèn)
- 需要定期處理未確認(rèn)的消息
3. 阻塞式消費模式(Blocking Consumption)
基本概念
阻塞式消費允許消費者在沒有新消息時保持連接,等待新消息到達(dá)。這種模式減少了輪詢開銷,提高了實時性,適合對消息處理時效性要求高的場景。
核心命令
# 阻塞式簡單消費 XREAD BLOCK milliseconds STREAMS stream_name ID # 阻塞式消費者組消費 XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >
實現(xiàn)示例
Redis CLI
# 阻塞等待新消息(最多等待10秒) > XREAD BLOCK 10000 STREAMS mystream $ (nil) # 如果10秒內(nèi)沒有新消息 # 使用消費者組的阻塞式消費 > XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream > (nil) # 如果10秒內(nèi)沒有新分配的消息
Java Spring Boot示例
@Service public class BlockingStreamConsumerService { @Autowired private StringRedisTemplate redisTemplate; /** * 阻塞式消息消費者任務(wù) */ @Async public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty() .count(1) .block(timeout); while (!Thread.currentThread().isInterrupted()) { try { // 阻塞讀取消息 List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream() .read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId))); if (records != null && !records.isEmpty()) { for (MapRecord<String, Object, Object> record : records) { // 處理消息 processMessage(record); // 更新最后讀取的ID lastId = record.getId().getValue(); } } else { // 超時未讀取到消息,可以執(zhí)行一些其他邏輯 } } catch (Exception e) { // 異常處理 log.error("Error reading from stream: {}", e.getMessage(), e); try { Thread.sleep(1000); // 出錯后等待一段時間再重試 } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } /** * 阻塞式消費者組消費 */ @Async public void startGroupBlockingConsumer( String streamKey, String groupName, String consumerName, Duration timeout) { StreamReadOptions options = StreamReadOptions.empty() .count(1) .block(timeout); while (!Thread.currentThread().isInterrupted()) { try { // 阻塞讀取消息 List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream() .read(Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed())); if (records != null && !records.isEmpty()) { for (MapRecord<String, Object, Object> record : records) { try { // 處理消息 processMessage(record); // 確認(rèn)消息 redisTemplate.opsForStream() .acknowledge(streamKey, groupName, record.getId().getValue()); } catch (Exception e) { // 處理失敗,記錄日志 log.error("Error processing message: {}", e.getMessage(), e); } } } } catch (Exception e) { log.error("Error reading from stream group: {}", e.getMessage(), e); try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } private void processMessage(MapRecord<String, Object, Object> record) { // 實際消息處理邏輯 log.info("Processing message: {}", record); // ...處理消息的具體業(yè)務(wù)邏輯 } }
使用場景
- 實時數(shù)據(jù)處理系統(tǒng)
- 事件驅(qū)動的任務(wù)處理
- 低延遲要求的應(yīng)用
- 即時通訊系統(tǒng)
- 通知服務(wù)
優(yōu)缺點
優(yōu)點
- 減少輪詢帶來的資源浪費
- 實時性好,消息到達(dá)后立即處理
- 降低Redis和客戶端的負(fù)載
- 節(jié)省CPU和網(wǎng)絡(luò)資源
缺點
- 長連接可能占用Redis連接資源
- 需要合理設(shè)置超時時間
- 可能需要處理網(wǎng)絡(luò)中斷后的重連
- 消費者需要具備并發(fā)處理能力
4. 扇出模式(Fan-out Pattern)
基本概念
扇出模式允許多個獨立的消費者組同時消費同一個流中的所有消息,類似于發(fā)布/訂閱模式,但具有消息持久化和回溯能力。每個消費者組獨立維護(hù)自己的消費位置。
核心命令
創(chuàng)建多個消費者組,它們都獨立消費同一個流:
XGROUP CREATE stream_name group_name_1 $ MKSTREAM XGROUP CREATE stream_name group_name_2 $ MKSTREAM XGROUP CREATE stream_name group_name_3 $ MKSTREAM
實現(xiàn)示例
Redis CLI
# 創(chuàng)建多個消費者組 > XGROUP CREATE notifications analytics-group $ MKSTREAM OK > XGROUP CREATE notifications email-group $ MKSTREAM OK > XGROUP CREATE notifications mobile-group $ MKSTREAM OK # 添加一條消息 > XADD notifications * type user_signup user_id 1001 email "user@example.com" "1647345678912-0" # 從各個消費者組讀?。總€組都能收到所有消息) > XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com"
Java Spring Boot示例
@Service public class FanOutService { @Autowired private StringRedisTemplate redisTemplate; /** * 初始化扇出消費者組 */ public void initializeFanOutGroups(String streamKey, List<String> groupNames) { // 確保流存在 try { StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey); } catch (Exception e) { // 流不存在,發(fā)送一個初始消息 Map<String, Object> initialMessage = new HashMap<>(); initialMessage.put("init", "true"); redisTemplate.opsForStream().add(streamKey, initialMessage); } // 創(chuàng)建所有消費者組 for (String groupName : groupNames) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (Exception e) { // 忽略組已存在的錯誤 log.info("Group {} may already exist: {}", groupName, e.getMessage()); } } } /** * 發(fā)布扇出消息 */ public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) { StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey); return redisTemplate.opsForStream().add(record).getValue(); } /** * 為特定組啟動消費者 */ @Async public void startGroupConsumer( String streamKey, String groupName, String consumerName, Consumer<MapRecord<String, Object, Object>> messageHandler) { StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2)); while (!Thread.currentThread().isInterrupted()) { try { List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read( Consumer.from(groupName, consumerName), options, StreamOffset.create(streamKey, ReadOffset.lastConsumed()) ); if (messages != null && !messages.isEmpty()) { for (MapRecord<String, Object, Object> message : messages) { try { // 處理消息 messageHandler.accept(message); // 確認(rèn)消息 redisTemplate.opsForStream().acknowledge( streamKey, groupName, message.getId().getValue()); } catch (Exception e) { log.error("Error processing message in group {}: {}", groupName, e.getMessage(), e); } } } } catch (Exception e) { log.error("Error reading from stream for group {}: {}", groupName, e.getMessage(), e); try { Thread.sleep(1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; } } } } }
使用示例
@Service public class NotificationService { @Autowired private FanOutService fanOutService; @PostConstruct public void init() { // 初始化扇出組 List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group"); fanOutService.initializeFanOutGroups("user-events", groups); // 啟動各個消費者組的處理器 fanOutService.startGroupConsumer( "user-events", "email-group", "email-consumer", this::processEmailNotification); fanOutService.startGroupConsumer( "user-events", "sms-group", "sms-consumer", this::processSmsNotification); fanOutService.startGroupConsumer( "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent); } private void processEmailNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing email notification: {}", messageData); // 郵件發(fā)送邏輯 } private void processSmsNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing SMS notification: {}", messageData); // 短信發(fā)送邏輯 } private void processAnalyticsEvent(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = message.getValue(); log.info("Processing analytics event: {}", messageData); // 分析事件處理邏輯 } public void publishUserEvent(String eventType, Map<String, Object> eventData) { Map<String, Object> message = new HashMap<>(eventData); message.put("event_type", eventType); message.put("timestamp", System.currentTimeMillis()); fanOutService.publishFanOutMessage("user-events", message); } }
使用場景
- 多個系統(tǒng)需要獨立處理同一事件流
- 實現(xiàn)事件廣播機(jī)制
- 系統(tǒng)集成:一個事件觸發(fā)多個業(yè)務(wù)流程
- 日志統(tǒng)一處理并分發(fā)到不同服務(wù)
- 通知系統(tǒng):一個事件需要通過多種方式通知用戶
優(yōu)缺點
優(yōu)點
- 實現(xiàn)一次發(fā)布多次消費
- 各消費者組獨立工作,互不影響
- 新增消費者組可以從頭開始消費所有歷史消息
- 可靠性高,消息持久化存儲
缺點
- 隨著流數(shù)據(jù)增長,可能占用較多存儲空間
- 需要合理設(shè)置流的最大長度或過期策略
- 消費者組數(shù)量過多可能增加Redis負(fù)載
- 需要單獨管理每個消費者組的狀態(tài)
5. 重試與恢復(fù)模式(Retry and Recovery)
基本概念
這種模式關(guān)注處理失敗消息的恢復(fù)和重試機(jī)制。Redis Stream消費者組會跟蹤每個消息的處理狀態(tài),允許查看和管理未確認(rèn)(PEL - Pending Entry List)的消息,實現(xiàn)可靠的消息處理。
核心命令
# 查看消費者組中未確認(rèn)的消息 XPENDING stream_name group_name [start_id end_id count] [consumer_name] # 查看消費者組中長時間未確認(rèn)的消息詳情 XPENDING stream_name group_name start_id end_id count [consumer_name] # 認(rèn)領(lǐng)處理超時的消息 XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]
實現(xiàn)示例
Redis CLI
# 查看未確認(rèn)的消息數(shù)量 > XPENDING mystream processing-group 1) (integer) 2 # 未確認(rèn)消息數(shù)量 2) "1647257548956-0" # 最小ID 3) "1647257549123-0" # 最大ID 4) 1) 1) "consumer-1" # 各個消費者的未確認(rèn)消息數(shù) 2) (integer) 1 2) 1) "consumer-2" 2) (integer) 1 # 查看特定消費者的未確認(rèn)消息 > XPENDING mystream processing-group - + 10 consumer-1 1) 1) "1647257548956-0" # 消息ID 2) "consumer-1" # 當(dāng)前持有的消費者 3) (integer) 120000 # 空閑時間(毫秒) 4) (integer) 2 # 傳遞次數(shù) # 認(rèn)領(lǐng)超過2分鐘未處理的消息 > XCLAIM mystream processing-group consumer-2 120000 1647257548956-0 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56"
Java Spring Boot示例
@Service public class MessageRecoveryService { @Autowired private StringRedisTemplate redisTemplate; /** * 獲取消費者組中的未確認(rèn)消息 */ public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) { return redisTemplate.opsForStream().pending(streamKey, groupName); } /** * 獲取指定消費者的詳細(xì)未確認(rèn)消息 */ public PendingMessages getPendingMessages( String streamKey, String groupName, String consumerName, Range<String> idRange, long count) { return redisTemplate.opsForStream().pending( streamKey, Consumer.from(groupName, consumerName), idRange, count); } /** * 認(rèn)領(lǐng)長時間未處理的消息 */ public List<MapRecord<String, Object, Object>> claimMessages( String streamKey, String groupName, String newConsumerName, Duration minIdleTime, String... messageIds) { return redisTemplate.opsForStream().claim( streamKey, Consumer.from(groupName, newConsumerName), minIdleTime, messageIds); } /** * 定時檢查和恢復(fù)未處理的消息 */ @Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次 public void recoverStaleMessages() { // 配置參數(shù) String streamKey = "mystream"; String groupName = "processing-group"; String recoveryConsumer = "recovery-consumer"; Duration minIdleTime = Duration.ofMinutes(5); // 超過5分鐘未處理的消息 try { // 1. 獲取所有未確認(rèn)消息的摘要 PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName); if (summary != null && summary.getTotalPendingMessages() > 0) { // 2. 遍歷每個消費者的未確認(rèn)消息 for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) { // 獲取該消費者的詳細(xì)未確認(rèn)消息列表 PendingMessages pendingMessages = getPendingMessages( streamKey, groupName, consumer.getName(), Range.unbounded(), 50); // 每次最多處理50條 if (pendingMessages != null) { // 3. 篩選出空閑時間超過閾值的消息 List<String> staleMessageIds = new ArrayList<>(); for (PendingMessage message : pendingMessages) { if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) { staleMessageIds.add(message.getIdAsString()); } } // 4. 認(rèn)領(lǐng)這些消息 if (!staleMessageIds.isEmpty()) { log.info("Claiming {} stale messages from consumer {}", staleMessageIds.size(), consumer.getName()); List<MapRecord<String, Object, Object>> claimedMessages = claimMessages( streamKey, groupName, recoveryConsumer, minIdleTime, staleMessageIds.toArray(new String[0])); // 5. 處理這些被認(rèn)領(lǐng)的消息 processClaimedMessages(streamKey, groupName, claimedMessages); } } } } } catch (Exception e) { log.error("Error recovering stale messages: {}", e.getMessage(), e); } } /** * 處理被認(rèn)領(lǐng)的消息 */ private void processClaimedMessages( String streamKey, String groupName, List<MapRecord<String, Object, Object>> messages) { if (messages == null || messages.isEmpty()) { return; } for (MapRecord<String, Object, Object> message : messages) { try { // 執(zhí)行消息處理邏輯 processMessage(message); // 確認(rèn)消息 redisTemplate.opsForStream().acknowledge( streamKey, groupName, message.getId().getValue()); log.info("Successfully processed recovered message: {}", message.getId()); } catch (Exception e) { log.error("Failed to process recovered message {}: {}", message.getId(), e.getMessage(), e); // 根據(jù)業(yè)務(wù)需求決定是否將消息加入死信隊列 moveToDeadLetterQueue(streamKey, message); } } } /** * 將消息移至死信隊列 */ private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) { String deadLetterStream = sourceStream + ":dead-letter"; Map<Object, Object> messageData = message.getValue(); Map<String, Object> dlqMessage = new HashMap<>(); messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v)); // 添加元數(shù)據(jù) dlqMessage.put("original_id", message.getId().getValue()); dlqMessage.put("error_time", System.currentTimeMillis()); redisTemplate.opsForStream().add(deadLetterStream, dlqMessage); // 可選:從原消費者組確認(rèn)該消息 // redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue()); } private void processMessage(MapRecord<String, Object, Object> message) { // 實際的消息處理邏輯 log.info("Processing recovered message: {}", message); // ... } }
使用場景
- 需要可靠消息處理的關(guān)鍵業(yè)務(wù)系統(tǒng)
- 處理時間較長的任務(wù)
- 需要錯誤重試機(jī)制的工作流
- 監(jiān)控和診斷消息處理過程
- 實現(xiàn)死信隊列處理特定失敗場景
優(yōu)缺點
優(yōu)點
- 提高系統(tǒng)容錯性和可靠性
- 自動恢復(fù)因消費者崩潰導(dǎo)致的未處理消息
- 可以識別和處理長時間未確認(rèn)的消息
- 支持實現(xiàn)復(fù)雜的重試策略和死信處理
缺點
- 需要額外開發(fā)和維護(hù)恢復(fù)機(jī)制
- 可能導(dǎo)致消息重復(fù)處理,需要確保業(yè)務(wù)邏輯冪等
- 系統(tǒng)復(fù)雜度增加
- 需要監(jiān)控和管理PEL(未確認(rèn)消息列表)的大小
6. 流處理窗口模式(Streaming Window Processing)
基本概念
流處理窗口模式基于時間或消息計數(shù)劃分?jǐn)?shù)據(jù)流,在每個窗口內(nèi)執(zhí)行聚合或分析操作。這種模式適用于實時分析、趨勢監(jiān)測和時間序列處理。雖然Redis Stream本身不直接提供窗口操作,但可以結(jié)合Redis的其他特性實現(xiàn)。
實現(xiàn)方式
主要通過以下幾種方式實現(xiàn):
1. 基于消息ID的時間范圍(Redis消息ID包含毫秒時間戳)
2. 結(jié)合Redis的排序集合(SortedSet)存儲窗口數(shù)據(jù)
3. 使用Redis的過期鍵實現(xiàn)滑動窗口
實現(xiàn)示例
Redis CLI
窗口數(shù)據(jù)收集與查詢:
# 添加帶時間戳的數(shù)據(jù) > XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000 "1647257550123-0" > XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000 "1647257560234-0" > XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000 "1647257570345-0" # 查詢特定時間范圍的數(shù)據(jù) > XRANGE temperature 1647257550000-0 1647257570000-0 1) 1) "1647257550123-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.5" 5) "timestamp" 6) "1647257548000" 2) 1) "1647257560234-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.8" 5) "timestamp" 6) "1647257558000"
Java Spring Boot示例
@Service public class TimeWindowProcessingService { @Autowired private StringRedisTemplate redisTemplate; /** * 添加數(shù)據(jù)點到流,并存儲到相應(yīng)的時間窗口 */ public String addDataPoint(String streamKey, String sensorId, double value) { long timestamp = System.currentTimeMillis(); // 1. 添加到原始數(shù)據(jù)流 Map<String, Object> dataPoint = new HashMap<>(); dataPoint.put("sensor_id", sensorId); dataPoint.put("value", String.valueOf(value)); dataPoint.put("timestamp", String.valueOf(timestamp)); StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey); RecordId recordId = redisTemplate.opsForStream().add(record); // 2. 計算所屬的窗口(這里以5分鐘為一個窗口) long windowStart = timestamp - (timestamp % (5 * 60 * 1000)); String windowKey = streamKey + ":window:" + windowStart; // 3. 將數(shù)據(jù)點添加到窗口的有序集合中,分?jǐn)?shù)為時間戳 String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint); redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp); // 4. 設(shè)置窗口鍵的過期時間(保留24小時) redisTemplate.expire(windowKey, Duration.ofHours(24)); return recordId.getValue(); } /** * 獲取指定時間窗口內(nèi)的數(shù)據(jù)點 */ public List<Map<String, Object>> getWindowData( String streamKey, long windowStartTime, long windowEndTime) { // 計算可能的窗口鍵(每5分鐘一個窗口) List<String> windowKeys = new ArrayList<>(); long current = windowStartTime - (windowStartTime % (5 * 60 * 1000)); while (current <= windowEndTime) { windowKeys.add(streamKey + ":window:" + current); current += (5 * 60 * 1000); } // 從各個窗口獲取數(shù)據(jù)點 List<Map<String, Object>> results = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); for (String windowKey : windowKeys) { Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore( windowKey, windowStartTime, windowEndTime); if (dataPoints != null) { for (String dataPointJson : dataPoints) { try { Map<String, Object> dataPoint = mapper.readValue( dataPointJson, new TypeReference<Map<String, Object>>() {}); results.add(dataPoint); } catch (Exception e) { log.error("Error parsing data point: {}", e.getMessage(), e); } } } } // 按時間戳排序 results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString()))); return results; } /** * 計算窗口內(nèi)數(shù)據(jù)的聚合統(tǒng)計 */ public Map<String, Object> getWindowStats( String streamKey, String sensorId, long windowStartTime, long windowEndTime) { List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime); // 過濾特定傳感器的數(shù)據(jù) List<Double> values = windowData.stream() .filter(dp -> sensorId.equals(dp.get("sensor_id").toString())) .map(dp -> Double.parseDouble(dp.get("value").toString())) .collect(Collectors.toList()); Map<String, Object> stats = new HashMap<>(); stats.put("count", values.size()); if (!values.isEmpty()) { DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v)); stats.put("min", summaryStats.getMin()); stats.put("max", summaryStats.getMax()); stats.put("avg", summaryStats.getAverage()); stats.put("sum", summaryStats.getSum()); } stats.put("start_time", windowStartTime); stats.put("end_time", windowEndTime); stats.put("sensor_id", sensorId); return stats; } /** * 實現(xiàn)滑動窗口處理 */ @Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次 public void processSlidingWindows() { String streamKey = "temperature"; long now = System.currentTimeMillis(); // 處理過去10分鐘窗口的數(shù)據(jù) long windowEndTime = now; long windowStartTime = now - (10 * 60 * 1000); List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例傳感器ID for (String sensorId : sensorIds) { try { // 獲取窗口統(tǒng)計 Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime); // 根據(jù)統(tǒng)計結(jié)果執(zhí)行業(yè)務(wù)邏輯 if (stats.containsKey("avg")) { double avgTemp = (double) stats.get("avg"); if (avgTemp > 25.0) { // 觸發(fā)高溫警報 log.warn("High temperature alert for sensor {}: {} °C", sensorId, avgTemp); triggerAlert(sensorId, "HIGH_TEMP", avgTemp); } } // 存儲聚合結(jié)果用于歷史趨勢分析 saveAggregatedResults(streamKey, sensorId, stats); } catch (Exception e) { log.error("Error processing sliding window for sensor {}: {}", sensorId, e.getMessage(), e); } } } /** * 觸發(fā)警報 */ private void triggerAlert(String sensorId, String alertType, double value) { Map<String, Object> alertData = new HashMap<>(); alertData.put("sensor_id", sensorId); alertData.put("alert_type", alertType); alertData.put("value", value); alertData.put("timestamp", System.currentTimeMillis()); redisTemplate.opsForStream().add("alerts", alertData); } /** * 保存聚合結(jié)果 */ private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) { long windowTime = (long) stats.get("end_time"); String aggregateKey = streamKey + ":aggregate:" + sensorId; // 使用時間作為分?jǐn)?shù)存儲聚合結(jié)果 redisTemplate.opsForZSet().add( aggregateKey, new ObjectMapper().writeValueAsString(stats), windowTime); // 保留30天的聚合數(shù)據(jù) redisTemplate.expire(aggregateKey, Duration.ofDays(30)); } }
使用場景
- 實時數(shù)據(jù)分析與統(tǒng)計
- 趨勢檢測和預(yù)測
- 異常值和閾值監(jiān)控
- 時間序列數(shù)據(jù)處理
- IoT數(shù)據(jù)流處理和聚合
- 用戶行為分析
優(yōu)缺點
優(yōu)點
- 支持基于時間的數(shù)據(jù)分析
- 可以實現(xiàn)實時聚合和計算
- 靈活的窗口定義(滑動窗口、滾動窗口)
- 可擴(kuò)展以支持復(fù)雜的分析場景
缺點
- 實現(xiàn)復(fù)雜度較高
- 可能需要額外的數(shù)據(jù)結(jié)構(gòu)和存儲空間
- 對于大數(shù)據(jù)量的窗口計算可能影響性能
- 需要小心管理內(nèi)存使用和數(shù)據(jù)過期策略
結(jié)論
Redis Stream提供了強(qiáng)大而靈活的消息處理功能,通過組合這些模式,可以構(gòu)建出高性能、可靠且靈活的消息處理系統(tǒng),滿足從簡單的任務(wù)隊列到復(fù)雜的實時數(shù)據(jù)處理等各種應(yīng)用需求。
在選擇和實現(xiàn)這些模式時,應(yīng)充分考慮業(yè)務(wù)特性、性能需求、可靠性要求以及系統(tǒng)規(guī)模,結(jié)合Redis Stream的特性,打造最適合自己應(yīng)用場景的消息處理解決方案。
以上就是一文帶你搞懂Redis Stream的6種消息處理模式的詳細(xì)內(nèi)容,更多關(guān)于Redis Stream消息處理模式的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Redis?鍵值對(key-value)數(shù)據(jù)庫實現(xiàn)方法
Redis 的鍵值對中的 key 就是字符串對象,而 value 可以是字符串對象,也可以是集合數(shù)據(jù)類型的對象,比如 List 對象,Hash 對象、Set 對象和 Zset 對象,這篇文章主要介紹了Redis?鍵值對數(shù)據(jù)庫是怎么實現(xiàn)的,需要的朋友可以參考下2024-05-05Windows系統(tǒng)安裝Redis的詳細(xì)圖文教程
但有時候想在windows下折騰下Redis,那么就可以參考下面的方法了,雖然腳本之家小編以前整理了一些,發(fā)現(xiàn)這篇做的比較詳細(xì),下載也給出來了2018-08-08ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式)
這篇文章主要介紹了ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式),需要的朋友可以參考下2018-03-03RedisTemplate中boundHashOps的使用小結(jié)
redisTemplate.boundHashOps(key)?是 RedisTemplate 類的一個方法,本文主要介紹了RedisTemplate中boundHashOps的使用小結(jié),具有一定的參考價值,感興趣的可以了解一下2024-04-04使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決
這篇文章主要介紹了使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11Redis 如何批量設(shè)置過期時間(PIPLINE的使用)
有時候我們并不希望redis的key一直存在。例如緩存,驗證碼等數(shù)據(jù),我們希望它們能在一定時間內(nèi)自動的被銷毀。本文就詳細(xì)的介紹一下Redis 如何批量設(shè)置過期時間,感興趣的可以了解一下2021-11-11Redis所實現(xiàn)的Reactor模型設(shè)計方案
這篇文章主要介紹了Redis所實現(xiàn)的Reactor模型,本文將帶領(lǐng)讀者從源碼的角度來查看redis關(guān)于reactor模型的設(shè)計,需要的朋友可以參考下2024-06-06詳解Redis中地理位置功能Geospatial的應(yīng)用
Geospatial?Indexes?是?Redis?提供的一種數(shù)據(jù)結(jié)構(gòu),用于存儲和查詢地理位置信息,這篇文章就來和大家詳細(xì)講講Geospatial的具體應(yīng)用吧2023-06-06