Redis 實(shí)現(xiàn)消息隊(duì)列實(shí)際案例
一、為什么選擇 Redis 做消息隊(duì)列?
1.1 Redis 消息隊(duì)列的核心優(yōu)勢
輕量級部署:無需單獨(dú)部署 RabbitMQ、Kafka 等消息隊(duì)列服務(wù),可以直接復(fù)用現(xiàn)有 Redis 集群。例如一個電商系統(tǒng)可能已經(jīng)使用 Redis 做緩存,現(xiàn)在只需增加消息隊(duì)列功能,無需額外維護(hù)其他中間件,顯著降低運(yùn)維成本;
高性能:基于內(nèi)存操作,單節(jié)點(diǎn) QPS 可達(dá) 10 萬級,滿足高吞吐場景。實(shí)測表明,在標(biāo)準(zhǔn)服務(wù)器配置下,Redis 處理簡單消息的延遲可低至 0.1ms,遠(yuǎn)優(yōu)于傳統(tǒng)磁盤存儲的消息隊(duì)列;
API 簡潔:依托 Redis 原生命令即可實(shí)現(xiàn)完整隊(duì)列功能:
- LPUSH/RPUSH 用于生產(chǎn)者推送消息
- BLPOP/BRPOP 實(shí)現(xiàn)消費(fèi)者阻塞式拉取
- PUBLISH/SUBSCRIBE 支持發(fā)布訂閱模式
- XADD/XREAD 提供 Stream 類型支持 開發(fā)人員無需學(xué)習(xí)復(fù)雜的新 API,顯著降低開發(fā)成本;
支持多語言:所有主流語言的 Redis 客戶端(Java/Jedis、Python/redis-py、Go/redigo 等)均原生支持消息隊(duì)列相關(guān)命令。例如 Java 開發(fā)者可以直接使用 Jedis 的 lpush() 方法發(fā)送消息,無需額外依賴;
可擴(kuò)展性:通過 Redis Cluster 可以輕松實(shí)現(xiàn)消息隊(duì)列的橫向擴(kuò)展。例如可以將不同業(yè)務(wù)的消息分配到不同分片,同時利用 Redis Sentinel 實(shí)現(xiàn)高可用,確保消息服務(wù)不間斷。
1.2 適用場景與不適用場景
適用場景
- 輕量級異步通信:如電商系統(tǒng)中的訂單狀態(tài)變更通知、APP 的日志上報等。例如用戶下單后,系統(tǒng)可以通過 Redis 隊(duì)列異步通知庫存系統(tǒng)扣減庫存,而不影響主流程響應(yīng)速度;
- 高吞吐但允許少量重復(fù)的場景:如用戶行為數(shù)據(jù)同步、監(jiān)控數(shù)據(jù)采集等。例如一個短視頻平臺需要將用戶的觀看記錄同步到推薦系統(tǒng),即使偶爾出現(xiàn)重復(fù)消息也不影響業(yè)務(wù)邏輯;
- 中小型系統(tǒng)的解耦需求:當(dāng)系統(tǒng)規(guī)模尚未達(dá)到需要引入 Kafka 等重量級組件時。例如一個初創(chuàng)公司的支付系統(tǒng)與通知系統(tǒng)之間使用 Redis 隊(duì)列解耦,避免系統(tǒng)間直接依賴。
不適用場景
- 金融級事務(wù)消息:如銀行轉(zhuǎn)賬、證券交易等需要強(qiáng)一致性和零丟失的場景。Redis 的持久化機(jī)制(RDB/AOF)無法保證 100% 不丟失消息,且缺乏事務(wù)消息的回查機(jī)制;
- 復(fù)雜路由需求:如需要死信隊(duì)列、優(yōu)先級隊(duì)列、延遲隊(duì)列等高級特性時。雖然 Redis 可以通過 Sorted Set 實(shí)現(xiàn)簡單延遲隊(duì)列,但相比 RabbitMQ 的專業(yè)實(shí)現(xiàn)功能有限;
- 海量消息存儲:如需要保存數(shù)月歷史消息的聊天系統(tǒng)。Redis 作為內(nèi)存數(shù)據(jù)庫,存儲容量受服務(wù)器內(nèi)存限制,且長期存儲成本過高。例如一個日均百萬消息的客服系統(tǒng),使用 Redis 存儲一周消息就可能需要上百 GB 內(nèi)存。
二、Redis 實(shí)現(xiàn)消息隊(duì)列的 3 種核心方案
方案一、基于 Redis List 的簡單消息隊(duì)列實(shí)現(xiàn)
1. 方案概述
Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,具有以下特性使其非常適合實(shí)現(xiàn)消息隊(duì)列:
- 支持從兩端(O(1)時間復(fù)雜度)插入和刪除元素
- 天然支持"生產(chǎn)者-消費(fèi)者"模型
- 提供阻塞式獲取消息的命令
- 內(nèi)存存儲,性能極高(每秒可處理數(shù)萬次操作)
1.1 核心命令詳解
| 角色 | 核心命令 | 作用說明 | 時間復(fù)雜度 |
|---|---|---|---|
| 生產(chǎn)者 | LPUSH key value1 value2 | 從 List 左側(cè)插入消息(頭部插入),支持批量插入,返回插入后 List 的長度 | O(1) |
| 生產(chǎn)者 | RPUSH key value1 value2 | 從 List 右側(cè)插入消息(尾部插入),支持批量插入 | O(1) |
| 消費(fèi)者 | BLPOP key timeout | 從 List 左側(cè)阻塞獲取消息(頭部取出),若 List 為空則等待timeout秒 | O(1) |
| 消費(fèi)者 | BRPOP key timeout | 從 List 右側(cè)阻塞獲取消息(尾部取出),若 List 為空則等待timeout秒 | O(1) |
| 監(jiān)控 | LLEN key | 獲取當(dāng)前隊(duì)列的消息數(shù)量 | O(1) |
| 監(jiān)控 | LRANGE key start end | 查看隊(duì)列中從start到end的消息(如LRANGE queue 0 9查看前10條) | O(S+N) |
2. 代碼實(shí)戰(zhàn)(Java + Jedis)
2.1 環(huán)境準(zhǔn)備
首先引入 Jedis 依賴(Maven):
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.3</version> <!-- 建議使用最新穩(wěn)定版 -->
</dependency>2.2 生產(chǎn)者實(shí)現(xiàn)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class ListMQProducer {
// 隊(duì)列key命名規(guī)范:業(yè)務(wù)域:組件類型:數(shù)據(jù)結(jié)構(gòu):具體業(yè)務(wù)
private static final String QUEUE_KEY = "redis:mq:list:order";
// 使用連接池提高性能
private static final JedisPool jedisPool = new JedisPool(
new JedisPoolConfig(),
"localhost",
6379,
2000, // 連接超時時間
null // 密碼
);
public static void main(String[] args) throws InterruptedException {
try (Jedis jedis = jedisPool.getResource()) {
// 模擬發(fā)送10條訂單消息
for (int i = 1; i <= 10; i++) {
// 消息內(nèi)容格式:業(yè)務(wù)標(biāo)識_序號_時間戳
String message = String.format("order_%d_%d", i, System.currentTimeMillis());
// LPUSH命令將消息放入隊(duì)列頭部
long queueLength = jedis.lpush(QUEUE_KEY, message);
System.out.printf("生產(chǎn)者發(fā)送消息:%s,當(dāng)前隊(duì)列長度:%d%n", message, queueLength);
// 模擬業(yè)務(wù)處理間隔
Thread.sleep(500);
}
} catch (Exception e) {
System.err.println("生產(chǎn)者異常:" + e.getMessage());
} finally {
jedisPool.close();
}
}
}2.3 消費(fèi)者實(shí)現(xiàn)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
public class ListMQConsumer {
private static final String QUEUE_KEY = "redis:mq:list:order";
private static final JedisPool jedisPool = new JedisPool(
new JedisPoolConfig(),
"localhost",
6379
);
public static void main(String[] args) {
System.out.println("消費(fèi)者啟動,等待接收消息...");
while (true) {
try (Jedis jedis = jedisPool.getResource()) {
// BRPOP命令參數(shù):
// 1. 超時時間3秒(避免空輪詢消耗CPU)
// 2. 可以監(jiān)聽多個隊(duì)列
List<String> messages = jedis.brpop(3, QUEUE_KEY);
if (messages != null) {
// BRPOP返回結(jié)果格式:
// 第一個元素是隊(duì)列key
// 第二個元素是消息內(nèi)容
String message = messages.get(1);
System.out.println("消費(fèi)者接收消息:" + message);
// 業(yè)務(wù)處理邏輯示例
processMessage(message);
} else {
System.out.println("隊(duì)列暫無消息,繼續(xù)等待...");
}
} catch (Exception e) {
System.err.println("消費(fèi)者處理消息異常:" + e.getMessage());
// 異常處理策略:
// 1. 記錄錯誤日志
// 2. 重試機(jī)制
// 3. 告警通知
try {
Thread.sleep(5000); // 出錯后暫停5秒
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
private static void processMessage(String message) throws InterruptedException {
// 模擬業(yè)務(wù)處理
System.out.println("處理消息:" + message);
// 解析消息內(nèi)容
String[] parts = message.split("_");
String orderId = parts[1];
// 模擬業(yè)務(wù)處理耗時
Thread.sleep(1000);
System.out.println("訂單" + orderId + "處理完成");
}
}3. 方案優(yōu)化與問題解決
3.1 標(biāo)準(zhǔn)方案的局限性
- 消息丟失風(fēng)險:
- 消費(fèi)者獲取消息后,如果處理過程中崩潰,消息將永久丟失
- 無消息確認(rèn)機(jī)制
- 功能限制:
- 不支持廣播模式(多個消費(fèi)者同時消費(fèi)同一條消息)
- 無優(yōu)先級隊(duì)列
- 無延遲隊(duì)列功能
- 監(jiān)控缺失:
- 缺乏消息處理狀態(tài)跟蹤
- 無死信隊(duì)列處理機(jī)制
3.2 消息可靠性優(yōu)化方案
3.2.1 消息確認(rèn)機(jī)制實(shí)現(xiàn)
private static final String CONFIRM_QUEUE_KEY = "redis:mq:list:order:confirm";
private static final String DEAD_QUEUE_KEY = "redis:mq:list:order:dead";
private static final int MAX_RETRY = 3;
// 優(yōu)化后的消費(fèi)者處理邏輯
List<String> messages = jedis.brpop(3, QUEUE_KEY);
if (messages != null) {
String message = messages.get(1);
// 1. 將消息移到待確認(rèn)隊(duì)列(使用RPUSH保持順序)
jedis.rpush(CONFIRM_QUEUE_KEY, message);
try {
// 2. 處理業(yè)務(wù)邏輯
processMessage(message);
// 3. 處理成功,從待確認(rèn)隊(duì)列刪除
jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
} catch (Exception e) {
System.err.println("處理消息失?。? + message);
// 4. 檢查重試次數(shù)
long retryCount = jedis.incr("retry:" + message);
if (retryCount <= MAX_RETRY) {
// 放回主隊(duì)列重試
jedis.lpush(QUEUE_KEY, message);
} else {
// 超過重試次數(shù),放入死信隊(duì)列
jedis.rpush(DEAD_QUEUE_KEY, message);
}
// 無論重試還是加入死信隊(duì)列,都要從待確認(rèn)隊(duì)列刪除
jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
}
}3.2.2 定時補(bǔ)償任務(wù)
// 定時檢查待確認(rèn)隊(duì)列(每分鐘執(zhí)行)
public void checkConfirmQueue() {
try (Jedis jedis = jedisPool.getResource()) {
// 獲取待確認(rèn)隊(duì)列所有消息
List<String> pendingMessages = jedis.lrange(CONFIRM_QUEUE_KEY, 0, -1);
for (String message : pendingMessages) {
// 檢查消息滯留時間
long createTime = Long.parseLong(message.split("_")[2]);
long currentTime = System.currentTimeMillis();
long delay = currentTime - createTime;
// 超過30秒未處理則重試
if (delay > 30000) {
jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
jedis.lpush(QUEUE_KEY, message);
System.out.println("消息超時重試:" + message);
}
}
}
}3.3 性能優(yōu)化策略
- 橫向擴(kuò)展:
- 增加消費(fèi)者實(shí)例數(shù)量,利用 List 的 BRPOP 命令天然支持多消費(fèi)者競爭
- 可采用消費(fèi)者組模式,每個組獨(dú)立消費(fèi)
- 批量處理:
// 生產(chǎn)者批量發(fā)送 jedis.lpush(QUEUE_KEY, "msg1", "msg2", "msg3"); // 消費(fèi)者批量獲取(非阻塞) List<String> batch = jedis.rpop(QUEUE_KEY, 10); // 獲取最多10條
管道(Pipeline)優(yōu)化:
try (Pipeline p = jedis.pipelined()) {
p.lpush(QUEUE_KEY, "msg1");
p.lpush(QUEUE_KEY, "msg2");
p.sync(); // 批量提交
}監(jiān)控指標(biāo):
隊(duì)列長度監(jiān)控:LLEN key
消費(fèi)者積壓:比較生產(chǎn)和消費(fèi)速率
異常告警:死信隊(duì)列增長監(jiān)控
4. 適用場景分析
4.1 推薦使用場景
- 異步任務(wù)處理:
- 訂單創(chuàng)建后的后續(xù)處理(如發(fā)送通知、更新庫存)
- 日志收集和分析
- 削峰填谷:
- 秒殺系統(tǒng)請求緩沖
- 突發(fā)流量處理
- 系統(tǒng)解耦:
- 微服務(wù)間通信
- 事件驅(qū)動架構(gòu)
4.2 不適用場景
- 嚴(yán)格順序要求:List雖然有序,但在多消費(fèi)者場景下不能保證全局順序
- 廣播模式需求:需要所有消費(fèi)者收到相同消息
- 持久化要求高:Redis是內(nèi)存數(shù)據(jù)庫,雖然支持持久化但不保證100%可靠
- 復(fù)雜路由需求:需要根據(jù)消息內(nèi)容路由到不同隊(duì)列
5. 生產(chǎn)環(huán)境建議
- Redis配置:
- 啟用AOF持久化:
appendonly yes - 合理設(shè)置內(nèi)存淘汰策略:
maxmemory-policy volatile-lru - 設(shè)置合理超時:
timeout 300(秒)
- 啟用AOF持久化:
- 高可用:
- 使用Redis Sentinel或Cluster
- 客戶端實(shí)現(xiàn)故障轉(zhuǎn)移
- 監(jiān)控指標(biāo):
# 監(jiān)控隊(duì)列長度 redis-cli llen redis:mq:list:order # 監(jiān)控Redis內(nèi)存 redis-cli info memory
- 命名規(guī)范:
- 業(yè)務(wù)域:組件類型:數(shù)據(jù)結(jié)構(gòu):具體業(yè)務(wù)
示例:payment:mq:list:refund
方案二、基于 Pub/Sub 的廣播式消息隊(duì)列方案詳解
Redis Pub/Sub 模型介紹
Redis 的 Pub/Sub(發(fā)布 - 訂閱)模型是一種高效的"一對多"消息通信機(jī)制,它允許生產(chǎn)者將消息發(fā)布到特定的頻道(Channel),而所有訂閱該頻道的消費(fèi)者都能即時接收到這些消息。這種模式特別適合需要實(shí)時廣播的場景,如新聞推送、實(shí)時聊天系統(tǒng)等。
核心命令及功能詳解
| 角色 | 核心命令 | 作用說明 |
|---|---|---|
| 生產(chǎn)者 | PUBLISH channel message | 向指定頻道發(fā)布消息,返回接收消息的消費(fèi)者數(shù)量 |
| 消費(fèi)者 | SUBSCRIBE channel1 channel2 | 訂閱一個或多個頻道,阻塞等待消息(訂閱狀態(tài)下只能接收消息,無法執(zhí)行其他命令) |
| 消費(fèi)者 | PSUBSCRIBE pattern | 使用模式匹配訂閱頻道(如PSUBSCRIBE redis:mq:pubsub:*訂閱所有匹配前綴的頻道) |
2.1 代碼實(shí)戰(zhàn)(Java + Jedis)
生產(chǎn)者實(shí)現(xiàn)(發(fā)布消息)
import redis.clients.jedis.Jedis;
public class PubSubProducer {
// 定義頻道名稱,采用命名空間方式避免沖突
private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
// 創(chuàng)建Redis連接實(shí)例
private static final Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) throws InterruptedException {
// 模擬發(fā)布3條新聞消息,實(shí)際應(yīng)用中可接入實(shí)時數(shù)據(jù)源
String[] news = {
"Redis 7.2版本發(fā)布,新增Stream增強(qiáng)功能",
"基于Redis的消息隊(duì)列在電商場景的實(shí)踐",
"Redis Cluster集群部署最佳實(shí)踐"
};
// 循環(huán)發(fā)布消息
for (String msg : news) {
// 發(fā)布消息并獲取接收者數(shù)量
long receiverCount = jedis.publish(CHANNEL_KEY, msg);
System.out.println(String.format(
"【生產(chǎn)者】發(fā)布消息:%s,當(dāng)前訂閱者數(shù)量:%d",
msg, receiverCount));
// 模擬消息間隔
Thread.sleep(1000);
}
// 關(guān)閉連接
jedis.close();
}
}
消費(fèi)者實(shí)現(xiàn)(訂閱消息)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PubSubConsumer {
private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
private static final Jedis jedis = new Jedis("localhost", 6379);
public static void main(String[] args) {
// 創(chuàng)建自定義的消息處理器
JedisPubSub pubSub = new JedisPubSub() {
// 接收到消息時的回調(diào)方法
@Override
public void onMessage(String channel, String message) {
System.out.println(String.format(
"【消費(fèi)者1】接收到新消息(頻道:%s):%s",
channel, message));
// 此處可添加業(yè)務(wù)處理邏輯
// 例如:解析消息內(nèi)容、寫入數(shù)據(jù)庫、觸發(fā)其他操作等
}
// 成功訂閱頻道時的回調(diào)
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format(
"【消費(fèi)者1】成功訂閱頻道:%s,當(dāng)前訂閱總數(shù):%d",
channel, subscribedChannels));
}
// 可添加其他回調(diào)方法如onUnsubscribe、onPSubscribe等
};
System.out.println("【消費(fèi)者1】啟動并開始監(jiān)聽...");
// 開始訂閱(該方法會阻塞當(dāng)前線程)
jedis.subscribe(pubSub, CHANNEL_KEY);
// 注意:在實(shí)際應(yīng)用中,通常會將訂閱邏輯放在獨(dú)立線程中
// 以避免阻塞主線程
}
}
2.2 方案深度分析與應(yīng)用場景
優(yōu)點(diǎn)詳解
- 實(shí)時廣播能力:天然支持一對多的消息分發(fā),一條消息可以同時被多個消費(fèi)者接收
- 實(shí)現(xiàn)簡單:無需額外中間件,使用Redis原生命令即可實(shí)現(xiàn)
- 低延遲:消息發(fā)布后立即推送給所有訂閱者,延遲通常在毫秒級
- 動態(tài)擴(kuò)展:消費(fèi)者可以隨時加入或退出訂閱,系統(tǒng)自動處理連接管理
缺點(diǎn)與限制
- 消息持久化問題:
- Redis重啟后所有未消費(fèi)的消息都會丟失
- 消費(fèi)者離線期間的消息無法恢復(fù)
- 可靠性限制:
- 缺乏消息確認(rèn)機(jī)制,無法保證消息必達(dá)
- 網(wǎng)絡(luò)中斷可能導(dǎo)致消息丟失
- 流量控制缺失:
- 沒有背壓機(jī)制,生產(chǎn)者可能壓垮消費(fèi)者
- 無法限制消息堆積(因?yàn)楦静欢逊e)
適用場景分析
- 實(shí)時通知系統(tǒng):
- 網(wǎng)站全局公告推送
- 在線聊天室消息分發(fā)
- 游戲服務(wù)器中的全服通知
- 日志收集與監(jiān)控:
- 多個監(jiān)控系統(tǒng)同時接收相同的日志流
- 實(shí)時統(tǒng)計系統(tǒng)指標(biāo)
- 分布式系統(tǒng)的調(diào)試信息廣播
- 臨時性事件廣播:
- 系統(tǒng)配置變更通知
- 緩存失效廣播
- 服務(wù)注冊中心的服務(wù)變更通知
- 不要求可靠性的場景:
- 實(shí)時數(shù)據(jù)統(tǒng)計(允許少量數(shù)據(jù)丟失)
- 非關(guān)鍵業(yè)務(wù)的實(shí)時通知
- 輔助性的系統(tǒng)狀態(tài)更新
不適用場景
- 金融交易等要求消息100%可靠的系統(tǒng)
- 需要保證消息順序的場景
- 需要消息重放或回溯的業(yè)務(wù)
- 消費(fèi)者處理能力遠(yuǎn)低于生產(chǎn)者速率的場景
使用建議
- 頻道命名規(guī)范:建議采用
業(yè)務(wù)域:子系統(tǒng):消息類型的層次結(jié)構(gòu),如trade:order:create - 消費(fèi)者實(shí)現(xiàn):
- 為每個訂閱者創(chuàng)建獨(dú)立連接
- 將訂閱邏輯放在獨(dú)立線程中
- 實(shí)現(xiàn)重連機(jī)制處理網(wǎng)絡(luò)中斷
- 監(jiān)控指標(biāo):
- 跟蹤每個頻道的訂閱者數(shù)量
- 監(jiān)控消息發(fā)布速率
- 記錄消息丟失情況(需應(yīng)用層實(shí)現(xiàn))
- 性能優(yōu)化:
- 對于高頻消息,考慮消息聚合
- 大消息可考慮只發(fā)送引用ID
- 合理設(shè)置Redis的TCP-Keepalive參數(shù)
方案 3:基于 Stream 的可靠消息隊(duì)列(Redis 5.0+)
Redis 5.0 推出的 Stream 數(shù)據(jù)結(jié)構(gòu)是專門為消息隊(duì)列場景設(shè)計的,它完美解決了傳統(tǒng) List 和 Pub/Sub 模式的諸多缺陷。Stream 支持消息持久化存儲、消息確認(rèn)機(jī)制、消費(fèi)者組管理、死信隊(duì)列等企業(yè)級特性,是目前 Redis 實(shí)現(xiàn)可靠消息隊(duì)列的最佳方案。在實(shí)際應(yīng)用中,如電商訂單處理、支付流水記錄、日志收集等場景都能發(fā)揮重要作用。
3.1 Stream 核心概念
Stream:消息隊(duì)列的主體,每個 Stream 有唯一的 key(如"order:stream")。消息以"條目(Entry)"形式存儲,每個條目包含:
- 唯一 ID:自動生成的格式為"時間戳-序列號"(如1680000000000-0)
- 多個字段值對:如{"order_id":"1001","amount":"199.00"}
消費(fèi)者組(Consumer Group):通過將多個消費(fèi)者歸為一組,實(shí)現(xiàn):
- 組內(nèi)消費(fèi)者共享消息,避免重復(fù)消費(fèi)
- 自動負(fù)載均衡,消息均勻分配給各消費(fèi)者
- 支持水平擴(kuò)展,可隨時增加消費(fèi)者
消息確認(rèn)(ACK)機(jī)制:
- 消費(fèi)者獲取消息后,消息進(jìn)入"Pending"狀態(tài)
- 處理完成后需顯式發(fā)送ACK命令
- 未確認(rèn)的消息會在消費(fèi)者斷開后重新分配
Pending 列表:
- 存儲所有已獲取但未確認(rèn)的消息
- 記錄每個消息的消費(fèi)者名稱、獲取時間、重試次數(shù)
- 支持通過XPENDING命令查看待處理消息
死信隊(duì)列:
- 當(dāng)消息超過最大重試次數(shù)(如3次)仍未處理成功
- 可自動/手動轉(zhuǎn)移到專門設(shè)計的死信Stream
- 便于后續(xù)人工干預(yù)或特殊處理
3.2 核心命令詳解
基本操作命令
| 操作類型 | 命令格式 | 說明 |
|---|---|---|
| 添加消息 | XADD key * field1 value1 [field2 value2...] | *表示自動生成ID,可指定ID保證順序 |
| 創(chuàng)建消費(fèi)者組 | XGROUP CREATE key groupname id [MKSTREAM] | MKSTREAM選項(xiàng)在Stream不存在時自動創(chuàng)建 |
| 消費(fèi)消息 | XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key [id] | id通常為>表示新消息,0表示Pending消息 |
| 消息確認(rèn) | XACK key groupname id [id...] | 支持批量確認(rèn)多個消息 |
| 查看Pending消息 | XPENDING key groupname [start end count] [consumer] | 可查看指定消費(fèi)者的未確認(rèn)消息 |
| 消息所有權(quán)轉(zhuǎn)移 | XCLAIM key groupname consumer min-idle-time id [id...] | 將空閑超時的消息轉(zhuǎn)給其他消費(fèi)者處理 |
高級管理命令
- 消息回溯:
XREAD STREAMS key 0-0從最早消息開始讀取 - 范圍查詢:
XRANGE key start end [COUNT n]按ID范圍查詢 - 監(jiān)控命令:
XINFO GROUPS key查看消費(fèi)者組信息
3.3 代碼實(shí)戰(zhàn)(Java + Jedis)
1. 環(huán)境準(zhǔn)備
// Maven依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
// 連接配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
try (JedisPool pool = new JedisPool(config, "localhost", 6379)) {
Jedis jedis = pool.getResource();
// 業(yè)務(wù)代碼...
}2. 生產(chǎn)消費(fèi)完整流程
生產(chǎn)者增強(qiáng)版:
public class EnhancedProducer {
private static final String[] ORDER_STATUS = {"PENDING", "PAID", "SHIPPED", "COMPLETED"};
public void sendOrderEvent(Order order) {
try (Jedis jedis = pool.getResource()) {
Map<String, String> fields = new HashMap<>();
fields.put("order_id", order.getId());
fields.put("user_id", order.getUserId());
fields.put("amount", order.getAmount().toString());
fields.put("status", ORDER_STATUS[0]);
fields.put("create_time", Instant.now().toString());
// 使用事務(wù)保證原子性
Transaction t = jedis.multi();
t.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, fields);
t.sadd("order:ids", order.getId()); // 記錄訂單ID集合
t.exec();
// 添加監(jiān)控埋點(diǎn)
Metrics.counter("mq.produce.count").increment();
}
}
}消費(fèi)者增強(qiáng)版:
public class ReliableConsumer implements Runnable {
private static final int MAX_RETRY = 3;
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
GROUP_NAME, consumerName,
new StreamParams().count(1).block(2000),
new StreamOffset(STREAM_KEY, ">")
);
if (messages != null) {
messages.forEach((stream, entries) -> {
entries.forEach(entry -> {
processWithRetry(entry);
});
});
}
} catch (Exception e) {
logger.error("消費(fèi)異常", e);
sleep(1000);
}
}
}
private void processWithRetry(StreamEntry entry) {
int retryCount = getRetryCount(entry.getID());
if (retryCount >= MAX_RETRY) {
moveToDeadLetter(entry);
return;
}
try {
Order order = parseOrder(entry.getFields());
orderService.process(order);
jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
} catch (Exception e) {
logger.warn("處理失敗準(zhǔn)備重試", e);
sleep(1000 * (retryCount + 1));
}
}
}3. 死信隊(duì)列管理
public class DeadLetterMonitor {
public void checkPendingMessages() {
// 獲取所有超時未確認(rèn)的消息
List<StreamEntry> pending = getPendingMessages(TIMEOUT_MS);
pending.forEach(entry -> {
// 檢查重試次數(shù)
if (getRetryCount(entry.getID()) > MAX_RETRY) {
// 轉(zhuǎn)移到死信隊(duì)列
jedis.xadd(DEAD_STREAM_KEY, StreamEntryID.NEW_ENTRY, entry.getFields());
jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
logger.warn("消息轉(zhuǎn)入死信隊(duì)列: {}", entry.getID());
// 發(fā)送告警通知
alertService.notifyAdmin(entry);
}
});
}
public void reprocessDeadLetters() {
// 從死信隊(duì)列重新處理
List<StreamEntry> deadMessages = jedis.xrange(DEAD_STREAM_KEY, "-", "+");
deadMessages.forEach(entry -> {
try {
manualProcess(entry.getFields());
jedis.xdel(DEAD_STREAM_KEY, entry.getID());
} catch (Exception e) {
logger.error("死信處理失敗", e);
}
});
}
}3.4 最佳實(shí)踐建議
- 消費(fèi)者設(shè)計原則:
- 每個消費(fèi)者設(shè)置唯一標(biāo)識
- 實(shí)現(xiàn)冪等性處理邏輯
- 添加合理的阻塞超時時間(通常1-5秒)
- 性能優(yōu)化:
// 批量消費(fèi)提高吞吐量
jedis.xreadGroup(GROUP_NAME, consumerName,
new StreamParams().count(100).block(1000),
new StreamOffset(STREAM_KEY, ">"));
// 批量確認(rèn)減少網(wǎng)絡(luò)開銷
jedis.xack(STREAM_KEY, GROUP_NAME, id1, id2, id3);
- 監(jiān)控指標(biāo):
- 待處理消息數(shù)(XPENDING)
- 消費(fèi)者延遲(當(dāng)前時間 - 消息創(chuàng)建時間)
- 死信隊(duì)列大小
- 消費(fèi)成功率
- 異常處理:
// 消費(fèi)者崩潰后的恢復(fù)處理
public void recoverConsumer(String failedConsumer) {
List<PendingEntry> pendings = jedis.xpending(
STREAM_KEY, GROUP_NAME, "-", "+", 100, failedConsumer);
pendings.forEach(pending -> {
jedis.xclaim(STREAM_KEY, GROUP_NAME, currentConsumer,
TIMEOUT_MS, pending.getIdAsString());
});
}通過以上實(shí)現(xiàn),基于Redis Stream的消息隊(duì)列可以達(dá)到:
- 99.9%的消息可靠性
- 每秒萬級的吞吐量
- 秒級的端到端延遲
- 完善的故障恢復(fù)機(jī)制
三、三種方案的選型對比與最佳實(shí)踐
3.1 方案選型對比表:
| 對比維度 | List 方案 | Pub/Sub 方案 | Stream 方案(推薦) |
|---|---|---|---|
| 消息持久化 | 支持(需手動處理) | 不支持 | 原生支持 |
| 消息確認(rèn) | 需自定義(如RPOPLPUSH) | 不支持 | 原生支持(ACK機(jī)制) |
| 廣播能力 | 不支持 | 原生支持(全量廣播) | 支持(通過多消費(fèi)者組實(shí)現(xiàn)) |
| 消費(fèi)者負(fù)載均衡 | 支持(競爭消費(fèi)模式) | 不支持(全量推送) | 支持(消費(fèi)者組內(nèi)自動均衡) |
| 死信隊(duì)列 | 需自定義(備份List) | 不支持 | 支持(通過XCLAIM命令) |
| 實(shí)現(xiàn)復(fù)雜度 | 低(基礎(chǔ)命令即可) | 低(訂閱/發(fā)布模式) | 中(需理解消費(fèi)者組概念) |
| 內(nèi)存占用 | 線性增長 | 瞬時內(nèi)存 | 可控制(支持消息修剪) |
| 歷史消息回溯 | 有限支持(需保存完整List) | 不支持 | 完整支持(消息ID時間序列) |
| 適用場景 | 簡單異步通信 | 實(shí)時廣播通知 | 可靠消息、企業(yè)級場景 |
3.2 最佳實(shí)踐建議
選型決策樹:
- 首要判斷消息可靠性需求:
- 必須保證不丟失 → 直接選擇Stream
- 可接受偶爾丟失 → 進(jìn)入下一判斷
- 次要判斷消息分發(fā)模式:
- 需要廣播 → 選擇Pub/Sub
- 點(diǎn)對點(diǎn)消費(fèi) → 選擇List或Stream
- 最后評估開發(fā)成本:
- 快速實(shí)現(xiàn) → 選擇List
- 長期維護(hù) → 選擇Stream
- 首要判斷消息可靠性需求:
Stream方案實(shí)施細(xì)節(jié):
- 消費(fèi)者組創(chuàng)建示例:
XGROUP CREATE mystream mygroup $ MKSTREAM
- 典型消費(fèi)代碼邏輯:
- 使用XREADGROUP阻塞讀取
- 業(yè)務(wù)處理成功后發(fā)送XACK
- 處理失敗時使用XCLAIM轉(zhuǎn)移消息
- 設(shè)置合理的PEL(Pending Entries List)超時
- 消費(fèi)者組創(chuàng)建示例:
List方案優(yōu)化建議:
- 可靠消費(fèi)模式實(shí)現(xiàn):
RPOPLPUSH source_list backup_list # 原子操作 # 處理成功后再LREM備份列表
- 性能提升技巧:
- 批量生產(chǎn):使用Pipeline打包多個LPUSH
- 批量消費(fèi):LUA腳本實(shí)現(xiàn)多消息批量獲取
- 可靠消費(fèi)模式實(shí)現(xiàn):
集群環(huán)境特別注意事項(xiàng):
- 跨slot訪問問題:
- 所有相關(guān)key必須使用相同hash tag(如{msg})
- 或者采用客戶端分片路由
- 監(jiān)控重點(diǎn)指標(biāo):
- Stream方案的PEL積壓長度
- List方案的內(nèi)存增長曲線
- Pub/Sub的客戶端連接數(shù)
- 跨slot訪問問題:
運(yùn)維管理建議:
- 容量規(guī)劃:
- 按業(yè)務(wù)峰值QPS的1.5倍預(yù)留資源
- Stream建議單分片不超過10MB/s寫入
- 監(jiān)控告警:
- 設(shè)置消息積壓閾值(如Stream的PEL>1000)
- 監(jiān)控消費(fèi)者延遲(XINFO GROUPS)
- 災(zāi)備方案:
- 定期備份Stream的RDB快照
- 對于關(guān)鍵業(yè)務(wù)實(shí)現(xiàn)雙寫機(jī)制
- 容量規(guī)劃:
四、實(shí)際應(yīng)用案例:電商訂單異步處理
4.1 業(yè)務(wù)流程詳解
電商平臺的訂單處理采用異步消息隊(duì)列模式,通過Redis Stream實(shí)現(xiàn)可靠的消息傳遞和消費(fèi)。整個流程包含以下關(guān)鍵環(huán)節(jié):
訂單創(chuàng)建階段
- 用戶下單后,訂單服務(wù)作為生產(chǎn)者將訂單數(shù)據(jù)持久化到MySQL數(shù)據(jù)庫
- 同時將訂單關(guān)鍵信息(訂單ID、用戶ID、商品ID、數(shù)量等)封裝為消息,發(fā)送到名為"order_create"的Stream中
- 消息格式示例:
{ "order_id": "ORD20231125001", "user_id": "U10086", "product_id": "P8808", "quantity": "2" }
并行消費(fèi)階段
通知服務(wù)(消費(fèi)者1):專門處理用戶通知
- 消費(fèi)消息后調(diào)用短信平臺API或極光推送服務(wù)
- 通知內(nèi)容示例:"尊敬的會員,您的訂單ORD20231125001已創(chuàng)建成功,我們將盡快為您處理"
- 支持重試機(jī)制:若首次發(fā)送失敗,會按照指數(shù)退避策略重試3次
庫存服務(wù)(消費(fèi)者2):負(fù)責(zé)庫存扣減
- 采用樂觀鎖機(jī)制更新庫存:
UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ? - 實(shí)現(xiàn)分布式事務(wù):若扣減失敗會記錄操作日志,便于后續(xù)人工核對
- 采用樂觀鎖機(jī)制更新庫存:
異常處理機(jī)制
- 當(dāng)庫存扣減失敗時,消息會進(jìn)入Pending列表并設(shè)置5分鐘超時
- 超時后自動轉(zhuǎn)移到死信隊(duì)列"DLQ:order_create"
- 運(yùn)維人員通過管理后臺查看死信隊(duì)列,可:
- 人工補(bǔ)扣庫存
- 觸發(fā)訂單取消流程
- 聯(lián)系用戶協(xié)商處理
4.2 核心代碼實(shí)現(xiàn)(生產(chǎn)級優(yōu)化版)
訂單服務(wù)(生產(chǎn)者)增強(qiáng)實(shí)現(xiàn)
// 訂單服務(wù)(生產(chǎn)者)發(fā)送消息 - 增強(qiáng)版
public void createOrder(Order order) {
// 1. 數(shù)據(jù)庫事務(wù)確保數(shù)據(jù)一致性
TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
try {
// 1.1 保存主訂單
orderMapper.insert(order);
// 1.2 保存訂單明細(xì)
order.getItems().forEach(item -> {
item.setOrderId(order.getId());
orderItemMapper.insert(item);
});
// 2. 構(gòu)建消息體(添加時間戳和業(yè)務(wù)標(biāo)識)
Map<String, String> message = new HashMap<>();
message.put("order_id", order.getId());
message.put("user_id", order.getUserId());
message.put("product_id", order.getProductId());
message.put("quantity", order.getQuantity() + "");
message.put("create_time", System.currentTimeMillis() + "");
message.put("biz_type", "NORMAL_ORDER");
// 3. 發(fā)送消息(添加重試機(jī)制)
int retryTimes = 0;
while (retryTimes < 3) {
try {
jedis.xadd("redis:mq:stream:order_create", null, message);
break;
} catch (Exception e) {
retryTimes++;
if (retryTimes == 3) {
throw new RuntimeException("消息發(fā)送失敗", e);
}
Thread.sleep(1000 * retryTimes);
}
}
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw new BusinessException("訂單創(chuàng)建失敗", e);
}
}通知服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)
// 通知服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)
public void handleNotification() {
// 初始化消費(fèi)者組(冪等操作)
initConsumerGroup("redis:mq:stream:order_create", "order_group");
while (!Thread.currentThread().isInterrupted()) {
try {
Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
"order_group",
"notify_consumer_" + instanceId, // 使用實(shí)例ID區(qū)分消費(fèi)者
1,
5000,
false,
Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
);
if (messages != null && !messages.isEmpty()) {
for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
Map<String, String> content = entry.getFields();
String userId = content.get("user_id");
String orderId = content.get("order_id");
// 1. 發(fā)送短信(帶熔斷機(jī)制)
boolean smsSent = circuitBreaker.execute(() ->
smsService.send(userId, "訂單通知", "您的訂單" + orderId + "已創(chuàng)建成功"));
// 2. 發(fā)送APP推送
boolean pushSent = pushService.send(userId, "訂單創(chuàng)建通知",
Map.of("orderId", orderId, "type", "order_created"));
if (smsSent || pushSent) {
// 至少一個通知發(fā)送成功才確認(rèn)消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
monitorService.recordSuccess("order_notify");
} else {
monitorService.recordFailure("order_notify");
}
}
}
} catch (Exception e) {
log.error("通知處理異常", e);
monitorService.recordError("order_notify", e);
Thread.sleep(5000); // 異常休眠避免循環(huán)異常
}
}
}
private void initConsumerGroup(String streamKey, String groupName) {
try {
jedis.xgroupCreate(streamKey, groupName, StreamEntryID.LAST_ENTRY, true);
} catch (RedisBusyException e) {
log.info("消費(fèi)者組已存在: {}", groupName);
}
}庫存服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)
// 庫存服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)
public void handleInventory() {
// 初始化消費(fèi)者組
initConsumerGroup("redis:mq:stream:order_create", "order_group");
while (!Thread.currentThread().isInterrupted()) {
try {
Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
"order_group",
"inventory_consumer_" + instanceId,
1,
5000,
false,
Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
);
if (messages != null && !messages.isEmpty()) {
for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
Map<String, String> content = entry.getFields();
String productId = content.get("product_id");
int quantity = Integer.parseInt(content.get("quantity"));
String orderId = content.get("order_id");
// 1. 扣減庫存(帶事務(wù))
boolean success = inventoryService.deductWithLog(
productId,
quantity,
orderId,
"ORDER_DEDUCTION"
);
if (success) {
// 2. 確認(rèn)消息
jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
monitorService.recordSuccess("inventory_deduct");
} else {
// 3. 記錄失敗日志
log.warn("庫存扣減失敗 orderId={}, productId={}", orderId, productId);
monitorService.recordFailure("inventory_deduct");
// 不確認(rèn)消息,讓其進(jìn)入Pending狀態(tài)
}
}
}
} catch (Exception e) {
log.error("庫存處理異常", e);
monitorService.recordError("inventory_deduct", e);
Thread.sleep(5000);
}
}
}
// 庫存扣減服務(wù)方法
@Transactional
public boolean deductWithLog(String productId, int quantity, String bizId, String bizType) {
// 1. 扣減庫存
int affected = inventoryMapper.deductWithVersion(
productId,
quantity,
getCurrentVersion(productId)
);
if (affected == 0) {
return false;
}
// 2. 記錄操作流水
InventoryLog log = new InventoryLog();
log.setLogId(UUID.randomUUID().toString());
log.setProductId(productId);
log.setChangedAmount(-quantity);
log.setBizId(bizId);
log.setBizType(bizType);
log.setRemarks("訂單扣減");
inventoryLogMapper.insert(log);
return true;
}4.3 監(jiān)控與運(yùn)維設(shè)計
監(jiān)控指標(biāo)
- 消息堆積量:
XLEN redis:mq:stream:order_create - Pending列表數(shù)量:
XPENDING redis:mq:stream:order_create order_group - 消費(fèi)者延遲:通過消息時間戳與當(dāng)前時間差值計算
- 消息堆積量:
運(yùn)維命令示例
# 查看消費(fèi)者組信息 XINFO GROUPS redis:mq:stream:order_create # 處理死信消息 XRANGE DLQ:order_create - + COUNT 10 XACK DLQ:order_create manual_group <entry_id>
自動恢復(fù)方案
- 定時任務(wù)每小時檢查Pending列表
- 對于超時1小時未處理的消息:
- 嘗試重新投遞到原Stream
- 超過3次重試則轉(zhuǎn)入死信隊(duì)列
- 觸發(fā)企業(yè)微信告警通知運(yùn)維人員
到此這篇關(guān)于Redis 是如何實(shí)現(xiàn)消息隊(duì)列的?的文章就介紹到這了,更多相關(guān)Redis消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 如何使用redis的stream數(shù)據(jù)類型做消息隊(duì)列
- SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例
- Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺功能
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
相關(guān)文章
高效異步redis客戶端aredis優(yōu)劣勢原理解析
這篇文章主要介紹了高效異步redis客戶端aredis優(yōu)劣勢原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09
基于Redis結(jié)合SpringBoot的秒殺案例詳解
這篇文章主要介紹了Redis結(jié)合SpringBoot的秒殺案例,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-09-09
redis.clients.jedis.exceptions.JedisDataException異常的錯誤解決
本文主要介紹了redis.clients.jedis.exceptions.JedisDataException異常的錯誤解決,這個異常通常發(fā)生在嘗試連接到一個?Redis?服務(wù)器時,客戶端發(fā)送了一個?AUTH?命令來驗(yàn)證密碼,但是沒有配置密碼驗(yàn)證,下來就來解決一下2024-05-05
Redis的Python客戶端redis-py安裝使用說明文檔
這篇文章主要介紹了Redis的Python客戶端redis-py安裝使用說明文檔,本文講解了安裝方法、入門使用實(shí)例、API參考和詳細(xì)說明,需要的朋友可以參考下2015-06-06
深度剖析Redis字符串操作指南從入門到實(shí)戰(zhàn)應(yīng)用
Redis字符串類型二進(jìn)制安全,支持文本、數(shù)字、二進(jìn)制等數(shù)據(jù),涵蓋基礎(chǔ)操作、數(shù)字計算、過期管理及分布式鎖等應(yīng)用,結(jié)合優(yōu)化策略提升系統(tǒng)性能,本文給大家介紹Redis字符串操作指南,感興趣的朋友一起看看吧2025-07-07

