Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺功能
1 Redis消息隊(duì)列
在高并發(fā)場(chǎng)景下,為了提高秒殺業(yè)務(wù)的性能,可將部分工作交給 Redis 處理,并通過(guò)異步方式執(zhí)行。Redis 提供了多種數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)消息隊(duì)列,總結(jié)三種。
1.1 List 結(jié)構(gòu)
- 原理:基于 List 結(jié)構(gòu)模擬消息隊(duì)列,使用
BRPUSH生產(chǎn)消息,BRPOP消費(fèi)消息。 - 命令示例
- 生產(chǎn)消息:
BRPUSH key value [value ...],將一個(gè)或多個(gè)元素推入到指定列表的頭部。如果列表不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)新的列表。 - 消費(fèi)消息:
BRPOP key [key ...] timeout,從指定的一個(gè)或多個(gè)列表中彈出最后一個(gè)元素。如果列表為空,該命令會(huì)導(dǎo)致客戶端阻塞,直到有數(shù)據(jù)可用或超過(guò)指定的超時(shí)時(shí)間。
- 生產(chǎn)消息:
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):不會(huì)內(nèi)存超限、可以持久化、消息有序性。
- 缺點(diǎn):無(wú)法避免數(shù)據(jù)丟失、只支持單消費(fèi)者。
1.2 Pub/Sub 模式
- 原理:發(fā)布訂閱模式,基本的點(diǎn)對(duì)點(diǎn)消息模型,支持多生產(chǎn)、多消費(fèi)者。
- 命令示例
- 生產(chǎn)消息:
PUBLISH channel message,用于向指定頻道發(fā)布一條消息。 - 消費(fèi)消息
SUBSCRIBE channel [channel]:訂閱一個(gè)或多個(gè)頻道。UNSUBSCRIBE [channel [channel ...]]:取消訂閱一個(gè)或多個(gè)頻道。PSUBSCRIBE pattern [pattern ...]:訂閱一個(gè)或多個(gè)符合給定模式的頻道,接收消息。PUNSUBSCRIBE [pattern [pattern ...]]:取消訂閱一個(gè)或多個(gè)符合給定模式的頻道。
- 生產(chǎn)消息:
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):支持多生產(chǎn)、多消費(fèi)者。
- 缺點(diǎn):不支持持久化、無(wú)法避免數(shù)據(jù)丟失,消息堆積有上限(消費(fèi)者會(huì)緩存消息),超出會(huì)丟失消息。
1.3 Stream 結(jié)構(gòu)
- 原理:Redis 5.0 引入的專門為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類型,支持消息可回溯、一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)、可以阻塞讀取。
- 命令示例
- 生產(chǎn)消息:
XADD key *|ID value [value ...],向指定的 Stream 流中添加一個(gè)消息。例如:XADD users * name jack age 21,創(chuàng)建名為users的隊(duì)列,并向其中發(fā)送一個(gè)消息,內(nèi)容是{name=jack,age=21},使用 Redis 自動(dòng)生成 ID。 - 消費(fèi)消息:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID。例如:XREAD COUNT 1 STREAMS users 0:讀取users隊(duì)列中的第一條消息。XREAD COUNT 1 BLOCK 1000 STREAMS users $:阻塞 1 秒鐘后從users隊(duì)列中讀取最新消息。
- 生產(chǎn)消息:
- 消費(fèi)者組模式
- 特點(diǎn):消息分流、消息標(biāo)識(shí)、消息確認(rèn)。
- 命令示例
XGROUP CREATE key groupName ID:創(chuàng)建消費(fèi)者組。XGROUP DESTORY key groupName:刪除指定的消費(fèi)者組。XGROUP CREATECONSUMER key groupName consumerName:給指定的消費(fèi)者組添加消費(fèi)者。XGROUP DELCONSUMER key groupName consumerName:刪除消費(fèi)者組中指定消費(fèi)者。XREADGROUP GROUP:從消費(fèi)者組中讀取消息。
- 優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn):消息可回溯、可以多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度、可以阻塞讀取、沒(méi)有消息漏讀的風(fēng)險(xiǎn)、有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次。
- 缺點(diǎn):有消息漏讀的風(fēng)險(xiǎn)(單消費(fèi)方式下)。
1.4 Redis Stream消息隊(duì)列的特點(diǎn)
Redis 5.0引入的Stream類型是專門為消息隊(duì)列設(shè)計(jì)的,支持以下特性:
- 消息持久化:消息存儲(chǔ)在內(nèi)存中,支持持久化到磁盤,避免消息丟失。
- 消費(fèi)者組(Consumer Group):
- 消息分流:一個(gè)隊(duì)列可以被多個(gè)消費(fèi)者組訂閱,組內(nèi)多個(gè)消費(fèi)者分?jǐn)傁⑻幚怼?/li>
- 消息回溯:支持按消息ID回溯歷史消息。
- 消息確認(rèn)(ACK):消費(fèi)者處理完消息后需確認(rèn),否則消息會(huì)進(jìn)入
pending-list等待重試。
- 阻塞讀取:消費(fèi)者可以阻塞等待新消息,減少CPU空轉(zhuǎn)。
- 避免消息丟失:通過(guò)
pending-list機(jī)制,確保消息至少被消費(fèi)一次。
2 秒殺業(yè)務(wù)處理

2.1 使用Lua腳本處理庫(kù)存和訂單
目標(biāo):在Redis中完成庫(kù)存判斷和訂單校驗(yàn),確保原子性。
-- 參數(shù):優(yōu)惠券ID、用戶ID、訂單ID
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 庫(kù)存Key和訂單Key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
-- 判斷庫(kù)存是否充足
if (tonumber(redis.call('GET', stockKey)) <= 0 then
return 1 -- 庫(kù)存不足
end
-- 判斷用戶是否已下單
if (redis.call('SISMEMBER', orderKey, userId) == 1 then
return 2 -- 用戶已下單
end
-- 扣減庫(kù)存并記錄訂單
redis.call('DECR', stockKey)
redis.call('SADD', orderKey, userId)
-- 將訂單信息發(fā)送到消息隊(duì)列
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0 -- 成功腳本說(shuō)明:
- 原子性操作:庫(kù)存檢查、訂單校驗(yàn)、消息發(fā)送在一個(gè)腳本中完成。
- 消息發(fā)送:使用
XADD將訂單信息寫入stream.orders隊(duì)列。
2.2 創(chuàng)建消費(fèi)者組
XGROUP CREATE stream.orders g1 0 MKSTREAM
g1:消費(fèi)者組名稱。
MKSTREAM:如果隊(duì)列不存在則自動(dòng)創(chuàng)建。
2.3 Java代碼實(shí)現(xiàn)
init方法:在類初始化時(shí)創(chuàng)建消息隊(duì)列,并啟動(dòng)一個(gè)線程任務(wù)從消息隊(duì)列中獲取訂單信息。VoucherOrderHandler類:實(shí)現(xiàn)Runnable接口,作為線程任務(wù),不斷從消息隊(duì)列中獲取訂單信息。如果獲取成功,將消息轉(zhuǎn)換為VoucherOrder對(duì)象,調(diào)用handleVoucherOrder方法處理訂單,并進(jìn)行 ACK 確認(rèn);如果出現(xiàn)異常,調(diào)用handlePendingList方法處理異常消息。handlePendingList方法:從pendingList中獲取訂單信息,處理訂單并進(jìn)行 ACK 確認(rèn),直到pendingList中沒(méi)有消息。handleVoucherOrder方法:使用 Redisson 分布式鎖確保一人一單,調(diào)用代理對(duì)象的createVoucherOrder方法創(chuàng)建訂單。seckillVoucher方法:執(zhí)行 Lua 腳本判斷用戶是否具有秒殺資格,如果具有資格,將訂單信息發(fā)送到消息隊(duì)列,并返回下單成功信息。createVoucherOrder方法:判斷當(dāng)前用戶是否是第一單,如果是則扣減庫(kù)存并將訂單保存到數(shù)據(jù)庫(kù)。
系統(tǒng)啟動(dòng)與初始化
系統(tǒng)啟動(dòng)時(shí),VoucherOrderServiceImpl 類的 @PostConstruct 注解會(huì)觸發(fā) init 方法執(zhí)行。該方法先加載創(chuàng)建消息隊(duì)列的 Lua 腳本,通過(guò) stringRedisTemplate.execute 方法執(zhí)行腳本創(chuàng)建 Redis Stream 消息隊(duì)列和消費(fèi)者組。若創(chuàng)建成功或隊(duì)列已存在,會(huì)記錄相應(yīng)日志。之后,使用線程池 SECKILL_ORDER_EXECUTOR 啟動(dòng) VoucherOrderHandler 線程,該線程負(fù)責(zé)后續(xù)從消息隊(duì)列獲取訂單信息并處理。
用戶發(fā)起秒殺請(qǐng)求
用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)調(diào)用 VoucherOrderServiceImpl 的 seckillVoucher 方法。此方法先從 ThreadLocalUtls 中獲取用戶 ID,用 redisIdWorker 生成訂單 ID。接著執(zhí)行判斷用戶秒殺資格的 Lua 腳本,該腳本接收優(yōu)惠券 ID、用戶 ID 和訂單 ID 作為參數(shù)。若腳本返回值表明庫(kù)存不足或用戶已下單,方法返回相應(yīng)的失敗提示;若返回值為 0,說(shuō)明用戶有秒殺資格,創(chuàng)建代理對(duì)象并返回下單成功結(jié)果。
Lua 腳本執(zhí)行邏輯
Lua 腳本接收到參數(shù)后,根據(jù)優(yōu)惠券 ID 拼接庫(kù)存和訂單的 Redis key。先通過(guò) GET 命令獲取庫(kù)存,若庫(kù)存小于等于 0 則返回 1 表示庫(kù)存不足。若庫(kù)存充足,使用 SISMEMBER 命令檢查用戶是否已下單,若已下單則返回 2。若庫(kù)存充足且用戶未下單,使用 INCRBY 命令扣減庫(kù)存,SADD 命令記錄訂單信息,最后返回 0 表示下單成功。
訂單處理線程工作
VoucherOrderHandler 線程啟動(dòng)后進(jìn)入無(wú)限循環(huán),不斷從 Redis Stream 消息隊(duì)列獲取訂單信息。若未獲取到消息,繼續(xù)下一次循環(huán);若獲取到消息,將消息轉(zhuǎn)換為 VoucherOrder 對(duì)象,調(diào)用 handleVoucherOrder 方法處理訂單,處理完成后向消息隊(duì)列發(fā)送 ACK 確認(rèn)消息。若處理過(guò)程中出現(xiàn)異常,調(diào)用 handlePendingList 方法處理異常消息。
訂單處理方法 handleVoucherOrder
handleVoucherOrder 方法接收 VoucherOrder 對(duì)象,根據(jù)用戶 ID 獲取 Redisson 分布式鎖。嘗試獲取鎖,若失敗記錄錯(cuò)誤日志并返回;若成功,調(diào)用代理對(duì)象的 createVoucherOrder 方法創(chuàng)建訂單,最后釋放鎖。
訂單創(chuàng)建方法 createVoucherOrder
該方法先判斷當(dāng)前用戶是否是第一單,通過(guò)查詢數(shù)據(jù)庫(kù)中該用戶的訂單數(shù)量來(lái)判斷。若不是第一單,記錄錯(cuò)誤日志并返回;若是第一單,嘗試扣減秒殺券庫(kù)存,若扣減失敗拋出異常。若庫(kù)存扣減成功,將訂單信息保存到數(shù)據(jù)庫(kù),若保存失敗也拋出異常。
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
/**
* 當(dāng)前類初始化完畢就立馬執(zhí)行該方法
*/
@PostConstruct
private void init() {
// 創(chuàng)建消息隊(duì)列
DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
mqScript.setResultType(Long.class);
Long result = null;
try {
result = stringRedisTemplate.execute(mqScript,
Collections.emptyList(),
QUEUE_NAME,
GROUP_NAME);
} catch (Exception e) {
log.error("隊(duì)列創(chuàng)建失敗", e);
return;
}
int r = result.intValue();
String info = r == 1 ? "隊(duì)列創(chuàng)建成功" : "隊(duì)列已存在";
log.debug(info);
// 執(zhí)行線程任務(wù)
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
/**
* 線程池
*/
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
/**
* 隊(duì)列名
*/
private static final String QUEUE_NAME = "stream.orders";
/**
* 組名
*/
private static final String GROUP_NAME = "g1";
/**
* 線程任務(wù): 不斷從消息隊(duì)列中獲取訂單
*/
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1、從消息隊(duì)列中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
);
// 2、判斷消息獲取是否成功
if (messageList == null || messageList.isEmpty()) {
// 2.1 消息獲取失敗,說(shuō)明沒(méi)有消息,進(jìn)入下一次循環(huán)獲取消息
continue;
}
// 3、消息獲取成功,可以下單
// 將消息轉(zhuǎn)成VoucherOrder對(duì)象
MapRecord<String, Object, Object> record = messageList.get(0);
Map<Object, Object> messageMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
// 4、ACK確認(rèn) SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
} catch (Exception e) {
log.error("處理訂單異常", e);
// 處理異常消息
handlePendingList();
}
}
}
}
private void handlePendingList() {
while (true) {
try {
// 1、從pendingList中獲取訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 1000 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(QUEUE_NAME, ReadOffset.from("0"))
);
// 2、判斷pendingList中是否有效性
if (messageList == null || messageList.isEmpty()) {
// 2.1 pendingList中沒(méi)有消息,直接結(jié)束循環(huán)
break;
}
// 3、pendingList中有消息
// 將消息轉(zhuǎn)成VoucherOrder對(duì)象
MapRecord<String, Object, Object> record = messageList.get(0);
Map<Object, Object> messageMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
// 4、ACK確認(rèn) SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
} catch (Exception e) {
log.error("處理訂單異常", e);
// 這里不用調(diào)自己,直接就進(jìn)入下一次循環(huán),再?gòu)膒endingList中取,這里只需要休眠一下,防止獲取消息太頻繁
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
log.error("線程休眠異常", ex);
}
}
}
}
/**
* 創(chuàng)建訂單
*
* @param voucherOrder
*/
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
// 索取鎖失敗,重試或者直接拋異常(這個(gè)業(yè)務(wù)是一人一單,所以直接返回失敗信息)
log.error("一人只能下一單");
return;
}
try {
// 創(chuàng)建訂單(使用代理對(duì)象調(diào)用,是為了確保事務(wù)生效)
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
/**
* 加載 判斷秒殺券庫(kù)存是否充足 并且 判斷用戶是否已下單 的Lua腳本
*/
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/stream-seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* VoucherOrderServiceImpl類的代理對(duì)象
* 將代理對(duì)象的作用域進(jìn)行提升,方面子線程取用
*/
private IVoucherOrderService proxy;
/**
* 搶購(gòu)秒殺券
*
* @param voucherId
* @return
*/
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = ThreadLocalUtls.getUser().getId();
long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
// 1、執(zhí)行Lua腳本,判斷用戶是否具有秒殺資格
Long result = null;
try {
result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);
} catch (Exception e) {
log.error("Lua腳本執(zhí)行失敗");
throw new RuntimeException(e);
}
if (result != null && !result.equals(0L)) {
// result為1表示庫(kù)存不足,result為2表示用戶已下單
int r = result.intValue();
return Result.fail(r == 2 ? "不能重復(fù)下單" : "庫(kù)存不足");
}
// 2、result為0,下單成功,直接返回ok
// 索取鎖成功,創(chuàng)建代理對(duì)象,使用代理對(duì)象調(diào)用第三方事務(wù)方法, 防止事務(wù)失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
this.proxy = proxy;
return Result.ok();
}
/**
* 創(chuàng)建訂單
*
* @param voucherOrder
* @return
*/
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 1、判斷當(dāng)前用戶是否是第一單
int count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId));
if (count >= 1) {
// 當(dāng)前用戶不是第一單
log.error("當(dāng)前用戶不是第一單");
return;
}
// 2、用戶是第一單,可以下單,秒殺券庫(kù)存數(shù)量減一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock -1"));
if (!flag) {
throw new RuntimeException("秒殺券扣減失敗");
}
// 3、將訂單保存到數(shù)據(jù)庫(kù)
flag = this.save(voucherOrder);
if (!flag) {
throw new RuntimeException("創(chuàng)建秒殺券訂單失敗");
}
}
}3 秒殺流程剖析
3.1 初始化操作
Lua 腳本準(zhǔn)備:編寫 Lua 腳本,接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),判斷庫(kù)存是否充足以及用戶是否已下單。若庫(kù)存不足返回 1,用戶已下單返回 2,下單成功返回 0。
-- 優(yōu)惠券id
local voucherId = ARGV[1];
-- 用戶id
local userId = ARGV[2];
local stockKey = 'seckill:stock:' .. voucherId;
local orderKey = 'seckill:order:' .. voucherId;
local stock = redis.call('GET', stockKey);
if (tonumber(stock) <= 0) then
return 1;
end
if (redis.call('SISMEMBER', orderKey, userId) == 1) then
return 2;
end
redis.call('INCRBY', stockKey, -1);
redis.call('SADD', orderKey, userId);
return 0;消息隊(duì)列創(chuàng)建:在 Java 代碼的 @PostConstruct 方法中,通過(guò)執(zhí)行 Lua 腳本創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組。
@PostConstruct
private void init() {
DefaultRedisScript<Long> mqScript = new DefaultRedisScript<>();
mqScript.setLocation(new ClassPathResource("lua/stream-mq.lua"));
mqScript.setResultType(Long.class);
Long result = stringRedisTemplate.execute(mqScript, Collections.emptyList(), QUEUE_NAME, GROUP_NAME);
if (result == 1) {
log.debug("隊(duì)列創(chuàng)建成功");
} else {
log.debug("隊(duì)列已存在");
}
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}3.2 秒殺請(qǐng)求處理
資格判斷:用戶發(fā)起秒殺請(qǐng)求,系統(tǒng)執(zhí)行 Lua 腳本,根據(jù)返回結(jié)果判斷用戶是否具有秒殺資格。若返回 1 表示庫(kù)存不足,返回 2 表示用戶已下單,均返回失敗信息;返回 0 則表示具有秒殺資格。
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = ThreadLocalUtls.getUser().getId();
long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
if (result != 0) {
return Result.fail(result == 2 ? "不能重復(fù)下單" : "庫(kù)存不足");
}
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
this.proxy = proxy;
return Result.ok();
}訂單入隊(duì):具有秒殺資格后,生成訂單 ID,創(chuàng)建訂單對(duì)象,將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列。
3.3 消息隊(duì)列消費(fèi)
訂單處理線程:使用線程池啟動(dòng)一個(gè)線程任務(wù) VoucherOrderHandler,不斷從消息隊(duì)列中獲取訂單信息。
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
List<MapRecord<String, Object, Object>> messageList = stringRedisTemplate.opsForStream().read(
Consumer.from(GROUP_NAME, "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(1)),
StreamOffset.create(QUEUE_NAME, ReadOffset.lastConsumed())
);
if (messageList == null || messageList.isEmpty()) {
continue;
}
MapRecord<String, Object, Object> record = messageList.get(0);
Map<Object, Object> messageMap = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(messageMap, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
stringRedisTemplate.opsForStream().acknowledge(QUEUE_NAME, GROUP_NAME, record.getId());
} catch (Exception e) {
log.error("處理訂單異常", e);
handlePendingList();
}
}
}
}異常處理:若處理訂單過(guò)程中出現(xiàn)異常,調(diào)用 handlePendingList 方法從 pendingList 中獲取未處理的訂單信息,繼續(xù)處理。
3.4 訂單創(chuàng)建
分布式鎖保障:使用 Redisson 分布式鎖,確保同一用戶同一時(shí)間只能創(chuàng)建一個(gè)訂單,避免一人多單問(wèn)題。
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("一人只能下一單");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}數(shù)據(jù)庫(kù)操作:判斷用戶是否是第一單,若是則扣減庫(kù)存并將訂單保存到數(shù)據(jù)庫(kù)。
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));
if (count >= 1) {
log.error("當(dāng)前用戶不是第一單");
return;
}
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock -1"));
if (!flag) {
throw new RuntimeException("秒殺券扣減失敗");
}
flag = this.save(voucherOrder);
if (!flag) {
throw new RuntimeException("創(chuàng)建秒殺券訂單失敗");
}
}4 秒殺流程(文字版)
1. 初始化準(zhǔn)備
在系統(tǒng)啟動(dòng)階段,我們會(huì)完成一些必要的初始化工作。一方面,編寫好用于判斷庫(kù)存和訂單情況的 Lua 腳本。這個(gè)腳本會(huì)接收優(yōu)惠券 ID 和用戶 ID 作為參數(shù),通過(guò) Redis 的相關(guān)命令判斷庫(kù)存是否充足以及用戶是否已下單,保證這些判斷操作的原子性。另一方面,在 Java 代碼里利用 @PostConstruct 注解,通過(guò)執(zhí)行另一個(gè) Lua 腳本來(lái)創(chuàng)建 Redis 的 Stream 消息隊(duì)列和消費(fèi)者組,為后續(xù)處理訂單消息做好準(zhǔn)備。
2. 用戶請(qǐng)求與資格判斷
當(dāng)用戶發(fā)起秒殺請(qǐng)求后,系統(tǒng)會(huì)立即執(zhí)行之前準(zhǔn)備好的 Lua 腳本來(lái)判斷用戶是否具有秒殺資格。
- 如果腳本返回庫(kù)存不足的標(biāo)識(shí),系統(tǒng)會(huì)迅速返回 “庫(kù)存不足” 的提示信息,結(jié)束本次請(qǐng)求處理。
- 若返回用戶已下單的標(biāo)識(shí),就會(huì)返回 “不能重復(fù)下單” 的提示,流程終止。
- 當(dāng)判定用戶具有秒殺資格時(shí),系統(tǒng)會(huì)生成唯一的訂單 ID,創(chuàng)建訂單對(duì)象,然后將訂單信息發(fā)送到 Redis 的 Stream 消息隊(duì)列,進(jìn)入異步處理階段。
3. 消息隊(duì)列消費(fèi)
有一個(gè)專門的消息隊(duì)列消費(fèi)者線程會(huì)持續(xù)監(jiān)聽(tīng) Redis 的 Stream 消息隊(duì)列。
- 如果沒(méi)有獲取到新的訂單信息,線程會(huì)繼續(xù)保持監(jiān)聽(tīng)狀態(tài)。
- 一旦獲取到訂單信息,線程會(huì)馬上嘗試獲取 Redisson 分布式鎖。這個(gè)鎖非常關(guān)鍵,它能確保同一用戶同一時(shí)間只能處理一個(gè)訂單,有效避免一人多單的問(wèn)題。
4. 訂單創(chuàng)建與處理
獲取到鎖之后,系統(tǒng)會(huì)進(jìn)一步處理訂單。
- 首先判斷當(dāng)前用戶是否是第一單。如果不是,系統(tǒng)會(huì)記錄錯(cuò)誤日志并釋放鎖,結(jié)束流程。
- 若是第一單,系統(tǒng)會(huì)嘗試扣減庫(kù)存。如果庫(kù)存扣減失敗,會(huì)拋出異常并釋放鎖;若扣減成功,就將訂單信息保存到數(shù)據(jù)庫(kù)。
- 在保存訂單時(shí),若保存失敗會(huì)拋出異常并釋放鎖;保存成功后,系統(tǒng)會(huì)向 Redis 的 Stream 消息隊(duì)列發(fā)送 ACK 確認(rèn)消息,最后釋放鎖,完成整個(gè)秒殺流程。
到此這篇關(guān)于Redis消息隊(duì)列實(shí)現(xiàn)異步秒殺的文章就介紹到這了,更多相關(guān)Redis異步秒殺內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程
這篇文章主要介紹了Redis Desktop Manager(Redis可視化工具)安裝及使用圖文教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
解決Redis開(kāi)啟遠(yuǎn)程訪問(wèn)及密碼問(wèn)題
這篇文章主要介紹了Redis開(kāi)啟遠(yuǎn)程訪問(wèn)及密碼的教程,文中給大家提到了Redis啟動(dòng)報(bào)錯(cuò)解決方法,需要的朋友可以參考下2019-10-10
Redis概述及l(fā)inux安裝redis的詳細(xì)教程
這篇文章主要介紹了Redis概述及l(fā)inux安裝redis的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Redis中LRU算法和LFU算法的區(qū)別小結(jié)
在Redis中,LRU算法和LFU算法是兩種常用的緩存淘汰算法,它們可以幫助我們優(yōu)化緩存性能,本文主要介紹了Redis中LRU算法和LFU算法的區(qū)別,感興趣的可以了解一下2023-12-12

