Redis消息隊列實現(xiàn)異步秒殺功能
1 Redis消息隊列
在高并發(fā)場景下,為了提高秒殺業(yè)務的性能,可將部分工作交給 Redis 處理,并通過異步方式執(zhí)行。Redis 提供了多種數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)消息隊列,總結(jié)三種。
1.1 List 結(jié)構(gòu)
- 原理:基于 List 結(jié)構(gòu)模擬消息隊列,使用
BRPUSH
生產(chǎn)消息,BRPOP
消費消息。 - 命令示例
- 生產(chǎn)消息:
BRPUSH key value [value ...]
,將一個或多個元素推入到指定列表的頭部。如果列表不存在,會自動創(chuàng)建一個新的列表。 - 消費消息:
BRPOP key [key ...] timeout
,從指定的一個或多個列表中彈出最后一個元素。如果列表為空,該命令會導致客戶端阻塞,直到有數(shù)據(jù)可用或超過指定的超時時間。
- 生產(chǎn)消息:
- 優(yōu)缺點
- 優(yōu)點:不會內(nèi)存超限、可以持久化、消息有序性。
- 缺點:無法避免數(shù)據(jù)丟失、只支持單消費者。
1.2 Pub/Sub 模式
- 原理:發(fā)布訂閱模式,基本的點對點消息模型,支持多生產(chǎn)、多消費者。
- 命令示例
- 生產(chǎn)消息:
PUBLISH channel message
,用于向指定頻道發(fā)布一條消息。 - 消費消息
SUBSCRIBE channel [channel]
:訂閱一個或多個頻道。UNSUBSCRIBE [channel [channel ...]]
:取消訂閱一個或多個頻道。PSUBSCRIBE pattern [pattern ...]
:訂閱一個或多個符合給定模式的頻道,接收消息。PUNSUBSCRIBE [pattern [pattern ...]]
:取消訂閱一個或多個符合給定模式的頻道。
- 生產(chǎn)消息:
- 優(yōu)缺點
- 優(yōu)點:支持多生產(chǎn)、多消費者。
- 缺點:不支持持久化、無法避免數(shù)據(jù)丟失,消息堆積有上限(消費者會緩存消息),超出會丟失消息。
1.3 Stream 結(jié)構(gòu)
- 原理:Redis 5.0 引入的專門為消息隊列設計的數(shù)據(jù)類型,支持消息可回溯、一個消息可以被多個消費者消費、可以阻塞讀取。
- 命令示例
- 生產(chǎn)消息:
XADD key *|ID value [value ...]
,向指定的 Stream 流中添加一個消息。例如:XADD users * name jack age 21
,創(chuàng)建名為users
的隊列,并向其中發(fā)送一個消息,內(nèi)容是{name=jack,age=21}
,使用 Redis 自動生成 ID。 - 消費消息:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID
。例如:XREAD COUNT 1 STREAMS users 0
:讀取users
隊列中的第一條消息。XREAD COUNT 1 BLOCK 1000 STREAMS users $
:阻塞 1 秒鐘后從users
隊列中讀取最新消息。
- 生產(chǎn)消息:
- 消費者組模式
- 特點:消息分流、消息標識、消息確認。
- 命令示例
XGROUP CREATE key groupName ID
:創(chuàng)建消費者組。XGROUP DESTORY key groupName
:刪除指定的消費者組。XGROUP CREATECONSUMER key groupName consumerName
:給指定的消費者組添加消費者。XGROUP DELCONSUMER key groupName consumerName
:刪除消費者組中指定消費者。XREADGROUP GROUP
:從消費者組中讀取消息。
- 優(yōu)缺點
- 優(yōu)點:消息可回溯、可以多消費者爭搶消息,加快消費速度、可以阻塞讀取、沒有消息漏讀的風險、有消息確認機制,保證消息至少被消費一次。
- 缺點:有消息漏讀的風險(單消費方式下)。
1.4 Redis Stream消息隊列的特點
Redis 5.0引入的Stream
類型是專門為消息隊列設計的,支持以下特性:
- 消息持久化:消息存儲在內(nèi)存中,支持持久化到磁盤,避免消息丟失。
- 消費者組(Consumer Group):
- 消息分流:一個隊列可以被多個消費者組訂閱,組內(nèi)多個消費者分攤消息處理。
- 消息回溯:支持按消息ID回溯歷史消息。
- 消息確認(ACK):消費者處理完消息后需確認,否則消息會進入
pending-list
等待重試。
- 阻塞讀取:消費者可以阻塞等待新消息,減少CPU空轉(zhuǎn)。
- 避免消息丟失:通過
pending-list
機制,確保消息至少被消費一次。
2 秒殺業(yè)務處理
2.1 使用Lua腳本處理庫存和訂單
目標:在Redis中完成庫存判斷和訂單校驗,確保原子性。
-- 參數(shù):優(yōu)惠券ID、用戶ID、訂單ID local voucherId = ARGV[1] local userId = ARGV[2] local orderId = ARGV[3] -- 庫存Key和訂單Key local stockKey = 'seckill:stock:' .. voucherId local orderKey = 'seckill:order:' .. voucherId -- 判斷庫存是否充足 if (tonumber(redis.call('GET', stockKey)) <= 0 then return 1 -- 庫存不足 end -- 判斷用戶是否已下單 if (redis.call('SISMEMBER', orderKey, userId) == 1 then return 2 -- 用戶已下單 end -- 扣減庫存并記錄訂單 redis.call('DECR', stockKey) redis.call('SADD', orderKey, userId) -- 將訂單信息發(fā)送到消息隊列 redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0 -- 成功
腳本說明:
- 原子性操作:庫存檢查、訂單校驗、消息發(fā)送在一個腳本中完成。
- 消息發(fā)送:使用
XADD
將訂單信息寫入stream.orders
隊列。
2.2 創(chuàng)建消費者組
XGROUP CREATE stream.orders g1 0 MKSTREAM
g1
:消費者組名稱。
MKSTREAM
:如果隊列不存在則自動創(chuàng)建。
2.3 Java代碼實現(xiàn)
init
方法:在類初始化時創(chuàng)建消息隊列,并啟動一個線程任務從消息隊列中獲取訂單信息。VoucherOrderHandler
類:實現(xiàn)Runnable
接口,作為線程任務,不斷從消息隊列中獲取訂單信息。如果獲取成功,將消息轉(zhuǎn)換為VoucherOrder
對象,調(diào)用handleVoucherOrder
方法處理訂單,并進行 ACK 確認;如果出現(xiàn)異常,調(diào)用handlePendingList
方法處理異常消息。handlePendingList
方法:從pendingList
中獲取訂單信息,處理訂單并進行 ACK 確認,直到pendingList
中沒有消息。handleVoucherOrder
方法:使用 Redisson 分布式鎖確保一人一單,調(diào)用代理對象的createVoucherOrder
方法創(chuàng)建訂單。seckillVoucher
方法:執(zhí)行 Lua 腳本判斷用戶是否具有秒殺資格,如果具有資格,將訂單信息發(fā)送到消息隊列,并返回下單成功信息。createVoucherOrder
方法:判斷當前用戶是否是第一單,如果是則扣減庫存并將訂單保存到數(shù)據(jù)庫。
系統(tǒng)啟動與初始化
系統(tǒng)啟動時,VoucherOrderServiceImpl
類的 @PostConstruct
注解會觸發(fā) init
方法執(zhí)行。該方法先加載創(chuàng)建消息隊列的 Lua 腳本,通過 stringRedisTemplate.execute
方法執(zhí)行腳本創(chuàng)建 Redis Stream 消息隊列和消費者組。若創(chuàng)建成功或隊列已存在,會記錄相應日志。之后,使用線程池 SECKILL_ORDER_EXECUTOR
啟動 VoucherOrderHandler
線程,該線程負責后續(xù)從消息隊列獲取訂單信息并處理。
用戶發(fā)起秒殺請求
用戶發(fā)起秒殺請求后,系統(tǒng)調(diào)用 VoucherOrderServiceImpl
的 seckillVoucher
方法。此方法先從 ThreadLocalUtls
中獲取用戶 ID,用 redisIdWorker
生成訂單 ID。接著執(zhí)行判斷用戶秒殺資格的 Lua 腳本,該腳本接收優(yōu)惠券 ID、用戶 ID 和訂單 ID 作為參數(shù)。若腳本返回值表明庫存不足或用戶已下單,方法返回相應的失敗提示;若返回值為 0,說明用戶有秒殺資格,創(chuàng)建代理對象并返回下單成功結(jié)果。
Lua 腳本執(zhí)行邏輯
Lua 腳本接收到參數(shù)后,根據(jù)優(yōu)惠券 ID 拼接庫存和訂單的 Redis key。先通過 GET
命令獲取庫存,若庫存小于等于 0 則返回 1 表示庫存不足。若庫存充足,使用 SISMEMBER
命令檢查用戶是否已下單,若已下單則返回 2。若庫存充足且用戶未下單,使用 INCRBY
命令扣減庫存,SADD
命令記錄訂單信息,最后返回 0 表示下單成功。
訂單處理線程工作
VoucherOrderHandler
線程啟動后進入無限循環(huán),不斷從 Redis Stream 消息隊列獲取訂單信息。若未獲取到消息,繼續(xù)下一次循環(huán);若獲取到消息,將消息轉(zhuǎn)換為 VoucherOrder
對象,調(diào)用 handleVoucherOrder
方法處理訂單,處理完成后向消息隊列發(fā)送 ACK 確認消息。若處理過程中出現(xiàn)異常,調(diào)用 handlePendingList
方法處理異常消息。
訂單處理方法 handleVoucherOrder
handleVoucherOrder
方法接收 VoucherOrder
對象,根據(jù)用戶 ID 獲取 Redisson 分布式鎖。嘗試獲取鎖,若失敗記錄錯誤日志并返回;若成功,調(diào)用代理對象的 createVoucherOrder
方法創(chuàng)建訂單,最后釋放鎖。
訂單創(chuàng)建方法 createVoucherOrder
該方法先判斷當前用戶是否是第一單,通過查詢數(shù)據(jù)庫中該用戶的訂單數(shù)量來判斷。若不是第一單,記錄錯誤日志并返回;若是第一單,嘗試扣減秒殺券庫存,若扣減失敗拋出異常。若庫存扣減成功,將訂單信息保存到數(shù)據(jù)庫,若保存失敗也拋出異常。
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; /** * 當前類初始化完畢就立馬執(zhí)行該方法 */ @PostConstruct private void init() { // 創(chuàng)建消息隊列 DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = null; try { result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); } catch (Exception e) { log.error("隊列創(chuàng)建失敗", e); return; } int r = result.intValue(); String info = r == 1 ? "隊列創(chuàng)建成功" : "隊列已存在"; log.debug(info); // 執(zhí)行線程任務 SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } /** * 線程池 */ private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); /** * 隊列名 */ private static final String QUEUE_NAME = "stream.orders"; /** * 組名 */ private static final String GROUP_NAME = "g1"; /** * 線程任務: 不斷從消息隊列中獲取訂單 */ private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1、從消息隊列中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order > List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); // 2、判斷消息獲取是否成功 if (messageList == null || messageList.isEmpty()) { // 2.1 消息獲取失敗,說明沒有消息,進入下一次循環(huán)獲取消息 continue; } // 3、消息獲取成功,可以下單 // 將消息轉(zhuǎn)成VoucherOrder對象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK確認 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); // 處理異常消息 handlePendingList(); } } } } private void handlePendingList() { while (true) { try { // 1、從pendingList中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0 List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.from("0")) ); // 2、判斷pendingList中是否有效性 if (messageList == null || messageList.isEmpty()) { // 2.1 pendingList中沒有消息,直接結(jié)束循環(huán) break; } // 3、pendingList中有消息 // 將消息轉(zhuǎn)成VoucherOrder對象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK確認 SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); // 這里不用調(diào)自己,直接就進入下一次循環(huán),再從pendingList中取,這里只需要休眠一下,防止獲取消息太頻繁 try { Thread.sleep(20); } catch (InterruptedException ex) { log.error("線程休眠異常", ex); } } } } /** * 創(chuàng)建訂單 * * @param voucherOrder */ private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { // 索取鎖失敗,重試或者直接拋異常(這個業(yè)務是一人一單,所以直接返回失敗信息) log.error("一人只能下一單"); return; } try { // 創(chuàng)建訂單(使用代理對象調(diào)用,是為了確保事務生效) proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } /** * 加載 判斷秒殺券庫存是否充足 并且 判斷用戶是否已下單 的Lua腳本 */ private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } /** * VoucherOrderServiceImpl類的代理對象 * 將代理對象的作用域進行提升,方面子線程取用 */ private IVoucherOrderService proxy; /** * 搶購秒殺券 * * @param voucherId * @return */ @Transactional @Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); // 1、執(zhí)行Lua腳本,判斷用戶是否具有秒殺資格 Long result = null; try { result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); } catch (Exception e) { log.error("Lua腳本執(zhí)行失敗"); throw new RuntimeException(e); } if (result != null && !result.equals(0L)) { // result為1表示庫存不足,result為2表示用戶已下單 int r = result.intValue(); return Result.fail(r == 2 ? "不能重復下單" : "庫存不足"); } // 2、result為0,下單成功,直接返回ok // 索取鎖成功,創(chuàng)建代理對象,使用代理對象調(diào)用第三方事務方法, 防止事務失效 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); } /** * 創(chuàng)建訂單 * * @param voucherOrder * @return */ @Transactional @Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); // 1、判斷當前用戶是否是第一單 int count = this.count(new LambdaQueryWrapper<VoucherOrder>() .eq(VoucherOrder::getUserId, userId)); if (count >= 1) { // 當前用戶不是第一單 log.error("當前用戶不是第一單"); return; } // 2、用戶是第一單,可以下單,秒殺券庫存數(shù)量減一 boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒殺券扣減失敗"); } // 3、將訂單保存到數(shù)據(jù)庫 flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("創(chuàng)建秒殺券訂單失敗"); } } }
3 秒殺流程剖析
3.1 初始化操作
Lua 腳本準備:編寫 Lua 腳本,接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),判斷庫存是否充足以及用戶是否已下單。若庫存不足返回 1,用戶已下單返回 2,下單成功返回 0。
-- 優(yōu)惠券id local voucherId = ARGV[1]; -- 用戶id local userId = ARGV[2]; local stockKey = 'seckill:stock:' .. voucherId; local orderKey = 'seckill:order:' .. voucherId; local stock = redis.call('GET', stockKey); if (tonumber(stock) <= 0) then return 1; end if (redis.call('SISMEMBER', orderKey, userId) == 1) then return 2; end redis.call('INCRBY', stockKey, -1); redis.call('SADD', orderKey, userId); return 0;
消息隊列創(chuàng)建:在 Java 代碼的 @PostConstruct
方法中,通過執(zhí)行 Lua 腳本創(chuàng)建 Redis 的 Stream 消息隊列和消費者組。
@PostConstruct private void init() { DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); if (result == 1) { log.debug("隊列創(chuàng)建成功"); } else { log.debug("隊列已存在"); } SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }
3.2 秒殺請求處理
資格判斷:用戶發(fā)起秒殺請求,系統(tǒng)執(zhí)行 Lua 腳本,根據(jù)返回結(jié)果判斷用戶是否具有秒殺資格。若返回 1 表示庫存不足,返回 2 表示用戶已下單,均返回失敗信息;返回 0 則表示具有秒殺資格。
@Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId)); if (result != 0) { return Result.fail(result == 2 ? "不能重復下單" : "庫存不足"); } IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); }
訂單入隊:具有秒殺資格后,生成訂單 ID,創(chuàng)建訂單對象,將訂單信息發(fā)送到 Redis 的 Stream 消息隊列。
3.3 消息隊列消費
訂單處理線程:使用線程池啟動一個線程任務 VoucherOrderHandler
,不斷從消息隊列中獲取訂單信息。
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); if (messageList == null || messageList.isEmpty()) { continue; } MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); handlePendingList(); } } } }
異常處理:若處理訂單過程中出現(xiàn)異常,調(diào)用 handlePendingList
方法從 pendingList
中獲取未處理的訂單信息,繼續(xù)處理。
3.4 訂單創(chuàng)建
分布式鎖保障:使用 Redisson 分布式鎖,確保同一用戶同一時間只能創(chuàng)建一個訂單,避免一人多單問題。
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("一人只能下一單"); return; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
數(shù)據(jù)庫操作:判斷用戶是否是第一單,若是則扣減庫存并將訂單保存到數(shù)據(jù)庫。
@Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId)); if (count >= 1) { log.error("當前用戶不是第一單"); return; } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒殺券扣減失敗"); } flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("創(chuàng)建秒殺券訂單失敗"); } }
4 秒殺流程(文字版)
1. 初始化準備
在系統(tǒng)啟動階段,我們會完成一些必要的初始化工作。一方面,編寫好用于判斷庫存和訂單情況的 Lua 腳本。這個腳本會接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),通過 Redis 的相關命令判斷庫存是否充足以及用戶是否已下單,保證這些判斷操作的原子性。另一方面,在 Java 代碼里利用 @PostConstruct
注解,通過執(zhí)行另一個 Lua 腳本來創(chuàng)建 Redis 的 Stream 消息隊列和消費者組,為后續(xù)處理訂單消息做好準備。
2. 用戶請求與資格判斷
當用戶發(fā)起秒殺請求后,系統(tǒng)會立即執(zhí)行之前準備好的 Lua 腳本來判斷用戶是否具有秒殺資格。
- 如果腳本返回庫存不足的標識,系統(tǒng)會迅速返回 “庫存不足” 的提示信息,結(jié)束本次請求處理。
- 若返回用戶已下單的標識,就會返回 “不能重復下單” 的提示,流程終止。
- 當判定用戶具有秒殺資格時,系統(tǒng)會生成唯一的訂單 ID,創(chuàng)建訂單對象,然后將訂單信息發(fā)送到 Redis 的 Stream 消息隊列,進入異步處理階段。
3. 消息隊列消費
有一個專門的消息隊列消費者線程會持續(xù)監(jiān)聽 Redis 的 Stream 消息隊列。
- 如果沒有獲取到新的訂單信息,線程會繼續(xù)保持監(jiān)聽狀態(tài)。
- 一旦獲取到訂單信息,線程會馬上嘗試獲取 Redisson 分布式鎖。這個鎖非常關鍵,它能確保同一用戶同一時間只能處理一個訂單,有效避免一人多單的問題。
4. 訂單創(chuàng)建與處理
獲取到鎖之后,系統(tǒng)會進一步處理訂單。
- 首先判斷當前用戶是否是第一單。如果不是,系統(tǒng)會記錄錯誤日志并釋放鎖,結(jié)束流程。
- 若是第一單,系統(tǒng)會嘗試扣減庫存。如果庫存扣減失敗,會拋出異常并釋放鎖;若扣減成功,就將訂單信息保存到數(shù)據(jù)庫。
- 在保存訂單時,若保存失敗會拋出異常并釋放鎖;保存成功后,系統(tǒng)會向 Redis 的 Stream 消息隊列發(fā)送 ACK 確認消息,最后釋放鎖,完成整個秒殺流程。
到此這篇關于Redis消息隊列實現(xiàn)異步秒殺的文章就介紹到這了,更多相關Redis異步秒殺內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程
這篇文章主要介紹了Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-04-04Redis中LRU算法和LFU算法的區(qū)別小結(jié)
在Redis中,LRU算法和LFU算法是兩種常用的緩存淘汰算法,它們可以幫助我們優(yōu)化緩存性能,本文主要介紹了Redis中LRU算法和LFU算法的區(qū)別,感興趣的可以了解一下2023-12-12