欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis 實(shí)現(xiàn)消息隊(duì)列實(shí)際案例

 更新時間:2025年09月30日 09:18:20   作者:祈禱蒼天賜我java之術(shù)  
文章探討Redis作為消息隊(duì)列的三大核心方案(List、Pub/Sub、Stream)及適用場景,分析了輕量部署、高性能、多語言支持等優(yōu)勢,指出Stream在可靠性、消息確認(rèn)、死信隊(duì)列等企業(yè)級需求上表現(xiàn)最佳,適合電商等高吞吐場景,并提供了實(shí)際應(yīng)用案例與優(yōu)化建議,感興趣的朋友一起看看吧

一、為什么選擇 Redis 做消息隊(duì)列?

1.1 Redis 消息隊(duì)列的核心優(yōu)勢

輕量級部署:無需單獨(dú)部署 RabbitMQ、Kafka 等消息隊(duì)列服務(wù),可以直接復(fù)用現(xiàn)有 Redis 集群。例如一個電商系統(tǒng)可能已經(jīng)使用 Redis 做緩存,現(xiàn)在只需增加消息隊(duì)列功能,無需額外維護(hù)其他中間件,顯著降低運(yùn)維成本;

高性能:基于內(nèi)存操作,單節(jié)點(diǎn) QPS 可達(dá) 10 萬級,滿足高吞吐場景。實(shí)測表明,在標(biāo)準(zhǔn)服務(wù)器配置下,Redis 處理簡單消息的延遲可低至 0.1ms,遠(yuǎn)優(yōu)于傳統(tǒng)磁盤存儲的消息隊(duì)列;

API 簡潔:依托 Redis 原生命令即可實(shí)現(xiàn)完整隊(duì)列功能:

  • LPUSH/RPUSH 用于生產(chǎn)者推送消息
  • BLPOP/BRPOP 實(shí)現(xiàn)消費(fèi)者阻塞式拉取
  • PUBLISH/SUBSCRIBE 支持發(fā)布訂閱模式
  • XADD/XREAD 提供 Stream 類型支持 開發(fā)人員無需學(xué)習(xí)復(fù)雜的新 API,顯著降低開發(fā)成本;

支持多語言:所有主流語言的 Redis 客戶端(Java/Jedis、Python/redis-py、Go/redigo 等)均原生支持消息隊(duì)列相關(guān)命令。例如 Java 開發(fā)者可以直接使用 Jedis 的 lpush() 方法發(fā)送消息,無需額外依賴;

可擴(kuò)展性:通過 Redis Cluster 可以輕松實(shí)現(xiàn)消息隊(duì)列的橫向擴(kuò)展。例如可以將不同業(yè)務(wù)的消息分配到不同分片,同時利用 Redis Sentinel 實(shí)現(xiàn)高可用,確保消息服務(wù)不間斷。

1.2 適用場景與不適用場景

適用場景

  • 輕量級異步通信:如電商系統(tǒng)中的訂單狀態(tài)變更通知、APP 的日志上報等。例如用戶下單后,系統(tǒng)可以通過 Redis 隊(duì)列異步通知庫存系統(tǒng)扣減庫存,而不影響主流程響應(yīng)速度;
  • 高吞吐但允許少量重復(fù)的場景:如用戶行為數(shù)據(jù)同步、監(jiān)控數(shù)據(jù)采集等。例如一個短視頻平臺需要將用戶的觀看記錄同步到推薦系統(tǒng),即使偶爾出現(xiàn)重復(fù)消息也不影響業(yè)務(wù)邏輯;
  • 中小型系統(tǒng)的解耦需求:當(dāng)系統(tǒng)規(guī)模尚未達(dá)到需要引入 Kafka 等重量級組件時。例如一個初創(chuàng)公司的支付系統(tǒng)與通知系統(tǒng)之間使用 Redis 隊(duì)列解耦,避免系統(tǒng)間直接依賴。

不適用場景

  • 金融級事務(wù)消息:如銀行轉(zhuǎn)賬、證券交易等需要強(qiáng)一致性和零丟失的場景。Redis 的持久化機(jī)制(RDB/AOF)無法保證 100% 不丟失消息,且缺乏事務(wù)消息的回查機(jī)制;
  • 復(fù)雜路由需求:如需要死信隊(duì)列、優(yōu)先級隊(duì)列、延遲隊(duì)列等高級特性時。雖然 Redis 可以通過 Sorted Set 實(shí)現(xiàn)簡單延遲隊(duì)列,但相比 RabbitMQ 的專業(yè)實(shí)現(xiàn)功能有限;
  • 海量消息存儲:如需要保存數(shù)月歷史消息的聊天系統(tǒng)。Redis 作為內(nèi)存數(shù)據(jù)庫,存儲容量受服務(wù)器內(nèi)存限制,且長期存儲成本過高。例如一個日均百萬消息的客服系統(tǒng),使用 Redis 存儲一周消息就可能需要上百 GB 內(nèi)存。

二、Redis 實(shí)現(xiàn)消息隊(duì)列的 3 種核心方案

方案一、基于 Redis List 的簡單消息隊(duì)列實(shí)現(xiàn)

1. 方案概述

Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,具有以下特性使其非常適合實(shí)現(xiàn)消息隊(duì)列:

  • 支持從兩端(O(1)時間復(fù)雜度)插入和刪除元素
  • 天然支持"生產(chǎn)者-消費(fèi)者"模型
  • 提供阻塞式獲取消息的命令
  • 內(nèi)存存儲,性能極高(每秒可處理數(shù)萬次操作)
1.1 核心命令詳解
角色核心命令作用說明時間復(fù)雜度
生產(chǎn)者LPUSH key value1 value2從 List 左側(cè)插入消息(頭部插入),支持批量插入,返回插入后 List 的長度O(1)
生產(chǎn)者RPUSH key value1 value2從 List 右側(cè)插入消息(尾部插入),支持批量插入O(1)
消費(fèi)者BLPOP key timeout從 List 左側(cè)阻塞獲取消息(頭部取出),若 List 為空則等待timeout秒O(1)
消費(fèi)者BRPOP key timeout從 List 右側(cè)阻塞獲取消息(尾部取出),若 List 為空則等待timeout秒O(1)
監(jiān)控LLEN key獲取當(dāng)前隊(duì)列的消息數(shù)量O(1)
監(jiān)控LRANGE key start end查看隊(duì)列中從start到end的消息(如LRANGE queue 0 9查看前10條)O(S+N)

2. 代碼實(shí)戰(zhàn)(Java + Jedis)

2.1 環(huán)境準(zhǔn)備

首先引入 Jedis 依賴(Maven):

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.3</version> <!-- 建議使用最新穩(wěn)定版 -->
</dependency>
2.2 生產(chǎn)者實(shí)現(xiàn)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class ListMQProducer {
    // 隊(duì)列key命名規(guī)范:業(yè)務(wù)域:組件類型:數(shù)據(jù)結(jié)構(gòu):具體業(yè)務(wù)
    private static final String QUEUE_KEY = "redis:mq:list:order"; 
    // 使用連接池提高性能
    private static final JedisPool jedisPool = new JedisPool(
        new JedisPoolConfig(),
        "localhost", 
        6379,
        2000,  // 連接超時時間
        null   // 密碼
    );
    public static void main(String[] args) throws InterruptedException {
        try (Jedis jedis = jedisPool.getResource()) {
            // 模擬發(fā)送10條訂單消息
            for (int i = 1; i <= 10; i++) {
                // 消息內(nèi)容格式:業(yè)務(wù)標(biāo)識_序號_時間戳
                String message = String.format("order_%d_%d", i, System.currentTimeMillis());
                // LPUSH命令將消息放入隊(duì)列頭部
                long queueLength = jedis.lpush(QUEUE_KEY, message);
                System.out.printf("生產(chǎn)者發(fā)送消息:%s,當(dāng)前隊(duì)列長度:%d%n", message, queueLength);
                // 模擬業(yè)務(wù)處理間隔
                Thread.sleep(500); 
            }
        } catch (Exception e) {
            System.err.println("生產(chǎn)者異常:" + e.getMessage());
        } finally {
            jedisPool.close();
        }
    }
}
2.3 消費(fèi)者實(shí)現(xiàn)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
public class ListMQConsumer {
    private static final String QUEUE_KEY = "redis:mq:list:order";
    private static final JedisPool jedisPool = new JedisPool(
        new JedisPoolConfig(),
        "localhost",
        6379
    );
    public static void main(String[] args) {
        System.out.println("消費(fèi)者啟動,等待接收消息...");
        while (true) {
            try (Jedis jedis = jedisPool.getResource()) {
                // BRPOP命令參數(shù):
                // 1. 超時時間3秒(避免空輪詢消耗CPU)
                // 2. 可以監(jiān)聽多個隊(duì)列
                List<String> messages = jedis.brpop(3, QUEUE_KEY);
                if (messages != null) {
                    // BRPOP返回結(jié)果格式:
                    // 第一個元素是隊(duì)列key
                    // 第二個元素是消息內(nèi)容
                    String message = messages.get(1);
                    System.out.println("消費(fèi)者接收消息:" + message);
                    // 業(yè)務(wù)處理邏輯示例
                    processMessage(message);
                } else {
                    System.out.println("隊(duì)列暫無消息,繼續(xù)等待...");
                }
            } catch (Exception e) {
                System.err.println("消費(fèi)者處理消息異常:" + e.getMessage());
                // 異常處理策略:
                // 1. 記錄錯誤日志
                // 2. 重試機(jī)制
                // 3. 告警通知
                try {
                    Thread.sleep(5000); // 出錯后暫停5秒
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
    private static void processMessage(String message) throws InterruptedException {
        // 模擬業(yè)務(wù)處理
        System.out.println("處理消息:" + message);
        // 解析消息內(nèi)容
        String[] parts = message.split("_");
        String orderId = parts[1];
        // 模擬業(yè)務(wù)處理耗時
        Thread.sleep(1000);
        System.out.println("訂單" + orderId + "處理完成");
    }
}

3. 方案優(yōu)化與問題解決

3.1 標(biāo)準(zhǔn)方案的局限性
  • 消息丟失風(fēng)險
    • 消費(fèi)者獲取消息后,如果處理過程中崩潰,消息將永久丟失
    • 無消息確認(rèn)機(jī)制
  • 功能限制
    • 不支持廣播模式(多個消費(fèi)者同時消費(fèi)同一條消息)
    • 無優(yōu)先級隊(duì)列
    • 無延遲隊(duì)列功能
  • 監(jiān)控缺失
    • 缺乏消息處理狀態(tài)跟蹤
    • 無死信隊(duì)列處理機(jī)制
3.2 消息可靠性優(yōu)化方案

3.2.1 消息確認(rèn)機(jī)制實(shí)現(xiàn)

private static final String CONFIRM_QUEUE_KEY = "redis:mq:list:order:confirm";
private static final String DEAD_QUEUE_KEY = "redis:mq:list:order:dead";
private static final int MAX_RETRY = 3;
// 優(yōu)化后的消費(fèi)者處理邏輯
List<String> messages = jedis.brpop(3, QUEUE_KEY);
if (messages != null) {
    String message = messages.get(1);
    // 1. 將消息移到待確認(rèn)隊(duì)列(使用RPUSH保持順序)
    jedis.rpush(CONFIRM_QUEUE_KEY, message);
    try {
        // 2. 處理業(yè)務(wù)邏輯
        processMessage(message);
        // 3. 處理成功,從待確認(rèn)隊(duì)列刪除
        jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    } catch (Exception e) {
        System.err.println("處理消息失?。? + message);
        // 4. 檢查重試次數(shù)
        long retryCount = jedis.incr("retry:" + message);
        if (retryCount <= MAX_RETRY) {
            // 放回主隊(duì)列重試
            jedis.lpush(QUEUE_KEY, message);
        } else {
            // 超過重試次數(shù),放入死信隊(duì)列
            jedis.rpush(DEAD_QUEUE_KEY, message);
        }
        // 無論重試還是加入死信隊(duì)列,都要從待確認(rèn)隊(duì)列刪除
        jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
    }
}

3.2.2 定時補(bǔ)償任務(wù)

// 定時檢查待確認(rèn)隊(duì)列(每分鐘執(zhí)行)
public void checkConfirmQueue() {
    try (Jedis jedis = jedisPool.getResource()) {
        // 獲取待確認(rèn)隊(duì)列所有消息
        List<String> pendingMessages = jedis.lrange(CONFIRM_QUEUE_KEY, 0, -1);
        for (String message : pendingMessages) {
            // 檢查消息滯留時間
            long createTime = Long.parseLong(message.split("_")[2]);
            long currentTime = System.currentTimeMillis();
            long delay = currentTime - createTime;
            // 超過30秒未處理則重試
            if (delay > 30000) {
                jedis.lrem(CONFIRM_QUEUE_KEY, 1, message);
                jedis.lpush(QUEUE_KEY, message);
                System.out.println("消息超時重試:" + message);
            }
        }
    }
}
3.3 性能優(yōu)化策略
  • 橫向擴(kuò)展
    • 增加消費(fèi)者實(shí)例數(shù)量,利用 List 的 BRPOP 命令天然支持多消費(fèi)者競爭
    • 可采用消費(fèi)者組模式,每個組獨(dú)立消費(fèi)
  • 批量處理
// 生產(chǎn)者批量發(fā)送
jedis.lpush(QUEUE_KEY, "msg1", "msg2", "msg3");
// 消費(fèi)者批量獲取(非阻塞)
List<String> batch = jedis.rpop(QUEUE_KEY, 10); // 獲取最多10條

管道(Pipeline)優(yōu)化

try (Pipeline p = jedis.pipelined()) {
    p.lpush(QUEUE_KEY, "msg1");
    p.lpush(QUEUE_KEY, "msg2");
    p.sync(); // 批量提交
}

監(jiān)控指標(biāo)

隊(duì)列長度監(jiān)控:LLEN key

消費(fèi)者積壓:比較生產(chǎn)和消費(fèi)速率

異常告警:死信隊(duì)列增長監(jiān)控

4. 適用場景分析

4.1 推薦使用場景
  • 異步任務(wù)處理
    • 訂單創(chuàng)建后的后續(xù)處理(如發(fā)送通知、更新庫存)
    • 日志收集和分析
  • 削峰填谷
    • 秒殺系統(tǒng)請求緩沖
    • 突發(fā)流量處理
  • 系統(tǒng)解耦
    • 微服務(wù)間通信
    • 事件驅(qū)動架構(gòu)
4.2 不適用場景
  1. 嚴(yán)格順序要求:List雖然有序,但在多消費(fèi)者場景下不能保證全局順序
  2. 廣播模式需求:需要所有消費(fèi)者收到相同消息
  3. 持久化要求高:Redis是內(nèi)存數(shù)據(jù)庫,雖然支持持久化但不保證100%可靠
  4. 復(fù)雜路由需求:需要根據(jù)消息內(nèi)容路由到不同隊(duì)列

5. 生產(chǎn)環(huán)境建議

  • Redis配置
    • 啟用AOF持久化:appendonly yes
    • 合理設(shè)置內(nèi)存淘汰策略:maxmemory-policy volatile-lru
    • 設(shè)置合理超時:timeout 300(秒)
  • 高可用
    • 使用Redis Sentinel或Cluster
    • 客戶端實(shí)現(xiàn)故障轉(zhuǎn)移
  • 監(jiān)控指標(biāo)
# 監(jiān)控隊(duì)列長度
redis-cli llen redis:mq:list:order

# 監(jiān)控Redis內(nèi)存
redis-cli info memory
  • 命名規(guī)范
    • 業(yè)務(wù)域:組件類型:數(shù)據(jù)結(jié)構(gòu):具體業(yè)務(wù)

示例:payment:mq:list:refund

方案二、基于 Pub/Sub 的廣播式消息隊(duì)列方案詳解

Redis Pub/Sub 模型介紹

Redis 的 Pub/Sub(發(fā)布 - 訂閱)模型是一種高效的"一對多"消息通信機(jī)制,它允許生產(chǎn)者將消息發(fā)布到特定的頻道(Channel),而所有訂閱該頻道的消費(fèi)者都能即時接收到這些消息。這種模式特別適合需要實(shí)時廣播的場景,如新聞推送、實(shí)時聊天系統(tǒng)等。

核心命令及功能詳解

角色核心命令作用說明
生產(chǎn)者PUBLISH channel message向指定頻道發(fā)布消息,返回接收消息的消費(fèi)者數(shù)量
消費(fèi)者SUBSCRIBE channel1 channel2訂閱一個或多個頻道,阻塞等待消息(訂閱狀態(tài)下只能接收消息,無法執(zhí)行其他命令)
消費(fèi)者PSUBSCRIBE pattern使用模式匹配訂閱頻道(如PSUBSCRIBE redis:mq:pubsub:*訂閱所有匹配前綴的頻道)

2.1 代碼實(shí)戰(zhàn)(Java + Jedis)

生產(chǎn)者實(shí)現(xiàn)(發(fā)布消息)
import redis.clients.jedis.Jedis;

public class PubSubProducer {
    // 定義頻道名稱,采用命名空間方式避免沖突
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news"; 
    // 創(chuàng)建Redis連接實(shí)例
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) throws InterruptedException {
        // 模擬發(fā)布3條新聞消息,實(shí)際應(yīng)用中可接入實(shí)時數(shù)據(jù)源
        String[] news = {
            "Redis 7.2版本發(fā)布,新增Stream增強(qiáng)功能",
            "基于Redis的消息隊(duì)列在電商場景的實(shí)踐",
            "Redis Cluster集群部署最佳實(shí)踐"
        };

        // 循環(huán)發(fā)布消息
        for (String msg : news) {
            // 發(fā)布消息并獲取接收者數(shù)量
            long receiverCount = jedis.publish(CHANNEL_KEY, msg);
            System.out.println(String.format(
                "【生產(chǎn)者】發(fā)布消息:%s,當(dāng)前訂閱者數(shù)量:%d", 
                msg, receiverCount));
            // 模擬消息間隔
            Thread.sleep(1000);
        }
        
        // 關(guān)閉連接
        jedis.close();
    }
}
消費(fèi)者實(shí)現(xiàn)(訂閱消息)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PubSubConsumer {
    private static final String CHANNEL_KEY = "redis:mq:pubsub:news";
    private static final Jedis jedis = new Jedis("localhost", 6379);

    public static void main(String[] args) {
        // 創(chuàng)建自定義的消息處理器
        JedisPubSub pubSub = new JedisPubSub() {
            // 接收到消息時的回調(diào)方法
            @Override
            public void onMessage(String channel, String message) {
                System.out.println(String.format(
                    "【消費(fèi)者1】接收到新消息(頻道:%s):%s", 
                    channel, message));
                // 此處可添加業(yè)務(wù)處理邏輯
                // 例如:解析消息內(nèi)容、寫入數(shù)據(jù)庫、觸發(fā)其他操作等
            }

            // 成功訂閱頻道時的回調(diào)
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                System.out.println(String.format(
                    "【消費(fèi)者1】成功訂閱頻道:%s,當(dāng)前訂閱總數(shù):%d", 
                    channel, subscribedChannels));
            }
            
            // 可添加其他回調(diào)方法如onUnsubscribe、onPSubscribe等
        };

        System.out.println("【消費(fèi)者1】啟動并開始監(jiān)聽...");
        // 開始訂閱(該方法會阻塞當(dāng)前線程)
        jedis.subscribe(pubSub, CHANNEL_KEY);
        
        // 注意:在實(shí)際應(yīng)用中,通常會將訂閱邏輯放在獨(dú)立線程中
        // 以避免阻塞主線程
    }
}

2.2 方案深度分析與應(yīng)用場景

優(yōu)點(diǎn)詳解
  1. 實(shí)時廣播能力:天然支持一對多的消息分發(fā),一條消息可以同時被多個消費(fèi)者接收
  2. 實(shí)現(xiàn)簡單:無需額外中間件,使用Redis原生命令即可實(shí)現(xiàn)
  3. 低延遲:消息發(fā)布后立即推送給所有訂閱者,延遲通常在毫秒級
  4. 動態(tài)擴(kuò)展:消費(fèi)者可以隨時加入或退出訂閱,系統(tǒng)自動處理連接管理
缺點(diǎn)與限制
  • 消息持久化問題
    • Redis重啟后所有未消費(fèi)的消息都會丟失
    • 消費(fèi)者離線期間的消息無法恢復(fù)
  • 可靠性限制
    • 缺乏消息確認(rèn)機(jī)制,無法保證消息必達(dá)
    • 網(wǎng)絡(luò)中斷可能導(dǎo)致消息丟失
  • 流量控制缺失
    • 沒有背壓機(jī)制,生產(chǎn)者可能壓垮消費(fèi)者
    • 無法限制消息堆積(因?yàn)楦静欢逊e)
適用場景分析
  • 實(shí)時通知系統(tǒng)
    • 網(wǎng)站全局公告推送
    • 在線聊天室消息分發(fā)
    • 游戲服務(wù)器中的全服通知
  • 日志收集與監(jiān)控
    • 多個監(jiān)控系統(tǒng)同時接收相同的日志流
    • 實(shí)時統(tǒng)計系統(tǒng)指標(biāo)
    • 分布式系統(tǒng)的調(diào)試信息廣播
  • 臨時性事件廣播
    • 系統(tǒng)配置變更通知
    • 緩存失效廣播
    • 服務(wù)注冊中心的服務(wù)變更通知
  • 不要求可靠性的場景
    • 實(shí)時數(shù)據(jù)統(tǒng)計(允許少量數(shù)據(jù)丟失)
    • 非關(guān)鍵業(yè)務(wù)的實(shí)時通知
    • 輔助性的系統(tǒng)狀態(tài)更新
不適用場景
  1. 金融交易等要求消息100%可靠的系統(tǒng)
  2. 需要保證消息順序的場景
  3. 需要消息重放或回溯的業(yè)務(wù)
  4. 消費(fèi)者處理能力遠(yuǎn)低于生產(chǎn)者速率的場景

使用建議

  • 頻道命名規(guī)范:建議采用業(yè)務(wù)域:子系統(tǒng):消息類型的層次結(jié)構(gòu),如trade:order:create
  • 消費(fèi)者實(shí)現(xiàn)
    • 為每個訂閱者創(chuàng)建獨(dú)立連接
    • 將訂閱邏輯放在獨(dú)立線程中
    • 實(shí)現(xiàn)重連機(jī)制處理網(wǎng)絡(luò)中斷
  • 監(jiān)控指標(biāo)
    • 跟蹤每個頻道的訂閱者數(shù)量
    • 監(jiān)控消息發(fā)布速率
    • 記錄消息丟失情況(需應(yīng)用層實(shí)現(xiàn))
  • 性能優(yōu)化
    • 對于高頻消息,考慮消息聚合
    • 大消息可考慮只發(fā)送引用ID
    • 合理設(shè)置Redis的TCP-Keepalive參數(shù)

方案 3:基于 Stream 的可靠消息隊(duì)列(Redis 5.0+)

Redis 5.0 推出的 Stream 數(shù)據(jù)結(jié)構(gòu)是專門為消息隊(duì)列場景設(shè)計的,它完美解決了傳統(tǒng) List 和 Pub/Sub 模式的諸多缺陷。Stream 支持消息持久化存儲、消息確認(rèn)機(jī)制、消費(fèi)者組管理、死信隊(duì)列等企業(yè)級特性,是目前 Redis 實(shí)現(xiàn)可靠消息隊(duì)列的最佳方案。在實(shí)際應(yīng)用中,如電商訂單處理、支付流水記錄、日志收集等場景都能發(fā)揮重要作用。

3.1 Stream 核心概念

Stream:消息隊(duì)列的主體,每個 Stream 有唯一的 key(如"order:stream")。消息以"條目(Entry)"形式存儲,每個條目包含:

  • 唯一 ID:自動生成的格式為"時間戳-序列號"(如1680000000000-0)
  • 多個字段值對:如{"order_id":"1001","amount":"199.00"}

消費(fèi)者組(Consumer Group):通過將多個消費(fèi)者歸為一組,實(shí)現(xiàn):

  • 組內(nèi)消費(fèi)者共享消息,避免重復(fù)消費(fèi)
  • 自動負(fù)載均衡,消息均勻分配給各消費(fèi)者
  • 支持水平擴(kuò)展,可隨時增加消費(fèi)者

消息確認(rèn)(ACK)機(jī)制

  1. 消費(fèi)者獲取消息后,消息進(jìn)入"Pending"狀態(tài)
  2. 處理完成后需顯式發(fā)送ACK命令
  3. 未確認(rèn)的消息會在消費(fèi)者斷開后重新分配

Pending 列表

  • 存儲所有已獲取但未確認(rèn)的消息
  • 記錄每個消息的消費(fèi)者名稱、獲取時間、重試次數(shù)
  • 支持通過XPENDING命令查看待處理消息

死信隊(duì)列

  • 當(dāng)消息超過最大重試次數(shù)(如3次)仍未處理成功
  • 可自動/手動轉(zhuǎn)移到專門設(shè)計的死信Stream
  • 便于后續(xù)人工干預(yù)或特殊處理

3.2 核心命令詳解

基本操作命令
操作類型命令格式說明
添加消息XADD key * field1 value1 [field2 value2...]*表示自動生成ID,可指定ID保證順序
創(chuàng)建消費(fèi)者組XGROUP CREATE key groupname id [MKSTREAM]MKSTREAM選項(xiàng)在Stream不存在時自動創(chuàng)建
消費(fèi)消息XREADGROUP GROUP group consumer [COUNT n] [BLOCK ms] STREAMS key [id]id通常為>表示新消息,0表示Pending消息
消息確認(rèn)XACK key groupname id [id...]支持批量確認(rèn)多個消息
查看Pending消息XPENDING key groupname [start end count] [consumer]可查看指定消費(fèi)者的未確認(rèn)消息
消息所有權(quán)轉(zhuǎn)移XCLAIM key groupname consumer min-idle-time id [id...]將空閑超時的消息轉(zhuǎn)給其他消費(fèi)者處理
高級管理命令
  1. 消息回溯XREAD STREAMS key 0-0從最早消息開始讀取
  2. 范圍查詢XRANGE key start end [COUNT n]按ID范圍查詢
  3. 監(jiān)控命令XINFO GROUPS key查看消費(fèi)者組信息

3.3 代碼實(shí)戰(zhàn)(Java + Jedis)

1. 環(huán)境準(zhǔn)備
// Maven依賴
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.3.1</version>
</dependency>
// 連接配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
try (JedisPool pool = new JedisPool(config, "localhost", 6379)) {
    Jedis jedis = pool.getResource();
    // 業(yè)務(wù)代碼...
}
2. 生產(chǎn)消費(fèi)完整流程

生產(chǎn)者增強(qiáng)版

public class EnhancedProducer {
    private static final String[] ORDER_STATUS = {"PENDING", "PAID", "SHIPPED", "COMPLETED"};
    public void sendOrderEvent(Order order) {
        try (Jedis jedis = pool.getResource()) {
            Map<String, String> fields = new HashMap<>();
            fields.put("order_id", order.getId());
            fields.put("user_id", order.getUserId());
            fields.put("amount", order.getAmount().toString());
            fields.put("status", ORDER_STATUS[0]);
            fields.put("create_time", Instant.now().toString());
            // 使用事務(wù)保證原子性
            Transaction t = jedis.multi();
            t.xadd(STREAM_KEY, StreamEntryID.NEW_ENTRY, fields);
            t.sadd("order:ids", order.getId()); // 記錄訂單ID集合
            t.exec();
            // 添加監(jiān)控埋點(diǎn)
            Metrics.counter("mq.produce.count").increment();
        }
    }
}

消費(fèi)者增強(qiáng)版

public class ReliableConsumer implements Runnable {
    private static final int MAX_RETRY = 3;
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                    GROUP_NAME, consumerName,
                    new StreamParams().count(1).block(2000),
                    new StreamOffset(STREAM_KEY, ">")
                );
                if (messages != null) {
                    messages.forEach((stream, entries) -> {
                        entries.forEach(entry -> {
                            processWithRetry(entry);
                        });
                    });
                }
            } catch (Exception e) {
                logger.error("消費(fèi)異常", e);
                sleep(1000);
            }
        }
    }
    private void processWithRetry(StreamEntry entry) {
        int retryCount = getRetryCount(entry.getID());
        if (retryCount >= MAX_RETRY) {
            moveToDeadLetter(entry);
            return;
        }
        try {
            Order order = parseOrder(entry.getFields());
            orderService.process(order);
            jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
        } catch (Exception e) {
            logger.warn("處理失敗準(zhǔn)備重試", e);
            sleep(1000 * (retryCount + 1));
        }
    }
}
3. 死信隊(duì)列管理
public class DeadLetterMonitor {
    public void checkPendingMessages() {
        // 獲取所有超時未確認(rèn)的消息
        List<StreamEntry> pending = getPendingMessages(TIMEOUT_MS);
        pending.forEach(entry -> {
            // 檢查重試次數(shù)
            if (getRetryCount(entry.getID()) > MAX_RETRY) {
                // 轉(zhuǎn)移到死信隊(duì)列
                jedis.xadd(DEAD_STREAM_KEY, StreamEntryID.NEW_ENTRY, entry.getFields());
                jedis.xack(STREAM_KEY, GROUP_NAME, entry.getID());
                logger.warn("消息轉(zhuǎn)入死信隊(duì)列: {}", entry.getID());
                // 發(fā)送告警通知
                alertService.notifyAdmin(entry);
            }
        });
    }
    public void reprocessDeadLetters() {
        // 從死信隊(duì)列重新處理
        List<StreamEntry> deadMessages = jedis.xrange(DEAD_STREAM_KEY, "-", "+");
        deadMessages.forEach(entry -> {
            try {
                manualProcess(entry.getFields());
                jedis.xdel(DEAD_STREAM_KEY, entry.getID());
            } catch (Exception e) {
                logger.error("死信處理失敗", e);
            }
        });
    }
}

3.4 最佳實(shí)踐建議

  • 消費(fèi)者設(shè)計原則
    • 每個消費(fèi)者設(shè)置唯一標(biāo)識
    • 實(shí)現(xiàn)冪等性處理邏輯
    • 添加合理的阻塞超時時間(通常1-5秒)
  • 性能優(yōu)化
// 批量消費(fèi)提高吞吐量
jedis.xreadGroup(GROUP_NAME, consumerName, 
    new StreamParams().count(100).block(1000),
    new StreamOffset(STREAM_KEY, ">"));

// 批量確認(rèn)減少網(wǎng)絡(luò)開銷
jedis.xack(STREAM_KEY, GROUP_NAME, id1, id2, id3);
  • 監(jiān)控指標(biāo)
    • 待處理消息數(shù)(XPENDING)
    • 消費(fèi)者延遲(當(dāng)前時間 - 消息創(chuàng)建時間)
    • 死信隊(duì)列大小
    • 消費(fèi)成功率
  • 異常處理
// 消費(fèi)者崩潰后的恢復(fù)處理
public void recoverConsumer(String failedConsumer) {
    List<PendingEntry> pendings = jedis.xpending(
        STREAM_KEY, GROUP_NAME, "-", "+", 100, failedConsumer);
    pendings.forEach(pending -> {
        jedis.xclaim(STREAM_KEY, GROUP_NAME, currentConsumer, 
            TIMEOUT_MS, pending.getIdAsString());
    });
}

通過以上實(shí)現(xiàn),基于Redis Stream的消息隊(duì)列可以達(dá)到:

  • 99.9%的消息可靠性
  • 每秒萬級的吞吐量
  • 秒級的端到端延遲
  • 完善的故障恢復(fù)機(jī)制

三、三種方案的選型對比與最佳實(shí)踐

3.1 方案選型對比表:

對比維度List 方案Pub/Sub 方案Stream 方案(推薦)
消息持久化支持(需手動處理)不支持原生支持
消息確認(rèn)需自定義(如RPOPLPUSH)不支持原生支持(ACK機(jī)制)
廣播能力不支持原生支持(全量廣播)支持(通過多消費(fèi)者組實(shí)現(xiàn))
消費(fèi)者負(fù)載均衡支持(競爭消費(fèi)模式)不支持(全量推送)支持(消費(fèi)者組內(nèi)自動均衡)
死信隊(duì)列需自定義(備份List)不支持支持(通過XCLAIM命令)
實(shí)現(xiàn)復(fù)雜度低(基礎(chǔ)命令即可)低(訂閱/發(fā)布模式)中(需理解消費(fèi)者組概念)
內(nèi)存占用線性增長瞬時內(nèi)存可控制(支持消息修剪)
歷史消息回溯有限支持(需保存完整List)不支持完整支持(消息ID時間序列)
適用場景簡單異步通信實(shí)時廣播通知可靠消息、企業(yè)級場景

3.2 最佳實(shí)踐建議

  1. 選型決策樹:

    • 首要判斷消息可靠性需求:
      • 必須保證不丟失 → 直接選擇Stream
      • 可接受偶爾丟失 → 進(jìn)入下一判斷
    • 次要判斷消息分發(fā)模式:
      • 需要廣播 → 選擇Pub/Sub
      • 點(diǎn)對點(diǎn)消費(fèi) → 選擇List或Stream
    • 最后評估開發(fā)成本:
      • 快速實(shí)現(xiàn) → 選擇List
      • 長期維護(hù) → 選擇Stream
  2. Stream方案實(shí)施細(xì)節(jié):

    • 消費(fèi)者組創(chuàng)建示例:
      XGROUP CREATE mystream mygroup $ MKSTREAM
      
    • 典型消費(fèi)代碼邏輯:
      1. 使用XREADGROUP阻塞讀取
      2. 業(yè)務(wù)處理成功后發(fā)送XACK
      3. 處理失敗時使用XCLAIM轉(zhuǎn)移消息
      4. 設(shè)置合理的PEL(Pending Entries List)超時
  3. List方案優(yōu)化建議:

    • 可靠消費(fèi)模式實(shí)現(xiàn):
      RPOPLPUSH source_list backup_list  # 原子操作
      # 處理成功后再LREM備份列表
    • 性能提升技巧:
      • 批量生產(chǎn):使用Pipeline打包多個LPUSH
      • 批量消費(fèi):LUA腳本實(shí)現(xiàn)多消息批量獲取
  4. 集群環(huán)境特別注意事項(xiàng):

    • 跨slot訪問問題:
      • 所有相關(guān)key必須使用相同hash tag(如{msg})
      • 或者采用客戶端分片路由
    • 監(jiān)控重點(diǎn)指標(biāo):
      • Stream方案的PEL積壓長度
      • List方案的內(nèi)存增長曲線
      • Pub/Sub的客戶端連接數(shù)
  5. 運(yùn)維管理建議:

    • 容量規(guī)劃:
      • 按業(yè)務(wù)峰值QPS的1.5倍預(yù)留資源
      • Stream建議單分片不超過10MB/s寫入
    • 監(jiān)控告警:
      • 設(shè)置消息積壓閾值(如Stream的PEL>1000)
      • 監(jiān)控消費(fèi)者延遲(XINFO GROUPS)
    • 災(zāi)備方案:
      • 定期備份Stream的RDB快照
      • 對于關(guān)鍵業(yè)務(wù)實(shí)現(xiàn)雙寫機(jī)制

四、實(shí)際應(yīng)用案例:電商訂單異步處理

4.1 業(yè)務(wù)流程詳解

電商平臺的訂單處理采用異步消息隊(duì)列模式,通過Redis Stream實(shí)現(xiàn)可靠的消息傳遞和消費(fèi)。整個流程包含以下關(guān)鍵環(huán)節(jié):

  1. 訂單創(chuàng)建階段

    • 用戶下單后,訂單服務(wù)作為生產(chǎn)者將訂單數(shù)據(jù)持久化到MySQL數(shù)據(jù)庫
    • 同時將訂單關(guān)鍵信息(訂單ID、用戶ID、商品ID、數(shù)量等)封裝為消息,發(fā)送到名為"order_create"的Stream中
    • 消息格式示例:
      {
        "order_id": "ORD20231125001",
        "user_id": "U10086",
        "product_id": "P8808",
        "quantity": "2"
      }
  2. 并行消費(fèi)階段

    • 通知服務(wù)(消費(fèi)者1):專門處理用戶通知

      • 消費(fèi)消息后調(diào)用短信平臺API或極光推送服務(wù)
      • 通知內(nèi)容示例:"尊敬的會員,您的訂單ORD20231125001已創(chuàng)建成功,我們將盡快為您處理"
      • 支持重試機(jī)制:若首次發(fā)送失敗,會按照指數(shù)退避策略重試3次
    • 庫存服務(wù)(消費(fèi)者2):負(fù)責(zé)庫存扣減

      • 采用樂觀鎖機(jī)制更新庫存:UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?
      • 實(shí)現(xiàn)分布式事務(wù):若扣減失敗會記錄操作日志,便于后續(xù)人工核對
  3. 異常處理機(jī)制

    • 當(dāng)庫存扣減失敗時,消息會進(jìn)入Pending列表并設(shè)置5分鐘超時
    • 超時后自動轉(zhuǎn)移到死信隊(duì)列"DLQ:order_create"
    • 運(yùn)維人員通過管理后臺查看死信隊(duì)列,可:
      • 人工補(bǔ)扣庫存
      • 觸發(fā)訂單取消流程
      • 聯(lián)系用戶協(xié)商處理

4.2 核心代碼實(shí)現(xiàn)(生產(chǎn)級優(yōu)化版)

訂單服務(wù)(生產(chǎn)者)增強(qiáng)實(shí)現(xiàn)

// 訂單服務(wù)(生產(chǎn)者)發(fā)送消息 - 增強(qiáng)版
public void createOrder(Order order) {
    // 1. 數(shù)據(jù)庫事務(wù)確保數(shù)據(jù)一致性
    TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
    try {
        // 1.1 保存主訂單
        orderMapper.insert(order);
        // 1.2 保存訂單明細(xì)
        order.getItems().forEach(item -> {
            item.setOrderId(order.getId());
            orderItemMapper.insert(item);
        });
        // 2. 構(gòu)建消息體(添加時間戳和業(yè)務(wù)標(biāo)識)
        Map<String, String> message = new HashMap<>();
        message.put("order_id", order.getId());
        message.put("user_id", order.getUserId());
        message.put("product_id", order.getProductId()); 
        message.put("quantity", order.getQuantity() + "");
        message.put("create_time", System.currentTimeMillis() + "");
        message.put("biz_type", "NORMAL_ORDER");
        // 3. 發(fā)送消息(添加重試機(jī)制)
        int retryTimes = 0;
        while (retryTimes < 3) {
            try {
                jedis.xadd("redis:mq:stream:order_create", null, message);
                break;
            } catch (Exception e) {
                retryTimes++;
                if (retryTimes == 3) {
                    throw new RuntimeException("消息發(fā)送失敗", e);
                }
                Thread.sleep(1000 * retryTimes);
            }
        }
        transactionManager.commit(status);
    } catch (Exception e) {
        transactionManager.rollback(status);
        throw new BusinessException("訂單創(chuàng)建失敗", e);
    }
}

通知服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)

// 通知服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)
public void handleNotification() {
    // 初始化消費(fèi)者組(冪等操作)
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
        try {
            Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                "order_group", 
                "notify_consumer_" + instanceId, // 使用實(shí)例ID區(qū)分消費(fèi)者
                1, 
                5000, 
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null && !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                    Map<String, String> content = entry.getFields();
                    String userId = content.get("user_id");
                    String orderId = content.get("order_id");
                    // 1. 發(fā)送短信(帶熔斷機(jī)制)
                    boolean smsSent = circuitBreaker.execute(() -> 
                        smsService.send(userId, "訂單通知", "您的訂單" + orderId + "已創(chuàng)建成功"));
                    // 2. 發(fā)送APP推送
                    boolean pushSent = pushService.send(userId, "訂單創(chuàng)建通知", 
                        Map.of("orderId", orderId, "type", "order_created"));
                    if (smsSent || pushSent) {
                        // 至少一個通知發(fā)送成功才確認(rèn)消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("order_notify");
                    } else {
                        monitorService.recordFailure("order_notify");
                    }
                }
            }
        } catch (Exception e) {
            log.error("通知處理異常", e);
            monitorService.recordError("order_notify", e);
            Thread.sleep(5000); // 異常休眠避免循環(huán)異常
        }
    }
}
private void initConsumerGroup(String streamKey, String groupName) {
    try {
        jedis.xgroupCreate(streamKey, groupName, StreamEntryID.LAST_ENTRY, true);
    } catch (RedisBusyException e) {
        log.info("消費(fèi)者組已存在: {}", groupName);
    }
}

庫存服務(wù)(消費(fèi)者)完整實(shí)現(xiàn)

// 庫存服務(wù)(消費(fèi)者)完整實(shí)現(xiàn) 
public void handleInventory() {
    // 初始化消費(fèi)者組
    initConsumerGroup("redis:mq:stream:order_create", "order_group");
    while (!Thread.currentThread().isInterrupted()) {
        try {
            Map<String, List<StreamEntry>> messages = jedis.xreadGroup(
                "order_group",
                "inventory_consumer_" + instanceId,
                1,
                5000,
                false,
                Map.of("redis:mq:stream:order_create", StreamEntryID.UNRECEIVED_ENTRY)
            );
            if (messages != null && !messages.isEmpty()) {
                for (StreamEntry entry : messages.get("redis:mq:stream:order_create")) {
                    Map<String, String> content = entry.getFields();
                    String productId = content.get("product_id");
                    int quantity = Integer.parseInt(content.get("quantity"));
                    String orderId = content.get("order_id");
                    // 1. 扣減庫存(帶事務(wù))
                    boolean success = inventoryService.deductWithLog(
                        productId, 
                        quantity,
                        orderId,
                        "ORDER_DEDUCTION"
                    );
                    if (success) {
                        // 2. 確認(rèn)消息
                        jedis.xack("redis:mq:stream:order_create", "order_group", entry.getID());
                        monitorService.recordSuccess("inventory_deduct");
                    } else {
                        // 3. 記錄失敗日志
                        log.warn("庫存扣減失敗 orderId={}, productId={}", orderId, productId);
                        monitorService.recordFailure("inventory_deduct");
                        // 不確認(rèn)消息,讓其進(jìn)入Pending狀態(tài)
                    }
                }
            }
        } catch (Exception e) {
            log.error("庫存處理異常", e);
            monitorService.recordError("inventory_deduct", e);
            Thread.sleep(5000);
        }
    }
}
// 庫存扣減服務(wù)方法
@Transactional
public boolean deductWithLog(String productId, int quantity, String bizId, String bizType) {
    // 1. 扣減庫存
    int affected = inventoryMapper.deductWithVersion(
        productId, 
        quantity,
        getCurrentVersion(productId)
    );
    if (affected == 0) {
        return false;
    }
    // 2. 記錄操作流水
    InventoryLog log = new InventoryLog();
    log.setLogId(UUID.randomUUID().toString());
    log.setProductId(productId);
    log.setChangedAmount(-quantity);
    log.setBizId(bizId);
    log.setBizType(bizType);
    log.setRemarks("訂單扣減");
    inventoryLogMapper.insert(log);
    return true;
}

4.3 監(jiān)控與運(yùn)維設(shè)計

  1. 監(jiān)控指標(biāo)

    • 消息堆積量:XLEN redis:mq:stream:order_create
    • Pending列表數(shù)量:XPENDING redis:mq:stream:order_create order_group
    • 消費(fèi)者延遲:通過消息時間戳與當(dāng)前時間差值計算
  2. 運(yùn)維命令示例

    # 查看消費(fèi)者組信息
    XINFO GROUPS redis:mq:stream:order_create
    # 處理死信消息
    XRANGE DLQ:order_create - + COUNT 10
    XACK DLQ:order_create manual_group <entry_id>
  3. 自動恢復(fù)方案

    • 定時任務(wù)每小時檢查Pending列表
    • 對于超時1小時未處理的消息:
      • 嘗試重新投遞到原Stream
      • 超過3次重試則轉(zhuǎn)入死信隊(duì)列
      • 觸發(fā)企業(yè)微信告警通知運(yùn)維人員

到此這篇關(guān)于Redis 是如何實(shí)現(xiàn)消息隊(duì)列的?的文章就介紹到這了,更多相關(guān)Redis消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 高效異步redis客戶端aredis優(yōu)劣勢原理解析

    高效異步redis客戶端aredis優(yōu)劣勢原理解析

    這篇文章主要介紹了高效異步redis客戶端aredis優(yōu)劣勢原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2020-09-09
  • Redis關(guān)于內(nèi)存碎片的解決方法

    Redis關(guān)于內(nèi)存碎片的解決方法

    今天生產(chǎn)機(jī)報內(nèi)存爆滿異常被叫過去查看問題,通過各種排除最終定位到了Redis的內(nèi)存碎片的問題,這篇博客將詳細(xì)介紹Redis內(nèi)存碎片問題并給出最佳實(shí)踐解決此問題,需要的朋友可以參考下
    2024-07-07
  • 基于Redis結(jié)合SpringBoot的秒殺案例詳解

    基于Redis結(jié)合SpringBoot的秒殺案例詳解

    這篇文章主要介紹了Redis結(jié)合SpringBoot的秒殺案例,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-09-09
  • redis.clients.jedis.exceptions.JedisDataException異常的錯誤解決

    redis.clients.jedis.exceptions.JedisDataException異常的錯誤解決

    本文主要介紹了redis.clients.jedis.exceptions.JedisDataException異常的錯誤解決,這個異常通常發(fā)生在嘗試連接到一個?Redis?服務(wù)器時,客戶端發(fā)送了一個?AUTH?命令來驗(yàn)證密碼,但是沒有配置密碼驗(yàn)證,下來就來解決一下
    2024-05-05
  • 詳解Spring?Boot?訪問Redis的三種方式

    詳解Spring?Boot?訪問Redis的三種方式

    這篇文章主要介紹了Spring?Boot?訪問Redis的三種方式,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-12-12
  • Redis的Python客戶端redis-py安裝使用說明文檔

    Redis的Python客戶端redis-py安裝使用說明文檔

    這篇文章主要介紹了Redis的Python客戶端redis-py安裝使用說明文檔,本文講解了安裝方法、入門使用實(shí)例、API參考和詳細(xì)說明,需要的朋友可以參考下
    2015-06-06
  • 深度剖析Redis字符串操作指南從入門到實(shí)戰(zhàn)應(yīng)用

    深度剖析Redis字符串操作指南從入門到實(shí)戰(zhàn)應(yīng)用

    Redis字符串類型二進(jìn)制安全,支持文本、數(shù)字、二進(jìn)制等數(shù)據(jù),涵蓋基礎(chǔ)操作、數(shù)字計算、過期管理及分布式鎖等應(yīng)用,結(jié)合優(yōu)化策略提升系統(tǒng)性能,本文給大家介紹Redis字符串操作指南,感興趣的朋友一起看看吧
    2025-07-07
  • Redis服務(wù)器的啟動過程分析

    Redis服務(wù)器的啟動過程分析

    這篇文章主要介紹了Redis服務(wù)器的啟動過程分析,本文講解了初始化Redis服務(wù)器全局配置、加載配置文件、初始化服務(wù)器、加載數(shù)據(jù)、開始網(wǎng)絡(luò)監(jiān)聽等內(nèi)容,需要的朋友可以參考下
    2015-04-04
  • redis中l(wèi)ua腳本使用教程

    redis中l(wèi)ua腳本使用教程

    在使用redis的過程中,發(fā)現(xiàn)有些時候需要原子性去操作redis命令,而redis的lua腳本正好可以實(shí)現(xiàn)這一功能。這篇文章主要介紹了redis中l(wèi)ua腳本的簡單使用,需要的朋友可以參考下
    2021-10-10
  • Redis連接超時異常的處理方法

    Redis連接超時異常的處理方法

    這篇文章主要給大家介紹了關(guān)于Redis連接超時異常的處理方法,文中通過示例代碼以及圖文介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07

最新評論