一文帶你搞懂Redis Stream的6種消息處理模式
Redis 5.0版本引入的Stream數(shù)據(jù)類型,為Redis生態(tài)帶來了強大而靈活的消息隊列功能,彌補了之前發(fā)布/訂閱模式的不足,如消息持久化、消費者組、消息確認等特性。
Redis Stream結合了傳統(tǒng)消息隊列和時序數(shù)據(jù)庫的特點,適用于日志收集、事件驅動應用、實時分析等多種場景。
本文將介紹Redis Stream的6種消息處理模式。
1. 簡單消費模式(Simple Consumption)
基本概念
簡單消費模式是Redis Stream最基礎的使用方式,不使用消費者組,直接讀取流中的消息。生產(chǎn)者將消息追加到流中,消費者通過指定起始ID來讀取消息。
核心命令
# 發(fā)布消息 XADD stream_name [ID] field value [field value ...] # 讀取消息 XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id
實現(xiàn)示例
Redis CLI
# 添加消息到stream
> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"
# 從頭開始讀取所有消息
> XREAD STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 從指定ID開始讀取
> XREAD STREAMS mystream 1647257548956-0
(empty list or set)
# 從最新的消息ID之后開始讀取(阻塞等待新消息)
> XREAD BLOCK 5000 STREAMS mystream $
(nil)Java Spring Boot示例
@Service
public class SimpleStreamService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 發(fā)布消息到Stream
*/
public String publishEvent(String streamKey, Map<String, Object> eventData) {
StringRecord record = StreamRecords.string(eventData).withStreamKey(streamKey);
return redisTemplate.opsForStream().add(record).getValue();
}
/**
* 從指定位置開始讀取消息
*/
public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
StreamReadOptions readOptions = StreamReadOptions.empty().count(count);
return redisTemplate.opsForStream().read(readOptions, StreamOffset.from(streamKey, ReadOffset.from(startId)));
}
/**
* 阻塞式讀取消息
*/
public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
StreamReadOptions readOptions = StreamReadOptions.empty().count(10).block(Duration.ofMillis(timeoutMillis));
return redisTemplate.opsForStream().read(readOptions, StreamOffset.latest(streamKey));
}
}使用場景
- 簡單的事件日志記錄
- 單一消費者場景
- 時間序列數(shù)據(jù)收集
- 開發(fā)和調(diào)試階段
優(yōu)缺點
優(yōu)點
- 實現(xiàn)簡單,無需創(chuàng)建和管理消費者組
- 直接控制從哪個位置開始消費消息
- 適合單個消費者場景
缺點
- 無法實現(xiàn)負載均衡
- 無法追蹤消息確認狀態(tài)
- 需要手動管理已讀消息ID
- 服務重啟需自行記錄上次讀取位置
2. 消費者組模式(Consumer Groups)
基本概念
消費者組允許多個消費者共同處理一個流的消息,實現(xiàn)負載均衡,并提供消息確認機制,確保消息至少被處理一次。每個消費者組維護自己的消費位置,不同消費者組之間互不干擾。
核心命令
# 創(chuàng)建消費者組 XGROUP CREATE stream_name group_name [ID|$] [MKSTREAM] # 從消費者組讀取消息 XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID] # 確認消息處理完成 XACK stream_name group_name message_id [message_id ...]
實現(xiàn)示例
Redis CLI
# 創(chuàng)建消費者組
> XGROUP CREATE mystream processing-group $ MKSTREAM
OK
# 消費者1讀取消息
> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"
# 確認消息已處理
> XACK mystream processing-group 1647257548956-0
(integer) 1
# 消費者2讀取消息(已無未處理消息)
> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) (empty list or set)Java Spring Boot示例
@Service
public class ConsumerGroupService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 創(chuàng)建消費者組
*/
public void createGroup(String streamKey, String groupName) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (RedisSystemException e) {
// 處理流不存在的情況
if (e.getRootCause() instanceof RedisCommandExecutionException
&& e.getRootCause().getMessage().contains("NOGROUP")) {
redisTemplate.opsForStream().createGroup(ReadOffset.from("0"), streamKey, groupName);
} else {
throw e;
}
}
}
/**
* 從消費者組讀取消息
*/
public List<MapRecord<String, Object, Object>> readFromGroup(
String streamKey, String groupName, String consumerName, int count) {
StreamReadOptions options = StreamReadOptions.empty().count(count);
return redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
}
/**
* 阻塞式從消費者組讀取消息
*/
public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
String streamKey, String groupName, String consumerName, int count, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty().count(count).block(timeout);
return redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
}
/**
* 確認消息已處理
*/
public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
return redisTemplate.opsForStream().acknowledge(streamKey, groupName, messageIds);
}
}使用場景
- 需要橫向擴展消息處理能力的系統(tǒng)
- 要求消息可靠處理的業(yè)務場景
- 實現(xiàn)消息工作隊列
- 微服務間的事件傳遞
優(yōu)缺點
優(yōu)點
- 多個消費者可以并行處理消息
- 提供消息確認機制,保證消息不丟失
- 支持消費者崩潰后恢復處理
- 每個消費者組維護獨立的消費位置
缺點
- 實現(xiàn)相對復雜
- 需要妥善管理消費者組和消費者
- 需要顯式處理消息確認
- 需要定期處理未確認的消息
3. 阻塞式消費模式(Blocking Consumption)
基本概念
阻塞式消費允許消費者在沒有新消息時保持連接,等待新消息到達。這種模式減少了輪詢開銷,提高了實時性,適合對消息處理時效性要求高的場景。
核心命令
# 阻塞式簡單消費 XREAD BLOCK milliseconds STREAMS stream_name ID # 阻塞式消費者組消費 XREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >
實現(xiàn)示例
Redis CLI
# 阻塞等待新消息(最多等待10秒) > XREAD BLOCK 10000 STREAMS mystream $ (nil) # 如果10秒內(nèi)沒有新消息 # 使用消費者組的阻塞式消費 > XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream > (nil) # 如果10秒內(nèi)沒有新分配的消息
Java Spring Boot示例
@Service
public class BlockingStreamConsumerService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 阻塞式消息消費者任務
*/
@Async
public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty()
.count(1)
.block(timeout);
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞讀取消息
List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
.read(options, StreamOffset.from(streamKey, ReadOffset.from(lastId)));
if (records != null && !records.isEmpty()) {
for (MapRecord<String, Object, Object> record : records) {
// 處理消息
processMessage(record);
// 更新最后讀取的ID
lastId = record.getId().getValue();
}
} else {
// 超時未讀取到消息,可以執(zhí)行一些其他邏輯
}
} catch (Exception e) {
// 異常處理
log.error("Error reading from stream: {}", e.getMessage(), e);
try {
Thread.sleep(1000); // 出錯后等待一段時間再重試
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* 阻塞式消費者組消費
*/
@Async
public void startGroupBlockingConsumer(
String streamKey, String groupName, String consumerName, Duration timeout) {
StreamReadOptions options = StreamReadOptions.empty()
.count(1)
.block(timeout);
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞讀取消息
List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream()
.read(Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
if (records != null && !records.isEmpty()) {
for (MapRecord<String, Object, Object> record : records) {
try {
// 處理消息
processMessage(record);
// 確認消息
redisTemplate.opsForStream()
.acknowledge(streamKey, groupName, record.getId().getValue());
} catch (Exception e) {
// 處理失敗,記錄日志
log.error("Error processing message: {}", e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("Error reading from stream group: {}", e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processMessage(MapRecord<String, Object, Object> record) {
// 實際消息處理邏輯
log.info("Processing message: {}", record);
// ...處理消息的具體業(yè)務邏輯
}
}使用場景
- 實時數(shù)據(jù)處理系統(tǒng)
- 事件驅動的任務處理
- 低延遲要求的應用
- 即時通訊系統(tǒng)
- 通知服務
優(yōu)缺點
優(yōu)點
- 減少輪詢帶來的資源浪費
- 實時性好,消息到達后立即處理
- 降低Redis和客戶端的負載
- 節(jié)省CPU和網(wǎng)絡資源
缺點
- 長連接可能占用Redis連接資源
- 需要合理設置超時時間
- 可能需要處理網(wǎng)絡中斷后的重連
- 消費者需要具備并發(fā)處理能力
4. 扇出模式(Fan-out Pattern)
基本概念
扇出模式允許多個獨立的消費者組同時消費同一個流中的所有消息,類似于發(fā)布/訂閱模式,但具有消息持久化和回溯能力。每個消費者組獨立維護自己的消費位置。
核心命令
創(chuàng)建多個消費者組,它們都獨立消費同一個流:
XGROUP CREATE stream_name group_name_1 $ MKSTREAM XGROUP CREATE stream_name group_name_2 $ MKSTREAM XGROUP CREATE stream_name group_name_3 $ MKSTREAM
實現(xiàn)示例
Redis CLI
# 創(chuàng)建多個消費者組
> XGROUP CREATE notifications analytics-group $ MKSTREAM
OK
> XGROUP CREATE notifications email-group $ MKSTREAM
OK
> XGROUP CREATE notifications mobile-group $ MKSTREAM
OK
# 添加一條消息
> XADD notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"
# 從各個消費者組讀?。總€組都能收到所有消息)
> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"
> XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
2) 1) 1) "1647345678912-0"
2) 1) "type"
2) "user_signup"
3) "user_id"
4) "1001"
5) "email"
6) "user@example.com"Java Spring Boot示例
@Service
public class FanOutService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 初始化扇出消費者組
*/
public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
// 確保流存在
try {
StreamInfo.XInfoStream info = redisTemplate.opsForStream().info(streamKey);
} catch (Exception e) {
// 流不存在,發(fā)送一個初始消息
Map<String, Object> initialMessage = new HashMap<>();
initialMessage.put("init", "true");
redisTemplate.opsForStream().add(streamKey, initialMessage);
}
// 創(chuàng)建所有消費者組
for (String groupName : groupNames) {
try {
redisTemplate.opsForStream().createGroup(streamKey, groupName);
} catch (Exception e) {
// 忽略組已存在的錯誤
log.info("Group {} may already exist: {}", groupName, e.getMessage());
}
}
}
/**
* 發(fā)布扇出消息
*/
public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
StringRecord record = StreamRecords.string(messageData).withStreamKey(streamKey);
return redisTemplate.opsForStream().add(record).getValue();
}
/**
* 為特定組啟動消費者
*/
@Async
public void startGroupConsumer(
String streamKey, String groupName, String consumerName,
Consumer<MapRecord<String, Object, Object>> messageHandler) {
StreamReadOptions options = StreamReadOptions.empty().count(10).block(Duration.ofSeconds(2));
while (!Thread.currentThread().isInterrupted()) {
try {
List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
Consumer.from(groupName, consumerName),
options,
StreamOffset.create(streamKey, ReadOffset.lastConsumed())
);
if (messages != null && !messages.isEmpty()) {
for (MapRecord<String, Object, Object> message : messages) {
try {
// 處理消息
messageHandler.accept(message);
// 確認消息
redisTemplate.opsForStream().acknowledge(
streamKey, groupName, message.getId().getValue());
} catch (Exception e) {
log.error("Error processing message in group {}: {}",
groupName, e.getMessage(), e);
}
}
}
} catch (Exception e) {
log.error("Error reading from stream for group {}: {}",
groupName, e.getMessage(), e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}使用示例
@Service
public class NotificationService {
@Autowired
private FanOutService fanOutService;
@PostConstruct
public void init() {
// 初始化扇出組
List<String> groups = Arrays.asList("email-group", "sms-group", "analytics-group");
fanOutService.initializeFanOutGroups("user-events", groups);
// 啟動各個消費者組的處理器
fanOutService.startGroupConsumer(
"user-events", "email-group", "email-consumer", this::processEmailNotification);
fanOutService.startGroupConsumer(
"user-events", "sms-group", "sms-consumer", this::processSmsNotification);
fanOutService.startGroupConsumer(
"user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
}
private void processEmailNotification(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing email notification: {}", messageData);
// 郵件發(fā)送邏輯
}
private void processSmsNotification(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing SMS notification: {}", messageData);
// 短信發(fā)送邏輯
}
private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
Map<Object, Object> messageData = message.getValue();
log.info("Processing analytics event: {}", messageData);
// 分析事件處理邏輯
}
public void publishUserEvent(String eventType, Map<String, Object> eventData) {
Map<String, Object> message = new HashMap<>(eventData);
message.put("event_type", eventType);
message.put("timestamp", System.currentTimeMillis());
fanOutService.publishFanOutMessage("user-events", message);
}
}使用場景
- 多個系統(tǒng)需要獨立處理同一事件流
- 實現(xiàn)事件廣播機制
- 系統(tǒng)集成:一個事件觸發(fā)多個業(yè)務流程
- 日志統(tǒng)一處理并分發(fā)到不同服務
- 通知系統(tǒng):一個事件需要通過多種方式通知用戶
優(yōu)缺點
優(yōu)點
- 實現(xiàn)一次發(fā)布多次消費
- 各消費者組獨立工作,互不影響
- 新增消費者組可以從頭開始消費所有歷史消息
- 可靠性高,消息持久化存儲
缺點
- 隨著流數(shù)據(jù)增長,可能占用較多存儲空間
- 需要合理設置流的最大長度或過期策略
- 消費者組數(shù)量過多可能增加Redis負載
- 需要單獨管理每個消費者組的狀態(tài)
5. 重試與恢復模式(Retry and Recovery)
基本概念
這種模式關注處理失敗消息的恢復和重試機制。Redis Stream消費者組會跟蹤每個消息的處理狀態(tài),允許查看和管理未確認(PEL - Pending Entry List)的消息,實現(xiàn)可靠的消息處理。
核心命令
# 查看消費者組中未確認的消息 XPENDING stream_name group_name [start_id end_id count] [consumer_name] # 查看消費者組中長時間未確認的消息詳情 XPENDING stream_name group_name start_id end_id count [consumer_name] # 認領處理超時的消息 XCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]
實現(xiàn)示例
Redis CLI
# 查看未確認的消息數(shù)量
> XPENDING mystream processing-group
1) (integer) 2 # 未確認消息數(shù)量
2) "1647257548956-0" # 最小ID
3) "1647257549123-0" # 最大ID
4) 1) 1) "consumer-1" # 各個消費者的未確認消息數(shù)
2) (integer) 1
2) 1) "consumer-2"
2) (integer) 1
# 查看特定消費者的未確認消息
> XPENDING mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0" # 消息ID
2) "consumer-1" # 當前持有的消費者
3) (integer) 120000 # 空閑時間(毫秒)
4) (integer) 2 # 傳遞次數(shù)
# 認領超過2分鐘未處理的消息
> XCLAIM mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
2) 1) "sensor_id"
2) "1234"
3) "temperature"
4) "19.8"
5) "humidity"
6) "56"Java Spring Boot示例
@Service
public class MessageRecoveryService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 獲取消費者組中的未確認消息
*/
public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
return redisTemplate.opsForStream().pending(streamKey, groupName);
}
/**
* 獲取指定消費者的詳細未確認消息
*/
public PendingMessages getPendingMessages(
String streamKey, String groupName, String consumerName,
Range<String> idRange, long count) {
return redisTemplate.opsForStream().pending(
streamKey,
Consumer.from(groupName, consumerName),
idRange,
count);
}
/**
* 認領長時間未處理的消息
*/
public List<MapRecord<String, Object, Object>> claimMessages(
String streamKey, String groupName, String newConsumerName,
Duration minIdleTime, String... messageIds) {
return redisTemplate.opsForStream().claim(
streamKey,
Consumer.from(groupName, newConsumerName),
minIdleTime,
messageIds);
}
/**
* 定時檢查和恢復未處理的消息
*/
@Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
public void recoverStaleMessages() {
// 配置參數(shù)
String streamKey = "mystream";
String groupName = "processing-group";
String recoveryConsumer = "recovery-consumer";
Duration minIdleTime = Duration.ofMinutes(5); // 超過5分鐘未處理的消息
try {
// 1. 獲取所有未確認消息的摘要
PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
if (summary != null && summary.getTotalPendingMessages() > 0) {
// 2. 遍歷每個消費者的未確認消息
for (Consumer consumer : summary.getPendingMessagesPerConsumer().keySet()) {
// 獲取該消費者的詳細未確認消息列表
PendingMessages pendingMessages = getPendingMessages(
streamKey, groupName, consumer.getName(),
Range.unbounded(), 50); // 每次最多處理50條
if (pendingMessages != null) {
// 3. 篩選出空閑時間超過閾值的消息
List<String> staleMessageIds = new ArrayList<>();
for (PendingMessage message : pendingMessages) {
if (message.getElapsedTimeSinceLastDelivery().compareTo(minIdleTime) > 0) {
staleMessageIds.add(message.getIdAsString());
}
}
// 4. 認領這些消息
if (!staleMessageIds.isEmpty()) {
log.info("Claiming {} stale messages from consumer {}",
staleMessageIds.size(), consumer.getName());
List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
streamKey, groupName, recoveryConsumer, minIdleTime,
staleMessageIds.toArray(new String[0]));
// 5. 處理這些被認領的消息
processClaimedMessages(streamKey, groupName, claimedMessages);
}
}
}
}
} catch (Exception e) {
log.error("Error recovering stale messages: {}", e.getMessage(), e);
}
}
/**
* 處理被認領的消息
*/
private void processClaimedMessages(
String streamKey, String groupName,
List<MapRecord<String, Object, Object>> messages) {
if (messages == null || messages.isEmpty()) {
return;
}
for (MapRecord<String, Object, Object> message : messages) {
try {
// 執(zhí)行消息處理邏輯
processMessage(message);
// 確認消息
redisTemplate.opsForStream().acknowledge(
streamKey, groupName, message.getId().getValue());
log.info("Successfully processed recovered message: {}", message.getId());
} catch (Exception e) {
log.error("Failed to process recovered message {}: {}",
message.getId(), e.getMessage(), e);
// 根據(jù)業(yè)務需求決定是否將消息加入死信隊列
moveToDeadLetterQueue(streamKey, message);
}
}
}
/**
* 將消息移至死信隊列
*/
private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
String deadLetterStream = sourceStream + ":dead-letter";
Map<Object, Object> messageData = message.getValue();
Map<String, Object> dlqMessage = new HashMap<>();
messageData.forEach((k, v) -> dlqMessage.put(k.toString(), v));
// 添加元數(shù)據(jù)
dlqMessage.put("original_id", message.getId().getValue());
dlqMessage.put("error_time", System.currentTimeMillis());
redisTemplate.opsForStream().add(deadLetterStream, dlqMessage);
// 可選:從原消費者組確認該消息
// redisTemplate.opsForStream().acknowledge(sourceStream, groupName, message.getId().getValue());
}
private void processMessage(MapRecord<String, Object, Object> message) {
// 實際的消息處理邏輯
log.info("Processing recovered message: {}", message);
// ...
}
}使用場景
- 需要可靠消息處理的關鍵業(yè)務系統(tǒng)
- 處理時間較長的任務
- 需要錯誤重試機制的工作流
- 監(jiān)控和診斷消息處理過程
- 實現(xiàn)死信隊列處理特定失敗場景
優(yōu)缺點
優(yōu)點
- 提高系統(tǒng)容錯性和可靠性
- 自動恢復因消費者崩潰導致的未處理消息
- 可以識別和處理長時間未確認的消息
- 支持實現(xiàn)復雜的重試策略和死信處理
缺點
- 需要額外開發(fā)和維護恢復機制
- 可能導致消息重復處理,需要確保業(yè)務邏輯冪等
- 系統(tǒng)復雜度增加
- 需要監(jiān)控和管理PEL(未確認消息列表)的大小
6. 流處理窗口模式(Streaming Window Processing)
基本概念
流處理窗口模式基于時間或消息計數(shù)劃分數(shù)據(jù)流,在每個窗口內(nèi)執(zhí)行聚合或分析操作。這種模式適用于實時分析、趨勢監(jiān)測和時間序列處理。雖然Redis Stream本身不直接提供窗口操作,但可以結合Redis的其他特性實現(xiàn)。
實現(xiàn)方式
主要通過以下幾種方式實現(xiàn):
1. 基于消息ID的時間范圍(Redis消息ID包含毫秒時間戳)
2. 結合Redis的排序集合(SortedSet)存儲窗口數(shù)據(jù)
3. 使用Redis的過期鍵實現(xiàn)滑動窗口
實現(xiàn)示例
Redis CLI
窗口數(shù)據(jù)收集與查詢:
# 添加帶時間戳的數(shù)據(jù)
> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"
# 查詢特定時間范圍的數(shù)據(jù)
> XRANGE temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.5"
5) "timestamp"
6) "1647257548000"
2) 1) "1647257560234-0"
2) 1) "sensor_id"
2) "1"
3) "value"
4) "21.8"
5) "timestamp"
6) "1647257558000"Java Spring Boot示例
@Service
public class TimeWindowProcessingService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 添加數(shù)據(jù)點到流,并存儲到相應的時間窗口
*/
public String addDataPoint(String streamKey, String sensorId, double value) {
long timestamp = System.currentTimeMillis();
// 1. 添加到原始數(shù)據(jù)流
Map<String, Object> dataPoint = new HashMap<>();
dataPoint.put("sensor_id", sensorId);
dataPoint.put("value", String.valueOf(value));
dataPoint.put("timestamp", String.valueOf(timestamp));
StringRecord record = StreamRecords.string(dataPoint).withStreamKey(streamKey);
RecordId recordId = redisTemplate.opsForStream().add(record);
// 2. 計算所屬的窗口(這里以5分鐘為一個窗口)
long windowStart = timestamp - (timestamp % (5 * 60 * 1000));
String windowKey = streamKey + ":window:" + windowStart;
// 3. 將數(shù)據(jù)點添加到窗口的有序集合中,分數(shù)為時間戳
String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint);
redisTemplate.opsForZSet().add(windowKey, dataPointJson, timestamp);
// 4. 設置窗口鍵的過期時間(保留24小時)
redisTemplate.expire(windowKey, Duration.ofHours(24));
return recordId.getValue();
}
/**
* 獲取指定時間窗口內(nèi)的數(shù)據(jù)點
*/
public List<Map<String, Object>> getWindowData(
String streamKey, long windowStartTime, long windowEndTime) {
// 計算可能的窗口鍵(每5分鐘一個窗口)
List<String> windowKeys = new ArrayList<>();
long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
while (current <= windowEndTime) {
windowKeys.add(streamKey + ":window:" + current);
current += (5 * 60 * 1000);
}
// 從各個窗口獲取數(shù)據(jù)點
List<Map<String, Object>> results = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
for (String windowKey : windowKeys) {
Set<String> dataPoints = redisTemplate.opsForZSet().rangeByScore(
windowKey, windowStartTime, windowEndTime);
if (dataPoints != null) {
for (String dataPointJson : dataPoints) {
try {
Map<String, Object> dataPoint = mapper.readValue(
dataPointJson, new TypeReference<Map<String, Object>>() {});
results.add(dataPoint);
} catch (Exception e) {
log.error("Error parsing data point: {}", e.getMessage(), e);
}
}
}
}
// 按時間戳排序
results.sort(Comparator.comparing(dp -> Long.parseLong(dp.get("timestamp").toString())));
return results;
}
/**
* 計算窗口內(nèi)數(shù)據(jù)的聚合統(tǒng)計
*/
public Map<String, Object> getWindowStats(
String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
// 過濾特定傳感器的數(shù)據(jù)
List<Double> values = windowData.stream()
.filter(dp -> sensorId.equals(dp.get("sensor_id").toString()))
.map(dp -> Double.parseDouble(dp.get("value").toString()))
.collect(Collectors.toList());
Map<String, Object> stats = new HashMap<>();
stats.put("count", values.size());
if (!values.isEmpty()) {
DoubleSummaryStatistics summaryStats = values.stream().collect(Collectors.summarizingDouble(v -> v));
stats.put("min", summaryStats.getMin());
stats.put("max", summaryStats.getMax());
stats.put("avg", summaryStats.getAverage());
stats.put("sum", summaryStats.getSum());
}
stats.put("start_time", windowStartTime);
stats.put("end_time", windowEndTime);
stats.put("sensor_id", sensorId);
return stats;
}
/**
* 實現(xiàn)滑動窗口處理
*/
@Scheduled(fixedRate = 60000) // 每分鐘執(zhí)行一次
public void processSlidingWindows() {
String streamKey = "temperature";
long now = System.currentTimeMillis();
// 處理過去10分鐘窗口的數(shù)據(jù)
long windowEndTime = now;
long windowStartTime = now - (10 * 60 * 1000);
List<String> sensorIds = Arrays.asList("1", "2", "3"); // 示例傳感器ID
for (String sensorId : sensorIds) {
try {
// 獲取窗口統(tǒng)計
Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
// 根據(jù)統(tǒng)計結果執(zhí)行業(yè)務邏輯
if (stats.containsKey("avg")) {
double avgTemp = (double) stats.get("avg");
if (avgTemp > 25.0) {
// 觸發(fā)高溫警報
log.warn("High temperature alert for sensor {}: {} °C", sensorId, avgTemp);
triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
}
}
// 存儲聚合結果用于歷史趨勢分析
saveAggregatedResults(streamKey, sensorId, stats);
} catch (Exception e) {
log.error("Error processing sliding window for sensor {}: {}",
sensorId, e.getMessage(), e);
}
}
}
/**
* 觸發(fā)警報
*/
private void triggerAlert(String sensorId, String alertType, double value) {
Map<String, Object> alertData = new HashMap<>();
alertData.put("sensor_id", sensorId);
alertData.put("alert_type", alertType);
alertData.put("value", value);
alertData.put("timestamp", System.currentTimeMillis());
redisTemplate.opsForStream().add("alerts", alertData);
}
/**
* 保存聚合結果
*/
private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
long windowTime = (long) stats.get("end_time");
String aggregateKey = streamKey + ":aggregate:" + sensorId;
// 使用時間作為分數(shù)存儲聚合結果
redisTemplate.opsForZSet().add(
aggregateKey,
new ObjectMapper().writeValueAsString(stats),
windowTime);
// 保留30天的聚合數(shù)據(jù)
redisTemplate.expire(aggregateKey, Duration.ofDays(30));
}
}使用場景
- 實時數(shù)據(jù)分析與統(tǒng)計
- 趨勢檢測和預測
- 異常值和閾值監(jiān)控
- 時間序列數(shù)據(jù)處理
- IoT數(shù)據(jù)流處理和聚合
- 用戶行為分析
優(yōu)缺點
優(yōu)點
- 支持基于時間的數(shù)據(jù)分析
- 可以實現(xiàn)實時聚合和計算
- 靈活的窗口定義(滑動窗口、滾動窗口)
- 可擴展以支持復雜的分析場景
缺點
- 實現(xiàn)復雜度較高
- 可能需要額外的數(shù)據(jù)結構和存儲空間
- 對于大數(shù)據(jù)量的窗口計算可能影響性能
- 需要小心管理內(nèi)存使用和數(shù)據(jù)過期策略
結論
Redis Stream提供了強大而靈活的消息處理功能,通過組合這些模式,可以構建出高性能、可靠且靈活的消息處理系統(tǒng),滿足從簡單的任務隊列到復雜的實時數(shù)據(jù)處理等各種應用需求。
在選擇和實現(xiàn)這些模式時,應充分考慮業(yè)務特性、性能需求、可靠性要求以及系統(tǒng)規(guī)模,結合Redis Stream的特性,打造最適合自己應用場景的消息處理解決方案。
以上就是一文帶你搞懂Redis Stream的6種消息處理模式的詳細內(nèi)容,更多關于Redis Stream消息處理模式的資料請關注腳本之家其它相關文章!
相關文章
Redis?鍵值對(key-value)數(shù)據(jù)庫實現(xiàn)方法
Redis 的鍵值對中的 key 就是字符串對象,而 value 可以是字符串對象,也可以是集合數(shù)據(jù)類型的對象,比如 List 對象,Hash 對象、Set 對象和 Zset 對象,這篇文章主要介紹了Redis?鍵值對數(shù)據(jù)庫是怎么實現(xiàn)的,需要的朋友可以參考下2024-05-05
ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式)
這篇文章主要介紹了ubuntu 16.04安裝redis的兩種方式教程詳解(apt和編譯方式),需要的朋友可以參考下2018-03-03
RedisTemplate中boundHashOps的使用小結
redisTemplate.boundHashOps(key)?是 RedisTemplate 類的一個方法,本文主要介紹了RedisTemplate中boundHashOps的使用小結,具有一定的參考價值,感興趣的可以了解一下2024-04-04
使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決
這篇文章主要介紹了使用RedisAtomicInteger計數(shù)出現(xiàn)少計問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11

