RabbitMQ+redis+Redisson分布式鎖+seata實(shí)現(xiàn)訂單服務(wù)的流程分析
引言
訂單服務(wù)涉及許多方面,分布式事務(wù),分布式鎖,例如訂單超時未支付要取消訂單,訂單如何防止重復(fù)提交,如何防止超賣、這里都會使用到。
- 開啟分布式事務(wù)可以保證跨多個服務(wù)的數(shù)據(jù)操作的一致性和完整性,
- 使用分布式鎖可以確保在同一時間只有一個操作能夠成功執(zhí)行,避免并發(fā)引起的問題。
訂單流程(只展示重要的內(nèi)容,具體可以到源碼查看)
訂單確認(rèn)
public OrderConfirmVO confirmOrder(Long skuId) { Long memberId = SecurityUtils.getMemberId(); // 解決子線程無法獲取HttpServletRequest請求對象中數(shù)據(jù)的問題,子線程指getOrderItemsFuture,getMemberAddressFuture,generateOrderTokenFuture //feign遠(yuǎn)程調(diào)用會被攔截提取attributes并轉(zhuǎn)發(fā) RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); RequestContextHolder.setRequestAttributes(attributes, true); // 獲取訂單商品 //使用了 CompletableFuture 來實(shí)現(xiàn)異步執(zhí)行獲取訂單項(xiàng)的操作 CompletableFuture<List<OrderItemDTO>> getOrderItemsFuture = CompletableFuture.supplyAsync( () -> this.getOrderItems(skuId, memberId), threadPoolExecutor) .exceptionally(ex -> { log.error("Failed to get order items: {}", ex.toString()); return null; }); // 用戶收貨地址 CompletableFuture<List<MemberAddressDTO>> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> { Result<List<MemberAddressDTO>> getMemberAddressResult = memberFeignClient.listMemberAddresses(memberId); if (Result.isSuccess(getMemberAddressResult)) { return getMemberAddressResult.getData(); } return null; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString()); return null; }); // 生成唯一令牌,防止重復(fù)提交(原理:提交會消耗令牌,令牌被消耗無法再次提交) CompletableFuture<String> generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> { String orderToken = this.generateTradeNo(memberId); redisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken); return orderToken; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to generate order token ."); return null; }); //CompletableFuture.allOf 方法,可以等待所有 CompletableFuture 對象都完成再進(jìn)行后續(xù)操作, // 確保獲取和設(shè)置屬性的操作都能夠成功執(zhí)行。這樣可以避免程序出現(xiàn)異常, CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join(); OrderConfirmVO orderConfirmVO = new OrderConfirmVO(); orderConfirmVO.setOrderItems(getOrderItemsFuture.join()); orderConfirmVO.setAddresses(getMemberAddressFuture.join()); orderConfirmVO.setOrderToken(generateOrderTokenFuture.join()); log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO); return orderConfirmVO; }
防止訂單重復(fù)提交
通過生成唯一令牌解決,方法:
根據(jù)自定義方法generateTradeNo生成訂單號,將時間戳+3位隨機(jī)數(shù)+5位id(由會員id組成,不夠5位補(bǔ)0,超過5位保留后5位)組成訂單號
private String generateTradeNo(Long memberId) { //當(dāng) memberId 的位數(shù)小于 5 位時,使用 0 來填充位數(shù)不足的部分,如果 memberId 已經(jīng)是 5 位數(shù)或更長,則不進(jìn)行填充 String userIdFilledZero = String.format("%05d", memberId); //超出五位的保留后五位 String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5); // 在前面加上wxo(wx order)等前綴是為了人工可以快速分辨訂單號是下單還是退款、來自哪家支付機(jī)構(gòu)等 // 將時間戳+3位隨機(jī)數(shù)+五位id組成商戶訂單號,規(guī)則參考自<a rel="external nofollow" >大眾點(diǎn)評</a> return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId; }
根據(jù)訂單防重提交令牌緩存鍵前綴+訂單號作為key存入redis,值為訂單號。
訂單提交的時候,會通過lua腳本驗(yàn)證redis是否含有該key,有則返回0,通過斷言阻止重復(fù)提交,下面訂單提交方法的代碼含有該部分。
擴(kuò)展:CompletableFuture 來實(shí)現(xiàn)異步執(zhí)行獲取訂單項(xiàng)的操作,可以提高響應(yīng)速度,減少阻塞時間,同時通過CompletableFuture可以更方便地處理異常,每個異步操作可以獨(dú)立地捕獲和處理異常,避免異常傳遞給上層調(diào)用者,提高了系統(tǒng)的健壯性和容錯性。CompletableFuture對于處理大量并發(fā)請求的場景非常重要,可以提升系統(tǒng)的性能和用戶體驗(yàn)。
涉及的自定義線程池(相關(guān)解釋直接在代碼注釋上了)
/** * 自定義訂單線程池 * */ @Configuration @Slf4j public class ThreadPoolConfig { @Bean public ThreadPoolExecutor threadPoolExecutor() { int cpuCoreSize = Runtime.getRuntime().availableProcessors();//使用 Java 獲取可用的 CPU 核心數(shù) log.info("當(dāng)前CPU核心數(shù):{}", cpuCoreSize); /* * 計(jì)算密集型: 核心線程數(shù)=CPU核心 +1 √ * I/O密集型: 核心線程數(shù)=2*CPU核心 +1 */ int corePoolSize = cpuCoreSize + 1; return new ThreadPoolExecutor( //核心線程數(shù) corePoolSize, //最大線程數(shù) 2 * corePoolSize, //線程空閑(存活)時間;當(dāng)線程數(shù)超過核心線程數(shù),并且空閑時間超過指定時間時,多余的線程會被銷毀 30, TimeUnit.SECONDS, //任務(wù)隊(duì)列(選取數(shù)組阻塞隊(duì)列)特點(diǎn):固定容量,公平性,先進(jìn)先出 new ArrayBlockingQueue<>(1000), //線程工廠 new NamedThreadFactory("order") // 訂單線程 ); } }
訂單提交
@GlobalTransactional public String submitOrder(OrderSubmitForm submitForm) { log.info("訂單提交參數(shù):{}", JSONUtil.toJsonStr(submitForm)); String orderToken = submitForm.getOrderToken(); // 1. 判斷訂單是否重復(fù)提交(LUA腳本保證獲取和刪除的原子性,成功返回1,否則返回0) //KEYS[1]指OrderConstants.ORDER_TOKEN_PREFIX + orderToken,ARGV[1]指orderToken String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //這一行代碼使用RedisTemplate的execute方法執(zhí)行Lua腳本。通過new DefaultRedisScript<>創(chuàng)建一個Redis腳本對象,并指定腳本字符串和返回結(jié)果的類型(Long)。 // 然后使用Collections.singletonList來創(chuàng)建包含鎖鍵名和訂單令牌參數(shù)的列表,傳遞給execute方法。 //執(zhí)行 Redis 腳本后,結(jié)果會賦值給 lockAcquired 變量,它的類型是 Long。 // lockAcquired這個值表示獲取鎖的結(jié)果,如果成功獲取并刪除了鎖,則為 1;如果獲取鎖失敗,則為 0。 Long lockAcquired = this.redisTemplate.execute( new DefaultRedisScript<>(lockAcquireScript, Long.class), Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken), orderToken ); Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "訂單重復(fù)提交,請刷新頁面后重試"); // 2. 訂單商品校驗(yàn) (PS:校驗(yàn)進(jìn)入訂單確認(rèn)頁面到提交過程商品(價格、上架狀態(tài))變化) List<OrderSubmitForm.OrderItem> orderItems = submitForm.getOrderItems(); List<Long> skuIds = orderItems.stream() .map(OrderSubmitForm.OrderItem::getSkuId) .collect(Collectors.toList()); List<SkuInfoDTO> skuList = skuFeignClient.getSkuInfoList(skuIds); for (OrderSubmitForm.OrderItem item : orderItems) { SkuInfoDTO skuInfo = skuList.stream().filter(sku -> sku.getId().equals(item.getSkuId())) .findFirst() .orElse(null); Assert.isTrue(skuInfo != null, "商品({})已下架或刪除"); //如果調(diào)用對象小于被比較對象,compareTo 方法返回一個負(fù)整數(shù)。 // 如果調(diào)用對象大于被比較對象,compareTo 方法返回一個正整數(shù)。 Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "商品({})價格發(fā)生變動,請刷新頁面", item.getSkuName()); } // 3. 校驗(yàn)庫存并鎖定庫存 List<LockedSkuDTO> lockedSkuList = orderItems.stream() .map(item -> new LockedSkuDTO(item.getSkuId(), item.getQuantity(), item.getSkuSn())) .collect(Collectors.toList()); boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList); Assert.isTrue(lockStockResult, "訂單提交失?。烘i定商品庫存失敗!"); // 4. 生成訂單 boolean result = this.saveOrder(submitForm); log.info("order ({}) create result:{}", orderToken, result); return orderToken; }
前提:此處涉及seata分布式事務(wù)和Redisson實(shí)現(xiàn)的分布式鎖
lua腳本
KEYS[1] 指OrderConstants.ORDER_TOKEN_PREFIX + orderToken
ARGV[1] 指orderToken
判斷是否含有這個鎖key,有就刪除這個鎖并返回1L,沒有就返回0
通過斷言判斷l(xiāng)ockAcquired是否成功獲取并刪除了鎖,不成功會報(bào)錯
校驗(yàn)商品
- 根據(jù)傳入的表單對象拿到訂單商品集合,然后通過stream拿到訂單商品id集合
- 通過Feign遠(yuǎn)程調(diào)用得到對應(yīng)訂單id集合的庫存商品集合
- 遍歷訂單商品集合,通過stream過濾拿到id對應(yīng)的商品,判斷商品是否為null,
以及是否跟庫存商品價格不一致 - 訂單商品集合通過stream返回用于鎖定庫存的對象集合,傳入對象的有
庫存商品id(Skuid),訂單商品數(shù)量(Quantity),訂單商品編號(skuSn)
3. 鎖定庫存方法
@Transactional public boolean lockStock(String orderToken, List<LockedSkuDTO> lockedSkuList) { Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "訂單({})未包含任何商品", orderToken); // 校驗(yàn)庫存數(shù)量是否足夠以及鎖定庫存 for (LockedSkuDTO lockedSku : lockedSkuList) { Long skuId = lockedSku.getSkuId(); //商品分布式鎖緩存鍵前綴:ProductConstants.SKU_LOCK_PREFIX //每次getLock都會返回一個獨(dú)立的分布式鎖對象,但它們共享一個鎖資源。 RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId); // 構(gòu)建商品鎖對象 try { //共享一個鎖資源意味著多個分布式鎖對象共同使用同一個鎖來實(shí)現(xiàn)互斥訪問。 //雖然每個分布式鎖對象是獨(dú)立創(chuàng)建的,但它們會使用相同的鎖資源來進(jìn)行加鎖和釋放鎖的操作。 lock.lock();//鎖定操作 Integer quantity = lockedSku.getQuantity(); // 訂單的商品數(shù)量 // 庫存足夠 boolean lockResult = this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock + " + quantity) // 修改鎖定商品數(shù) .eq(PmsSku::getId, lockedSku.getSkuId()) //通過 apply 方法添加動態(tài) SQL 條件,確保 stock 減去 locked_stock 的值大于等于給定的 quantity。 //使用了占位符 {0} 來引用 quantity,0表示第一個傳入的值。 .apply("stock - locked_stock >= {0}", quantity) // 剩余商品數(shù) ≥ 訂單商品數(shù) ); Assert.isTrue(lockResult, "商品({})庫存不足", lockedSku.getSkuSn()); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 鎖定的商品緩存至 Redis (后續(xù)使用:1.取消訂單解鎖庫存;2:支付訂單扣減庫存) redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList); return true; }
- 斷言判斷鎖定庫存對象集合是否為null
- 遍歷鎖定庫存對象集合,拿到庫存商品id,通過商品分布式鎖緩存鍵前綴+庫存商品id創(chuàng)建 分布式鎖,然后進(jìn)行加鎖操作(具體細(xì)節(jié)看代碼注釋就明白了)通過更新語句來更新鎖 定商品數(shù)以及判斷剩余商品數(shù)是否大于訂單商品數(shù)。
- try語句最后通過判斷分布式鎖是否被鎖定,是就釋放鎖。
- 將商品分布式鎖緩存鍵前綴+訂單號(orderToken)作為鍵,鎖定庫存集合作為值
存入redis,以便后續(xù)取消訂單時解鎖庫存和支付訂單時扣減庫存。
擴(kuò)展:解鎖庫存
/** *解鎖庫存 *<P> *訂單超時未支付,釋放鎖定的商品庫存 */ public boolean unlockStock(String orderSn) { //鎖定庫存對象集合:lockedSkus List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("釋放訂單({})鎖定的商品庫存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); // 庫存已釋放 if (CollectionUtil.isEmpty(lockedSkus)) { return true; } // 遍歷恢復(fù)鎖定的商品庫存 for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 獲取商品分布式鎖 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { //判斷當(dāng)前分布式鎖是否已被鎖定,通過這個判斷可以防止非法釋放鎖等潛在問題 if (lock.isLocked()) { lock.unlock(); } } } // 移除 redis 訂單鎖定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
- 通過redis獲取鎖定庫存對象集合,判斷集合是否為空,空則表示庫存已釋放,直接返回true
- 不為空就進(jìn)行遍歷鎖定庫存對象集合(lockedSkus),獲取分布式鎖對象,進(jìn)行加鎖操作,然后 執(zhí)行更新語句,扣減鎖定商品數(shù)(即恢復(fù)原來的鎖定商品數(shù)),最后釋放鎖
- 釋放庫存完后就可以移除redis訂單鎖定的商品,這樣再執(zhí)行釋放庫存的時候就直接返回true
擴(kuò)展:扣減庫存
/** * 扣減庫存 * <p> * 訂單支付扣減商品庫存和釋放鎖定庫存 * * @param orderSn 訂單編號 * @return ture/false */ public boolean deductStock(String orderSn) { // 獲取訂單提交時鎖定的商品 List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("訂單({})支付成功,扣減訂單商品庫存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "扣減商品庫存失?。河唵?{})未包含商品"); for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 獲取商品分布式鎖 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("stock = stock - " + lockedSku.getQuantity()) .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 移除訂單鎖定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
原理跟釋放庫存基本一致,更新語句發(fā)生變化
更新語句:將商品庫存數(shù)進(jìn)行扣減,扣減數(shù)量為訂單商品數(shù),然后鎖定商品數(shù)也進(jìn)行扣 減,扣減數(shù)量為訂單商品數(shù)
4. 生成訂單
通過saveOrder創(chuàng)建訂單,返回result,打印日志。
返回訂單號
創(chuàng)建訂單
/** * 創(chuàng)建訂單 * * @param submitForm 訂單提交表單對象 * @return */ private boolean saveOrder(OrderSubmitForm submitForm) { //創(chuàng)建訂單詳情表(OmsOrder)對象 OmsOrder order = orderConverter.form2Entity(submitForm); //設(shè)置待支付狀態(tài) order.setStatus(OrderStatusEnum.UNPAID.getValue()); //設(shè)置訂單會員id order.setMemberId(SecurityUtils.getMemberId()); //設(shè)置訂單來源(0代表PC訂單,1代表APP訂單) order.setSource(submitForm.getOrderSource().getValue()); //保存到數(shù)據(jù)庫 boolean result = this.save(order); Long orderId = order.getId(); if (result) { // 保存訂單明細(xì) List<OmsOrderItem> orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems()); orderItemEntities.forEach(item -> item.setOrderId(orderId)); orderItemService.saveBatch(orderItemEntities); // 訂單超時未支付取消 //這行代碼使用 RabbitMQ 的 Java 客戶端庫來發(fā)送一條消息到 order.exchange 交換器, // 該消息會被路由到 order.close.delay 隊(duì)列中。消息的內(nèi)容是 submitForm.getOrderToken() 方法的返回結(jié)果。 rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken()); } return result; }
普通的保存到數(shù)據(jù)的操作就不做解釋了,主要看訂單超時未支付取消的功能。
訂單超時未支付取消功能通過rabbitMQ實(shí)現(xiàn)
訂單超時關(guān)單延時隊(duì)列
@Component @Slf4j public class OrderRabbitConfig { // 普通延遲隊(duì)列 private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue"; private static final String ORDER_EXCHANGE = "order.exchange"; private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay"; // 死信關(guān)單隊(duì)列 private static final String ORDER_ClOSE_QUEUE = "order.close.queue"; private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange"; private static final String ORDER_ClOSE_ROUTING_KEY = "order.close"; /** * 定義交換機(jī) */ @Bean public Exchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } /** * 死信交換機(jī) */ @Bean public Exchange orderDlxExchange() { return new DirectExchange(ORDER_DLX_EXCHANGE, true, false); } /** * 延時隊(duì)列 */ @Bean public Queue orderDelayQueue() { // 延時隊(duì)列的消息過期了,會自動觸發(fā)消息的轉(zhuǎn)發(fā),根據(jù)routingKey發(fā)送到指定的exchange中,exchange路由到死信隊(duì)列 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE); args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key args.put("x-message-ttl", 10 * 1000L); // 單位毫秒,10s用于測試 return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args); } /** * 延時隊(duì)列綁定交換機(jī) */ @Bean public Binding orderDelayQueueBinding() { return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE, ORDER_CLOSE_DELAY_ROUTING_KEY, null); } /** * 關(guān)單隊(duì)列 */ @Bean public Queue orderCloseQueue() { log.info("死信隊(duì)列(order.close.queue)創(chuàng)建"); return new Queue(ORDER_ClOSE_QUEUE, true, false, false); } /** * 關(guān)單隊(duì)列綁定死信交換機(jī) */ @Bean public Binding orderCloseQueueBinding() { return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE, ORDER_ClOSE_ROUTING_KEY, null); } }
- 綁定死信交換機(jī)后,超過10s就會路由到死信隊(duì)列(order.close.queue)中。
訂單超時未支付系統(tǒng)自動取消監(jiān)聽器
/** * 訂單超時未支付系統(tǒng)自動取消監(jiān)聽器 * */ @Component @RequiredArgsConstructor @Slf4j public class OrderCloseListener { private final OrderService orderService; private final RabbitTemplate rabbitTemplate; @RabbitListener(queues = "order.close.queue") public void closeOrder(String orderSn, Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序號(消息隊(duì)列中的位置) log.info("訂單({})超時未支付,系統(tǒng)自動關(guān)閉訂單", orderSn); try { boolean closeOrderResult = orderService.closeOrder(orderSn); log.info("關(guān)單結(jié)果:{}", closeOrderResult); if (closeOrderResult) { // 關(guān)單成功:釋放庫存 //發(fā)送訂單號 rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn); } else { // 關(guān)單失?。河唵我驯魂P(guān)閉,手動ACK確認(rèn)并從隊(duì)列移除消息 channel.basicAck(deliveryTag, false); // false: 不批量確認(rèn),僅確認(rèn)當(dāng)前單個消息 } } catch (Exception e) { // 關(guān)單異常:拒絕消息并重新入隊(duì) try { channel.basicReject(deliveryTag, true); // true: 重新放回隊(duì)列 // channel.basicReject(deliveryTag, false); // false: 直接丟棄消息 (TODO 定時任務(wù)補(bǔ)償) } catch (IOException ex) { log.error("訂單({})關(guān)閉失敗,原因:{}", orderSn, ex.getMessage()); } } } }
監(jiān)聽死信關(guān)單隊(duì)列,拿到消息序號deliveryTag(便于后續(xù)手動ACK和重新入隊(duì)的操作)
進(jìn)行關(guān)單操作orderService.closeOrder,主要修改訂單詳情表的狀態(tài)從待支付改為已關(guān)閉。
public boolean closeOrder(String orderSn) { return this.update(new LambdaUpdateWrapper<OmsOrder>() .eq(OmsOrder::getOrderSn, orderSn) .eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue()) .set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue()) ); }
判斷關(guān)單是否成功,成功就釋放庫存,發(fā)送訂單號給釋放庫存的交換機(jī),路由鍵為stock.unlock。
/** *庫存釋放監(jiān)聽器 */ @Component @Slf4j @RequiredArgsConstructor public class StockReleaseListener { private final SkuService skuService; private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue"; private static final String STOCK_EXCHANGE = "stock.exchange"; private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = STOCK_UNLOCK_QUEUE,durable = "true"), exchange = @Exchange(value = STOCK_EXCHANGE), key = {STOCK_UNLOCK_ROUTING_KEY} ), ackMode = "MANUAL")//手動ACK @RabbitHandler public void UnlockStock(String orderSn, Message message, Channel channel){ log.info("訂單{}取消釋放庫存",orderSn); long deliverTag=message.getMessageProperties().getDeliveryTag(); try{ skuService.unlockStock(orderSn); channel.basicAck(deliverTag,false); }catch (Exception e){ try { channel.basicAck(deliverTag,true); }catch (IOException ex){ log.error("訂單{}關(guān)閉失敗,原因:{}",orderSn,ex.getMessage()); } } } }
如果收到釋放庫存的消息,就會執(zhí)行釋放庫存的方法unlockStock,上面鎖定庫存方法那節(jié)已解釋。
訂單支付
@GlobalTransactional public <T> T payOrder(OrderPaymentForm paymentForm) { String orderSn = paymentForm.getOrderSn(); OmsOrder order = this.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSn)); Assert.isTrue(order != null, "訂單不存在"); Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "訂單不可支付,請檢查訂單狀態(tài)"); RLock lock = redissonClient.getLock(OrderConstants.ORDER_LOCK_PREFIX + order.getOrderSn()); try { lock.lock(); T result; switch (paymentForm.getPaymentMethod()) { case WX_JSAPI: result = (T) wxJsapiPay(paymentForm.getAppId(), order.getOrderSn(), order.getPaymentAmount()); break; default: result = (T) balancePay(order); break; } return result; } finally { //釋放鎖 if (lock.isLocked()) { lock.unlock(); } } }
代碼摘自
到此這篇關(guān)于RabbitMQ+redis+Redisson分布式鎖+seata實(shí)現(xiàn)訂單服務(wù)的文章就介紹到這了,更多相關(guān)Redisson分布式鎖訂單服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
高并發(fā)場景分析之redis+lua防重校驗(yàn)
這篇文章主要介紹了高并發(fā)場景分析之redis+lua防重校驗(yàn),本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn)
本文主要介紹了RedisDesktopManager遠(yuǎn)程連接redis的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Redis數(shù)據(jù)庫分布式設(shè)計(jì)方案介紹
大家好,本篇文章主要講的是Redis數(shù)據(jù)庫分布式設(shè)計(jì)方案介紹,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下2022-01-01redis?bitmap數(shù)據(jù)結(jié)構(gòu)之java對等操作詳解
bitmap是以其高性能出名。其基本原理是一位存儲一個標(biāo)識,其他衍生知道咱就不說了,而redis就是以這種原生格式存儲的,這篇文章主要介紹了redis?bitmap數(shù)據(jù)結(jié)構(gòu)之java對等操作,需要的朋友可以參考下2022-10-10淺析PHP分布式中Redis實(shí)現(xiàn)Session的方法
這篇文章主要介紹了PHP分布式中Redis實(shí)現(xiàn)Session的方法,文中詳細(xì)介紹了兩種方法的使用方法,并給出了測試的示例代碼,有需要的朋友可以參考借鑒,下面來一起看看吧,2016-12-12