SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺的示例代碼
業(yè)務(wù)分析
一般而言,商品秒殺大概可以拆分成以下幾步:
用戶校驗(yàn)
校驗(yàn)是否多次搶單,保證每個(gè)商品每個(gè)用戶只能秒殺一次
下單
訂單信息進(jìn)入消息隊(duì)列,等待消費(fèi)
減少庫存
消費(fèi)訂單消息,減少商品庫存,增加訂單記錄
付款
十五分鐘內(nèi)完成支付,修改支付狀態(tài)
創(chuàng)建表
goods_info 商品庫存表
| 列 | 說明 |
|---|---|
| id | 主鍵(uuid) |
| goods_name | 商品名稱 |
| goods_stock | 商品庫存 |
package com.jason.seckill.order.entity;
/**
* 商品庫存
*/
public class GoodsInfo {
private String id;
private String goodsName;
private String goodsStock;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public String getGoodsStock() {
return goodsStock;
}
public void setGoodsStock(String goodsStock) {
this.goodsStock = goodsStock;
}
@Override
public String toString() {
return "GoodsInfo{" +
"id='" + id + '\'' +
", goodsName='" + goodsName + '\'' +
", goodsStock='" + goodsStock + '\'' +
'}';
}
}
order_info 訂單記錄表
| 列 | 說明 |
|---|---|
| id | 主鍵(uuid) |
| user_id | 用戶id |
| goods_id | 商品id |
| pay_status | 支付狀態(tài)(0-超時(shí)未支付 1-已支付 2-待支付) |
package com.jason.seckill.order.entity;
/**
* 下單記錄
*/
public class OrderRecord {
private String id;
private String userId;
private String goodsId;
/**
* 0-超時(shí)未支付 1-已支付 2-待支付
*/
private Integer payStatus;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getGoodsId() {
return goodsId;
}
public void setGoodsId(String goodsId) {
this.goodsId = goodsId;
}
public Integer getPayStatus() {
return payStatus;
}
public void setPayStatus(Integer payStatus) {
this.payStatus = payStatus;
}
@Override
public String toString() {
return "OrderRecord{" +
"id='" + id + '\'' +
", userId='" + userId + '\'' +
", goodsId='" + goodsId + '\'' +
'}';
}
}
功能實(shí)現(xiàn)
1.用戶校驗(yàn)
使用redis做用戶校驗(yàn),保證每個(gè)用戶每個(gè)商品只能搶一次,上代碼:
public boolean checkSeckillUser(OrderRequest order) {
String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId();
return redisTemplate.opsForValue().setIfAbsent(key,"1");
}
userId+orderId的組合作為key,利用redis的setnx分布式鎖原理來實(shí)現(xiàn)。如果是限時(shí)秒殺,可以通過設(shè)置key的過期時(shí)間來實(shí)現(xiàn)。
2.下單
下單信息肯定是要先扔到消息隊(duì)列里的,這里采用RabbitMQ來做消息隊(duì)列,先來看一下消息隊(duì)列的模型圖:

rabbitmq的配置:
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.concurrency=5
#最大消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量。寫多了,如果長時(shí)間得不到消費(fèi),數(shù)據(jù)就一直得不到處理
spring.rabbitmq.listener.simple.prefetch=1
#消費(fèi)接收確認(rèn)機(jī)制-手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq.env=local
#訂單處理隊(duì)列
#交換機(jī)名稱
order.mq.exchange.name=${mq.env}:order:mq:exchange
#隊(duì)列名稱
order.mq.queue.name=${mq.env}:order:mq:queue
#routingkey
order.mq.routing.key=${mq.env}:order:mq:routing:key
rabbitmq配置類OrderRabbitmqConfig:
/**
* rabbitmq配置
*/
@Configuration
public class OrderRabbitmqConfig {
private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class);
@Autowired
private Environment env;
/**
* channel鏈接工廠
*/
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 監(jiān)聽器容器配置
*/
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 聲明rabbittemplate
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//消息發(fā)送成功確認(rèn),對應(yīng)application.properties中的spring.rabbitmq.publisher-confirms=true
connectionFactory.setPublisherConfirms(true);
//消息發(fā)送失敗確認(rèn),對應(yīng)application.properties中的spring.rabbitmq.publisher-returns=true
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//設(shè)置消息發(fā)送格式為json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
//消息發(fā)送到exchange回調(diào) 需設(shè)置:spring.rabbitmq.publisher-confirms=true
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
//消息從exchange發(fā)送到queue失敗回調(diào) 需設(shè)置:spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
//---------------------------------------訂單隊(duì)列------------------------------------------------------
/**
* 聲明訂單隊(duì)列的交換機(jī)
* @return
*/
@Bean("orderTopicExchange")
public TopicExchange orderTopicExchange(){
//設(shè)置為持久化 不自動(dòng)刪除
return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false);
}
/**
* 聲明訂單隊(duì)列
* @return
*/
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue(env.getProperty("order.mq.queue.name"),true);
}
/**
* 將隊(duì)列綁定到交換機(jī)
* @return
*/
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key"));
}
/**
* 注入訂單對列消費(fèi)監(jiān)聽器
*/
@Autowired
private OrderListener orderListener;
/**
* 聲明訂單隊(duì)列監(jiān)聽器配置容器
* @return
*/
@Bean("orderListenerContainer")
public SimpleMessageListenerContainer orderListenerContainer(){
//創(chuàng)建監(jiān)聽器容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//將配置信息和鏈接信息賦給容器工廠
factoryConfigurer.configure(factory,connectionFactory);
//容器工廠創(chuàng)建監(jiān)聽器容器
SimpleMessageListenerContainer container = factory.createListenerContainer();
//指定監(jiān)聽器
container.setMessageListener(orderListener);
//指定監(jiān)聽器監(jiān)聽的隊(duì)列
container.setQueues(orderQueue());
return container;
}
}
配置類聲明了訂單隊(duì)列,交換機(jī),通過指定的routingkey綁定了隊(duì)列與交換機(jī)。另外,rabbitTemplate用來發(fā)送消息,ListenerContainer指定監(jiān)聽器(消費(fèi)者)監(jiān)聽的隊(duì)列。
客戶下單,生產(chǎn)消息,上代碼:
@Service
public class SeckillService {
private static final Logger logger = LoggerFactory.getLogger(SeckillService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 生產(chǎn)消息
* @param order
*/
public void seckill(OrderRequest order){
//設(shè)置交換機(jī)
rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name"));
//設(shè)置routingkey
rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key"));
//創(chuàng)建消息體
Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build();
//發(fā)送消息
rabbitTemplate.convertAndSend(msg);
}
}
很簡單,操作rabbitTemplate,指定交換機(jī)和routingkey,發(fā)送消息到綁定的隊(duì)列,等待消費(fèi)處理。
3.減少庫存
消費(fèi)者消費(fèi)訂單消息,做業(yè)務(wù)處理。
看一下監(jiān)聽器(消費(fèi)者)OrderListener:
/**
* 消息監(jiān)聽器(消費(fèi)者)
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(OrderListener.class);
@Autowired
private OrderService orderService;
/**
* 處理接收到的消息
* @param message 消息體
* @param channel 通道,確認(rèn)消費(fèi)用
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
//獲取交付tag
long tag = message.getMessageProperties().getDeliveryTag();
String str = new String(message.getBody(),"utf-8");
logger.info("接收到的消息:{}",str);
JSONObject obj = JSONObject.parseObject(str);
//下單,操作數(shù)據(jù)庫
orderService.order(obj.getString("userId"),obj.getString("goodsId"));
//確認(rèn)消費(fèi)
channel.basicAck(tag,true);
}catch(Exception e){
logger.error("消息監(jiān)聽確認(rèn)機(jī)制發(fā)生異常:",e.fillInStackTrace());
}
}
}
業(yè)務(wù)處理 OrderService:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
/**
* 下單,操作數(shù)據(jù)庫
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//該商品庫存-1(當(dāng)庫存>0時(shí))
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設(shè)為2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
}
}
}
Dao接口和Mybatis文件就不往出貼了,這里的邏輯是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},這條update相當(dāng)于將查詢庫存和減少庫存合并為一個(gè)原子操作,避免高并發(fā)問題,執(zhí)行成功,插入訂單記錄,執(zhí)行失敗,則庫存不夠搶單失敗。
4.支付
訂單處理完成后,如果庫存減少,也就是搶單成功,那么需要用戶在十五分鐘內(nèi)完成支付,這塊就要用到死信隊(duì)列(延遲隊(duì)列)來處理了,先看模型圖:

DLX:dead-letter Exchange 死信交換機(jī)
DLK:dead-letter RoutingKey 死信路由
ttl:time-to-live 超時(shí)時(shí)間
死信隊(duì)列中,消息到期后,會(huì)通過DLX和DLK進(jìn)入到pay-queue,進(jìn)行消費(fèi)。這是另一組消息隊(duì)列,和訂單消息隊(duì)列是分開的。這里注意他們的綁定關(guān)系,主交換機(jī)綁定死信隊(duì)列,死信交換機(jī)綁定的是主隊(duì)列(pay queue)。
接下來聲明圖中的一系列組件,首先application.properties中增加配置:
#支付處理隊(duì)列
#主交換機(jī)
pay.mq.exchange.name=${mq.env}:pay:mq:exchange
#死信交換機(jī)(DLX)
pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange
#主隊(duì)列
pay.mq.queue.name=${mq.env}:pay:mq:queue
#死信隊(duì)列
pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue
#主routingkey
pay.mq.routing.key=${mq.env}:pay:mq:routing:key
#死信routingkey(DLK)
pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key
#支付超時(shí)時(shí)間(毫秒)(TTL),測試原因,這里模擬5秒,如果是生產(chǎn)環(huán)境,這里可以是15分鐘等
pay.mq.ttl=5000
配置類OrderRabbitmqConfig中增加支付隊(duì)列和死信隊(duì)列的聲明:
/**
* 死信隊(duì)列,十五分鐘超時(shí)
* @return
*/
@Bean
public Queue payDeadLetterQueue(){
Map args = new HashMap();
//聲明死信交換機(jī)
args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name"));
//聲明死信routingkey
args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key"));
//聲明死信隊(duì)列中的消息過期時(shí)間
args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class));
//創(chuàng)建死信隊(duì)列
return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args);
}
/**
* 支付隊(duì)列交換機(jī)(主交換機(jī))
* @return
*/
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false);
}
/**
* 將主交換機(jī)綁定到死信隊(duì)列
* @return
*/
@Bean
public Binding payBinding(){
return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key"));
}
/**
* 支付隊(duì)列(主隊(duì)列)
* @return
*/
@Bean
public Queue payQueue(){
return new Queue(env.getProperty("pay.mq.queue.name"),true);
}
/**
* 死信交換機(jī)
* @return
*/
@Bean
public TopicExchange payDeadLetterExchange(){
return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false);
}
/**
* 將主隊(duì)列綁定到死信交換機(jī)
* @return
*/
@Bean
public Binding payDeadLetterBinding(){
return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key"));
}
/**
* 注入支付監(jiān)聽器
*/
@Autowired
private PayListener payListener;
/**
* 支付隊(duì)列監(jiān)聽器容器
* @return
*/
@Bean
public SimpleMessageListenerContainer payMessageListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer();
listenerContainer.setMessageListener(payListener);
listenerContainer.setQueues(payQueue());
return listenerContainer;
}
支付隊(duì)列和死信隊(duì)列的Queue、Exchange、routingkey都已就緒。
看生產(chǎn)者:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 下單,操作數(shù)據(jù)庫
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//該商品庫存-1(當(dāng)庫存>0時(shí))
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明搶單成功,插入下單記錄,支付狀態(tài)設(shè)為2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
//將該訂單添加到支付隊(duì)列
rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key"));
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
String json = JSON.toJSONString(orderRecord);
Message msg = MessageBuilder.withBody(json.getBytes()).build();
rabbitTemplate.convertAndSend(msg);
}
}
}
在OrderService中,數(shù)據(jù)庫操作完成后,將訂單信息發(fā)送到死信隊(duì)列,死信隊(duì)列中的消息會(huì)在十五分鐘后進(jìn)入到支付隊(duì)列,等待消費(fèi)。
再看消費(fèi)者:
@Component
public class PayListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayListener.class);
@Autowired
private PayService payService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Long tag = message.getMessageProperties().getDeliveryTag();
try {
String str = new String(message.getBody(), "utf-8");
logger.info("接收到的消息:{}",str);
JSONObject json = JSON.parseObject(str);
String orderId = json.getString("id");
//確認(rèn)是否付款
payService.confirmPay(orderId);
//確認(rèn)消費(fèi)
channel.basicAck(tag, true);
}catch(Exception e){
logger.info("支付消息消費(fèi)出錯(cuò):{}",e.getMessage());
logger.info("出錯(cuò)的tag:{}",tag);
}
}
}
PayService:
@Service
public class PayService {
private static final Logger logger = LoggerFactory.getLogger(PayService.class);
@Resource
private SeckillMapper seckillMapper;
/**
* 確認(rèn)是否支付
* @param orderId
*/
public void confirmPay(String orderId){
OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId);
//根據(jù)訂單號校驗(yàn)該用戶是否已支付
if(checkPay(orderId)){
//已支付
orderRecord.setPayStatus(1);
seckillMapper.updatePayStatus(orderRecord);
logger.info("用戶{}已支付",orderId);
}else{
//未支付
orderRecord.setPayStatus(0);
seckillMapper.updatePayStatus(orderRecord);
//取消支付后,商品庫存+1
seckillMapper.returnStock(orderRecord.getGoodsId());
logger.info("用戶{}未支付",orderId);
}
}
/**
* 模擬判斷訂單支付成功或失敗,成功失敗隨機(jī)
* @param orderId
* @return
*/
public boolean checkPay(String orderId){
Random random = new Random();
int res = random.nextInt(2);
return res==0?false:true;
}
這里checkPay()方法模擬調(diào)用第三方支付接口來判斷用戶是否已支付。若支付成功,訂單改為已支付狀態(tài),支付失敗,改為已取消狀態(tài),庫存退回。
總結(jié)
整個(gè)demo,是兩組消息隊(duì)列撐起來的,一組訂單消息隊(duì)列,一組支付消息隊(duì)列,而每一組隊(duì)列都是由queue、exchange、routingkey、生產(chǎn)者以及消費(fèi)者組成。交換機(jī)通過routingkey綁定隊(duì)列,rabbitTemplate通過指定交換機(jī)和routingkey將消息發(fā)送到指定隊(duì)列,消費(fèi)者監(jiān)聽該隊(duì)列進(jìn)行消費(fèi)。不同的是第二組支付隊(duì)列里嵌入了死信隊(duì)列來做一個(gè)十五分鐘的延遲支付。
到此這篇關(guān)于SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺的文章就介紹到這了,更多相關(guān)SpringBoot+RabbitMQ+Redis實(shí)現(xiàn)商品秒殺內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot使用注解進(jìn)行分頁的實(shí)現(xiàn)示例
分頁使用可以說非常普遍了,有時(shí)候會(huì)需要非常靈活的方式去開啟或關(guān)閉分頁,嘗試使用一下注解的方式來進(jìn)行分頁,本文主要介紹了SpringBoot使用注解進(jìn)行分頁的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
詳解使用@RequestBody取POST方式的json字符串
這篇文章主要介紹了詳解使用@RequestBody取POST方式的json字符串,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
SpringBoot統(tǒng)一響應(yīng)和統(tǒng)一異常處理詳解
在開發(fā)Spring Boot應(yīng)用時(shí),處理響應(yīng)結(jié)果和異常的方式對項(xiàng)目的可維護(hù)性、可擴(kuò)展性和團(tuán)隊(duì)協(xié)作有著至關(guān)重要的影響,統(tǒng)一結(jié)果返回和統(tǒng)一異常處理是提升項(xiàng)目質(zhì)量的關(guān)鍵策略之一,所以本文給大家詳細(xì)介紹了SpringBoot統(tǒng)一響應(yīng)和統(tǒng)一異常處理,需要的朋友可以參考下2024-08-08
基于Comparator對象集合實(shí)現(xiàn)多個(gè)條件按照優(yōu)先級的比較
這篇文章主要介紹了基于Comparator對象集合實(shí)現(xiàn)多個(gè)條件按照優(yōu)先級的比較,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07

