RabbitMQ+redis+Redisson分布式鎖+seata實(shí)現(xiàn)訂單服務(wù)的流程分析
引言
訂單服務(wù)涉及許多方面,分布式事務(wù),分布式鎖,例如訂單超時(shí)未支付要取消訂單,訂單如何防止重復(fù)提交,如何防止超賣(mài)、這里都會(huì)使用到。
- 開(kāi)啟分布式事務(wù)可以保證跨多個(gè)服務(wù)的數(shù)據(jù)操作的一致性和完整性,
- 使用分布式鎖可以確保在同一時(shí)間只有一個(gè)操作能夠成功執(zhí)行,避免并發(fā)引起的問(wèn)題。
訂單流程(只展示重要的內(nèi)容,具體可以到源碼查看)
訂單確認(rèn)
public OrderConfirmVO confirmOrder(Long skuId) { Long memberId = SecurityUtils.getMemberId(); // 解決子線程無(wú)法獲取HttpServletRequest請(qǐng)求對(duì)象中數(shù)據(jù)的問(wèn)題,子線程指getOrderItemsFuture,getMemberAddressFuture,generateOrderTokenFuture //feign遠(yuǎn)程調(diào)用會(huì)被攔截提取attributes并轉(zhuǎn)發(fā) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); RequestContextHolder.setRequestAttributes(attributes, true); // 獲取訂單商品 //使用了 CompletableFuture 來(lái)實(shí)現(xiàn)異步執(zhí)行獲取訂單項(xiàng)的操作 CompletableFuture<List<OrderItemDTO>> getOrderItemsFuture = CompletableFuture.supplyAsync( () -> this.getOrderItems(skuId, memberId), threadPoolExecutor) .exceptionally(ex -> { log.error("Failed to get order items: {}", ex.toString()); return null; }); // 用戶收貨地址 CompletableFuture<List<MemberAddressDTO>> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> { Result<List<MemberAddressDTO>> getMemberAddressResult = memberFeignClient.listMemberAddresses(memberId); if (Result.isSuccess(getMemberAddressResult)) { return getMemberAddressResult.getData(); } return null; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString()); return null; }); // 生成唯一令牌,防止重復(fù)提交(原理:提交會(huì)消耗令牌,令牌被消耗無(wú)法再次提交) CompletableFuture<String> generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> { String orderToken = this.generateTradeNo(memberId); redisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken); return orderToken; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to generate order token ."); return null; }); //CompletableFuture.allOf 方法,可以等待所有 CompletableFuture 對(duì)象都完成再進(jìn)行后續(xù)操作, // 確保獲取和設(shè)置屬性的操作都能夠成功執(zhí)行。這樣可以避免程序出現(xiàn)異常, CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join(); OrderConfirmVO orderConfirmVO = new OrderConfirmVO(); orderConfirmVO.setOrderItems(getOrderItemsFuture.join()); orderConfirmVO.setAddresses(getMemberAddressFuture.join()); orderConfirmVO.setOrderToken(generateOrderTokenFuture.join()); log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO); return orderConfirmVO; }
防止訂單重復(fù)提交
通過(guò)生成唯一令牌解決,方法:
根據(jù)自定義方法generateTradeNo生成訂單號(hào),將時(shí)間戳+3位隨機(jī)數(shù)+5位id(由會(huì)員id組成,不夠5位補(bǔ)0,超過(guò)5位保留后5位)組成訂單號(hào)
private String generateTradeNo(Long memberId) { //當(dāng) memberId 的位數(shù)小于 5 位時(shí),使用 0 來(lái)填充位數(shù)不足的部分,如果 memberId 已經(jīng)是 5 位數(shù)或更長(zhǎng),則不進(jìn)行填充 String userIdFilledZero = String.format("%05d", memberId); //超出五位的保留后五位 String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5); // 在前面加上wxo(wx order)等前綴是為了人工可以快速分辨訂單號(hào)是下單還是退款、來(lái)自哪家支付機(jī)構(gòu)等 // 將時(shí)間戳+3位隨機(jī)數(shù)+五位id組成商戶訂單號(hào),規(guī)則參考自<a rel="external nofollow" >大眾點(diǎn)評(píng)</a> return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId; }
根據(jù)訂單防重提交令牌緩存鍵前綴+訂單號(hào)作為key存入redis,值為訂單號(hào)。
訂單提交的時(shí)候,會(huì)通過(guò)lua腳本驗(yàn)證redis是否含有該key,有則返回0,通過(guò)斷言阻止重復(fù)提交,下面訂單提交方法的代碼含有該部分。
擴(kuò)展:CompletableFuture 來(lái)實(shí)現(xiàn)異步執(zhí)行獲取訂單項(xiàng)的操作,可以提高響應(yīng)速度,減少阻塞時(shí)間,同時(shí)通過(guò)CompletableFuture可以更方便地處理異常,每個(gè)異步操作可以獨(dú)立地捕獲和處理異常,避免異常傳遞給上層調(diào)用者,提高了系統(tǒng)的健壯性和容錯(cuò)性。CompletableFuture對(duì)于處理大量并發(fā)請(qǐng)求的場(chǎng)景非常重要,可以提升系統(tǒng)的性能和用戶體驗(yàn)。
涉及的自定義線程池(相關(guān)解釋直接在代碼注釋上了)
/** * 自定義訂單線程池 * */ @Configuration @Slf4j public class ThreadPoolConfig { @Bean public ThreadPoolExecutor threadPoolExecutor() { int cpuCoreSize = Runtime.getRuntime().availableProcessors();//使用 Java 獲取可用的 CPU 核心數(shù) log.info("當(dāng)前CPU核心數(shù):{}", cpuCoreSize); /* * 計(jì)算密集型: 核心線程數(shù)=CPU核心 +1 √ * I/O密集型: 核心線程數(shù)=2*CPU核心 +1 */ int corePoolSize = cpuCoreSize + 1; return new ThreadPoolExecutor( //核心線程數(shù) corePoolSize, //最大線程數(shù) 2 * corePoolSize, //線程空閑(存活)時(shí)間;當(dāng)線程數(shù)超過(guò)核心線程數(shù),并且空閑時(shí)間超過(guò)指定時(shí)間時(shí),多余的線程會(huì)被銷(xiāo)毀 30, TimeUnit.SECONDS, //任務(wù)隊(duì)列(選取數(shù)組阻塞隊(duì)列)特點(diǎn):固定容量,公平性,先進(jìn)先出 new ArrayBlockingQueue<>(1000), //線程工廠 new NamedThreadFactory("order") // 訂單線程 ); } }
訂單提交
@GlobalTransactional public String submitOrder(OrderSubmitForm submitForm) { log.info("訂單提交參數(shù):{}", JSONUtil.toJsonStr(submitForm)); String orderToken = submitForm.getOrderToken(); // 1. 判斷訂單是否重復(fù)提交(LUA腳本保證獲取和刪除的原子性,成功返回1,否則返回0) //KEYS[1]指OrderConstants.ORDER_TOKEN_PREFIX + orderToken,ARGV[1]指orderToken String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //這一行代碼使用RedisTemplate的execute方法執(zhí)行Lua腳本。通過(guò)new DefaultRedisScript<>創(chuàng)建一個(gè)Redis腳本對(duì)象,并指定腳本字符串和返回結(jié)果的類(lèi)型(Long)。 // 然后使用Collections.singletonList來(lái)創(chuàng)建包含鎖鍵名和訂單令牌參數(shù)的列表,傳遞給execute方法。 //執(zhí)行 Redis 腳本后,結(jié)果會(huì)賦值給 lockAcquired 變量,它的類(lèi)型是 Long。 // lockAcquired這個(gè)值表示獲取鎖的結(jié)果,如果成功獲取并刪除了鎖,則為 1;如果獲取鎖失敗,則為 0。 Long lockAcquired = this.redisTemplate.execute( new DefaultRedisScript<>(lockAcquireScript, Long.class), Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken), orderToken ); Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "訂單重復(fù)提交,請(qǐng)刷新頁(yè)面后重試"); // 2. 訂單商品校驗(yàn) (PS:校驗(yàn)進(jìn)入訂單確認(rèn)頁(yè)面到提交過(guò)程商品(價(jià)格、上架狀態(tài))變化) List<OrderSubmitForm.OrderItem> orderItems = submitForm.getOrderItems(); List<Long> skuIds = orderItems.stream() .map(OrderSubmitForm.OrderItem::getSkuId) .collect(Collectors.toList()); List<SkuInfoDTO> skuList = skuFeignClient.getSkuInfoList(skuIds); for (OrderSubmitForm.OrderItem item : orderItems) { SkuInfoDTO skuInfo = skuList.stream().filter(sku -> sku.getId().equals(item.getSkuId())) .findFirst() .orElse(null); Assert.isTrue(skuInfo != null, "商品({})已下架或刪除"); //如果調(diào)用對(duì)象小于被比較對(duì)象,compareTo 方法返回一個(gè)負(fù)整數(shù)。 // 如果調(diào)用對(duì)象大于被比較對(duì)象,compareTo 方法返回一個(gè)正整數(shù)。 Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "商品({})價(jià)格發(fā)生變動(dòng),請(qǐng)刷新頁(yè)面", item.getSkuName()); } // 3. 校驗(yàn)庫(kù)存并鎖定庫(kù)存 List<LockedSkuDTO> lockedSkuList = orderItems.stream() .map(item -> new LockedSkuDTO(item.getSkuId(), item.getQuantity(), item.getSkuSn())) .collect(Collectors.toList()); boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList); Assert.isTrue(lockStockResult, "訂單提交失?。烘i定商品庫(kù)存失??!"); // 4. 生成訂單 boolean result = this.saveOrder(submitForm); log.info("order ({}) create result:{}", orderToken, result); return orderToken; }
前提:此處涉及seata分布式事務(wù)和Redisson實(shí)現(xiàn)的分布式鎖
lua腳本
KEYS[1] 指OrderConstants.ORDER_TOKEN_PREFIX + orderToken
ARGV[1] 指orderToken
判斷是否含有這個(gè)鎖key,有就刪除這個(gè)鎖并返回1L,沒(méi)有就返回0
通過(guò)斷言判斷l(xiāng)ockAcquired是否成功獲取并刪除了鎖,不成功會(huì)報(bào)錯(cuò)
校驗(yàn)商品
- 根據(jù)傳入的表單對(duì)象拿到訂單商品集合,然后通過(guò)stream拿到訂單商品id集合
- 通過(guò)Feign遠(yuǎn)程調(diào)用得到對(duì)應(yīng)訂單id集合的庫(kù)存商品集合
- 遍歷訂單商品集合,通過(guò)stream過(guò)濾拿到id對(duì)應(yīng)的商品,判斷商品是否為null,
以及是否跟庫(kù)存商品價(jià)格不一致 - 訂單商品集合通過(guò)stream返回用于鎖定庫(kù)存的對(duì)象集合,傳入對(duì)象的有
庫(kù)存商品id(Skuid),訂單商品數(shù)量(Quantity),訂單商品編號(hào)(skuSn)
3. 鎖定庫(kù)存方法
@Transactional public boolean lockStock(String orderToken, List<LockedSkuDTO> lockedSkuList) { Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "訂單({})未包含任何商品", orderToken); // 校驗(yàn)庫(kù)存數(shù)量是否足夠以及鎖定庫(kù)存 for (LockedSkuDTO lockedSku : lockedSkuList) { Long skuId = lockedSku.getSkuId(); //商品分布式鎖緩存鍵前綴:ProductConstants.SKU_LOCK_PREFIX //每次getLock都會(huì)返回一個(gè)獨(dú)立的分布式鎖對(duì)象,但它們共享一個(gè)鎖資源。 RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId); // 構(gòu)建商品鎖對(duì)象 try { //共享一個(gè)鎖資源意味著多個(gè)分布式鎖對(duì)象共同使用同一個(gè)鎖來(lái)實(shí)現(xiàn)互斥訪問(wèn)。 //雖然每個(gè)分布式鎖對(duì)象是獨(dú)立創(chuàng)建的,但它們會(huì)使用相同的鎖資源來(lái)進(jìn)行加鎖和釋放鎖的操作。 lock.lock();//鎖定操作 Integer quantity = lockedSku.getQuantity(); // 訂單的商品數(shù)量 // 庫(kù)存足夠 boolean lockResult = this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock + " + quantity) // 修改鎖定商品數(shù) .eq(PmsSku::getId, lockedSku.getSkuId()) //通過(guò) apply 方法添加動(dòng)態(tài) SQL 條件,確保 stock 減去 locked_stock 的值大于等于給定的 quantity。 //使用了占位符 {0} 來(lái)引用 quantity,0表示第一個(gè)傳入的值。 .apply("stock - locked_stock >= {0}", quantity) // 剩余商品數(shù) ≥ 訂單商品數(shù) ); Assert.isTrue(lockResult, "商品({})庫(kù)存不足", lockedSku.getSkuSn()); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 鎖定的商品緩存至 Redis (后續(xù)使用:1.取消訂單解鎖庫(kù)存;2:支付訂單扣減庫(kù)存) redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList); return true; }
- 斷言判斷鎖定庫(kù)存對(duì)象集合是否為null
- 遍歷鎖定庫(kù)存對(duì)象集合,拿到庫(kù)存商品id,通過(guò)商品分布式鎖緩存鍵前綴+庫(kù)存商品id創(chuàng)建 分布式鎖,然后進(jìn)行加鎖操作(具體細(xì)節(jié)看代碼注釋就明白了)通過(guò)更新語(yǔ)句來(lái)更新鎖 定商品數(shù)以及判斷剩余商品數(shù)是否大于訂單商品數(shù)。
- try語(yǔ)句最后通過(guò)判斷分布式鎖是否被鎖定,是就釋放鎖。
- 將商品分布式鎖緩存鍵前綴+訂單號(hào)(orderToken)作為鍵,鎖定庫(kù)存集合作為值
存入redis,以便后續(xù)取消訂單時(shí)解鎖庫(kù)存和支付訂單時(shí)扣減庫(kù)存。
擴(kuò)展:解鎖庫(kù)存
/** *解鎖庫(kù)存 *<P> *訂單超時(shí)未支付,釋放鎖定的商品庫(kù)存 */ public boolean unlockStock(String orderSn) { //鎖定庫(kù)存對(duì)象集合:lockedSkus List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("釋放訂單({})鎖定的商品庫(kù)存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); // 庫(kù)存已釋放 if (CollectionUtil.isEmpty(lockedSkus)) { return true; } // 遍歷恢復(fù)鎖定的商品庫(kù)存 for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 獲取商品分布式鎖 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { //判斷當(dāng)前分布式鎖是否已被鎖定,通過(guò)這個(gè)判斷可以防止非法釋放鎖等潛在問(wèn)題 if (lock.isLocked()) { lock.unlock(); } } } // 移除 redis 訂單鎖定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
- 通過(guò)redis獲取鎖定庫(kù)存對(duì)象集合,判斷集合是否為空,空則表示庫(kù)存已釋放,直接返回true
- 不為空就進(jìn)行遍歷鎖定庫(kù)存對(duì)象集合(lockedSkus),獲取分布式鎖對(duì)象,進(jìn)行加鎖操作,然后 執(zhí)行更新語(yǔ)句,扣減鎖定商品數(shù)(即恢復(fù)原來(lái)的鎖定商品數(shù)),最后釋放鎖
- 釋放庫(kù)存完后就可以移除redis訂單鎖定的商品,這樣再執(zhí)行釋放庫(kù)存的時(shí)候就直接返回true
擴(kuò)展:扣減庫(kù)存
/** * 扣減庫(kù)存 * <p> * 訂單支付扣減商品庫(kù)存和釋放鎖定庫(kù)存 * * @param orderSn 訂單編號(hào) * @return ture/false */ public boolean deductStock(String orderSn) { // 獲取訂單提交時(shí)鎖定的商品 List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("訂單({})支付成功,扣減訂單商品庫(kù)存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "扣減商品庫(kù)存失?。河唵?{})未包含商品"); for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 獲取商品分布式鎖 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("stock = stock - " + lockedSku.getQuantity()) .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 移除訂單鎖定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
原理跟釋放庫(kù)存基本一致,更新語(yǔ)句發(fā)生變化
更新語(yǔ)句:將商品庫(kù)存數(shù)進(jìn)行扣減,扣減數(shù)量為訂單商品數(shù),然后鎖定商品數(shù)也進(jìn)行扣 減,扣減數(shù)量為訂單商品數(shù)
4. 生成訂單
通過(guò)saveOrder創(chuàng)建訂單,返回result,打印日志。
返回訂單號(hào)
創(chuàng)建訂單
/** * 創(chuàng)建訂單 * * @param submitForm 訂單提交表單對(duì)象 * @return */ private boolean saveOrder(OrderSubmitForm submitForm) { //創(chuàng)建訂單詳情表(OmsOrder)對(duì)象 OmsOrder order = orderConverter.form2Entity(submitForm); //設(shè)置待支付狀態(tài) order.setStatus(OrderStatusEnum.UNPAID.getValue()); //設(shè)置訂單會(huì)員id order.setMemberId(SecurityUtils.getMemberId()); //設(shè)置訂單來(lái)源(0代表PC訂單,1代表APP訂單) order.setSource(submitForm.getOrderSource().getValue()); //保存到數(shù)據(jù)庫(kù) boolean result = this.save(order); Long orderId = order.getId(); if (result) { // 保存訂單明細(xì) List<OmsOrderItem> orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems()); orderItemEntities.forEach(item -> item.setOrderId(orderId)); orderItemService.saveBatch(orderItemEntities); // 訂單超時(shí)未支付取消 //這行代碼使用 RabbitMQ 的 Java 客戶端庫(kù)來(lái)發(fā)送一條消息到 order.exchange 交換器, // 該消息會(huì)被路由到 order.close.delay 隊(duì)列中。消息的內(nèi)容是 submitForm.getOrderToken() 方法的返回結(jié)果。 rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken()); } return result; }
普通的保存到數(shù)據(jù)的操作就不做解釋了,主要看訂單超時(shí)未支付取消的功能。
訂單超時(shí)未支付取消功能通過(guò)rabbitMQ實(shí)現(xiàn)
訂單超時(shí)關(guān)單延時(shí)隊(duì)列
@Component @Slf4j public class OrderRabbitConfig { // 普通延遲隊(duì)列 private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue"; private static final String ORDER_EXCHANGE = "order.exchange"; private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay"; // 死信關(guān)單隊(duì)列 private static final String ORDER_ClOSE_QUEUE = "order.close.queue"; private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange"; private static final String ORDER_ClOSE_ROUTING_KEY = "order.close"; /** * 定義交換機(jī) */ @Bean public Exchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } /** * 死信交換機(jī) */ @Bean public Exchange orderDlxExchange() { return new DirectExchange(ORDER_DLX_EXCHANGE, true, false); } /** * 延時(shí)隊(duì)列 */ @Bean public Queue orderDelayQueue() { // 延時(shí)隊(duì)列的消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),根據(jù)routingKey發(fā)送到指定的exchange中,exchange路由到死信隊(duì)列 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE); args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key args.put("x-message-ttl", 10 * 1000L); // 單位毫秒,10s用于測(cè)試 return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args); } /** * 延時(shí)隊(duì)列綁定交換機(jī) */ @Bean public Binding orderDelayQueueBinding() { return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE, ORDER_CLOSE_DELAY_ROUTING_KEY, null); } /** * 關(guān)單隊(duì)列 */ @Bean public Queue orderCloseQueue() { log.info("死信隊(duì)列(order.close.queue)創(chuàng)建"); return new Queue(ORDER_ClOSE_QUEUE, true, false, false); } /** * 關(guān)單隊(duì)列綁定死信交換機(jī) */ @Bean public Binding orderCloseQueueBinding() { return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE, ORDER_ClOSE_ROUTING_KEY, null); } }
- 綁定死信交換機(jī)后,超過(guò)10s就會(huì)路由到死信隊(duì)列(order.close.queue)中。
訂單超時(shí)未支付系統(tǒng)自動(dòng)取消監(jiān)聽(tīng)器
/** * 訂單超時(shí)未支付系統(tǒng)自動(dòng)取消監(jiān)聽(tīng)器 * */ @Component @RequiredArgsConstructor @Slf4j public class OrderCloseListener { private final OrderService orderService; private final RabbitTemplate rabbitTemplate; @RabbitListener(queues = "order.close.queue") public void closeOrder(String orderSn, Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序號(hào)(消息隊(duì)列中的位置) log.info("訂單({})超時(shí)未支付,系統(tǒng)自動(dòng)關(guān)閉訂單", orderSn); try { boolean closeOrderResult = orderService.closeOrder(orderSn); log.info("關(guān)單結(jié)果:{}", closeOrderResult); if (closeOrderResult) { // 關(guān)單成功:釋放庫(kù)存 //發(fā)送訂單號(hào) rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn); } else { // 關(guān)單失?。河唵我驯魂P(guān)閉,手動(dòng)ACK確認(rèn)并從隊(duì)列移除消息 channel.basicAck(deliveryTag, false); // false: 不批量確認(rèn),僅確認(rèn)當(dāng)前單個(gè)消息 } } catch (Exception e) { // 關(guān)單異常:拒絕消息并重新入隊(duì) try { channel.basicReject(deliveryTag, true); // true: 重新放回隊(duì)列 // channel.basicReject(deliveryTag, false); // false: 直接丟棄消息 (TODO 定時(shí)任務(wù)補(bǔ)償) } catch (IOException ex) { log.error("訂單({})關(guān)閉失敗,原因:{}", orderSn, ex.getMessage()); } } } }
監(jiān)聽(tīng)死信關(guān)單隊(duì)列,拿到消息序號(hào)deliveryTag(便于后續(xù)手動(dòng)ACK和重新入隊(duì)的操作)
進(jìn)行關(guān)單操作orderService.closeOrder,主要修改訂單詳情表的狀態(tài)從待支付改為已關(guān)閉。
public boolean closeOrder(String orderSn) { return this.update(new LambdaUpdateWrapper<OmsOrder>() .eq(OmsOrder::getOrderSn, orderSn) .eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue()) .set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue()) ); }
判斷關(guān)單是否成功,成功就釋放庫(kù)存,發(fā)送訂單號(hào)給釋放庫(kù)存的交換機(jī),路由鍵為stock.unlock。
/** *庫(kù)存釋放監(jiān)聽(tīng)器 */ @Component @Slf4j @RequiredArgsConstructor public class StockReleaseListener { private final SkuService skuService; private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue"; private static final String STOCK_EXCHANGE = "stock.exchange"; private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = STOCK_UNLOCK_QUEUE,durable = "true"), exchange = @Exchange(value = STOCK_EXCHANGE), key = {STOCK_UNLOCK_ROUTING_KEY} ), ackMode = "MANUAL")//手動(dòng)ACK @RabbitHandler public void UnlockStock(String orderSn, Message message, Channel channel){ log.info("訂單{}取消釋放庫(kù)存",orderSn); long deliverTag=message.getMessageProperties().getDeliveryTag(); try{ skuService.unlockStock(orderSn); channel.basicAck(deliverTag,false); }catch (Exception e){ try { channel.basicAck(deliverTag,true); }catch (IOException ex){ log.error("訂單{}關(guān)閉失敗,原因:{}",orderSn,ex.getMessage()); } } } }
如果收到釋放庫(kù)存的消息,就會(huì)執(zhí)行釋放庫(kù)存的方法unlockStock,上面鎖定庫(kù)存方法那節(jié)已解釋。
訂單支付
@GlobalTransactional public <T> T payOrder(OrderPaymentForm paymentForm) { String orderSn = paymentForm.getOrderSn(); OmsOrder order = this.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSn)); Assert.isTrue(order != null, "訂單不存在"); Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "訂單不可支付,請(qǐng)檢查訂單狀態(tài)"); RLock lock = redissonClient.getLock(OrderConstants.ORDER_LOCK_PREFIX + order.getOrderSn()); try { lock.lock(); T result; switch (paymentForm.getPaymentMethod()) { case WX_JSAPI: result = (T) wxJsapiPay(paymentForm.getAppId(), order.getOrderSn(), order.getPaymentAmount()); break; default: result = (T) balancePay(order); break; } return result; } finally { //釋放鎖 if (lock.isLocked()) { lock.unlock(); } } }
代碼摘自
到此這篇關(guān)于RabbitMQ+redis+Redisson分布式鎖+seata實(shí)現(xiàn)訂單服務(wù)的文章就介紹到這了,更多相關(guān)Redisson分布式鎖訂單服務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis 設(shè)置密碼無(wú)效問(wèn)題解決
本文主要介紹了Redis 設(shè)置密碼無(wú)效問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02高并發(fā)場(chǎng)景分析之redis+lua防重校驗(yàn)
這篇文章主要介紹了高并發(fā)場(chǎng)景分析之redis+lua防重校驗(yàn),本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-07-07RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn)
本文主要介紹了RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Redis數(shù)據(jù)庫(kù)分布式設(shè)計(jì)方案介紹
大家好,本篇文章主要講的是Redis數(shù)據(jù)庫(kù)分布式設(shè)計(jì)方案介紹,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下2022-01-01redis?bitmap數(shù)據(jù)結(jié)構(gòu)之java對(duì)等操作詳解
bitmap是以其高性能出名。其基本原理是一位存儲(chǔ)一個(gè)標(biāo)識(shí),其他衍生知道咱就不說(shuō)了,而redis就是以這種原生格式存儲(chǔ)的,這篇文章主要介紹了redis?bitmap數(shù)據(jù)結(jié)構(gòu)之java對(duì)等操作,需要的朋友可以參考下2022-10-10淺析PHP分布式中Redis實(shí)現(xiàn)Session的方法
這篇文章主要介紹了PHP分布式中Redis實(shí)現(xiàn)Session的方法,文中詳細(xì)介紹了兩種方法的使用方法,并給出了測(cè)試的示例代碼,有需要的朋友可以參考借鑒,下面來(lái)一起看看吧,2016-12-12window手動(dòng)操作清理redis緩存的技巧總結(jié)
在本篇文章中小編給大家分享了關(guān)于window環(huán)境手動(dòng)操作清理redis緩存的方法和技巧,有興趣的朋友們可以跟著學(xué)習(xí)下。2019-07-07