SpringBoot+Redis實現(xiàn)不重復(fù)消費(fèi)的隊列的示例代碼
背景
最近我們新研發(fā)了一個“年夜飯訂購”功能(沒想到吧,雷襲在是一個程序猿的同時,也是一名優(yōu)秀的在廚子)。用戶使用系統(tǒng)選擇年夜飯,點(diǎn)擊“下單”時,后臺首先會生成一條訂單數(shù)據(jù),返回消息給用戶:“您已成功下單,后廚正在準(zhǔn)備菜品!”。同時,以線程的方式指揮各個廚子按菜單聯(lián)系供應(yīng)商準(zhǔn)備食材,制作菜品,最后打包寄給客戶。但是,用戶在使用這個功能時,系統(tǒng)卻有一定的機(jī)率卡死,這個問題極大的影響了用戶的體驗。年關(guān)將近,這個功能也顯得越發(fā)重要,客戶要求我們限期整改,三天內(nèi)必須解決該問題。
我首先對這個功能進(jìn)行了分析,很明顯,這是一個使用頻次不高,但是使用時間比較集中的功能。在大量用戶同時使用時,會導(dǎo)致后臺的廚師,食材,供應(yīng)商等全面告警(用程序員語言翻譯一下,這個功能耗CPU,耗內(nèi)存,耗IO)。但用戶對于實時性的要求并不高。下單之后,訂購的菜品是一天內(nèi)完成,還是兩天完成并沒有關(guān)系,只要年前能做完就可以。
因此,我們決定采用消息中間件的方式,以隊列的形式逐次的執(zhí)行“年夜飯制作”的操作, 來緩解服務(wù)器的各種資源的壓力。
之所以采用Redis來實現(xiàn)消息隊列,而不是使用更為成熟的ONS,Kafka。不是因為ONS用不起,而是Redis更有性價比(用戶只允許使用ONS中間件,但ONS會帶來額外的網(wǎng)絡(luò)開銷,學(xué)習(xí)成本和風(fēng)險都更大,這個功能使用頻度并不高,沒有必要為了它而引入一個重量級的中間件。)
代碼實踐
說干就干,咱們先看看源碼,如下:
// 訂單實體類
@Data
public class OrderEntity implements Serializable {
/**
* 客戶姓名
*/
private String customerName;
/**
* 訂單號
*/
private String orderCode;
/**
* 菜單
*/
List<String> menus;
}
@Slf4j
@Service
public class DinnerService {
/**
* 年夜飯下單
*
* @param req 訂單信息
* @return
*/
public Object orderNewYearEveDinner(OrderEntity entity) {
// 存儲訂單信息
saveOrder(entity);
// 異步開始做菜
CompletableFuture.runAsync(() -> doNewYearEveDinner(entity));
return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!";
}
/**
* 這里模擬的是做年夜飯的過程方法,該方法用時較長,整個過程需要10秒。
* 這個過程中存在多種意外,可能導(dǎo)致該方法執(zhí)行失敗
*
* @param req 訂單信息
*/
public void doNewYearEveDinner(OrderEntity entity) {
System.out.println("開始做訂單 " + entity.getOrderCode() + " 的年夜飯");
try {
Thread.sleep(10000);
}catch (Exception e ) {
e.printStackTrace();
System.out.println("廚子跑了,廚房著火了,供應(yīng)商堵路上了");
}
System.out.println("訂單 " + entity.getOrderCode() + " 的年夜飯已經(jīng)完成");
}
private void saveOrder(OrderEntity req) {
//這里假設(shè)做的是訂單入庫操作
System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫, 做飯開始時間為 "+ new Date());
}
}1、引入maven依賴,在application.yml中添加redis配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>spring:
redis:
database: 9
host: 127.0.0.1
port: 6379
password:
jedis:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 02、添加Redis隊列監(jiān)聽,添加Redis配置文件注冊監(jiān)聽
// 監(jiān)聽類
@Component
public class DinnerListener implements MessageListener {
@Autowired
private DinnerService service;
@Override
public void onMessage(Message message, byte[] pattern) {
OrderEntity entity= JSON.parseObject(message.toString(), OrderEntity.class);
service.doNewYearEveDinner(entity);
}
}
//配置類,用于注冊監(jiān)聽
@Configuration
public class RedisConfig {
@Bean
public ChannelTopic topic() {
return new ChannelTopic("NEW_YEAR_DINNER");
}
@Bean
public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) {
return new MessageListenerAdapter(listener);
}
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter messageListenerAdapter,
ChannelTopic topic) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(messageListenerAdapter, topic);
return container;
}
}3、修改原方法,以及Controller調(diào)用
// DinnerService中的方法修改
/**
* 年夜飯下單
*
* @param req 訂單信息
* @return
*/
public Object orderNewYearEveDinner(OrderEntity entity) {
// 存儲訂單信息
saveOrder(entity);
// 異步開始做菜
redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(entity));
return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!";
}
@RestController
public class DinnerController {
private int i = 0;
@Autowired
private DinnerService service;
@GetMapping("/orderDinner")
public Object orderDinner() {
OrderEntity entity = new OrderEntity();
entity.setOrderCode("Order" + (++i));
entity.setCustomerName("第"+i+"位客戶");
return service.orderNewYearEveDinner(entity);
}
}
4、通過postman調(diào)用四次請求,測試結(jié)果如下:


5、Listener中添加同步鎖
細(xì)看上文中打出來的注釋,我發(fā)現(xiàn)這和我設(shè)想的不一樣啊。原定的計劃是先做完第一份年夜飯,再做第二份,做完第二份再做第三份,為什么第一次沒執(zhí)行完就開始執(zhí)行第二次了?
在網(wǎng)上查了些資料后我才知道,要達(dá)到我想要的效果,得在Listener中添加上同步鎖,如下:
@Component
public class DinnerListener implements MessageListener {
@Autowired
private DinnerService service;
private final Object lock = new Object();
@Override
public void onMessage(Message message, byte[] pattern) {
synchronized (lock) {
OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class);
service.doNewYearEveDinner(entity);
}
}
}再次執(zhí)行測試用例,結(jié)果如下:

6、多服務(wù)不重復(fù)消費(fèi)消息
上面的結(jié)果已經(jīng)滿足了我們的要求,但是,客戶考慮到我們只有一個廚房,的確影響效率,決定給我們擴(kuò)建一個廚房(添加服務(wù)器),希望能達(dá)到廚房A做第一份訂單,廚房B做第二份訂單,以上的代碼能實現(xiàn)嗎?我們把剛才的項目拷貝一份,修改端口,啟動后測試。結(jié)果如下:

從上面的日志可以看出來,兩個服務(wù)都做了訂單1的年夜飯,消息被重復(fù)消費(fèi)了。但是根據(jù)業(yè)務(wù)需求,我們不需要重復(fù)消費(fèi)消息,我們想達(dá)到的效果是多服務(wù)實現(xiàn)負(fù)載均衡,本服務(wù)在處理的數(shù)據(jù),其他服務(wù)不需要再處理了,應(yīng)該怎么實現(xiàn)呢?咱們依然可以運(yùn)用Redis,對代碼做如下調(diào)整:
@Component
public class DinnerListener implements MessageListener {
@Autowired
private DinnerService service;
@Autowired
private RedisTemplate<String, String> redisTemplate;
private final Object lock = new Object();
@Override
public void onMessage(Message message, byte[] pattern) {
synchronized (lock) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS);
// 加鎖失敗,已有消費(fèi)端在此時對此消息進(jìn)行處理,這里不再做處理
if (!flag) {
return;
}
OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class);
service.doNewYearEveDinner(entity);
}
}
}
從測試結(jié)果來看,這么調(diào)整解決達(dá)到了我們的效果。

7、添加日志監(jiān)控
仔細(xì)檢查,發(fā)現(xiàn)上面的代碼雖然滿足了我們的業(yè)務(wù)需求,但是在安全方面仍然沒有得到一定的保障,方法doNewYearEveDinner存在很多不可預(yù)見的隱患,如廚師跑了,廚房著了,供應(yīng)商堵路上了,這些都會導(dǎo)致方法執(zhí)行失敗,那么,我們怎么知道這個訂單執(zhí)行成功或者失敗了呢?看日志嗎?成百上千條數(shù)據(jù)堆起來,通過看日志來看結(jié)果多不方便?。吭蹅兪欠窨梢詫Υa做一下調(diào)整?基于這方面考慮,我對代碼做了以下調(diào)整
//訂單類進(jìn)行調(diào)整
@Data
public class OrderEntity implements Serializable {
/**
* 客戶姓名
*/
private String customerName;
/**
* 訂單號
*/
private String orderCode;
/**
* 菜單
*/
List<String> menus;
/**
* 出餐狀態(tài)
*/
private String dinnerState;
/**
* 做飯開始時間
*/
private String dinnerStartTime;
/**
* 做飯結(jié)束時間
*/
private String dinnerEndTime;
/**
* 備注
*/
private String remark;
}
// DinnerService做如下調(diào)整, 添加一個訂單信息更新的方法
@Slf4j
@Service
public class DinnerService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 年夜飯下單
*
* @param req 訂單信息
* @return
*/
public Object orderNewYearEveDinner(OrderEntity req) {
// 存儲訂單信息
saveOrder(req);
// 異步開始做菜
redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(req));
return "您已成功下單,訂單號為"+ req.getOrderCode()+",后廚正在準(zhǔn)備預(yù)制菜!";
}
/**
* 這里模擬的是做年夜飯的過程方法,該方法用時較長,整個過程需要10秒,但是,這個過程中存在多種意外,該方法可能失敗
*
* @param req 訂單信息
*/
public void doNewYearEveDinner(OrderEntity req) throws Exception {
System.out.println("開始做訂單 " + req.getOrderCode() + " 的年夜飯");
Thread.sleep(10000);
System.out.println("訂單 " + req.getOrderCode() + " 的年夜飯已經(jīng)完成");
}
private void saveOrder(OrderEntity req) {
//這里假設(shè)做的是訂單入庫操作
System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫, 做飯開始時間為 "+ new Date());
}
/**
* 根據(jù)訂單編號修改訂單信息
*
* @param orderCode 訂單編號
* @param dinnerStatus
* @param remark
*/
public void updateOrder(String orderCode, String dinnerStatus, String remark) {
// 根據(jù)訂單編號修改訂單的出餐結(jié)束時間,出餐狀態(tài),備注等信息。
System.out.println("更新訂單 "+ orderCode +" 信息,做飯結(jié)束時間為 "+ new Date() + ", 出餐狀態(tài)為"+ dinnerStatus +", 備注為 " +remark);
}
}
// Listener中做如下調(diào)整
@Override
public void onMessage(Message message, byte[] pattern) {
synchronized (lock) {
Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS);
// 加鎖失敗,已有消費(fèi)端在此時對此消息進(jìn)行處理,這里不再做處理
if (!flag) {
return;
}
OrderEntity param = JSON.parseObject(message.toString(), OrderEntity.class);
try {
service.doNewYearEveDinner(param);
service.updateOrder(param.getOrderCode(), "SUCCESS", "成功");
}catch (Exception e) {
e.printStackTrace();
service.updateOrder(param.getOrderCode(), "FAIL", e.getMessage());
}
}
}這部分代碼就不貼測試結(jié)果了,與上一次的測試結(jié)果一致,只不過提升了功能的可測試性,擴(kuò)展一下,這個結(jié)果能否達(dá)到我們的要求呢?其實仍然沒有,對于執(zhí)行失敗的訂單,我們需要一個機(jī)制來處理,根據(jù)報錯信息決定是重新執(zhí)行還是直接報警,人為介入處理,由此才能實現(xiàn)整個事務(wù)的閉環(huán)。
這是一次簡單的SpringBoot+Redis實現(xiàn)隊列的實踐,個人覺得這個過程比較有趣,分析問題出現(xiàn)的原因,需求的潛在歸約,根據(jù)業(yè)務(wù)的需要、當(dāng)前的條件選擇合適的方法和組件,快而有效的解決問題,所以我將它記錄了下來,供大家參考。實際上,已經(jīng)有大神對于Redis實現(xiàn)隊列的方法進(jìn)行了完整細(xì)致的歸納,如果想深入的了解這部分的知識,推薦你們看看這篇博客: Redis隊列詳解(springboot實戰(zhàn))
到此這篇關(guān)于SpringBoot+Redis實現(xiàn)不重復(fù)消費(fèi)的隊列的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Redis不重復(fù)消費(fèi)隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java隨機(jī)數(shù)算法原理與實現(xiàn)方法實例詳解
這篇文章主要介紹了Java隨機(jī)數(shù)算法原理與實現(xiàn)方法,簡單分析了隨機(jī)數(shù)算法的原理并結(jié)合具體實例形式給出了java編程計算隨機(jī)數(shù)的具體操作技巧,需要的朋友可以參考下2017-09-09
Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置
這篇文章主要介紹了Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12
java加載properties文件的六種方法總結(jié)
這篇文章主要介紹了java加載properties文件的六種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-05-05
mybatis動態(tài)新增(insert)和修改(update)方式
這篇文章主要介紹了mybatis動態(tài)新增(insert)和修改(update)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05
java GUI實現(xiàn)ATM機(jī)系統(tǒng)(3.0版)
這篇文章主要為大家詳細(xì)介紹了java GUI實現(xiàn)ATM機(jī)系統(tǒng)(3.0版),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-03-03
Javaweb監(jiān)聽器實例之統(tǒng)計在線人數(shù)
這篇文章主要為大家詳細(xì)介紹了Javaweb監(jiān)聽器實例之統(tǒng)計在線人數(shù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-11-11
spring/springboot整合dubbo詳細(xì)教程
今天教大家如何使用spring/springboot整合dubbo,文中有非常詳細(xì)的圖文介紹及代碼示例,對正在學(xué)習(xí)java的小伙伴有很好地幫助,需要的朋友可以參考下2021-05-05

