SpringBoot+RabbitMQ+Redis實現(xiàn)商品秒殺的示例代碼
業(yè)務分析
一般而言,商品秒殺大概可以拆分成以下幾步:
用戶校驗
校驗是否多次搶單,保證每個商品每個用戶只能秒殺一次
下單
訂單信息進入消息隊列,等待消費
減少庫存
消費訂單消息,減少商品庫存,增加訂單記錄
付款
十五分鐘內完成支付,修改支付狀態(tài)
創(chuàng)建表
goods_info 商品庫存表
列 | 說明 |
---|---|
id | 主鍵(uuid) |
goods_name | 商品名稱 |
goods_stock | 商品庫存 |
package com.jason.seckill.order.entity; /** * 商品庫存 */ public class GoodsInfo { private String id; private String goodsName; private String goodsStock; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getGoodsName() { return goodsName; } public void setGoodsName(String goodsName) { this.goodsName = goodsName; } public String getGoodsStock() { return goodsStock; } public void setGoodsStock(String goodsStock) { this.goodsStock = goodsStock; } @Override public String toString() { return "GoodsInfo{" + "id='" + id + '\'' + ", goodsName='" + goodsName + '\'' + ", goodsStock='" + goodsStock + '\'' + '}'; } }
order_info 訂單記錄表
列 | 說明 |
---|---|
id | 主鍵(uuid) |
user_id | 用戶id |
goods_id | 商品id |
pay_status | 支付狀態(tài)(0-超時未支付 1-已支付 2-待支付) |
package com.jason.seckill.order.entity; /** * 下單記錄 */ public class OrderRecord { private String id; private String userId; private String goodsId; /** * 0-超時未支付 1-已支付 2-待支付 */ private Integer payStatus; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getGoodsId() { return goodsId; } public void setGoodsId(String goodsId) { this.goodsId = goodsId; } public Integer getPayStatus() { return payStatus; } public void setPayStatus(Integer payStatus) { this.payStatus = payStatus; } @Override public String toString() { return "OrderRecord{" + "id='" + id + '\'' + ", userId='" + userId + '\'' + ", goodsId='" + goodsId + '\'' + '}'; } }
功能實現(xiàn)
1.用戶校驗
使用redis做用戶校驗,保證每個用戶每個商品只能搶一次,上代碼:
public boolean checkSeckillUser(OrderRequest order) { String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId(); return redisTemplate.opsForValue().setIfAbsent(key,"1"); }
userId+orderId的組合作為key,利用redis的setnx分布式鎖原理來實現(xiàn)。如果是限時秒殺,可以通過設置key的過期時間來實現(xiàn)。
2.下單
下單信息肯定是要先扔到消息隊列里的,這里采用RabbitMQ來做消息隊列,先來看一下消息隊列的模型圖:
rabbitmq的配置:
#rabbitmq配置 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #消費者數(shù)量 spring.rabbitmq.listener.simple.concurrency=5 #最大消費者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 #消費者每次從隊列獲取的消息數(shù)量。寫多了,如果長時間得不到消費,數(shù)據(jù)就一直得不到處理 spring.rabbitmq.listener.simple.prefetch=1 #消費接收確認機制-手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual mq.env=local #訂單處理隊列 #交換機名稱 order.mq.exchange.name=${mq.env}:order:mq:exchange #隊列名稱 order.mq.queue.name=${mq.env}:order:mq:queue #routingkey order.mq.routing.key=${mq.env}:order:mq:routing:key
rabbitmq配置類OrderRabbitmqConfig:
/** * rabbitmq配置 */ @Configuration public class OrderRabbitmqConfig { private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class); @Autowired private Environment env; /** * channel鏈接工廠 */ @Autowired private CachingConnectionFactory connectionFactory; /** * 監(jiān)聽器容器配置 */ @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 聲明rabbittemplate * @return */ @Bean public RabbitTemplate rabbitTemplate(){ //消息發(fā)送成功確認,對應application.properties中的spring.rabbitmq.publisher-confirms=true connectionFactory.setPublisherConfirms(true); //消息發(fā)送失敗確認,對應application.properties中的spring.rabbitmq.publisher-returns=true connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設置消息發(fā)送格式為json rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setMandatory(true); //消息發(fā)送到exchange回調 需設置:spring.rabbitmq.publisher-confirms=true rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); //消息從exchange發(fā)送到queue失敗回調 需設置:spring.rabbitmq.publisher-returns=true rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } //---------------------------------------訂單隊列------------------------------------------------------ /** * 聲明訂單隊列的交換機 * @return */ @Bean("orderTopicExchange") public TopicExchange orderTopicExchange(){ //設置為持久化 不自動刪除 return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false); } /** * 聲明訂單隊列 * @return */ @Bean("orderQueue") public Queue orderQueue(){ return new Queue(env.getProperty("order.mq.queue.name"),true); } /** * 將隊列綁定到交換機 * @return */ @Bean public Binding simpleBinding(){ return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key")); } /** * 注入訂單對列消費監(jiān)聽器 */ @Autowired private OrderListener orderListener; /** * 聲明訂單隊列監(jiān)聽器配置容器 * @return */ @Bean("orderListenerContainer") public SimpleMessageListenerContainer orderListenerContainer(){ //創(chuàng)建監(jiān)聽器容器工廠 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //將配置信息和鏈接信息賦給容器工廠 factoryConfigurer.configure(factory,connectionFactory); //容器工廠創(chuàng)建監(jiān)聽器容器 SimpleMessageListenerContainer container = factory.createListenerContainer(); //指定監(jiān)聽器 container.setMessageListener(orderListener); //指定監(jiān)聽器監(jiān)聽的隊列 container.setQueues(orderQueue()); return container; } }
配置類聲明了訂單隊列,交換機,通過指定的routingkey綁定了隊列與交換機。另外,rabbitTemplate用來發(fā)送消息,ListenerContainer指定監(jiān)聽器(消費者)監(jiān)聽的隊列。
客戶下單,生產(chǎn)消息,上代碼:
@Service public class SeckillService { private static final Logger logger = LoggerFactory.getLogger(SeckillService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; /** * 生產(chǎn)消息 * @param order */ public void seckill(OrderRequest order){ //設置交換機 rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name")); //設置routingkey rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key")); //創(chuàng)建消息體 Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build(); //發(fā)送消息 rabbitTemplate.convertAndSend(msg); } }
很簡單,操作rabbitTemplate,指定交換機和routingkey,發(fā)送消息到綁定的隊列,等待消費處理。
3.減少庫存
消費者消費訂單消息,做業(yè)務處理。
看一下監(jiān)聽器(消費者)OrderListener:
/** * 消息監(jiān)聽器(消費者) */ @Component public class OrderListener implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(OrderListener.class); @Autowired private OrderService orderService; /** * 處理接收到的消息 * @param message 消息體 * @param channel 通道,確認消費用 * @throws Exception */ @Override public void onMessage(Message message, Channel channel) throws Exception { try{ //獲取交付tag long tag = message.getMessageProperties().getDeliveryTag(); String str = new String(message.getBody(),"utf-8"); logger.info("接收到的消息:{}",str); JSONObject obj = JSONObject.parseObject(str); //下單,操作數(shù)據(jù)庫 orderService.order(obj.getString("userId"),obj.getString("goodsId")); //確認消費 channel.basicAck(tag,true); }catch(Exception e){ logger.error("消息監(jiān)聽確認機制發(fā)生異常:",e.fillInStackTrace()); } } }
業(yè)務處理 OrderService:
@Service public class OrderService { @Resource private SeckillMapper seckillMapper; /** * 下單,操作數(shù)據(jù)庫 * @param userId * @param goodsId */ @Transactional() public void order(String userId,String goodsId){ //該商品庫存-1(當庫存>0時) int count = seckillMapper.reduceGoodsStockById(goodsId); //更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設為2-待支付 if(count > 0){ OrderRecord orderRecord = new OrderRecord(); orderRecord.setId(CommonUtils.createUUID()); orderRecord.setGoodsId(goodsId); orderRecord.setUserId(userId); orderRecord.setPayStatus(2); seckillMapper.insertOrderRecord(orderRecord); } } }
Dao接口和Mybatis文件就不往出貼了,這里的邏輯是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},這條update相當于將查詢庫存和減少庫存合并為一個原子操作,避免高并發(fā)問題,執(zhí)行成功,插入訂單記錄,執(zhí)行失敗,則庫存不夠搶單失敗。
4.支付
訂單處理完成后,如果庫存減少,也就是搶單成功,那么需要用戶在十五分鐘內完成支付,這塊就要用到死信隊列(延遲隊列)來處理了,先看模型圖:
DLX:dead-letter Exchange 死信交換機
DLK:dead-letter RoutingKey 死信路由
ttl:time-to-live 超時時間
死信隊列中,消息到期后,會通過DLX和DLK進入到pay-queue,進行消費。這是另一組消息隊列,和訂單消息隊列是分開的。這里注意他們的綁定關系,主交換機綁定死信隊列,死信交換機綁定的是主隊列(pay queue)。
接下來聲明圖中的一系列組件,首先application.properties中增加配置:
#支付處理隊列 #主交換機 pay.mq.exchange.name=${mq.env}:pay:mq:exchange #死信交換機(DLX) pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange #主隊列 pay.mq.queue.name=${mq.env}:pay:mq:queue #死信隊列 pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue #主routingkey pay.mq.routing.key=${mq.env}:pay:mq:routing:key #死信routingkey(DLK) pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key #支付超時時間(毫秒)(TTL),測試原因,這里模擬5秒,如果是生產(chǎn)環(huán)境,這里可以是15分鐘等 pay.mq.ttl=5000
配置類OrderRabbitmqConfig中增加支付隊列和死信隊列的聲明:
/** * 死信隊列,十五分鐘超時 * @return */ @Bean public Queue payDeadLetterQueue(){ Map args = new HashMap(); //聲明死信交換機 args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name")); //聲明死信routingkey args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key")); //聲明死信隊列中的消息過期時間 args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class)); //創(chuàng)建死信隊列 return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args); } /** * 支付隊列交換機(主交換機) * @return */ @Bean public TopicExchange payTopicExchange(){ return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false); } /** * 將主交換機綁定到死信隊列 * @return */ @Bean public Binding payBinding(){ return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key")); } /** * 支付隊列(主隊列) * @return */ @Bean public Queue payQueue(){ return new Queue(env.getProperty("pay.mq.queue.name"),true); } /** * 死信交換機 * @return */ @Bean public TopicExchange payDeadLetterExchange(){ return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false); } /** * 將主隊列綁定到死信交換機 * @return */ @Bean public Binding payDeadLetterBinding(){ return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key")); } /** * 注入支付監(jiān)聽器 */ @Autowired private PayListener payListener; /** * 支付隊列監(jiān)聽器容器 * @return */ @Bean public SimpleMessageListenerContainer payMessageListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer(); listenerContainer.setMessageListener(payListener); listenerContainer.setQueues(payQueue()); return listenerContainer; }
支付隊列和死信隊列的Queue、Exchange、routingkey都已就緒。
看生產(chǎn)者:
@Service public class OrderService { @Resource private SeckillMapper seckillMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; /** * 下單,操作數(shù)據(jù)庫 * @param userId * @param goodsId */ @Transactional() public void order(String userId,String goodsId){ //該商品庫存-1(當庫存>0時) int count = seckillMapper.reduceGoodsStockById(goodsId); //更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設為2-待支付 if(count > 0){ OrderRecord orderRecord = new OrderRecord(); orderRecord.setId(CommonUtils.createUUID()); orderRecord.setGoodsId(goodsId); orderRecord.setUserId(userId); orderRecord.setPayStatus(2); seckillMapper.insertOrderRecord(orderRecord); //將該訂單添加到支付隊列 rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key")); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); String json = JSON.toJSONString(orderRecord); Message msg = MessageBuilder.withBody(json.getBytes()).build(); rabbitTemplate.convertAndSend(msg); } } }
在OrderService中,數(shù)據(jù)庫操作完成后,將訂單信息發(fā)送到死信隊列,死信隊列中的消息會在十五分鐘后進入到支付隊列,等待消費。
再看消費者:
@Component public class PayListener implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(PayListener.class); @Autowired private PayService payService; @Override public void onMessage(Message message, Channel channel) throws Exception { Long tag = message.getMessageProperties().getDeliveryTag(); try { String str = new String(message.getBody(), "utf-8"); logger.info("接收到的消息:{}",str); JSONObject json = JSON.parseObject(str); String orderId = json.getString("id"); //確認是否付款 payService.confirmPay(orderId); //確認消費 channel.basicAck(tag, true); }catch(Exception e){ logger.info("支付消息消費出錯:{}",e.getMessage()); logger.info("出錯的tag:{}",tag); } } }
PayService:
@Service public class PayService { private static final Logger logger = LoggerFactory.getLogger(PayService.class); @Resource private SeckillMapper seckillMapper; /** * 確認是否支付 * @param orderId */ public void confirmPay(String orderId){ OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId); //根據(jù)訂單號校驗該用戶是否已支付 if(checkPay(orderId)){ //已支付 orderRecord.setPayStatus(1); seckillMapper.updatePayStatus(orderRecord); logger.info("用戶{}已支付",orderId); }else{ //未支付 orderRecord.setPayStatus(0); seckillMapper.updatePayStatus(orderRecord); //取消支付后,商品庫存+1 seckillMapper.returnStock(orderRecord.getGoodsId()); logger.info("用戶{}未支付",orderId); } } /** * 模擬判斷訂單支付成功或失敗,成功失敗隨機 * @param orderId * @return */ public boolean checkPay(String orderId){ Random random = new Random(); int res = random.nextInt(2); return res==0?false:true; }
這里checkPay()方法模擬調用第三方支付接口來判斷用戶是否已支付。若支付成功,訂單改為已支付狀態(tài),支付失敗,改為已取消狀態(tài),庫存退回。
總結
整個demo,是兩組消息隊列撐起來的,一組訂單消息隊列,一組支付消息隊列,而每一組隊列都是由queue、exchange、routingkey、生產(chǎn)者以及消費者組成。交換機通過routingkey綁定隊列,rabbitTemplate通過指定交換機和routingkey將消息發(fā)送到指定隊列,消費者監(jiān)聽該隊列進行消費。不同的是第二組支付隊列里嵌入了死信隊列來做一個十五分鐘的延遲支付。
到此這篇關于SpringBoot+RabbitMQ+Redis實現(xiàn)商品秒殺的文章就介紹到這了,更多相關SpringBoot+RabbitMQ+Redis實現(xiàn)商品秒殺內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring Cloud Hystrix入門和Hystrix命令原理分析
這篇文章主要介紹了Spring Cloud Hystrix入門和Hystrix命令原理分析,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-08-08因BigDecimal類型數(shù)據(jù)引出的問題詳析
Java在java.math包中提供的API類BigDecimal,用來對超過16位有效位的數(shù)進行精確的運算,下面這篇文章主要給大家介紹了因BigDecimal類型數(shù)據(jù)引出的問題的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考下2018-08-08使用@Validated 和 BindingResult 遇到的坑及解決
這篇文章主要介紹了使用@Validated 和 BindingResult 遇到的坑及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10java Servlet 實現(xiàn)動態(tài)驗證碼圖片示例
這篇文章主要介紹了java Servlet 實現(xiàn)動態(tài)驗證碼圖片示例的資料,這里整理了詳細的代碼,有需要的小伙伴可以參考下。2017-02-02SpringBoot實現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項目實踐
本文主要介紹了SpringBoot實現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項目實踐,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-11-11