Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺功能
1 Redis消息隊(duì)列
在高并發(fā)場景下,為了提高秒殺業(yè)務(wù)的性能,可將部分工作交給 Redis 處理,并通過異步方式執(zhí)行。Redis 提供了多種數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)消息隊(duì)列,總結(jié)三種。
1.1 List 結(jié)構(gòu)
- 原理:基于 List 結(jié)構(gòu)模擬消息隊(duì)列,使用
BRPUSH
生產(chǎn)消息,BRPOP
消費(fèi)消息。 - 命令示例
- 生產(chǎn)消息:
BRPUSH key value [value ...]
,將一個(gè)或多個(gè)元素推入到指定列表的頭部。如果列表不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)新的列表。 - 消費(fèi)消息:
BRPOP key [key ...] timeout
,從指定的一個(gè)或多個(gè)列表中彈出最后一個(gè)元素。如果列表為空,該命令會(huì)導(dǎo)致客戶端阻塞,直到有數(shù)據(jù)可用或超過指定的超時(shí)時(shí)間。
- 生產(chǎn)消息:
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):不會(huì)內(nèi)存超限、可以持久化、消息有序性。
- 缺點(diǎn):無法避免數(shù)據(jù)丟失、只支持單消費(fèi)者。
1.2 Pub/Sub 模式
- 原理:發(fā)布訂閱模式,基本的點(diǎn)對(duì)點(diǎn)消息模型,支持多生產(chǎn)、多消費(fèi)者。
- 命令示例
- 生產(chǎn)消息:
PUBLISH channel message
,用于向指定頻道發(fā)布一條消息。 - 消費(fèi)消息
SUBSCRIBE channel [channel]
:訂閱一個(gè)或多個(gè)頻道。UNSUBSCRIBE [channel [channel ...]]
:取消訂閱一個(gè)或多個(gè)頻道。PSUBSCRIBE pattern [pattern ...]
:訂閱一個(gè)或多個(gè)符合給定模式的頻道,接收消息。PUNSUBSCRIBE [pattern [pattern ...]]
:取消訂閱一個(gè)或多個(gè)符合給定模式的頻道。
- 生產(chǎn)消息:
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):支持多生產(chǎn)、多消費(fèi)者。
- 缺點(diǎn):不支持持久化、無法避免數(shù)據(jù)丟失,消息堆積有上限(消費(fèi)者會(huì)緩存消息),超出會(huì)丟失消息。
1.3 Stream 結(jié)構(gòu)
- 原理:Redis 5.0 引入的專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,支持消息可回溯、一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)、可以阻塞讀取。
- 命令示例
- 生產(chǎn)消息:
XADD key *|ID value [value ...]
,向指定的 Stream 流中添加一個(gè)消息。例如:XADD users * name jack age 21
,創(chuàng)建名為users
的隊(duì)列,并向其中發(fā)送一個(gè)消息,內(nèi)容是{name=jack,age=21}
,使用 Redis 自動(dòng)生成 ID。 - 消費(fèi)消息:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID
。例如:XREAD COUNT 1 STREAMS users 0
:讀取users
隊(duì)列中的第一條消息。XREAD COUNT 1 BLOCK 1000 STREAMS users $
:阻塞 1 秒鐘后從users
隊(duì)列中讀取最新消息。
- 生產(chǎn)消息:
- 消費(fèi)者組模式
- 特點(diǎn):消息分流、消息標(biāo)識(shí)、消息確認(rèn)。
- 命令示例
XGROUP CREATE key groupName ID
:創(chuàng)建消費(fèi)者組。XGROUP DESTORY key groupName
:刪除指定的消費(fèi)者組。XGROUP CREATECONSUMER key groupName consumerName
:給指定的消費(fèi)者組添加消費(fèi)者。XGROUP DELCONSUMER key groupName consumerName
:刪除消費(fèi)者組中指定消費(fèi)者。XREADGROUP GROUP
:從消費(fèi)者組中讀取消息。
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):消息可回溯、可以多消費(fèi)者爭搶消息,加快消費(fèi)速度、可以阻塞讀取、沒有消息漏讀的風(fēng)險(xiǎn)、有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次。
- 缺點(diǎn):有消息漏讀的風(fēng)險(xiǎn)(單消費(fèi)方式下)。
1.4 Redis Stream消息隊(duì)列的特點(diǎn)
Redis 5.0引入的Stream
類型是專門為消息隊(duì)列設(shè)計(jì)的,支持以下特性:
- 消息持久化:消息存儲(chǔ)在內(nèi)存中,支持持久化到磁盤,避免消息丟失。
- 消費(fèi)者組(Consumer Group):
- 消息分流:一個(gè)隊(duì)列可以被多個(gè)消費(fèi)者組訂閱,組內(nèi)多個(gè)消費(fèi)者分?jǐn)傁⑻幚怼?/li>
- 消息回溯:支持按消息ID回溯歷史消息。
- 消息確認(rèn)(ACK):消費(fèi)者處理完消息后需確認(rèn),否則消息會(huì)進(jìn)入
pending-list
等待重試。
- 阻塞讀取:消費(fèi)者可以阻塞等待新消息,減少CPU空轉(zhuǎn)。
- 避免消息丟失:通過
pending-list
機(jī)制,確保消息至少被消費(fèi)一次。
2 秒殺業(yè)務(wù)處理
2.1 使用Lua腳本處理庫存和訂單
目標(biāo):在Redis中完成庫存判斷和訂單校驗(yàn),確保原子性。
-- 參數(shù):優(yōu)惠券ID、用戶ID、訂單ID local voucherId = ARGV[1] local userId = ARGV[2] local orderId = ARGV[3] -- 庫存Key和訂單Key local stockKey = 'seckill:stock:' .. voucherId local orderKey = 'seckill:order:' .. voucherId -- 判斷庫存是否充足 if (tonumber(redis.call('GET', stockKey)) <= 0 then return 1 -- 庫存不足 end -- 判斷用戶是否已下單 if (redis.call('SISMEMBER', orderKey, userId) == 1 then return 2 -- 用戶已下單 end -- 扣減庫存并記錄訂單 redis.call('DECR', stockKey) redis.call('SADD', orderKey, userId) -- 將訂單信息發(fā)送到消息隊(duì)列 redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0 -- 成功
腳本說明:
- 原子性操作:庫存檢查、訂單校驗(yàn)、消息發(fā)送在一個(gè)腳本中完成。
- 消息發(fā)送:使用
XADD
將訂單信息寫入stream.orders
隊(duì)列。
2.2 創(chuàng)建消費(fèi)者組
XGROUP CREATE stream.orders g1 0 MKSTREAM
g1
:消費(fèi)者組名稱。
MKSTREAM
:如果隊(duì)列不存在則自動(dòng)創(chuàng)建。
2.3 Java代碼實(shí)現(xiàn)
init
方法:在類初始化時(shí)創(chuàng)建消息隊(duì)列,并啟動(dòng)一個(gè)線程任務(wù)從消息隊(duì)列中獲取訂單信息。VoucherOrderHandler
類:實(shí)現(xiàn)Runnable
接口,作為線程任務(wù),不斷從消息隊(duì)列中獲取訂單信息。如果獲取成功,將消息轉(zhuǎn)換為VoucherOrder
對(duì)象,調(diào)用handleVoucherOrder
方法處理訂單,并進(jìn)行 ACK 確認(rèn);如果出現(xiàn)異常,調(diào)用handlePendingList
方法處理異常消息。handlePendingList
方法:從pendingList
中獲取訂單信息,處理訂單并進(jìn)行 ACK 確認(rèn),直到pendingList
中沒有消息。handleVoucherOrder
方法:使用 Redisson 分布式鎖確保一人一單,調(diào)用代理對(duì)象的createVoucherOrder
方法創(chuàng)建訂單。seckillVoucher
方法:執(zhí)行 Lua 腳本判斷用戶是否具有秒殺資格,如果具有資格,將訂單信息發(fā)送到消息隊(duì)列,并返回下單成功信息。createVoucherOrder
方法:判斷當(dāng)前用戶是否是第一單,如果是則扣減庫存并將訂單保存到數(shù)據(jù)庫。
系統(tǒng)啟動(dòng)與初始化
系統(tǒng)啟動(dòng)時(shí),VoucherOrderServiceImpl
類的 @PostConstruct
注解會(huì)觸發(fā) init
方法執(zhí)行。該方法先加載創(chuàng)建消息隊(duì)列的 Lua 腳本,通過 stringRedisTemplate.execute
方法執(zhí)行腳本創(chuàng)建 Redis Stream 消息隊(duì)列和消費(fèi)者組。若創(chuàng)建成功或隊(duì)列已存在,會(huì)記錄相應(yīng)日志。之后,使用線程池 SECKILL_ORDER_EXECUTOR
啟動(dòng) VoucherOrderHandler
線程,該線程負(fù)責(zé)后續(xù)從消息隊(duì)列獲取訂單信息并處理。
用戶發(fā)起秒殺請(qǐng)求
用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)調(diào)用 VoucherOrderServiceImpl
的 seckillVoucher
方法。此方法先從 ThreadLocalUtls
中獲取用戶 ID,用 redisIdWorker
生成訂單 ID。接著執(zhí)行判斷用戶秒殺資格的 Lua 腳本,該腳本接收優(yōu)惠券 ID、用戶 ID 和訂單 ID 作為參數(shù)。若腳本返回值表明庫存不足或用戶已下單,方法返回相應(yīng)的失敗提示;若返回值為 0,說明用戶有秒殺資格,創(chuàng)建代理對(duì)象并返回下單成功結(jié)果。
Lua 腳本執(zhí)行邏輯
Lua 腳本接收到參數(shù)后,根據(jù)優(yōu)惠券 ID 拼接庫存和訂單的 Redis key。先通過 GET
命令獲取庫存,若庫存小于等于 0 則返回 1 表示庫存不足。若庫存充足,使用 SISMEMBER
命令檢查用戶是否已下單,若已下單則返回 2。若庫存充足且用戶未下單,使用 INCRBY
命令扣減庫存,SADD
命令記錄訂單信息,最后返回 0 表示下單成功。
訂單處理線程工作
VoucherOrderHandler
線程啟動(dòng)后進(jìn)入無限循環(huán),不斷從 Redis Stream 消息隊(duì)列獲取訂單信息。若未獲取到消息,繼續(xù)下一次循環(huán);若獲取到消息,將消息轉(zhuǎn)換為 VoucherOrder
對(duì)象,調(diào)用 handleVoucherOrder
方法處理訂單,處理完成后向消息隊(duì)列發(fā)送 ACK 確認(rèn)消息。若處理過程中出現(xiàn)異常,調(diào)用 handlePendingList
方法處理異常消息。
訂單處理方法 handleVoucherOrder
handleVoucherOrder
方法接收 VoucherOrder
對(duì)象,根據(jù)用戶 ID 獲取 Redisson 分布式鎖。嘗試獲取鎖,若失敗記錄錯(cuò)誤日志并返回;若成功,調(diào)用代理對(duì)象的 createVoucherOrder
方法創(chuàng)建訂單,最后釋放鎖。
訂單創(chuàng)建方法 createVoucherOrder
該方法先判斷當(dāng)前用戶是否是第一單,通過查詢數(shù)據(jù)庫中該用戶的訂單數(shù)量來判斷。若不是第一單,記錄錯(cuò)誤日志并返回;若是第一單,嘗試扣減秒殺券庫存,若扣減失敗拋出異常。若庫存扣減成功,將訂單信息保存到數(shù)據(jù)庫,若保存失敗也拋出異常。
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; /** * 當(dāng)前類初始化完畢就立馬執(zhí)行該方法 */ @PostConstruct private void init() { // 創(chuàng)建消息隊(duì)列 DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = null; try { result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); } catch (Exception e) { log.error("隊(duì)列創(chuàng)建失敗", e); return; } int r = result.intValue(); String info = r == 1 ? "隊(duì)列創(chuàng)建成功" : "隊(duì)列已存在"; log.debug(info); // 執(zhí)行線程任務(wù) SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } /** * 線程池 */ private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); /** * 隊(duì)列名 */ private static final String QUEUE_NAME = "stream.orders"; /** * 組名 */ private static final String GROUP_NAME = "g1"; /** * 線程任務(wù): 不斷從消息隊(duì)列中獲取訂單 */ private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1、從消息隊(duì)列中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order > List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); // 2、判斷消息獲取是否成功 if (messageList == null || messageList.isEmpty()) { // 2.1 消息獲取失敗,說明沒有消息,進(jìn)入下一次循環(huán)獲取消息 continue; } // 3、消息獲取成功,可以下單 // 將消息轉(zhuǎn)成VoucherOrder對(duì)象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK確認(rèn) SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); // 處理異常消息 handlePendingList(); } } } } private void handlePendingList() { while (true) { try { // 1、從pendingList中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0 List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.from("0")) ); // 2、判斷pendingList中是否有效性 if (messageList == null || messageList.isEmpty()) { // 2.1 pendingList中沒有消息,直接結(jié)束循環(huán) break; } // 3、pendingList中有消息 // 將消息轉(zhuǎn)成VoucherOrder對(duì)象 MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); // 4、ACK確認(rèn) SACK stream.orders g1 id stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); // 這里不用調(diào)自己,直接就進(jìn)入下一次循環(huán),再從pendingList中取,這里只需要休眠一下,防止獲取消息太頻繁 try { Thread.sleep(20); } catch (InterruptedException ex) { log.error("線程休眠異常", ex); } } } } /** * 創(chuàng)建訂單 * * @param voucherOrder */ private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { // 索取鎖失敗,重試或者直接拋異常(這個(gè)業(yè)務(wù)是一人一單,所以直接返回失敗信息) log.error("一人只能下一單"); return; } try { // 創(chuàng)建訂單(使用代理對(duì)象調(diào)用,是為了確保事務(wù)生效) proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } } /** * 加載 判斷秒殺券庫存是否充足 并且 判斷用戶是否已下單 的Lua腳本 */ private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua")); SECKILL_SCRIPT.setResultType(Long.class); } /** * VoucherOrderServiceImpl類的代理對(duì)象 * 將代理對(duì)象的作用域進(jìn)行提升,方面子線程取用 */ private IVoucherOrderService proxy; /** * 搶購秒殺券 * * @param voucherId * @return */ @Transactional @Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); // 1、執(zhí)行Lua腳本,判斷用戶是否具有秒殺資格 Long result = null; try { result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); } catch (Exception e) { log.error("Lua腳本執(zhí)行失敗"); throw new RuntimeException(e); } if (result != null && !result.equals(0L)) { // result為1表示庫存不足,result為2表示用戶已下單 int r = result.intValue(); return Result.fail(r == 2 ? "不能重復(fù)下單" : "庫存不足"); } // 2、result為0,下單成功,直接返回ok // 索取鎖成功,創(chuàng)建代理對(duì)象,使用代理對(duì)象調(diào)用第三方事務(wù)方法, 防止事務(wù)失效 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); } /** * 創(chuàng)建訂單 * * @param voucherOrder * @return */ @Transactional @Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); // 1、判斷當(dāng)前用戶是否是第一單 int count = this.count(new LambdaQueryWrapper<VoucherOrder>() .eq(VoucherOrder::getUserId, userId)); if (count >= 1) { // 當(dāng)前用戶不是第一單 log.error("當(dāng)前用戶不是第一單"); return; } // 2、用戶是第一單,可以下單,秒殺券庫存數(shù)量減一 boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒殺券扣減失敗"); } // 3、將訂單保存到數(shù)據(jù)庫 flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("創(chuàng)建秒殺券訂單失敗"); } } }
3 秒殺流程剖析
3.1 初始化操作
Lua 腳本準(zhǔn)備:編寫 Lua 腳本,接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),判斷庫存是否充足以及用戶是否已下單。若庫存不足返回 1,用戶已下單返回 2,下單成功返回 0。
-- 優(yōu)惠券id local voucherId = ARGV[1]; -- 用戶id local userId = ARGV[2]; local stockKey = 'seckill:stock:' .. voucherId; local orderKey = 'seckill:order:' .. voucherId; local stock = redis.call('GET', stockKey); if (tonumber(stock) <= 0) then return 1; end if (redis.call('SISMEMBER', orderKey, userId) == 1) then return 2; end redis.call('INCRBY', stockKey, -1); redis.call('SADD', orderKey, userId); return 0;
消息隊(duì)列創(chuàng)建:在 Java 代碼的 @PostConstruct
方法中,通過執(zhí)行 Lua 腳本創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組。
@PostConstruct private void init() { DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>(); mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua")); mqScript.setResultType(Long.class); Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME); if (result == 1) { log.debug("隊(duì)列創(chuàng)建成功"); } else { log.debug("隊(duì)列已存在"); } SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); }
3.2 秒殺請(qǐng)求處理
資格判斷:用戶發(fā)起秒殺請(qǐng)求,系統(tǒng)執(zhí)行 Lua 腳本,根據(jù)返回結(jié)果判斷用戶是否具有秒殺資格。若返回 1 表示庫存不足,返回 2 表示用戶已下單,均返回失敗信息;返回 0 則表示具有秒殺資格。
@Override public Result seckillVoucher(Long voucherId) { Long userId = ThreadLocalUtls.getUser().getId(); long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER); Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId)); if (result != 0) { return Result.fail(result == 2 ? "不能重復(fù)下單" : "庫存不足"); } IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); this.proxy = proxy; return Result.ok(); }
訂單入隊(duì):具有秒殺資格后,生成訂單 ID,創(chuàng)建訂單對(duì)象,將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列。
3.3 消息隊(duì)列消費(fèi)
訂單處理線程:使用線程池啟動(dòng)一個(gè)線程任務(wù) VoucherOrderHandler
,不斷從消息隊(duì)列中獲取訂單信息。
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)), StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed()) ); if (messageList == null || messageList.isEmpty()) { continue; } MapRecord<String, Object, Object> record = messageList.get(0); Map<Object, Object> messageMap = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true); handleVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId()); } catch (Exception e) { log.error("處理訂單異常", e); handlePendingList(); } } } }
異常處理:若處理訂單過程中出現(xiàn)異常,調(diào)用 handlePendingList
方法從 pendingList
中獲取未處理的訂單信息,繼續(xù)處理。
3.4 訂單創(chuàng)建
分布式鎖保障:使用 Redisson 分布式鎖,確保同一用戶同一時(shí)間只能創(chuàng)建一個(gè)訂單,避免一人多單問題。
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId); boolean isLock = lock.tryLock(); if (!isLock) { log.error("一人只能下一單"); return; } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
數(shù)據(jù)庫操作:判斷用戶是否是第一單,若是則扣減庫存并將訂單保存到數(shù)據(jù)庫。
@Override public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId)); if (count >= 1) { log.error("當(dāng)前用戶不是第一單"); return; } boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>() .eq(SeckillVoucher::getVoucherId, voucherId) .gt(SeckillVoucher::getStock, 0) .setSql("stock = stock -1")); if (!flag) { throw new RuntimeException("秒殺券扣減失敗"); } flag = this.save(voucherOrder); if (!flag) { throw new RuntimeException("創(chuàng)建秒殺券訂單失敗"); } }
4 秒殺流程(文字版)
1. 初始化準(zhǔn)備
在系統(tǒng)啟動(dòng)階段,我們會(huì)完成一些必要的初始化工作。一方面,編寫好用于判斷庫存和訂單情況的 Lua 腳本。這個(gè)腳本會(huì)接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),通過 Redis 的相關(guān)命令判斷庫存是否充足以及用戶是否已下單,保證這些判斷操作的原子性。另一方面,在 Java 代碼里利用 @PostConstruct
注解,通過執(zhí)行另一個(gè) Lua 腳本來創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組,為后續(xù)處理訂單消息做好準(zhǔn)備。
2. 用戶請(qǐng)求與資格判斷
當(dāng)用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)會(huì)立即執(zhí)行之前準(zhǔn)備好的 Lua 腳本來判斷用戶是否具有秒殺資格。
- 如果腳本返回庫存不足的標(biāo)識(shí),系統(tǒng)會(huì)迅速返回 “庫存不足” 的提示信息,結(jié)束本次請(qǐng)求處理。
- 若返回用戶已下單的標(biāo)識(shí),就會(huì)返回 “不能重復(fù)下單” 的提示,流程終止。
- 當(dāng)判定用戶具有秒殺資格時(shí),系統(tǒng)會(huì)生成唯一的訂單 ID,創(chuàng)建訂單對(duì)象,然后將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列,進(jìn)入異步處理階段。
3. 消息隊(duì)列消費(fèi)
有一個(gè)專門的消息隊(duì)列消費(fèi)者線程會(huì)持續(xù)監(jiān)聽 Redis 的 Stream 消息隊(duì)列。
- 如果沒有獲取到新的訂單信息,線程會(huì)繼續(xù)保持監(jiān)聽狀態(tài)。
- 一旦獲取到訂單信息,線程會(huì)馬上嘗試獲取 Redisson 分布式鎖。這個(gè)鎖非常關(guān)鍵,它能確保同一用戶同一時(shí)間只能處理一個(gè)訂單,有效避免一人多單的問題。
4. 訂單創(chuàng)建與處理
獲取到鎖之后,系統(tǒng)會(huì)進(jìn)一步處理訂單。
- 首先判斷當(dāng)前用戶是否是第一單。如果不是,系統(tǒng)會(huì)記錄錯(cuò)誤日志并釋放鎖,結(jié)束流程。
- 若是第一單,系統(tǒng)會(huì)嘗試扣減庫存。如果庫存扣減失敗,會(huì)拋出異常并釋放鎖;若扣減成功,就將訂單信息保存到數(shù)據(jù)庫。
- 在保存訂單時(shí),若保存失敗會(huì)拋出異常并釋放鎖;保存成功后,系統(tǒng)會(huì)向 Redis 的 Stream 消息隊(duì)列發(fā)送 ACK 確認(rèn)消息,最后釋放鎖,完成整個(gè)秒殺流程。
到此這篇關(guān)于Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺的文章就介紹到這了,更多相關(guān)Redis異步秒殺內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程
這篇文章主要介紹了Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04Redis概述及l(fā)inux安裝redis的詳細(xì)教程
這篇文章主要介紹了Redis概述及l(fā)inux安裝redis的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10Redis中LRU算法和LFU算法的區(qū)別小結(jié)
在Redis中,LRU算法和LFU算法是兩種常用的緩存淘汰算法,它們可以幫助我們優(yōu)化緩存性能,本文主要介紹了Redis中LRU算法和LFU算法的區(qū)別,感興趣的可以了解一下2023-12-12