基于rocketmq的有序消費(fèi)模式和并發(fā)消費(fèi)模式的區(qū)別說(shuō)明
rocketmq消費(fèi)者注冊(cè)監(jiān)聽(tīng)有兩種模式
有序消費(fèi)MessageListenerOrderly和并發(fā)消費(fèi)MessageListenerConcurrently,這兩種模式返回值不同。
MessageListenerOrderly
正確消費(fèi)返回
ConsumeOrderlyStatus.SUCCESS
稍后消費(fèi)返回
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently
正確消費(fèi)返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消費(fèi)返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
顧名思義,有序消費(fèi)模式是按照消息的順序進(jìn)行消費(fèi),但是除此之外,在實(shí)踐過(guò)程中我發(fā)現(xiàn)和并發(fā)消費(fèi)模式還有很大的區(qū)別的。
第一,速度,下面我打算用實(shí)驗(yàn)來(lái)探究一下。
使用mq發(fā)送消息,消費(fèi)者使用有序消費(fèi)模式消費(fèi),具體的業(yè)務(wù)是阻塞100ms
Long totalTime = 0L; Date date1 = null; Date date2 = new Date(); new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { logger.info("==========CONSUME_START==========="); logger.info(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); try { if(date1 == null) date1 = new Date();//在第一次消費(fèi)時(shí)初始化 Thread.sleep(100); logger.info("total:"+(++total)); date2 = new Date(); totalTime = (date2.getTime() - date1.getTime()); logger.info("totalTime:"+totalTime); logger.info("==========CONSUME_SUCCESS==========="); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { logger.info("==========RECONSUME_LATER==========="); logger.error(e.getMessage(),e); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
消費(fèi)100條消息
速度挺快的,為了讓結(jié)果更準(zhǔn)確,將消息加到1000條
消費(fèi)1000條消息
可以看到每一條消息平均耗時(shí)25ms,然而業(yè)務(wù)是阻塞100ms,這說(shuō)明有序消費(fèi)模式和同步消費(fèi)可能并不是一回事,那如果不阻塞代碼我們?cè)賮?lái)看一下結(jié)果
不阻塞過(guò)后速度明顯提高了,那么我阻塞300ms會(huì)怎么樣呢?
時(shí)間相比阻塞100ms多了2倍
接下來(lái)我們測(cè)試并發(fā)消費(fèi)模式
Long totalTime = 0L; Date date1 = null; Date date2 = new Date(); new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List< MessageExt > msgs, ConsumeConcurrentlyContext context) { logger.info(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); try { if(date1 == null) date1 = new Date(); Thread.sleep(100); logger.info("total:"+(++total)); date2 = new Date(); totalTime = (date2.getTime() - date1.getTime()); logger.info("totalTime:"+totalTime); logger.info("==========CONSUME_SUCCESS==========="); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { logger.info("==========RECONSUME_LATER==========="); logger.error(e.getMessage(),e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }
基于上次的經(jīng)驗(yàn),同樣測(cè)試三種情況,消費(fèi)1000條不阻塞,消費(fèi)1000條阻塞100ms,消費(fèi)1000條阻塞300ms
消費(fèi)1000條不阻塞的情況
和有序消費(fèi)模式差不多,快個(gè)一兩秒。
消費(fèi)1000條阻塞100ms
竟然比不阻塞的情況更快,可能是誤差把
消費(fèi)1000條阻塞300ms
速度稍慢,但是還是比有序消費(fèi)快得多。
結(jié)論是并發(fā)消費(fèi)的消費(fèi)速度要比有序消費(fèi)更快。
另一個(gè)區(qū)別是消費(fèi)失敗時(shí)的處理不同,有序消費(fèi)模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消費(fèi)者會(huì)立馬消費(fèi)這條消息,而使用并發(fā)消費(fèi)模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要過(guò)好幾秒甚至十幾秒才會(huì)再次消費(fèi)。
我是在只有一條消息的情況下測(cè)試的。更重要的區(qū)別是,
返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不會(huì)增加消息的消費(fèi)次數(shù),mq消息有個(gè)默認(rèn)最大消費(fèi)次數(shù)16,消費(fèi)次數(shù)到了以后,這條消息會(huì)進(jìn)入死信隊(duì)列,這個(gè)最大消費(fèi)次數(shù)是可以在mqadmin中設(shè)置的。
mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3
我測(cè)試后發(fā)現(xiàn),并發(fā)模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個(gè)消息到達(dá)最大消費(fèi)次數(shù)之后就不會(huì)再出現(xiàn)了。這說(shuō)明有序消費(fèi)模式可能并沒(méi)有這個(gè)機(jī)制,這意味著你再有序消費(fèi)模式下拋出固定異常,那么這條異常信息將會(huì)被永遠(yuǎn)消費(fèi),并且很可能會(huì)影響之后正常的消息。下面依然做個(gè)試驗(yàn)
Map<String, Integer> map = new HashMap<>();//保存消息錯(cuò)誤消費(fèi)次數(shù) new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { if(1 == 1) throw new Exception(); return ConsumeOrderlyStatus.SUCCESS; }catch (Exception e) { MessageExt msg = msgs.get(0); if(map.containsKey(msg.getKeys())) {//消息每消費(fèi)一次,加1 map.put(msg.getKeys(), map.get(msg.getKeys()) + 1); }else { map.put(msg.getKeys(), 1); } logger.info(msg.getKeys()+":"+map.get(msg.getKeys())); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }
發(fā)送了十條消息
可以看到雖然我發(fā)了十條消息,但是一直在消費(fèi)同樣四條消息,這可能跟消息broker有默認(rèn)四條隊(duì)列有關(guān)系。同時(shí)從時(shí)間可以看到,消費(fèi)失敗后,會(huì)馬上拉這條信息。
至于并發(fā)消費(fèi)模式則不會(huì)無(wú)限消費(fèi),而且消費(fèi)失敗后不會(huì)馬上再消費(fèi)。具體的就不嘗試了。
結(jié)論是有序消費(fèi)模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯(cuò)誤消費(fèi)次數(shù),只要消費(fèi)次數(shù)達(dá)到一定次數(shù),那么就直接返回ConsumeOrderlyStatus.SUCCESS。
突然想到之前測(cè)試有序消費(fèi)模式MessageListenerOrderly的時(shí)候?yàn)槭裁?000條消息阻塞100ms耗時(shí)25000ms了,因?yàn)橛行蛳M(fèi)模式是同時(shí)拉取四條隊(duì)列消息的,這就對(duì)上了。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot框架實(shí)現(xiàn)自動(dòng)裝配詳解
在使用springboot時(shí),很多配置我們都沒(méi)有做,都是springboot在幫我們完成,這很大一部分歸功于springboot自動(dòng)裝配。本文將詳細(xì)為大家講解SpringBoot的自動(dòng)裝配原理,需要的可以參考一下2022-08-08SpringBoot使用Validator進(jìn)行參數(shù)校驗(yàn)實(shí)戰(zhàn)教程(自定義校驗(yàn),分組校驗(yàn))
這篇文章主要介紹了SpringBoot使用Validator進(jìn)行參數(shù)校驗(yàn)(自定義校驗(yàn),分組校驗(yàn))的實(shí)戰(zhàn)教程,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2023-07-07Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例
這篇文章主要為大家介紹了Java 獲取原始請(qǐng)求域名實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Java設(shè)計(jì)模式之簡(jiǎn)單工廠(chǎng) 工廠(chǎng)方法 抽象工廠(chǎng)深度總結(jié)
設(shè)計(jì)模式(Design Pattern)是前輩們對(duì)代碼開(kāi)發(fā)經(jīng)驗(yàn)的總結(jié),是解決特定問(wèn)題的一系列套路。它不是語(yǔ)法規(guī)定,而是一套用來(lái)提高代碼可復(fù)用性、可維護(hù)性、可讀性、穩(wěn)健性以及安全性的解決方案2021-09-09淺談springboot一個(gè)service內(nèi)組件的加載順序
這篇文章主要介紹了springboot一個(gè)service內(nèi)組件的加載順序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家2021-08-08Java編程實(shí)現(xiàn)springMVC簡(jiǎn)單登錄實(shí)例
這篇文章主要介紹了Java編程實(shí)現(xiàn)springMVC簡(jiǎn)單登錄實(shí)例,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11mybatis-plus中更新null值的問(wèn)題解決
本文主要介紹 mybatis-plus 中常使用的 update 相關(guān)方法的區(qū)別,以及更新 null 的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-04-04spring-core組件詳解——PropertyResolver屬性解決器
這篇文章主要介紹了spring-core組件詳解——PropertyResolver屬性解決器,需要的朋友可以參考下2016-05-05