欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺功能

 更新時(shí)間:2025年04月22日 15:06:56   作者:starrismq  
在高并發(fā)場景下,為了提高秒殺業(yè)務(wù)的性能,可將部分工作交給 Redis 處理,并通過異步方式執(zhí)行,Redis 提供了多種數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)消息隊(duì)列,總結(jié)三種,本文詳細(xì)介紹Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺功能,感興趣的朋友一起看看吧

1 Redis消息隊(duì)列

在高并發(fā)場景下,為了提高秒殺業(yè)務(wù)的性能,可將部分工作交給 Redis 處理,并通過異步方式執(zhí)行。Redis 提供了多種數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)消息隊(duì)列,總結(jié)三種。

1.1 List 結(jié)構(gòu)

  • 原理:基于 List 結(jié)構(gòu)模擬消息隊(duì)列,使用 BRPUSH 生產(chǎn)消息,BRPOP 消費(fèi)消息。
  • 命令示例
    • 生產(chǎn)消息BRPUSH key value [value ...],將一個(gè)或多個(gè)元素推入到指定列表的頭部。如果列表不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)新的列表。
    • 消費(fèi)消息BRPOP key [key ...] timeout,從指定的一個(gè)或多個(gè)列表中彈出最后一個(gè)元素。如果列表為空,該命令會(huì)導(dǎo)致客戶端阻塞,直到有數(shù)據(jù)可用或超過指定的超時(shí)時(shí)間。
  • 優(yōu)缺點(diǎn)
  • 優(yōu)點(diǎn):不會(huì)內(nèi)存超限、可以持久化、消息有序性。
  • 缺點(diǎn):無法避免數(shù)據(jù)丟失、只支持單消費(fèi)者。

1.2 Pub/Sub 模式

  • 原理:發(fā)布訂閱模式,基本的點(diǎn)對(duì)點(diǎn)消息模型,支持多生產(chǎn)、多消費(fèi)者。
  • 命令示例
    • 生產(chǎn)消息PUBLISH channel message,用于向指定頻道發(fā)布一條消息。
    • 消費(fèi)消息 SUBSCRIBE channel [channel]:訂閱一個(gè)或多個(gè)頻道。
      • UNSUBSCRIBE [channel [channel ...]]:取消訂閱一個(gè)或多個(gè)頻道。
      • PSUBSCRIBE pattern [pattern ...]:訂閱一個(gè)或多個(gè)符合給定模式的頻道,接收消息。
      • PUNSUBSCRIBE [pattern [pattern ...]]:取消訂閱一個(gè)或多個(gè)符合給定模式的頻道。
  • 優(yōu)缺點(diǎn)
    • 優(yōu)點(diǎn):支持多生產(chǎn)、多消費(fèi)者。
    • 缺點(diǎn):不支持持久化、無法避免數(shù)據(jù)丟失,消息堆積有上限(消費(fèi)者會(huì)緩存消息),超出會(huì)丟失消息。

1.3 Stream 結(jié)構(gòu)

  • 原理:Redis 5.0 引入的專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,支持消息可回溯、一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)、可以阻塞讀取。
  • 命令示例
    • 生產(chǎn)消息XADD key *|ID value [value ...],向指定的 Stream 流中添加一個(gè)消息。例如:XADD users * name jack age 21,創(chuàng)建名為 users 的隊(duì)列,并向其中發(fā)送一個(gè)消息,內(nèi)容是 {name=jack,age=21},使用 Redis 自動(dòng)生成 ID。
    • 消費(fèi)消息XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID。例如:
      • XREAD COUNT 1 STREAMS users 0:讀取 users 隊(duì)列中的第一條消息。
      • XREAD COUNT 1 BLOCK 1000 STREAMS users $:阻塞 1 秒鐘后從 users 隊(duì)列中讀取最新消息。
  • 消費(fèi)者組模式
    • 特點(diǎn):消息分流、消息標(biāo)識(shí)、消息確認(rèn)。
    • 命令示例
      • XGROUP CREATE key groupName ID:創(chuàng)建消費(fèi)者組。
      • XGROUP DESTORY key groupName:刪除指定的消費(fèi)者組。
      • XGROUP CREATECONSUMER key groupName consumerName:給指定的消費(fèi)者組添加消費(fèi)者。
      • XGROUP DELCONSUMER key groupName consumerName:刪除消費(fèi)者組中指定消費(fèi)者。
      • XREADGROUP GROUP:從消費(fèi)者組中讀取消息。
  • 優(yōu)缺點(diǎn)
    • 優(yōu)點(diǎn):消息可回溯、可以多消費(fèi)者爭搶消息,加快消費(fèi)速度、可以阻塞讀取、沒有消息漏讀的風(fēng)險(xiǎn)、有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次。
    • 缺點(diǎn):有消息漏讀的風(fēng)險(xiǎn)(單消費(fèi)方式下)。

1.4 Redis Stream消息隊(duì)列的特點(diǎn)

Redis 5.0引入的Stream類型是專門為消息隊(duì)列設(shè)計(jì)的,支持以下特性:

  • 消息持久化:消息存儲(chǔ)在內(nèi)存中,支持持久化到磁盤,避免消息丟失。
  • 消費(fèi)者組(Consumer Group)
    • 消息分流:一個(gè)隊(duì)列可以被多個(gè)消費(fèi)者組訂閱,組內(nèi)多個(gè)消費(fèi)者分?jǐn)傁⑻幚怼?/li>
    • 消息回溯:支持按消息ID回溯歷史消息。
    • 消息確認(rèn)(ACK):消費(fèi)者處理完消息后需確認(rèn),否則消息會(huì)進(jìn)入pending-list等待重試。
  • 阻塞讀取:消費(fèi)者可以阻塞等待新消息,減少CPU空轉(zhuǎn)。
  • 避免消息丟失:通過pending-list機(jī)制,確保消息至少被消費(fèi)一次。

2 秒殺業(yè)務(wù)處理

2.1 使用Lua腳本處理庫存和訂單

目標(biāo):在Redis中完成庫存判斷和訂單校驗(yàn),確保原子性。

-- 參數(shù):優(yōu)惠券ID、用戶ID、訂單ID
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 庫存Key和訂單Key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 判斷庫存是否充足
if (tonumber(redis.call('GET', stockKey)) <= 0 then
    return 1 -- 庫存不足
end
-- 判斷用戶是否已下單
if (redis.call('SISMEMBER', orderKey, userId) == 1 then
    return 2 -- 用戶已下單
end
-- 扣減庫存并記錄訂單
redis.call('DECR', stockKey)
redis.call('SADD', orderKey, userId)
-- 將訂單信息發(fā)送到消息隊(duì)列
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0 -- 成功

腳本說明

  • 原子性操作:庫存檢查、訂單校驗(yàn)、消息發(fā)送在一個(gè)腳本中完成。
  • 消息發(fā)送:使用XADD將訂單信息寫入stream.orders隊(duì)列。

 2.2 創(chuàng)建消費(fèi)者組

  • XGROUP CREATE stream.orders g1 0 MKSTREAM

g1:消費(fèi)者組名稱。

MKSTREAM:如果隊(duì)列不存在則自動(dòng)創(chuàng)建。

2.3 Java代碼實(shí)現(xiàn)

  • init 方法:在類初始化時(shí)創(chuàng)建消息隊(duì)列,并啟動(dòng)一個(gè)線程任務(wù)從消息隊(duì)列中獲取訂單信息。
  • VoucherOrderHandler 類:實(shí)現(xiàn) Runnable 接口,作為線程任務(wù),不斷從消息隊(duì)列中獲取訂單信息。如果獲取成功,將消息轉(zhuǎn)換為 VoucherOrder 對(duì)象,調(diào)用 handleVoucherOrder 方法處理訂單,并進(jìn)行 ACK 確認(rèn);如果出現(xiàn)異常,調(diào)用 handlePendingList 方法處理異常消息。
  • handlePendingList 方法:從 pendingList 中獲取訂單信息,處理訂單并進(jìn)行 ACK 確認(rèn),直到 pendingList 中沒有消息。
  • handleVoucherOrder 方法:使用 Redisson 分布式鎖確保一人一單,調(diào)用代理對(duì)象的 createVoucherOrder 方法創(chuàng)建訂單。
  • seckillVoucher 方法:執(zhí)行 Lua 腳本判斷用戶是否具有秒殺資格,如果具有資格,將訂單信息發(fā)送到消息隊(duì)列,并返回下單成功信息。
  • createVoucherOrder 方法:判斷當(dāng)前用戶是否是第一單,如果是則扣減庫存并將訂單保存到數(shù)據(jù)庫。

系統(tǒng)啟動(dòng)與初始化

系統(tǒng)啟動(dòng)時(shí),VoucherOrderServiceImpl 類的 @PostConstruct 注解會(huì)觸發(fā) init 方法執(zhí)行。該方法先加載創(chuàng)建消息隊(duì)列的 Lua 腳本,通過 stringRedisTemplate.execute 方法執(zhí)行腳本創(chuàng)建 Redis Stream 消息隊(duì)列和消費(fèi)者組。若創(chuàng)建成功或隊(duì)列已存在,會(huì)記錄相應(yīng)日志。之后,使用線程池 SECKILL_ORDER_EXECUTOR 啟動(dòng) VoucherOrderHandler 線程,該線程負(fù)責(zé)后續(xù)從消息隊(duì)列獲取訂單信息并處理。

用戶發(fā)起秒殺請(qǐng)求

用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)調(diào)用 VoucherOrderServiceImplseckillVoucher 方法。此方法先從 ThreadLocalUtls 中獲取用戶 ID,用 redisIdWorker 生成訂單 ID。接著執(zhí)行判斷用戶秒殺資格的 Lua 腳本,該腳本接收優(yōu)惠券 ID、用戶 ID 和訂單 ID 作為參數(shù)。若腳本返回值表明庫存不足或用戶已下單,方法返回相應(yīng)的失敗提示;若返回值為 0,說明用戶有秒殺資格,創(chuàng)建代理對(duì)象并返回下單成功結(jié)果。

Lua 腳本執(zhí)行邏輯

Lua 腳本接收到參數(shù)后,根據(jù)優(yōu)惠券 ID 拼接庫存和訂單的 Redis key。先通過 GET 命令獲取庫存,若庫存小于等于 0 則返回 1 表示庫存不足。若庫存充足,使用 SISMEMBER 命令檢查用戶是否已下單,若已下單則返回 2。若庫存充足且用戶未下單,使用 INCRBY 命令扣減庫存,SADD 命令記錄訂單信息,最后返回 0 表示下單成功。

訂單處理線程工作

VoucherOrderHandler 線程啟動(dòng)后進(jìn)入無限循環(huán),不斷從 Redis Stream 消息隊(duì)列獲取訂單信息。若未獲取到消息,繼續(xù)下一次循環(huán);若獲取到消息,將消息轉(zhuǎn)換為 VoucherOrder 對(duì)象,調(diào)用 handleVoucherOrder 方法處理訂單,處理完成后向消息隊(duì)列發(fā)送 ACK 確認(rèn)消息。若處理過程中出現(xiàn)異常,調(diào)用 handlePendingList 方法處理異常消息。

訂單處理方法 handleVoucherOrder

handleVoucherOrder 方法接收 VoucherOrder 對(duì)象,根據(jù)用戶 ID 獲取 Redisson 分布式鎖。嘗試獲取鎖,若失敗記錄錯(cuò)誤日志并返回;若成功,調(diào)用代理對(duì)象的 createVoucherOrder 方法創(chuàng)建訂單,最后釋放鎖。

訂單創(chuàng)建方法 createVoucherOrder

該方法先判斷當(dāng)前用戶是否是第一單,通過查詢數(shù)據(jù)庫中該用戶的訂單數(shù)量來判斷。若不是第一單,記錄錯(cuò)誤日志并返回;若是第一單,嘗試扣減秒殺券庫存,若扣減失敗拋出異常。若庫存扣減成功,將訂單信息保存到數(shù)據(jù)庫,若保存失敗也拋出異常。

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    /**
     * 當(dāng)前類初始化完畢就立馬執(zhí)行該方法
     */
    @PostConstruct
    private void init() {
        // 創(chuàng)建消息隊(duì)列
        DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
        mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
        mqScript.setResultType(Long.class);
        Long result = null;
        try {
            result = stringRedisTemplate.execute(mqScript,
                    Collections.emptyList(),
                    QUEUE_NAME,
                    GROUP_NAME);
        } catch (Exception e) {
            log.error("隊(duì)列創(chuàng)建失敗", e);
            return;
        }
        int r = result.intValue();
        String info = r == 1 ? "隊(duì)列創(chuàng)建成功" : "隊(duì)列已存在";
        log.debug(info);
        // 執(zhí)行線程任務(wù)
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }
    /**
     * 線程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    /**
     * 隊(duì)列名
     */
    private static final String QUEUE_NAME = "stream.orders";
    /**
     * 組名
     */
    private static final String GROUP_NAME = "g1";
    /**
     * 線程任務(wù): 不斷從消息隊(duì)列中獲取訂單
     */
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1、從消息隊(duì)列中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order >
                    List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                            Consumer.from(GROUP_NAME, "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                            StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
                    );
                    // 2、判斷消息獲取是否成功
                    if (messageList == null || messageList.isEmpty()) {
                        // 2.1 消息獲取失敗,說明沒有消息,進(jìn)入下一次循環(huán)獲取消息
                        continue;
                    }
                    // 3、消息獲取成功,可以下單
                    // 將消息轉(zhuǎn)成VoucherOrder對(duì)象
                    MapRecord<String, Object, Object> record = messageList.get(0);
                    Map<Object, Object> messageMap = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                    handleVoucherOrder(voucherOrder);
                    // 4、ACK確認(rèn) SACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常", e);
                    // 處理異常消息
                    handlePendingList();
                }
            }
        }
    }
    private void handlePendingList() {
        while (true) {
            try {
                // 1、從pendingList中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
                List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                        Consumer.from(GROUP_NAME, "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                        StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
                );
                // 2、判斷pendingList中是否有效性
                if (messageList == null || messageList.isEmpty()) {
                    // 2.1 pendingList中沒有消息,直接結(jié)束循環(huán)
                    break;
                }
                // 3、pendingList中有消息
                // 將消息轉(zhuǎn)成VoucherOrder對(duì)象
                MapRecord<String, Object, Object> record = messageList.get(0);
                Map<Object, Object> messageMap = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                // 4、ACK確認(rèn) SACK stream.orders g1 id
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
            } catch (Exception e) {
                log.error("處理訂單異常", e);
                // 這里不用調(diào)自己,直接就進(jìn)入下一次循環(huán),再從pendingList中取,這里只需要休眠一下,防止獲取消息太頻繁
                try {
                    Thread.sleep(20);
                } catch (InterruptedException ex) {
                    log.error("線程休眠異常", ex);
                }
            }
        }
    }
    /**
     * 創(chuàng)建訂單
     *
     * @param voucherOrder
     */
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
        boolean isLock = lock.tryLock();
        if (!isLock) {
            // 索取鎖失敗,重試或者直接拋異常(這個(gè)業(yè)務(wù)是一人一單,所以直接返回失敗信息)
            log.error("一人只能下一單");
            return;
        }
        try {
            // 創(chuàng)建訂單(使用代理對(duì)象調(diào)用,是為了確保事務(wù)生效)
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    /**
     * 加載 判斷秒殺券庫存是否充足 并且 判斷用戶是否已下單 的Lua腳本
     */
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    /**
     * VoucherOrderServiceImpl類的代理對(duì)象
     * 將代理對(duì)象的作用域進(jìn)行提升,方面子線程取用
     */
    private IVoucherOrderService proxy;
    /**
     * 搶購秒殺券
     *
     * @param voucherId
     * @return
     */
    @Transactional
    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = ThreadLocalUtls.getUser().getId();
        long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
        // 1、執(zhí)行Lua腳本,判斷用戶是否具有秒殺資格
        Long result = null;
        try {
            result = stringRedisTemplate.execute(
                    SECKILL_SCRIPT,
                    Collections.emptyList(),
                    voucherId.toString(),
                    userId.toString(),
                    String.valueOf(orderId)
            );
        } catch (Exception e) {
            log.error("Lua腳本執(zhí)行失敗");
            throw new RuntimeException(e);
        }
        if (result != null && !result.equals(0L)) {
            // result為1表示庫存不足,result為2表示用戶已下單
            int r = result.intValue();
            return Result.fail(r == 2 ? "不能重復(fù)下單" : "庫存不足");
        }
        // 2、result為0,下單成功,直接返回ok
        // 索取鎖成功,創(chuàng)建代理對(duì)象,使用代理對(duì)象調(diào)用第三方事務(wù)方法, 防止事務(wù)失效
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        this.proxy = proxy;
        return Result.ok();
    }
    /**
     * 創(chuàng)建訂單
     *
     * @param voucherOrder
     * @return
     */
    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        // 1、判斷當(dāng)前用戶是否是第一單
        int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                .eq(VoucherOrder::getUserId, userId));
        if (count >= 1) {
            // 當(dāng)前用戶不是第一單
            log.error("當(dāng)前用戶不是第一單");
            return;
        }
        // 2、用戶是第一單,可以下單,秒殺券庫存數(shù)量減一
        boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                .eq(SeckillVoucher::getVoucherId, voucherId)
                .gt(SeckillVoucher::getStock, 0)
                .setSql("stock = stock -1"));
        if (!flag) {
            throw new RuntimeException("秒殺券扣減失敗");
        }
        // 3、將訂單保存到數(shù)據(jù)庫
        flag = this.save(voucherOrder);
        if (!flag) {
            throw new RuntimeException("創(chuàng)建秒殺券訂單失敗");
        }
    }
}

3 秒殺流程剖析

3.1 初始化操作

Lua 腳本準(zhǔn)備:編寫 Lua 腳本,接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),判斷庫存是否充足以及用戶是否已下單。若庫存不足返回 1,用戶已下單返回 2,下單成功返回 0。

-- 優(yōu)惠券id
local voucherId = ARGV[1];
-- 用戶id
local userId = ARGV[2];
local stockKey = 'seckill:stock:' .. voucherId;
local orderKey = 'seckill:order:' .. voucherId;
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then
    return 1;
end
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
    return 2;
end
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
return 0;

消息隊(duì)列創(chuàng)建:在 Java 代碼的 @PostConstruct 方法中,通過執(zhí)行 Lua 腳本創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組。

@PostConstruct
private void init() {
    DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
    mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
    mqScript.setResultType(Long.class);
    Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME);
    if (result == 1) {
        log.debug("隊(duì)列創(chuàng)建成功");
    } else {
        log.debug("隊(duì)列已存在");
    }
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

3.2 秒殺請(qǐng)求處理

資格判斷:用戶發(fā)起秒殺請(qǐng)求,系統(tǒng)執(zhí)行 Lua 腳本,根據(jù)返回結(jié)果判斷用戶是否具有秒殺資格。若返回 1 表示庫存不足,返回 2 表示用戶已下單,均返回失敗信息;返回 0 則表示具有秒殺資格。

@Override
public Result seckillVoucher(Long voucherId) {
    Long userId = ThreadLocalUtls.getUser().getId();
    long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
    Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), 
                                            voucherId.toString(), userId.toString(), String.valueOf(orderId));
    if (result != 0) {
        return Result.fail(result == 2 ? "不能重復(fù)下單" : "庫存不足");
    }
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    this.proxy = proxy;
    return Result.ok();
}

訂單入隊(duì):具有秒殺資格后,生成訂單 ID,創(chuàng)建訂單對(duì)象,將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列。

3.3 消息隊(duì)列消費(fèi)

訂單處理線程:使用線程池啟動(dòng)一個(gè)線程任務(wù) VoucherOrderHandler,不斷從消息隊(duì)列中獲取訂單信息。

private class VoucherOrderHandler implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
                    Consumer.from(GROUP_NAME, "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
                    StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
                );
                if (messageList == null || messageList.isEmpty()) {
                    continue;
                }
                MapRecord<String, Object, Object> record = messageList.get(0);
                Map<Object, Object> messageMap = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
                handleVoucherOrder(voucherOrder);
                stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
            } catch (Exception e) {
                log.error("處理訂單異常", e);
                handlePendingList();
            }
        }
    }
}

異常處理:若處理訂單過程中出現(xiàn)異常,調(diào)用 handlePendingList 方法從 pendingList 中獲取未處理的訂單信息,繼續(xù)處理。

 3.4 訂單創(chuàng)建

分布式鎖保障:使用 Redisson 分布式鎖,確保同一用戶同一時(shí)間只能創(chuàng)建一個(gè)訂單,避免一人多單問題。

private void handleVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
    boolean isLock = lock.tryLock();
    if (!isLock) {
        log.error("一人只能下一單");
        return;
    }
    try {
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        lock.unlock();
    }
}

數(shù)據(jù)庫操作:判斷用戶是否是第一單,若是則扣減庫存并將訂單保存到數(shù)據(jù)庫。

@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
    Long userId = voucherOrder.getUserId();
    Long voucherId = voucherOrder.getVoucherId();
    int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));
    if (count >= 1) {
        log.error("當(dāng)前用戶不是第一單");
        return;
    }
    boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
        .eq(SeckillVoucher::getVoucherId, voucherId)
        .gt(SeckillVoucher::getStock, 0)
        .setSql("stock = stock -1"));
    if (!flag) {
        throw new RuntimeException("秒殺券扣減失敗");
    }
    flag = this.save(voucherOrder);
    if (!flag) {
        throw new RuntimeException("創(chuàng)建秒殺券訂單失敗");
    }
}

4 秒殺流程(文字版)

1. 初始化準(zhǔn)備

在系統(tǒng)啟動(dòng)階段,我們會(huì)完成一些必要的初始化工作。一方面,編寫好用于判斷庫存和訂單情況的 Lua 腳本。這個(gè)腳本會(huì)接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),通過 Redis 的相關(guān)命令判斷庫存是否充足以及用戶是否已下單,保證這些判斷操作的原子性。另一方面,在 Java 代碼里利用 @PostConstruct 注解,通過執(zhí)行另一個(gè) Lua 腳本來創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組,為后續(xù)處理訂單消息做好準(zhǔn)備。

2. 用戶請(qǐng)求與資格判斷

當(dāng)用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)會(huì)立即執(zhí)行之前準(zhǔn)備好的 Lua 腳本來判斷用戶是否具有秒殺資格。

  • 如果腳本返回庫存不足的標(biāo)識(shí),系統(tǒng)會(huì)迅速返回 “庫存不足” 的提示信息,結(jié)束本次請(qǐng)求處理。
  • 若返回用戶已下單的標(biāo)識(shí),就會(huì)返回 “不能重復(fù)下單” 的提示,流程終止。
  • 當(dāng)判定用戶具有秒殺資格時(shí),系統(tǒng)會(huì)生成唯一的訂單 ID,創(chuàng)建訂單對(duì)象,然后將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列,進(jìn)入異步處理階段。

3. 消息隊(duì)列消費(fèi)

有一個(gè)專門的消息隊(duì)列消費(fèi)者線程會(huì)持續(xù)監(jiān)聽 Redis 的 Stream 消息隊(duì)列。

  • 如果沒有獲取到新的訂單信息,線程會(huì)繼續(xù)保持監(jiān)聽狀態(tài)。
  • 一旦獲取到訂單信息,線程會(huì)馬上嘗試獲取 Redisson 分布式鎖。這個(gè)鎖非常關(guān)鍵,它能確保同一用戶同一時(shí)間只能處理一個(gè)訂單,有效避免一人多單的問題。

4. 訂單創(chuàng)建與處理

獲取到鎖之后,系統(tǒng)會(huì)進(jìn)一步處理訂單。

  • 首先判斷當(dāng)前用戶是否是第一單。如果不是,系統(tǒng)會(huì)記錄錯(cuò)誤日志并釋放鎖,結(jié)束流程。
  • 若是第一單,系統(tǒng)會(huì)嘗試扣減庫存。如果庫存扣減失敗,會(huì)拋出異常并釋放鎖;若扣減成功,就將訂單信息保存到數(shù)據(jù)庫。
  • 在保存訂單時(shí),若保存失敗會(huì)拋出異常并釋放鎖;保存成功后,系統(tǒng)會(huì)向 Redis 的 Stream 消息隊(duì)列發(fā)送 ACK 確認(rèn)消息,最后釋放鎖,完成整個(gè)秒殺流程。

到此這篇關(guān)于Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺的文章就介紹到這了,更多相關(guān)Redis異步秒殺內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis?Brpop?命令作用詳解

    Redis?Brpop?命令作用詳解

    BRPOP?是一個(gè)阻塞的列表彈出原語,該命令會(huì)按照給出的?key?順序查看?list,并在找到的第一個(gè)非空?list?的尾部彈出一個(gè)元素,今天通過本文給大家介紹Redis?Brpop?命令相關(guān)知識(shí),感興趣的朋友一起看看吧
    2023-07-07
  • Redis解決Session共享問題的方法詳解

    Redis解決Session共享問題的方法詳解

    這篇文章主要為大家詳細(xì)介紹了分布式系統(tǒng)Redis解決Session共享問題的方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2022-07-07
  • Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程

    Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程

    這篇文章主要介紹了Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-04-04
  • 解決Redis開啟遠(yuǎn)程訪問及密碼問題

    解決Redis開啟遠(yuǎn)程訪問及密碼問題

    這篇文章主要介紹了Redis開啟遠(yuǎn)程訪問及密碼的教程,文中給大家提到了Redis啟動(dòng)報(bào)錯(cuò)解決方法,需要的朋友可以參考下
    2019-10-10
  • Redis緩存降級(jí)的四種策略

    Redis緩存降級(jí)的四種策略

    在高并發(fā)系統(tǒng)架構(gòu)中,Redis作為核心緩存組件扮演著至關(guān)重要的角色,它不僅能夠顯著提升系統(tǒng)響應(yīng)速度,還能有效減輕數(shù)據(jù)庫壓力,然而,當(dāng)Redis服務(wù)出現(xiàn)故障、性能下降或連接超時(shí)時(shí),如果沒有適當(dāng)?shù)慕导?jí)機(jī)制,可能導(dǎo)致系統(tǒng)雪崩,所以本文給大家介紹了Redis緩存降級(jí)的四種策略
    2025-04-04
  • redis如何后臺(tái)啟動(dòng)的方法

    redis如何后臺(tái)啟動(dòng)的方法

    這篇文章主要介紹了redis如何后臺(tái)啟動(dòng)的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Redis概述及l(fā)inux安裝redis的詳細(xì)教程

    Redis概述及l(fā)inux安裝redis的詳細(xì)教程

    這篇文章主要介紹了Redis概述及l(fā)inux安裝redis的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-10-10
  • redis服務(wù)如何啟動(dòng)

    redis服務(wù)如何啟動(dòng)

    這篇文章主要介紹了redis服務(wù)如何啟動(dòng)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • Redis中LRU算法和LFU算法的區(qū)別小結(jié)

    Redis中LRU算法和LFU算法的區(qū)別小結(jié)

    在Redis中,LRU算法和LFU算法是兩種常用的緩存淘汰算法,它們可以幫助我們優(yōu)化緩存性能,本文主要介紹了Redis中LRU算法和LFU算法的區(qū)別,感興趣的可以了解一下
    2023-12-12
  • redis中的事務(wù)操作案例分析

    redis中的事務(wù)操作案例分析

    這篇文章主要介紹了redis中的事務(wù)操作案例,結(jié)合具體實(shí)例形式詳細(xì)分析了redis事務(wù)操作的概念、原理、使用技巧與相關(guān)注意事項(xiàng),需要的朋友可以參考下
    2019-07-07

最新評(píng)論