欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

解讀Redis秒殺優(yōu)化方案(阻塞隊列+基于Stream流的消息隊列)

 更新時間:2025年02月05日 16:44:38   作者:記得開心一點(diǎn)嘛  
該文章介紹了使用Redis的阻塞隊列和Stream流的消息隊列來優(yōu)化秒殺系統(tǒng)的方案,通過將秒殺流程拆分為兩條流水線,使用Redis緩存緩解數(shù)據(jù)庫壓力,并結(jié)合Lua腳本進(jìn)行原子性判斷,使用阻塞隊列和消息隊列異步處理訂單,有效提高了系統(tǒng)的并發(fā)處理能力和可用性

Redis秒殺優(yōu)化方案(阻塞隊列+Stream流的消息隊列)

下面是我們的秒殺流程:

對于正常的秒殺處理,我們需要多次查詢數(shù)據(jù)庫,會給數(shù)據(jù)庫造成相當(dāng)大的壓力,這個時候我們需要加入緩存,進(jìn)而緩解數(shù)據(jù)庫壓力。

在上面的圖示中,我們可以將一條流水線的任務(wù)拆成兩條流水線來做,如果我們直接將判斷秒殺庫存與校驗(yàn)一人一單放在流水線A上,剩下的放在另一條流水線B,那么如果流水線A就可以相當(dāng)于服務(wù)員直接判斷是否符合資格,如果符合資格那么直接生成信息給另一條流水線B去處理業(yè)務(wù),這里的流水線就是咱們的線程,而流水線A也是基于數(shù)據(jù)庫進(jìn)行查詢,也會壓力數(shù)據(jù)庫,那么這種情況我們就可以將待查詢信息保存在Redis緩存中。

但是我們不能再流水線A判斷完成后去直接調(diào)用流水線B,這樣的效率是大打折扣的,這種情況我們需要開啟獨(dú)立線程去執(zhí)行流水線B的操作,如何知道給哪個用戶創(chuàng)建訂單呢?這個時候就要流水線A在判斷成功后去生成信息給獨(dú)立線程。

最后的業(yè)務(wù)就變成,用戶直接訪問流水線A,通過流水線A去判斷,如果通過則生成信息給流水線B去創(chuàng)建訂單,過程如下圖:

那么什么樣的數(shù)據(jù)結(jié)構(gòu)滿足下面條件:

  • ① 一個key能夠保存很多值
  • ②唯一性:一人一單需要保證用戶id不能重復(fù)。

所以我們需要使用set:

那么如何判斷校驗(yàn)用戶的購買資格呢?

而上述判斷需要保證原子性,所以我們需要使用Lua腳本進(jìn)行編寫:

local voucherId = ARGV[1]; -- 優(yōu)惠劵id
local userId = ARGV[2]; -- 用戶id

-- 庫存key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 訂單key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 判斷庫存是否充足
if(tonumber(redis.call('get',stockKey) <= 0)) then
    -- 庫存不足,返回1
    return 1;
end;
-- 判斷用戶是否下單
if(redis.call('sismember',orderKey,userId)) then
    -- 存在,說明重復(fù)下單,返回2
    return 2;
end
-- 扣減庫存 incrby stockKey -1
redis.call('incrby',stockKey,-1);
-- 下單(保存用戶) sadd orderKey userId
redis.call('sadd',orderKey,userId);
return 0;

之后我們按照下面步驟來實(shí)現(xiàn)代碼:

在方法體內(nèi)執(zhí)行Lua腳本來原子性判斷,然后判斷是否能夠處理并傳入阻塞隊列:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內(nèi)填入返回值類型
    static { // 靜態(tài)屬性要使用靜態(tài)代碼塊進(jìn)行初始化
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setResultType(Long.class);
        SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    }
    public Result seckillVoucherMax(Long voucherId) {
        // 獲取用戶信息
        Long userId = UserHolder.getUser().getId();
        // 1.執(zhí)行Lua腳本來判斷用戶資格
        Long result = stringRedisTemplate.execute(
                            SECKILL_SCRIPT,
                            Collections.emptyList(), // Lua無需接受key
                            voucherId.toString(),
                            userId.toString()
                        );
        // 2.判斷結(jié)果是否為0
        int r = result.intValue();
        if(r != 0) {
            // 不為0代表無資格購買
            return Result.fail(r == 1 ? "庫存不足" : "不能重復(fù)下單");
        }
        // 3.有購買資格則將下單信息保存到阻塞隊列中
        // ... 
        return Result.ok();
    }

}

接下來我們創(chuàng)建阻塞隊列,線程池以及線程方法,隨后使用Springboot提供的注解在@PostConstruct去給線程池傳入線程方法:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型內(nèi)填入返回值類型
    static { // 靜態(tài)屬性要使用靜態(tài)代碼塊進(jìn)行初始化
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setResultType(Long.class);
        SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    }
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 創(chuàng)建阻塞隊列
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  // 創(chuàng)建線程池
    // 讓大類在開始初始化時就能夠執(zhí)行線程任務(wù)
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
    }
    // 創(chuàng)建線程任務(wù)
    private class VoucherOrderTask implements Runnable {
        @Override
        public void run() {
            while(true){
                try {
                    // 獲取隊列中的訂單信息
                    VoucherOrder voucherOrder = orderTasks.take();// 取出頭部信息
                    // 創(chuàng)建訂單
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("處理訂單異常",e);
                }
            }
        }
    }
    // 創(chuàng)建訂單
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());
        boolean isLock = lock.tryLock();
        // 判斷是否獲取鎖成功
        if (!isLock) {
            // 獲取鎖失敗,返回錯誤或重試
            log.error("不允許重復(fù)下單");
            return ;
        }
        try {
            proxy.createVoucherOrderMax(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void createVoucherOrderMax(VoucherOrder voucherOrder) {
        // 一人一單
        Long userId = voucherOrder.getUserId();
        // 查詢訂單
        int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 判斷是否存在
        if(count > 0){
            // 用戶已經(jīng)購買過
            log.error("用戶已經(jīng)購買過");
            return ;
        }
        // CAS改進(jìn):將庫存判斷改成stock > 0以此來提高性能
        boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0
                .update();
        if (!success) {
            //扣減庫存
            log.error("庫存不足!");
            return ;
        }
        //6.創(chuàng)建訂單
        save(voucherOrder);
    }
    private IVoucherOrderService proxy; // 代理對象
    public Result seckillVoucherMax(Long voucherId) {
        // 獲取用戶信息
        Long userId = UserHolder.getUser().getId();
        // 1.執(zhí)行Lua腳本來判斷用戶資格
        Long result = stringRedisTemplate.execute(
                            SECKILL_SCRIPT,
                            Collections.emptyList(), // Lua無需接受key
                            voucherId.toString(),
                            userId.toString()
                        );
        // 2.判斷結(jié)果是否為0
        int r = result.intValue();
        if(r != 0) {
            // 不為0代表無資格購買
            return Result.fail(r == 1 ? "庫存不足" : "不能重復(fù)下單");
        }
        // 3.有購買資格則將下單信息保存到阻塞隊列中
        Long orderId = redisIdWorker.nextId("order");
        // 創(chuàng)建訂單
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        // 放入阻塞隊列
        orderTasks.add(voucherOrder);
        // 4.獲取代理對象(線程異步執(zhí)行,需要手動在方法內(nèi)獲取)
        proxy = (IVoucherOrderService)AopContext.currentProxy(); // 獲取當(dāng)前類的代理對象  (需要引入aspectjweaver依賴,并且在實(shí)現(xiàn)類加入@EnableAspectJAutoProxy(exposeProxy = true)以此來暴露代理對象)
        return Result.ok();
    }

}

在上面代碼中,我們使用下面代碼創(chuàng)建了一個單線程的線程池。它保證所有提交的任務(wù)都按照提交的順序執(zhí)行,每次只有一個線程在工作。

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

下面代碼是一個常見的阻塞隊列實(shí)現(xiàn),具有固定大小(在這里是 1024 * 1024),它的作用是緩沖和排隊任務(wù)。ArrayBlockingQueue 是一個線程安全的隊列,它會自動處理線程之間的同步問題。當(dāng)隊列滿時,調(diào)用 put() 方法的線程會被阻塞,直到隊列有空間;當(dāng)隊列為空時,調(diào)用 take() 方法的線程會被阻塞,直到隊列中有數(shù)據(jù)。

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

在下面代碼中,orderTasks 阻塞隊列用于存放需要處理的訂單對象,每個訂單的處理邏輯都由 VoucherOrderTask 線程池中的線程異步執(zhí)行:

VoucherOrder voucherOrder = orderTasks.take();
handleVoucherOrder(voucherOrder);

之后我們需要調(diào)用 Runnable 接口去實(shí)現(xiàn)VoucherOrderTask類以此來創(chuàng)建線程方法

private class VoucherOrderTask implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // 獲取隊列中的訂單信息
                VoucherOrder voucherOrder = orderTasks.take(); // 獲取訂單
                // 創(chuàng)建訂單
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("處理訂單異常", e);
            }
        }
    }
}

隨后將線程方法通過 submit() 方法將 VoucherOrderTask 提交到線程池中,這個任務(wù)是一個無限循環(huán)的任務(wù),它會不斷從阻塞隊列中取出訂單并處理,直到線程池關(guān)閉。

這種方式使得訂單處理任務(wù)可以異步執(zhí)行,而不阻塞主線程,提高了系統(tǒng)的響應(yīng)能力:

@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}

但是在高并發(fā)的情況下就會產(chǎn)生大量訂單,就會超出JVM阻塞隊列的上線,并且每當(dāng)服務(wù)重啟或者宕機(jī)的情況發(fā)生,阻塞隊列的所有訂單任務(wù)就都會丟失。

所以為了解決這種情況,我們就要使用消息隊列去解決這個問題:

什么是消息隊列?

消息隊列(Message Queue, MQ)是一種用于在應(yīng)用程序之間傳遞消息的通信方式。它允許應(yīng)用程序通過發(fā)送和接收消息來解耦,從而提高系統(tǒng)的可擴(kuò)展性、可靠性和靈活性。消息隊列通常用于異步通信、任務(wù)隊列、事件驅(qū)動架構(gòu)等場景。

消息隊列的核心概念 :

  • 生產(chǎn)者(Producer):發(fā)送消息到消息隊列的應(yīng)用程序。
  • 消費(fèi)者(Consumer):從消息隊列中接收并處理消息的應(yīng)用程序。
  • 隊列(Queue):消息的存儲區(qū)域,生產(chǎn)者將消息發(fā)送到隊列,消費(fèi)者從隊列中獲取消息。
  • 消息(Message):在生產(chǎn)者與消費(fèi)者之間傳遞的數(shù)據(jù)單元。
  • Broker:消息隊列的服務(wù)器,負(fù)責(zé)接收、存儲和轉(zhuǎn)發(fā)消息。

消息隊列是在JVM以外的一個獨(dú)立的服務(wù),能夠不受JVM內(nèi)存的限制,并且存入MQ的信息都可以做持久化存儲。

詳細(xì)教學(xué)可以查詢下面鏈接:微服務(wù)架構(gòu) --- 使用RabbitMQ進(jìn)行異步處理

但是這樣的方式是需要額外提供服務(wù)的,所以我們可以使用Redis提供的三種不同的方式來實(shí)現(xiàn)消息隊列

  1. List 結(jié)構(gòu)實(shí)現(xiàn)消息隊列
  2. Pub/Sub(發(fā)布/訂閱)模式
  3. Stream 結(jié)構(gòu)(Redis 5.0 及以上版本)(推薦使用)(詳細(xì)介紹)

使用 List 結(jié)構(gòu)實(shí)現(xiàn)消息隊列:

Redis 的 List 數(shù)據(jù)結(jié)構(gòu)是一個雙向鏈表,支持從頭部或尾部插入和彈出元素。我們可以利用 LPUSHBRPOP 命令實(shí)現(xiàn)一個簡單的消息隊列。

實(shí)現(xiàn)步驟:

  • 生產(chǎn)者:使用 LPUSH 將消息推入隊列。
  • 消費(fèi)者:使用 BRPOP 阻塞地從隊列中獲取消息。

生產(chǎn)者代碼:

import redis.clients.jedis.Jedis;

public class ListProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
        String queueName = "myQueue";

        // 發(fā)送消息
        for (int i = 1; i <= 5; i++) {
            String message = "Message " + i;
            jedis.lpush(queueName, message); // 將消息推入隊列
            System.out.println("Sent: " + message);
        }

        jedis.close(); // 關(guān)閉連接
    }
}

消費(fèi)者代碼:

import redis.clients.jedis.Jedis;

public class ListConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
        String queueName = "myQueue";

        while (true) {
            // 阻塞獲取消息,超時時間為 0(無限等待)
            var result = jedis.brpop(0, queueName);
            String message = result.get(1); // 獲取消息內(nèi)容
            System.out.println("Received: " + message);
        }
    }
}
  • 優(yōu)點(diǎn):簡單易用,適合輕量級場景。
  • 缺點(diǎn)不支持消息確認(rèn)機(jī)制,消息一旦被消費(fèi)(從隊列內(nèi)取出)就會從隊列中刪除。并且只支持單消費(fèi)者(一個消息只能拿出一次)

使用 Pub/Sub 模式實(shí)現(xiàn)消息隊列:

Redis 的 Pub/Sub 模式是一種發(fā)布-訂閱模型,生產(chǎn)者將消息發(fā)布到頻道,消費(fèi)者訂閱頻道以接收消息。

實(shí)現(xiàn)步驟:

  • 生產(chǎn)者:使用 PUBLISH 命令向頻道發(fā)布消息。
  • 消費(fèi)者:使用 SUBSCRIBE 命令訂閱頻道。

生產(chǎn)者代碼:

import redis.clients.jedis.Jedis;

public class PubSubProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
        String channelName = "myChannel";

        // 發(fā)布消息
        for (int i = 1; i <= 5; i++) {
            String message = "Message " + i;
            jedis.publish(channelName, message); // 發(fā)布消息到頻道
            System.out.println("Published: " + message);
        }

        jedis.close(); // 關(guān)閉連接
    }
}

消費(fèi)者代碼:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PubSubConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 連接 Redis
        String channelName = "myChannel";

        // 創(chuàng)建訂閱者
        JedisPubSub subscriber = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Received: " + message);
            }
        };

        // 訂閱頻道
        jedis.subscribe(subscriber, channelName);
    }
}
  • 優(yōu)點(diǎn):支持一對多的消息廣播。
  • 缺點(diǎn):消息是即時的,如果消費(fèi)者不在線,消息會丟失。

但是上面兩方式都是有缺點(diǎn)的:

  1. 不支持消息確認(rèn)機(jī)制,消息一旦被消費(fèi)(從隊列內(nèi)取出)就會從隊列中刪除。并且只支持單消費(fèi)者(一個消息只能拿出一次)
  2. 消息是即時的,如果消費(fèi)者不在線,消息會丟失。

所以根據(jù)上面的兩種方式,我們推出一款全新的方式 ->

使用 Stream 結(jié)構(gòu)實(shí)現(xiàn)消息隊列:

Redis Stream 是一種強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),用于管理消息流。它將消息存儲在 Redis 中,并允許消費(fèi)者按順序獲取消息。Stream 具有以下特點(diǎn):

  • 有序消息:消息按插入順序排列。
  • 消費(fèi)者組:一個消費(fèi)者組可以有多個消費(fèi)者,每個消費(fèi)者可以獨(dú)立消費(fèi)不同的消息。
  • 消息 ID:每條消息都有唯一的 ID(如:1588890470850-0),ID 按時間戳生成。
  • 自動分配消息:多個消費(fèi)者可以從 Stream 中并行消費(fèi)消息,保證消息不會重復(fù)消費(fèi)。

在 Redis Stream 中,一個隊列可以有多個消費(fèi)者組,每個消費(fèi)者組可以獨(dú)立地消費(fèi)隊列中的消息。每個消費(fèi)者組內(nèi)有多個消費(fèi)者,而消費(fèi)者是基于 消費(fèi)者名稱 進(jìn)行識別的。

消費(fèi)者組的工作方式

  • 每個消費(fèi)者組擁有自己的 消費(fèi)進(jìn)度,也就是每個消費(fèi)者組會從 自己獨(dú)立的消息 ID 開始消費(fèi)
  • 多個消費(fèi)者組之間是相互獨(dú)立的,即使它們消費(fèi)的是同一個隊列,它們也可以從不同的位置開始消費(fèi)隊列中的消息。
  • 每個消費(fèi)者組都可以有多個 消費(fèi)者(在同一個組內(nèi),多個消費(fèi)者可以并行消費(fèi)同一個隊列的消息,但每個消息在消費(fèi)者組內(nèi)只能被一個消費(fèi)者處理一次)。

假設(shè)有一個隊列(Stream)mystream,可以為它創(chuàng)建多個消費(fèi)者組:

XGROUP CREATE mystream group1 $ MKSTREAM
XGROUP CREATE mystream group2 $ MKSTREAM

這樣,mystream 隊列上就有了兩個消費(fèi)者組:group1group2。每個消費(fèi)者組可以有自己的消費(fèi)者并從該隊列中讀取消息。此時,group1group2 都在消費(fèi)同一個隊列 mystream,但它們的消費(fèi)進(jìn)度是獨(dú)立的,它們各自有自己的消息 ID 記錄。

每個消費(fèi)者組可以有多個消費(fèi)者,而每個消費(fèi)者通過一個 唯一的消費(fèi)者名稱 來標(biāo)識。

每個消費(fèi)者組有獨(dú)立的消費(fèi)進(jìn)度

每個消費(fèi)者組會記錄自己的消費(fèi)進(jìn)度,也就是它消費(fèi)到隊列中的 哪個消息 ID。即使多個消費(fèi)者組在消費(fèi)同一個消息隊列,它們每個組都會從 不同的消費(fèi)位置(消息 ID)開始讀取消息。

例如,假設(shè)有一個隊列 mystream,同時有兩個消費(fèi)者組 group1group2,它們都從 mystream 隊列中讀取消息:

  • group1mystream 隊列中的消息 id1 開始消費(fèi),group1 的進(jìn)度會記錄在 Redis 中。
  • group2mystream 隊列中的消息 id2 開始消費(fèi),group2 的進(jìn)度也會記錄在 Redis 中。

消費(fèi)進(jìn)度互不干擾,即便 group1group2 都在消費(fèi) mystream 隊列,它們的消費(fèi)位置是獨(dú)立的。

消費(fèi)者組內(nèi)部的消息消費(fèi)

一個消費(fèi)者組內(nèi)的消費(fèi)者會 共享 組內(nèi)的消息。即使有多個消費(fèi)者,每條消息 在消費(fèi)者組內(nèi)部只會被 一個消費(fèi)者 消費(fèi)。消費(fèi)者之間會并行處理消息,但每條消息只會被一個消費(fèi)者處理。

舉個例子:假設(shè) group1 中有三個消費(fèi)者 consumer1consumer2、consumer3,如果隊列 mystream 有 6 條消息,那么它們會如下消費(fèi):

  • consumer1 處理消息 12
  • consumer2 處理消息 3、4
  • consumer3 處理消息 5、6

但對于消費(fèi)者組 group2,如果它有自己的消費(fèi)者,group2 內(nèi)的消費(fèi)者也會并行消費(fèi) mystream 中的消息,而 group1group2 之間沒有直接關(guān)系。

首先初始化一個消息隊列:

在項(xiàng)目啟動時,確保 Redis 中存在對應(yīng)的 Stream 和消費(fèi)者組。可以通過程序在啟動時檢查并創(chuàng)建(如果不存在的話)。

@Configuration
public class RedisStreamConfig {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String STREAM_KEY = "mystream";
    private static final String GROUP_NAME = "mygroup";

    @PostConstruct
    public void init() {
        // 檢查消費(fèi)者組是否存在,若不存在則創(chuàng)建
        try {
            // 如果消費(fèi)者組不存在則會拋出異常,我們捕獲異常進(jìn)行創(chuàng)建
            redisTemplate.opsForStream().groups(STREAM_KEY);
        } catch (Exception e) {
            // 創(chuàng)建消費(fèi)者組,起始位置為 $ 表示從末尾開始消費(fèi)新消息
            redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
        }
    }
}

注意:

  • opsForStream().groups(STREAM_KEY):查詢消費(fèi)者組是否已存在。
  • opsForStream().createGroup(STREAM_KEY, GROUP_NAME):如果沒有消費(fèi)者組,則創(chuàng)建一個新的組。

隨后我們生產(chǎn)者發(fā)送消息示例:

@Service  
public class RedisStreamProducerService {  // 定義生產(chǎn)者服務(wù)類 RedisStreamProducerService

    private static final String STREAM_KEY = "mystream";  // 定義 Redis Stream 的名稱,這里指定隊列名為 "mystream"

    @Autowired  
    private StringRedisTemplate redisTemplate;

    public void sendMessage(String content) {  // 定義一個方法,發(fā)送消息到 Redis Stream,參數(shù) content 是消息的內(nèi)容
        Map<String, String> map = new HashMap<>();  // 創(chuàng)建一個 Map 用來存儲消息內(nèi)容
        map.put("content", content);  // 將消息內(nèi)容添加到 Map 中,鍵是 "content",值是傳入的內(nèi)容

        // 在消息隊列中添加消息,調(diào)用 StringRedisTemplate 的 opsForStream 方法
        RecordId recordId = redisTemplate.opsForStream()  // 獲取操作 Redis Stream 的操作對象
                .add(StreamRecords.objectBacked(map)  // 創(chuàng)建一個 Stream 記錄,將 Map 轉(zhuǎn)化為對象記錄
                .withStreamKey(STREAM_KEY));  // 設(shè)置該記錄屬于的 Stream(消息隊列)的名稱
        // 輸出記錄的 ID,表示消息已經(jīng)成功發(fā)送
        System.out.println("消息發(fā)送成功,id: " + recordId.getValue());  // 打印消息的 ID,表明該消息已經(jīng)被成功加入到 Stream 中
    }
}

RecordId 是 Spring Data Redis 中的一個類,用來表示 消息的唯一標(biāo)識符。它對應(yīng) Redis Stream 中的 消息 ID,該 ID 是 Redis Stream 中每條消息的唯一標(biāo)識。Redis 中的消息 ID 通常是由時間戳和序號組成的(如 1588890470850-0)。

主要功能:

  • 表示消息 IDRecordId 是一個封裝類,表示 Redis Stream 中消息的 ID。
  • 用于識別和操作消息:在消費(fèi)和確認(rèn)消息時,RecordId 用來標(biāo)識每條消息的唯一性,并幫助 Redis 確定消息是否已經(jīng)被消費(fèi)。

使用場景:

RecordId 用來標(biāo)識從 Stream 中讀取到的消息,我們可以通過 RecordId 來進(jìn)行消息的確認(rèn)、刪除或其他操作。

RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));

通過 StreamRecords.objectBacked(map)map 對象作為消息內(nèi)容,并用 add 方法將其寫入 Stream。

在然后編寫消費(fèi)者服務(wù):

使用 RedisTemplate 的 read 方法(底層執(zhí)行的是 XREADGROUP 命令)從消費(fèi)者組中拉取消息,并進(jìn)行處理。消費(fèi)者可以采用定時任務(wù)或后臺線程不斷輪詢。

@Slf4j  
@Service  
public class RedisStreamConsumerService { 
    private static final String STREAM_KEY = "mystream";  // Redis Stream 的名稱,這里指定隊列名為 "mystream"
    private static final String GROUP_NAME = "mygroup";  // 消費(fèi)者組的名稱,多個消費(fèi)者可以通過組名共享消費(fèi)隊列
    private static final String CONSUMER_NAME = "consumer-1";  // 消費(fèi)者的名稱,消費(fèi)者名稱在同一消費(fèi)者組內(nèi)必須唯一

    @Autowired  
    private StringRedisTemplate redisTemplate;

    @PostConstruct  // 使用該注解能讓方法在 Spring 完成依賴注入后自動調(diào)用,用于初始化任務(wù)
    @Async  // 將該方法標(biāo)記為異步執(zhí)行,允許它在單獨(dú)的線程中運(yùn)行,不會阻塞主線程,@EnableAsync 需要在配置類中啟用
    public void start() {  // 啟動方法,在應(yīng)用啟動時執(zhí)行
        // 無限循環(huán),不斷從 Redis Stream 中讀取消息(可以改為定時任務(wù)等方式)
        while (true) {
            try {
                // 設(shè)置 Stream 讀取的阻塞超時,設(shè)置最多等待 2 秒
                StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));
                // 從指定的消費(fèi)者組中讀取消息,">" 表示只消費(fèi)未被消費(fèi)過的消息
                List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                        Consumer.from(GROUP_NAME, CONSUMER_NAME),  // 指定消費(fèi)者組和消費(fèi)者名稱
                        options,  // 設(shè)置讀取選項(xiàng),包含阻塞時間
                        StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())  // 從最后消費(fèi)的消息開始讀取
                );
                // 如果沒有消息,繼續(xù)循環(huán)讀取
                if (messages == null || messages.isEmpty()) {
                    continue;  
                }
                // 處理每一條讀取到的消息
                for (MapRecord<String, Object, Object> message : messages) {
                    String messageId = message.getId();  // 獲取消息的唯一標(biāo)識符(ID)
                    Map<Object, Object> value = message.getValue();  // 獲取消息內(nèi)容(以 Map 形式存儲)
                    log.info("接收到消息,id={},內(nèi)容={}", messageId, value);  // 打印日志,記錄消息 ID 和內(nèi)容
                    // 在這里加入業(yè)務(wù)邏輯處理
                    // 例如處理消息并執(zhí)行相應(yīng)的操作
                    // ...

                    // 消息處理成功后,需要確認(rèn)消息已經(jīng)被消費(fèi)(通過 XACK 命令)
                    redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);  // 確認(rèn)消費(fèi)的消息
                }
            } catch (Exception e) {
                log.error("讀取 Redis Stream 消息異常", e);  // 異常捕獲,記錄錯誤日志
            }
        }
    }
}

MapRecord<String, Object, Object> 是 Spring Data Redis 用來表示 Redis Stream 中的 消息記錄 的類。它不僅包含了消息的 ID,還包含了消息的內(nèi)容(即消息數(shù)據(jù))。

在 Redis 中,每條消息都存儲為一個 key-value 對。

主要功能:

  • 封裝消息 ID 和消息內(nèi)容MapRecord 用來封裝消息的 ID 和消息的內(nèi)容。
  • 消息的內(nèi)容:消息的內(nèi)容通常是一個 鍵值對Map<String, Object>),可以是任意對象的數(shù)據(jù)結(jié)構(gòu)(例如,JSON、Map 或其他序列化對象)。

字段:

  • getId():返回消息的 ID(RecordId 類型)。
  • getValue():返回消息的內(nèi)容,以 Map<Object, Object> 的形式。

使用場景:

MapRecord 是用來表示從 Stream 中讀取到的消息,它將消息的 ID 和內(nèi)容(鍵值對)封裝在一起。你可以使用 MapRecord 來獲取消息的 ID 和內(nèi)容并處理。

MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));

在這個例子中,message 是一個 MapRecord 實(shí)例,它封裝了從 mystream 隊列中讀取到的消息。我們可以通過 message.getId() 獲取消息 ID,通過 message.getValue() 獲取消息內(nèi)容。

在消費(fèi)者中,我們使用 MapRecord<String, Object, Object> 來封裝消息,獲取 message.getId() 來獲取消息的 ID(RecordId),以及通過 message.getValue() 獲取消息的內(nèi)容。 隨后在處理完消息后,調(diào)用 acknowledge() 來確認(rèn)消息已經(jīng)被消費(fèi)。

最后啟動異步支持:

@SpringBootApplication
@EnableAsync // 啟動異步支持
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

通過這種方式,Spring Data Redis 提供了高效且類型安全的接口來操作 Redis Stream,幫助我們在分布式系統(tǒng)中實(shí)現(xiàn)高效的消息隊列。

總結(jié)

以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • redis數(shù)據(jù)一致性之延時雙刪策略詳解

    redis數(shù)據(jù)一致性之延時雙刪策略詳解

    在使用redis時,需要保持redis和數(shù)據(jù)庫數(shù)據(jù)的一致性,最流行的解決方案之一就是延時雙刪策略,今天我們就來詳細(xì)刨析一下,需要的朋友可以參考下
    2023-09-09
  • 查看Redis內(nèi)存信息的命令

    查看Redis內(nèi)存信息的命令

    Redis 是一個開源、高性能的Key-Value數(shù)據(jù)庫,被廣泛應(yīng)用在服務(wù)器各種場景中。本文介紹幾個查看Redis內(nèi)存信息的命令,包括常用的info memory、info keyspace、bigkeys等。
    2020-09-09
  • Redis優(yōu)惠券秒殺企業(yè)實(shí)戰(zhàn)

    Redis優(yōu)惠券秒殺企業(yè)實(shí)戰(zhàn)

    本文主要介紹了Redis優(yōu)惠券秒殺企業(yè)實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • Redis實(shí)現(xiàn)訂單過期刪除的方法步驟

    Redis實(shí)現(xiàn)訂單過期刪除的方法步驟

    本文主要介紹了Redis實(shí)現(xiàn)訂單過期刪除的方法步驟,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • 深入解析Redis中常見的應(yīng)用場景

    深入解析Redis中常見的應(yīng)用場景

    這篇文章主要給大家介紹了關(guān)于Redis中常見的應(yīng)用場景的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-09-09
  • 詳解Redis基本命令與使用場景

    詳解Redis基本命令與使用場景

    REmote DIctionary Server(Redis)是一個由Salvatore Sanfilippo寫的key-value 存儲系統(tǒng),是跨平臺的非關(guān)系型數(shù)據(jù)庫,是一個開源的使用ANSI C語言編寫、遵守BSD協(xié)議、支持網(wǎng)絡(luò)、可基于內(nèi)存、分布式、可選持久性的鍵值對(Key-Value)存儲數(shù)據(jù)庫,并提供多種語言的 API。
    2021-06-06
  • 一文教你學(xué)會Redis的事務(wù)

    一文教你學(xué)會Redis的事務(wù)

    Redis?作為內(nèi)存的存儲中間件,已經(jīng)是面試的面試題必問之一了。今天小編就來和大家一起來聊聊Redis的事務(wù)吧,希望對大家有所幫助
    2022-08-08
  • Redis服務(wù)器優(yōu)化方式

    Redis服務(wù)器優(yōu)化方式

    文章分享了常見的Redis服務(wù)器優(yōu)化技巧和策略,主要包括內(nèi)存管理、持久化配置、連接配置和網(wǎng)絡(luò)優(yōu)化四個方面,內(nèi)存管理主要是設(shè)置maxmemory參數(shù)和選擇合適的內(nèi)存淘汰策略,持久化配置包括RDB持久化和AOF持久化
    2024-09-09
  • Win10下通過Ubuntu安裝Redis的過程

    Win10下通過Ubuntu安裝Redis的過程

    這篇文章主要介紹了Win10下通過Ubuntu安裝Redis,在安裝Ubuntu需要先打開Windows功能,接著創(chuàng)建一個用戶及密碼,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-04-04
  • Redis緩存異常常用解決方案總結(jié)

    Redis緩存異常常用解決方案總結(jié)

    Redis緩存異常問題分別是緩存雪崩,緩存預(yù)熱,緩存穿透,緩存降級,緩存擊穿,本文主要介紹了Redis緩存異常常用解決方案總結(jié),具有一定的參考價值,感興趣的可以了解一下
    2023-12-12

最新評論