SpringBoot+Redis實(shí)現(xiàn)不重復(fù)消費(fèi)的隊(duì)列的示例代碼
背景
最近我們新研發(fā)了一個(gè)“年夜飯訂購(gòu)”功能(沒(méi)想到吧,雷襲在是一個(gè)程序猿的同時(shí),也是一名優(yōu)秀的在廚子)。用戶使用系統(tǒng)選擇年夜飯,點(diǎn)擊“下單”時(shí),后臺(tái)首先會(huì)生成一條訂單數(shù)據(jù),返回消息給用戶:“您已成功下單,后廚正在準(zhǔn)備菜品!”。同時(shí),以線程的方式指揮各個(gè)廚子按菜單聯(lián)系供應(yīng)商準(zhǔn)備食材,制作菜品,最后打包寄給客戶。但是,用戶在使用這個(gè)功能時(shí),系統(tǒng)卻有一定的機(jī)率卡死,這個(gè)問(wèn)題極大的影響了用戶的體驗(yàn)。年關(guān)將近,這個(gè)功能也顯得越發(fā)重要,客戶要求我們限期整改,三天內(nèi)必須解決該問(wèn)題。
我首先對(duì)這個(gè)功能進(jìn)行了分析,很明顯,這是一個(gè)使用頻次不高,但是使用時(shí)間比較集中的功能。在大量用戶同時(shí)使用時(shí),會(huì)導(dǎo)致后臺(tái)的廚師,食材,供應(yīng)商等全面告警(用程序員語(yǔ)言翻譯一下,這個(gè)功能耗CPU,耗內(nèi)存,耗IO)。但用戶對(duì)于實(shí)時(shí)性的要求并不高。下單之后,訂購(gòu)的菜品是一天內(nèi)完成,還是兩天完成并沒(méi)有關(guān)系,只要年前能做完就可以。
因此,我們決定采用消息中間件的方式,以隊(duì)列的形式逐次的執(zhí)行“年夜飯制作”的操作, 來(lái)緩解服務(wù)器的各種資源的壓力。
之所以采用Redis來(lái)實(shí)現(xiàn)消息隊(duì)列,而不是使用更為成熟的ONS,Kafka。不是因?yàn)镺NS用不起,而是Redis更有性價(jià)比(用戶只允許使用ONS中間件,但ONS會(huì)帶來(lái)額外的網(wǎng)絡(luò)開(kāi)銷,學(xué)習(xí)成本和風(fēng)險(xiǎn)都更大,這個(gè)功能使用頻度并不高,沒(méi)有必要為了它而引入一個(gè)重量級(jí)的中間件。)
代碼實(shí)踐
說(shuō)干就干,咱們先看看源碼,如下:
// 訂單實(shí)體類 @Data public class OrderEntity implements Serializable { /** * 客戶姓名 */ private String customerName; /** * 訂單號(hào) */ private String orderCode; /** * 菜單 */ List<String> menus; } @Slf4j @Service public class DinnerService { /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity entity) { // 存儲(chǔ)訂單信息 saveOrder(entity); // 異步開(kāi)始做菜 CompletableFuture.runAsync(() -> doNewYearEveDinner(entity)); return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!"; } /** * 這里模擬的是做年夜飯的過(guò)程方法,該方法用時(shí)較長(zhǎng),整個(gè)過(guò)程需要10秒。 * 這個(gè)過(guò)程中存在多種意外,可能導(dǎo)致該方法執(zhí)行失敗 * * @param req 訂單信息 */ public void doNewYearEveDinner(OrderEntity entity) { System.out.println("開(kāi)始做訂單 " + entity.getOrderCode() + " 的年夜飯"); try { Thread.sleep(10000); }catch (Exception e ) { e.printStackTrace(); System.out.println("廚子跑了,廚房著火了,供應(yīng)商堵路上了"); } System.out.println("訂單 " + entity.getOrderCode() + " 的年夜飯已經(jīng)完成"); } private void saveOrder(OrderEntity req) { //這里假設(shè)做的是訂單入庫(kù)操作 System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫(kù), 做飯開(kāi)始時(shí)間為 "+ new Date()); } }
1、引入maven依賴,在application.yml中添加redis配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
spring: redis: database: 9 host: 127.0.0.1 port: 6379 password: jedis: pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0
2、添加Redis隊(duì)列監(jiān)聽(tīng),添加Redis配置文件注冊(cè)監(jiān)聽(tīng)
// 監(jiān)聽(tīng)類 @Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; @Override public void onMessage(Message message, byte[] pattern) { OrderEntity entity= JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } //配置類,用于注冊(cè)監(jiān)聽(tīng) @Configuration public class RedisConfig { @Bean public ChannelTopic topic() { return new ChannelTopic("NEW_YEAR_DINNER"); } @Bean public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) { return new MessageListenerAdapter(listener); } @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter, ChannelTopic topic) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.addMessageListener(messageListenerAdapter, topic); return container; } }
3、修改原方法,以及Controller調(diào)用
// DinnerService中的方法修改 /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity entity) { // 存儲(chǔ)訂單信息 saveOrder(entity); // 異步開(kāi)始做菜 redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(entity)); return "您已成功下單,后廚正在準(zhǔn)備預(yù)制菜!"; } @RestController public class DinnerController { private int i = 0; @Autowired private DinnerService service; @GetMapping("/orderDinner") public Object orderDinner() { OrderEntity entity = new OrderEntity(); entity.setOrderCode("Order" + (++i)); entity.setCustomerName("第"+i+"位客戶"); return service.orderNewYearEveDinner(entity); } }
4、通過(guò)postman調(diào)用四次請(qǐng)求,測(cè)試結(jié)果如下:
5、Listener中添加同步鎖
細(xì)看上文中打出來(lái)的注釋,我發(fā)現(xiàn)這和我設(shè)想的不一樣啊。原定的計(jì)劃是先做完第一份年夜飯,再做第二份,做完第二份再做第三份,為什么第一次沒(méi)執(zhí)行完就開(kāi)始執(zhí)行第二次了?
在網(wǎng)上查了些資料后我才知道,要達(dá)到我想要的效果,得在Listener中添加上同步鎖,如下:
@Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; private final Object lock = new Object(); @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } }
再次執(zhí)行測(cè)試用例,結(jié)果如下:
6、多服務(wù)不重復(fù)消費(fèi)消息
上面的結(jié)果已經(jīng)滿足了我們的要求,但是,客戶考慮到我們只有一個(gè)廚房,的確影響效率,決定給我們擴(kuò)建一個(gè)廚房(添加服務(wù)器),希望能達(dá)到廚房A做第一份訂單,廚房B做第二份訂單,以上的代碼能實(shí)現(xiàn)嗎?我們把剛才的項(xiàng)目拷貝一份,修改端口,啟動(dòng)后測(cè)試。結(jié)果如下:
從上面的日志可以看出來(lái),兩個(gè)服務(wù)都做了訂單1的年夜飯,消息被重復(fù)消費(fèi)了。但是根據(jù)業(yè)務(wù)需求,我們不需要重復(fù)消費(fèi)消息,我們想達(dá)到的效果是多服務(wù)實(shí)現(xiàn)負(fù)載均衡,本服務(wù)在處理的數(shù)據(jù),其他服務(wù)不需要再處理了,應(yīng)該怎么實(shí)現(xiàn)呢?咱們依然可以運(yùn)用Redis,對(duì)代碼做如下調(diào)整:
@Component public class DinnerListener implements MessageListener { @Autowired private DinnerService service; @Autowired private RedisTemplate<String, String> redisTemplate; private final Object lock = new Object(); @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS); // 加鎖失敗,已有消費(fèi)端在此時(shí)對(duì)此消息進(jìn)行處理,這里不再做處理 if (!flag) { return; } OrderEntity entity = JSON.parseObject(message.toString(), OrderEntity.class); service.doNewYearEveDinner(entity); } } }
從測(cè)試結(jié)果來(lái)看,這么調(diào)整解決達(dá)到了我們的效果。
7、添加日志監(jiān)控
仔細(xì)檢查,發(fā)現(xiàn)上面的代碼雖然滿足了我們的業(yè)務(wù)需求,但是在安全方面仍然沒(méi)有得到一定的保障,方法doNewYearEveDinner存在很多不可預(yù)見(jiàn)的隱患,如廚師跑了,廚房著了,供應(yīng)商堵路上了,這些都會(huì)導(dǎo)致方法執(zhí)行失敗,那么,我們?cè)趺粗肋@個(gè)訂單執(zhí)行成功或者失敗了呢?看日志嗎?成百上千條數(shù)據(jù)堆起來(lái),通過(guò)看日志來(lái)看結(jié)果多不方便???咱們是否可以對(duì)代碼做一下調(diào)整?基于這方面考慮,我對(duì)代碼做了以下調(diào)整
//訂單類進(jìn)行調(diào)整 @Data public class OrderEntity implements Serializable { /** * 客戶姓名 */ private String customerName; /** * 訂單號(hào) */ private String orderCode; /** * 菜單 */ List<String> menus; /** * 出餐狀態(tài) */ private String dinnerState; /** * 做飯開(kāi)始時(shí)間 */ private String dinnerStartTime; /** * 做飯結(jié)束時(shí)間 */ private String dinnerEndTime; /** * 備注 */ private String remark; } // DinnerService做如下調(diào)整, 添加一個(gè)訂單信息更新的方法 @Slf4j @Service public class DinnerService { @Autowired private RedisTemplate<String, String> redisTemplate; /** * 年夜飯下單 * * @param req 訂單信息 * @return */ public Object orderNewYearEveDinner(OrderEntity req) { // 存儲(chǔ)訂單信息 saveOrder(req); // 異步開(kāi)始做菜 redisTemplate.convertAndSend("NEW_YEAR_DINNER", JSON.toJSONString(req)); return "您已成功下單,訂單號(hào)為"+ req.getOrderCode()+",后廚正在準(zhǔn)備預(yù)制菜!"; } /** * 這里模擬的是做年夜飯的過(guò)程方法,該方法用時(shí)較長(zhǎng),整個(gè)過(guò)程需要10秒,但是,這個(gè)過(guò)程中存在多種意外,該方法可能失敗 * * @param req 訂單信息 */ public void doNewYearEveDinner(OrderEntity req) throws Exception { System.out.println("開(kāi)始做訂單 " + req.getOrderCode() + " 的年夜飯"); Thread.sleep(10000); System.out.println("訂單 " + req.getOrderCode() + " 的年夜飯已經(jīng)完成"); } private void saveOrder(OrderEntity req) { //這里假設(shè)做的是訂單入庫(kù)操作 System.out.println("訂單 " + req.getOrderCode() + " 已經(jīng)入庫(kù), 做飯開(kāi)始時(shí)間為 "+ new Date()); } /** * 根據(jù)訂單編號(hào)修改訂單信息 * * @param orderCode 訂單編號(hào) * @param dinnerStatus * @param remark */ public void updateOrder(String orderCode, String dinnerStatus, String remark) { // 根據(jù)訂單編號(hào)修改訂單的出餐結(jié)束時(shí)間,出餐狀態(tài),備注等信息。 System.out.println("更新訂單 "+ orderCode +" 信息,做飯結(jié)束時(shí)間為 "+ new Date() + ", 出餐狀態(tài)為"+ dinnerStatus +", 備注為 " +remark); } } // Listener中做如下調(diào)整 @Override public void onMessage(Message message, byte[] pattern) { synchronized (lock) { Boolean flag = redisTemplate.opsForValue().setIfAbsent(message.toString(), "1", 1, TimeUnit.DAYS); // 加鎖失敗,已有消費(fèi)端在此時(shí)對(duì)此消息進(jìn)行處理,這里不再做處理 if (!flag) { return; } OrderEntity param = JSON.parseObject(message.toString(), OrderEntity.class); try { service.doNewYearEveDinner(param); service.updateOrder(param.getOrderCode(), "SUCCESS", "成功"); }catch (Exception e) { e.printStackTrace(); service.updateOrder(param.getOrderCode(), "FAIL", e.getMessage()); } } }
這部分代碼就不貼測(cè)試結(jié)果了,與上一次的測(cè)試結(jié)果一致,只不過(guò)提升了功能的可測(cè)試性,擴(kuò)展一下,這個(gè)結(jié)果能否達(dá)到我們的要求呢?其實(shí)仍然沒(méi)有,對(duì)于執(zhí)行失敗的訂單,我們需要一個(gè)機(jī)制來(lái)處理,根據(jù)報(bào)錯(cuò)信息決定是重新執(zhí)行還是直接報(bào)警,人為介入處理,由此才能實(shí)現(xiàn)整個(gè)事務(wù)的閉環(huán)。
這是一次簡(jiǎn)單的SpringBoot+Redis實(shí)現(xiàn)隊(duì)列的實(shí)踐,個(gè)人覺(jué)得這個(gè)過(guò)程比較有趣,分析問(wèn)題出現(xiàn)的原因,需求的潛在歸約,根據(jù)業(yè)務(wù)的需要、當(dāng)前的條件選擇合適的方法和組件,快而有效的解決問(wèn)題,所以我將它記錄了下來(lái),供大家參考。實(shí)際上,已經(jīng)有大神對(duì)于Redis實(shí)現(xiàn)隊(duì)列的方法進(jìn)行了完整細(xì)致的歸納,如果想深入的了解這部分的知識(shí),推薦你們看看這篇博客: Redis隊(duì)列詳解(springboot實(shí)戰(zhàn))
到此這篇關(guān)于SpringBoot+Redis實(shí)現(xiàn)不重復(fù)消費(fèi)的隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Redis不重復(fù)消費(fèi)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot利用redis集成消息隊(duì)列的方法
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的場(chǎng)景分析
- springboot整合redis之消息隊(duì)列
- SpringBoot+Redis隊(duì)列實(shí)現(xiàn)Java版秒殺的示例代碼
- SpringBoot實(shí)現(xiàn)redis延遲隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- SpringBoot中Redisson延遲隊(duì)列的示例
- springboot使用Redis隊(duì)列實(shí)戰(zhàn)
相關(guān)文章
如何批量測(cè)試Mybatis項(xiàng)目中的Sql是否正確詳解
這篇文章主要給大家介紹了關(guān)于如何批量測(cè)試Mybatis項(xiàng)目中Sql是否正確的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12Java隨機(jī)數(shù)算法原理與實(shí)現(xiàn)方法實(shí)例詳解
這篇文章主要介紹了Java隨機(jī)數(shù)算法原理與實(shí)現(xiàn)方法,簡(jiǎn)單分析了隨機(jī)數(shù)算法的原理并結(jié)合具體實(shí)例形式給出了java編程計(jì)算隨機(jī)數(shù)的具體操作技巧,需要的朋友可以參考下2017-09-09Spring boot如何通過(guò)@Scheduled實(shí)現(xiàn)定時(shí)任務(wù)及多線程配置
這篇文章主要介紹了Spring boot如何通過(guò)@Scheduled實(shí)現(xiàn)定時(shí)任務(wù)及多線程配置,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12java加載properties文件的六種方法總結(jié)
這篇文章主要介紹了java加載properties文件的六種方法總結(jié)的相關(guān)資料,需要的朋友可以參考下2017-05-05mybatis動(dòng)態(tài)新增(insert)和修改(update)方式
這篇文章主要介紹了mybatis動(dòng)態(tài)新增(insert)和修改(update)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05java GUI實(shí)現(xiàn)ATM機(jī)系統(tǒng)(3.0版)
這篇文章主要為大家詳細(xì)介紹了java GUI實(shí)現(xiàn)ATM機(jī)系統(tǒng)(3.0版),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-03-03Javaweb監(jiān)聽(tīng)器實(shí)例之統(tǒng)計(jì)在線人數(shù)
這篇文章主要為大家詳細(xì)介紹了Javaweb監(jiān)聽(tīng)器實(shí)例之統(tǒng)計(jì)在線人數(shù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-11-11C++/java 繼承類的多態(tài)詳解及實(shí)例代碼
這篇文章主要介紹了C++/java 繼承類的多態(tài)詳解及實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2017-02-02spring/springboot整合dubbo詳細(xì)教程
今天教大家如何使用spring/springboot整合dubbo,文中有非常詳細(xì)的圖文介紹及代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴有很好地幫助,需要的朋友可以參考下2021-05-05