Redis實戰(zhàn)之Redis實現異步秒殺優(yōu)化詳解
秒殺優(yōu)化-異步秒殺思路
未優(yōu)化的思路
當用戶發(fā)起請求,此時會請求nginx,nginx會訪問到tomcat,而tomcat中的程序,會進行串行操作,分成如下幾個步驟
1、查詢優(yōu)惠卷
2、判斷秒殺庫存是否足夠
3、查詢訂單
4、校驗是否是一人一單
5、扣減庫存
6、創(chuàng)建訂單
在這六步操作中,又有很多操作是要去操作數據庫的,而且還是一個線程串行執(zhí)行, 這樣就會導致我們的程序執(zhí)行的很慢
優(yōu)化方案
我們將耗時比較短的邏輯判斷放入到redis中,比如是否庫存足夠,比如是否一人一單,這樣的操作,只要這種邏輯可以完成,就意味著我們是一定可以下單完成的,我們只需要進行快速的邏輯判斷,根本就不用等下單邏輯走完,我們直接給用戶返回成功, 再在后臺開一個線程,后臺線程慢慢的去執(zhí)行queue里邊的消息,即不追求時效性,讓用戶先成功下單,后續(xù)再完善數據庫數據
整體思路
用戶下單之后,判斷庫存是否充足只需要到redis中去根據key找對應的value是否大于0即可,如果不充足,則直接結束,如果充足,繼續(xù)在redis中判斷用戶是否可以下單,如果set集合中沒有這條數據,說明他可以下單,如果set集合中沒有這條記錄,則將userId和優(yōu)惠卷存入到redis中,并且返回0,整個過程需要保證是原子性的,我們可以使用lua來操作
當以上判斷邏輯走完之后,我們可以判斷當前redis中返回的結果是否是0 ,如果是0,則表示可以下單,則將之前說的信息存入到到queue中去,然后返回,然后再來個線程異步的下單,前端可以通過返回的訂單id來判斷是否下單成功。
難點
- 怎么在redis中去快速校驗一人一單,還有庫存判斷
- 由于我們校驗和tomct下單是兩個線程,那么我們如何知道到底哪個單他最后是否成功,或者是下單完成,為了完成這件事我們在redis操作完之后,我們會將一些信息返回給前端,同時也會把這些信息丟到異步queue中去,后續(xù)操作中,可以通過這個id來查詢我們tomcat中的下單邏輯是否完成了。
代碼實現
需求:
- 新增秒殺優(yōu)惠券的同時,將優(yōu)惠券信息,優(yōu)惠券id和庫存信息保存到Redis中
- 基于Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶購成功
- 如果搶購成功,將優(yōu)惠券id和用戶id封裝后存入阻塞隊列
- 開啟線程任務,不斷從阻塞隊列中獲取信息,實現異步下單功能
新增優(yōu)惠券,將優(yōu)惠券信息入庫并寫入redis
@Override @Transactional public void addSeckillVoucher(Voucher voucher) { // 保存優(yōu)惠券 save(voucher); // 保存秒殺信息 SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); //存入redis stringRedisTemplate.opsForValue().setIfAbsent(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
判斷秒殺庫存、一人一單,決定用戶是否搶購成功,考慮到操作的原子性,采用lua腳本完成這一連串的操作
--- --- Generated by EmmyLua(https://github.com/EmmyLua) --- Created by Lenovo. --- DateTime: 2023/9/5 20:57 --- -- 1.參數列表 -- 1.1.優(yōu)惠券id local voucherId = ARGV[1] -- 1.2.用戶id local userId = ARGV[2] ---- 1.3.訂單id local orderId = ARGV[3] -- 2.數據key -- 2.1.庫存key local stockKey = 'seckill:stock:' .. voucherId ---- 2.2.訂單key local orderKey = 'seckill:order:' .. voucherId -- 3.腳本業(yè)務 -- 3.1.判斷庫存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.庫存不足,返回1 return 1 end -- 3.2.判斷用戶是否下單 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,說明是重復下單,返回2 return 2 end -- 3.4.扣庫存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下單(保存用戶)sadd orderKey userId redis.call('sadd', orderKey, userId) ---- 3.6.發(fā)送消息到隊列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
執(zhí)行l(wèi)ua腳本,判斷是否搶購成功,如果搶購成功,要放入堵塞隊列中
@Override public Result seckillVoucher(Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); //判斷是否開始,開始時間如果在當前時間之后就是尚未開始 if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒殺尚未開始"); } //判斷是否結束,結束時間如果在當前時間之前就是已經結束 if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒殺已經結束"); } Long userId = UserHolder.getUser().getId(); long orderId = new RedisIdWorker(stringRedisTemplate).nextId("order"); Long execute = stringRedisTemplate.execute(SILLL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = execute.intValue(); if (r != 0) { return Result.fail(r == 1 ? "庫存不足" : "不能重復下單"); } VoucherOrder voucherOrder = new VoucherOrder(); //訂單id voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); //將訂單信息放入阻塞隊列 orderTakes.add(voucherOrder); return Result.ok(orderId); }
定義線程內部類,不斷從堵塞隊列中讀取訂單
//從阻塞隊列里面取訂單信息 private class voucherOrderHander implements Runnable { @Override public void run() { while (true) { try { VoucherOrder take = orderTakes.take(); handleVoucherOrder(take); } catch (Exception e) { log.error("異常信息如下", e); } } }
獲取訂單信息的具體方法,這里依然加了分布式鎖,是為了保險起見
private void handleVoucherOrder(VoucherOrder take) { Long userId = take.getId(); //創(chuàng)建鎖對象 RLock lock = redissonClient.getLock("lock:order:" + userId); //嘗試獲取鎖 boolean isLock = lock.tryLock(); //獲取鎖失敗 if (!isLock) { log.error("不允許重復下單"); return; } try { voucherOrderService.createVoucherOrder(take); } finally { //釋放鎖 lock.unlock(); } } }
這里又有一個問題,就是我們訂單信息入庫應該是在該類對象被創(chuàng)建的時候就要開啟線程在堵塞隊列等待讀取是否有訂單信息,然后順利入庫,所以我們用了aop的@PostConstruct,保證該對象被創(chuàng)建時,線程也能順利創(chuàng)建,這里用了線程池來提交線程任務
@PostConstruct public void init() { SECKILL_ORDER_EXECUTOR.execute(new voucherOrderHander()); }
完整代碼實現
@Service public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService { @Autowired private ISeckillVoucherService seckillVoucherService; @Autowired private RedisIdWorker redisIdWorker; @Autowired private IVoucherOrderService voucherOrderService; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SILLL_SCRIPT; BlockingQueue<VoucherOrder> orderTakes = new ArrayBlockingQueue<>(1024 * 1024); //異步處理線程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); static { SILLL_SCRIPT = new DefaultRedisScript<>(); SILLL_SCRIPT.setLocation(new ClassPathResource("skill.lua")); SILLL_SCRIPT.setResultType(Long.class); } @PostConstruct public void init() { SECKILL_ORDER_EXECUTOR.execute(new voucherOrderHander()); } //從阻塞隊列里面取用戶信息 private class voucherOrderHander implements Runnable { @Override public void run() { while (true) { try { VoucherOrder take = orderTakes.take(); handleVoucherOrder(take); } catch (Exception e) { log.error("異常信息如下", e); } } } private void handleVoucherOrder(VoucherOrder take) { Long userId = take.getId(); //創(chuàng)建鎖對象 RLock lock = redissonClient.getLock("lock:order:" + userId); //嘗試獲取鎖 boolean isLock = lock.tryLock(); //獲取鎖失敗 if (!isLock) { log.error("不允許重復下單"); return; } try { voucherOrderService.createVoucherOrder(take); } finally { //釋放鎖 lock.unlock(); } } } @Override public Result seckillVoucher(Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); //判斷是否開始,開始時間如果在當前時間之后就是尚未開始 if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒殺尚未開始"); } //判斷是否結束,結束時間如果在當前時間之前就是已經結束 if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒殺已經結束"); } Long userId = UserHolder.getUser().getId(); long orderId = new RedisIdWorker(stringRedisTemplate).nextId("order"); Long execute = stringRedisTemplate.execute(SILLL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = execute.intValue(); if (r != 0) { return Result.fail(r == 1 ? "庫存不足" : "不能重復下單"); } VoucherOrder voucherOrder = new VoucherOrder(); //訂單id voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); voucherOrder.setId(orderId); //將訂單信息放入阻塞隊列 orderTakes.add(voucherOrder); return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查詢訂單 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(); // 5.2.判斷是否存在 if (count > 0) { // 用戶已經購買過了 log.error("用戶已經購買過了"); return; } // 6.扣減庫存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣減失敗 log.error("庫存不足"); return; } save(voucherOrder); }
以上就是Redis實戰(zhàn)之Redis實現異步秒殺優(yōu)化詳解的詳細內容,更多關于Redis實現異步秒殺優(yōu)化的資料請關注腳本之家其它相關文章!
相關文章
redis?for?windows?6.2.6安裝包最新步驟詳解
這篇文章主要介紹了redis?for?windows?6.2.6安裝包全網首發(fā),使用Windows計劃任務自動運行redis服務,文章給大家講解的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-04-04