SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺的示例代碼
業(yè)務(wù)分析
一般而言,商品秒殺大概可以拆分成以下幾步:
用戶校驗(yàn)
校驗(yàn)是否多次搶單,保證每個(gè)商品每個(gè)用戶只能秒殺一次
下單
訂單信息進(jìn)入消息隊(duì)列,等待消費(fèi)
減少庫(kù)存
消費(fèi)訂單消息,減少商品庫(kù)存,增加訂單記錄
付款
十五分鐘內(nèi)完成支付,修改支付狀態(tài)
創(chuàng)建表
goods_info 商品庫(kù)存表
列 | 說(shuō)明 |
---|---|
id | 主鍵(uuid) |
goods_name | 商品名稱 |
goods_stock | 商品庫(kù)存 |
package com.jason.seckill.order.entity; /** * 商品庫(kù)存 */ public class GoodsInfo { private String id; private String goodsName; private String goodsStock; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getGoodsName() { return goodsName; } public void setGoodsName(String goodsName) { this.goodsName = goodsName; } public String getGoodsStock() { return goodsStock; } public void setGoodsStock(String goodsStock) { this.goodsStock = goodsStock; } @Override public String toString() { return "GoodsInfo{" + "id='" + id + '\'' + ", goodsName='" + goodsName + '\'' + ", goodsStock='" + goodsStock + '\'' + '}'; } }
order_info 訂單記錄表
列 | 說(shuō)明 |
---|---|
id | 主鍵(uuid) |
user_id | 用戶id |
goods_id | 商品id |
pay_status | 支付狀態(tài)(0-超時(shí)未支付 1-已支付 2-待支付) |
package com.jason.seckill.order.entity; /** * 下單記錄 */ public class OrderRecord { private String id; private String userId; private String goodsId; /** * 0-超時(shí)未支付 1-已支付 2-待支付 */ private Integer payStatus; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getGoodsId() { return goodsId; } public void setGoodsId(String goodsId) { this.goodsId = goodsId; } public Integer getPayStatus() { return payStatus; } public void setPayStatus(Integer payStatus) { this.payStatus = payStatus; } @Override public String toString() { return "OrderRecord{" + "id='" + id + '\'' + ", userId='" + userId + '\'' + ", goodsId='" + goodsId + '\'' + '}'; } }
功能實(shí)現(xiàn)
1.用戶校驗(yàn)
使用redis做用戶校驗(yàn),保證每個(gè)用戶每個(gè)商品只能搶一次,上代碼:
public boolean checkSeckillUser(OrderRequest order) { String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId(); return redisTemplate.opsForValue().setIfAbsent(key,"1"); }
userId+orderId的組合作為key,利用redis的setnx分布式鎖原理來(lái)實(shí)現(xiàn)。如果是限時(shí)秒殺,可以通過(guò)設(shè)置key的過(guò)期時(shí)間來(lái)實(shí)現(xiàn)。
2.下單
下單信息肯定是要先扔到消息隊(duì)列里的,這里采用RabbitMQ來(lái)做消息隊(duì)列,先來(lái)看一下消息隊(duì)列的模型圖:
rabbitmq的配置:
#rabbitmq配置 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.concurrency=5 #最大消費(fèi)者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 #消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量。寫多了,如果長(zhǎng)時(shí)間得不到消費(fèi),數(shù)據(jù)就一直得不到處理 spring.rabbitmq.listener.simple.prefetch=1 #消費(fèi)接收確認(rèn)機(jī)制-手動(dòng)確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual mq.env=local #訂單處理隊(duì)列 #交換機(jī)名稱 order.mq.exchange.name=${mq.env}:order:mq:exchange #隊(duì)列名稱 order.mq.queue.name=${mq.env}:order:mq:queue #routingkey order.mq.routing.key=${mq.env}:order:mq:routing:key
rabbitmq配置類OrderRabbitmqConfig:
/** * rabbitmq配置 */ @Configuration public class OrderRabbitmqConfig { private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class); @Autowired private Environment env; /** * channel鏈接工廠 */ @Autowired private CachingConnectionFactory connectionFactory; /** * 監(jiān)聽器容器配置 */ @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 聲明rabbittemplate * @return */ @Bean public RabbitTemplate rabbitTemplate(){ //消息發(fā)送成功確認(rèn),對(duì)應(yīng)application.properties中的spring.rabbitmq.publisher-confirms=true connectionFactory.setPublisherConfirms(true); //消息發(fā)送失敗確認(rèn),對(duì)應(yīng)application.properties中的spring.rabbitmq.publisher-returns=true connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //設(shè)置消息發(fā)送格式為json rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setMandatory(true); //消息發(fā)送到exchange回調(diào) 需設(shè)置:spring.rabbitmq.publisher-confirms=true rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); //消息從exchange發(fā)送到queue失敗回調(diào) 需設(shè)置:spring.rabbitmq.publisher-returns=true rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } //---------------------------------------訂單隊(duì)列------------------------------------------------------ /** * 聲明訂單隊(duì)列的交換機(jī) * @return */ @Bean("orderTopicExchange") public TopicExchange orderTopicExchange(){ //設(shè)置為持久化 不自動(dòng)刪除 return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false); } /** * 聲明訂單隊(duì)列 * @return */ @Bean("orderQueue") public Queue orderQueue(){ return new Queue(env.getProperty("order.mq.queue.name"),true); } /** * 將隊(duì)列綁定到交換機(jī) * @return */ @Bean public Binding simpleBinding(){ return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key")); } /** * 注入訂單對(duì)列消費(fèi)監(jiān)聽器 */ @Autowired private OrderListener orderListener; /** * 聲明訂單隊(duì)列監(jiān)聽器配置容器 * @return */ @Bean("orderListenerContainer") public SimpleMessageListenerContainer orderListenerContainer(){ //創(chuàng)建監(jiān)聽器容器工廠 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //將配置信息和鏈接信息賦給容器工廠 factoryConfigurer.configure(factory,connectionFactory); //容器工廠創(chuàng)建監(jiān)聽器容器 SimpleMessageListenerContainer container = factory.createListenerContainer(); //指定監(jiān)聽器 container.setMessageListener(orderListener); //指定監(jiān)聽器監(jiān)聽的隊(duì)列 container.setQueues(orderQueue()); return container; } }
配置類聲明了訂單隊(duì)列,交換機(jī),通過(guò)指定的routingkey綁定了隊(duì)列與交換機(jī)。另外,rabbitTemplate用來(lái)發(fā)送消息,ListenerContainer指定監(jiān)聽器(消費(fèi)者)監(jiān)聽的隊(duì)列。
客戶下單,生產(chǎn)消息,上代碼:
@Service public class SeckillService { private static final Logger logger = LoggerFactory.getLogger(SeckillService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; /** * 生產(chǎn)消息 * @param order */ public void seckill(OrderRequest order){ //設(shè)置交換機(jī) rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name")); //設(shè)置routingkey rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key")); //創(chuàng)建消息體 Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build(); //發(fā)送消息 rabbitTemplate.convertAndSend(msg); } }
很簡(jiǎn)單,操作rabbitTemplate,指定交換機(jī)和routingkey,發(fā)送消息到綁定的隊(duì)列,等待消費(fèi)處理。
3.減少庫(kù)存
消費(fèi)者消費(fèi)訂單消息,做業(yè)務(wù)處理。
看一下監(jiān)聽器(消費(fèi)者)OrderListener:
/** * 消息監(jiān)聽器(消費(fèi)者) */ @Component public class OrderListener implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(OrderListener.class); @Autowired private OrderService orderService; /** * 處理接收到的消息 * @param message 消息體 * @param channel 通道,確認(rèn)消費(fèi)用 * @throws Exception */ @Override public void onMessage(Message message, Channel channel) throws Exception { try{ //獲取交付tag long tag = message.getMessageProperties().getDeliveryTag(); String str = new String(message.getBody(),"utf-8"); logger.info("接收到的消息:{}",str); JSONObject obj = JSONObject.parseObject(str); //下單,操作數(shù)據(jù)庫(kù) orderService.order(obj.getString("userId"),obj.getString("goodsId")); //確認(rèn)消費(fèi) channel.basicAck(tag,true); }catch(Exception e){ logger.error("消息監(jiān)聽確認(rèn)機(jī)制發(fā)生異常:",e.fillInStackTrace()); } } }
業(yè)務(wù)處理 OrderService:
@Service public class OrderService { @Resource private SeckillMapper seckillMapper; /** * 下單,操作數(shù)據(jù)庫(kù) * @param userId * @param goodsId */ @Transactional() public void order(String userId,String goodsId){ //該商品庫(kù)存-1(當(dāng)庫(kù)存>0時(shí)) int count = seckillMapper.reduceGoodsStockById(goodsId); //更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設(shè)為2-待支付 if(count > 0){ OrderRecord orderRecord = new OrderRecord(); orderRecord.setId(CommonUtils.createUUID()); orderRecord.setGoodsId(goodsId); orderRecord.setUserId(userId); orderRecord.setPayStatus(2); seckillMapper.insertOrderRecord(orderRecord); } } }
Dao接口和Mybatis文件就不往出貼了,這里的邏輯是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},這條update相當(dāng)于將查詢庫(kù)存和減少庫(kù)存合并為一個(gè)原子操作,避免高并發(fā)問題,執(zhí)行成功,插入訂單記錄,執(zhí)行失敗,則庫(kù)存不夠搶單失敗。
4.支付
訂單處理完成后,如果庫(kù)存減少,也就是搶單成功,那么需要用戶在十五分鐘內(nèi)完成支付,這塊就要用到死信隊(duì)列(延遲隊(duì)列)來(lái)處理了,先看模型圖:
DLX:dead-letter Exchange 死信交換機(jī)
DLK:dead-letter RoutingKey 死信路由
ttl:time-to-live 超時(shí)時(shí)間
死信隊(duì)列中,消息到期后,會(huì)通過(guò)DLX和DLK進(jìn)入到pay-queue,進(jìn)行消費(fèi)。這是另一組消息隊(duì)列,和訂單消息隊(duì)列是分開的。這里注意他們的綁定關(guān)系,主交換機(jī)綁定死信隊(duì)列,死信交換機(jī)綁定的是主隊(duì)列(pay queue)。
接下來(lái)聲明圖中的一系列組件,首先application.properties中增加配置:
#支付處理隊(duì)列 #主交換機(jī) pay.mq.exchange.name=${mq.env}:pay:mq:exchange #死信交換機(jī)(DLX) pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange #主隊(duì)列 pay.mq.queue.name=${mq.env}:pay:mq:queue #死信隊(duì)列 pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue #主routingkey pay.mq.routing.key=${mq.env}:pay:mq:routing:key #死信routingkey(DLK) pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key #支付超時(shí)時(shí)間(毫秒)(TTL),測(cè)試原因,這里模擬5秒,如果是生產(chǎn)環(huán)境,這里可以是15分鐘等 pay.mq.ttl=5000
配置類OrderRabbitmqConfig中增加支付隊(duì)列和死信隊(duì)列的聲明:
/** * 死信隊(duì)列,十五分鐘超時(shí) * @return */ @Bean public Queue payDeadLetterQueue(){ Map args = new HashMap(); //聲明死信交換機(jī) args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name")); //聲明死信routingkey args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key")); //聲明死信隊(duì)列中的消息過(guò)期時(shí)間 args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class)); //創(chuàng)建死信隊(duì)列 return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args); } /** * 支付隊(duì)列交換機(jī)(主交換機(jī)) * @return */ @Bean public TopicExchange payTopicExchange(){ return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false); } /** * 將主交換機(jī)綁定到死信隊(duì)列 * @return */ @Bean public Binding payBinding(){ return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key")); } /** * 支付隊(duì)列(主隊(duì)列) * @return */ @Bean public Queue payQueue(){ return new Queue(env.getProperty("pay.mq.queue.name"),true); } /** * 死信交換機(jī) * @return */ @Bean public TopicExchange payDeadLetterExchange(){ return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false); } /** * 將主隊(duì)列綁定到死信交換機(jī) * @return */ @Bean public Binding payDeadLetterBinding(){ return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key")); } /** * 注入支付監(jiān)聽器 */ @Autowired private PayListener payListener; /** * 支付隊(duì)列監(jiān)聽器容器 * @return */ @Bean public SimpleMessageListenerContainer payMessageListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer(); listenerContainer.setMessageListener(payListener); listenerContainer.setQueues(payQueue()); return listenerContainer; }
支付隊(duì)列和死信隊(duì)列的Queue、Exchange、routingkey都已就緒。
看生產(chǎn)者:
@Service public class OrderService { @Resource private SeckillMapper seckillMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; /** * 下單,操作數(shù)據(jù)庫(kù) * @param userId * @param goodsId */ @Transactional() public void order(String userId,String goodsId){ //該商品庫(kù)存-1(當(dāng)庫(kù)存>0時(shí)) int count = seckillMapper.reduceGoodsStockById(goodsId); //更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設(shè)為2-待支付 if(count > 0){ OrderRecord orderRecord = new OrderRecord(); orderRecord.setId(CommonUtils.createUUID()); orderRecord.setGoodsId(goodsId); orderRecord.setUserId(userId); orderRecord.setPayStatus(2); seckillMapper.insertOrderRecord(orderRecord); //將該訂單添加到支付隊(duì)列 rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key")); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); String json = JSON.toJSONString(orderRecord); Message msg = MessageBuilder.withBody(json.getBytes()).build(); rabbitTemplate.convertAndSend(msg); } } }
在OrderService中,數(shù)據(jù)庫(kù)操作完成后,將訂單信息發(fā)送到死信隊(duì)列,死信隊(duì)列中的消息會(huì)在十五分鐘后進(jìn)入到支付隊(duì)列,等待消費(fèi)。
再看消費(fèi)者:
@Component public class PayListener implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(PayListener.class); @Autowired private PayService payService; @Override public void onMessage(Message message, Channel channel) throws Exception { Long tag = message.getMessageProperties().getDeliveryTag(); try { String str = new String(message.getBody(), "utf-8"); logger.info("接收到的消息:{}",str); JSONObject json = JSON.parseObject(str); String orderId = json.getString("id"); //確認(rèn)是否付款 payService.confirmPay(orderId); //確認(rèn)消費(fèi) channel.basicAck(tag, true); }catch(Exception e){ logger.info("支付消息消費(fèi)出錯(cuò):{}",e.getMessage()); logger.info("出錯(cuò)的tag:{}",tag); } } }
PayService:
@Service public class PayService { private static final Logger logger = LoggerFactory.getLogger(PayService.class); @Resource private SeckillMapper seckillMapper; /** * 確認(rèn)是否支付 * @param orderId */ public void confirmPay(String orderId){ OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId); //根據(jù)訂單號(hào)校驗(yàn)該用戶是否已支付 if(checkPay(orderId)){ //已支付 orderRecord.setPayStatus(1); seckillMapper.updatePayStatus(orderRecord); logger.info("用戶{}已支付",orderId); }else{ //未支付 orderRecord.setPayStatus(0); seckillMapper.updatePayStatus(orderRecord); //取消支付后,商品庫(kù)存+1 seckillMapper.returnStock(orderRecord.getGoodsId()); logger.info("用戶{}未支付",orderId); } } /** * 模擬判斷訂單支付成功或失敗,成功失敗隨機(jī) * @param orderId * @return */ public boolean checkPay(String orderId){ Random random = new Random(); int res = random.nextInt(2); return res==0?false:true; }
這里checkPay()方法模擬調(diào)用第三方支付接口來(lái)判斷用戶是否已支付。若支付成功,訂單改為已支付狀態(tài),支付失敗,改為已取消狀態(tài),庫(kù)存退回。
總結(jié)
整個(gè)demo,是兩組消息隊(duì)列撐起來(lái)的,一組訂單消息隊(duì)列,一組支付消息隊(duì)列,而每一組隊(duì)列都是由queue、exchange、routingkey、生產(chǎn)者以及消費(fèi)者組成。交換機(jī)通過(guò)routingkey綁定隊(duì)列,rabbitTemplate通過(guò)指定交換機(jī)和routingkey將消息發(fā)送到指定隊(duì)列,消費(fèi)者監(jiān)聽該隊(duì)列進(jìn)行消費(fèi)。不同的是第二組支付隊(duì)列里嵌入了死信隊(duì)列來(lái)做一個(gè)十五分鐘的延遲支付。
到此這篇關(guān)于SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺的文章就介紹到這了,更多相關(guān)SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java 如何使用Feign發(fā)送HTTP請(qǐng)求
這篇文章主要介紹了Java 如何使用Feign發(fā)送HTTP請(qǐng)求,幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下2020-11-11Spring Cloud Hystrix入門和Hystrix命令原理分析
這篇文章主要介紹了Spring Cloud Hystrix入門和Hystrix命令原理分析,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-08-08因BigDecimal類型數(shù)據(jù)引出的問題詳析
Java在java.math包中提供的API類BigDecimal,用來(lái)對(duì)超過(guò)16位有效位的數(shù)進(jìn)行精確的運(yùn)算,下面這篇文章主要給大家介紹了因BigDecimal類型數(shù)據(jù)引出的問題的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2018-08-08淺談Map集合中g(shù)et不存在的key值,會(huì)拋出異常嗎?
這篇文章主要介紹了淺談Map集合中g(shù)et不存在的key值,會(huì)拋出異常嗎?具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09使用@Validated 和 BindingResult 遇到的坑及解決
這篇文章主要介紹了使用@Validated 和 BindingResult 遇到的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10IDEA2022中部署Tomcat Web項(xiàng)目的流程分析
這篇文章主要介紹了IDEA2022中部署Tomcat Web項(xiàng)目,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-03-03java Servlet 實(shí)現(xiàn)動(dòng)態(tài)驗(yàn)證碼圖片示例
這篇文章主要介紹了java Servlet 實(shí)現(xiàn)動(dòng)態(tài)驗(yàn)證碼圖片示例的資料,這里整理了詳細(xì)的代碼,有需要的小伙伴可以參考下。2017-02-02SpringBoot Starter依賴原理與實(shí)例詳解
SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動(dòng)掃描到要加載的信息并啟動(dòng)相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫(kù)的處理,需要配置各種信息的困擾2022-09-09SpringBoot實(shí)現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot實(shí)現(xiàn)mysql與clickhouse多數(shù)據(jù)源的項(xiàng)目實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-11-11