欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于rocketmq的有序消費(fèi)模式和并發(fā)消費(fèi)模式的區(qū)別說(shuō)明

 更新時(shí)間:2021年06月22日 10:18:59   作者:從心歸零  
這篇文章主要介紹了基于rocketmq的有序消費(fèi)模式和并發(fā)消費(fèi)模式的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

rocketmq消費(fèi)者注冊(cè)監(jiān)聽(tīng)有兩種模式

有序消費(fèi)MessageListenerOrderly和并發(fā)消費(fèi)MessageListenerConcurrently,這兩種模式返回值不同。

MessageListenerOrderly

正確消費(fèi)返回

ConsumeOrderlyStatus.SUCCESS

稍后消費(fèi)返回

ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently

正確消費(fèi)返回

ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消費(fèi)返回

ConsumeConcurrentlyStatus.RECONSUME_LATER

顧名思義,有序消費(fèi)模式是按照消息的順序進(jìn)行消費(fèi),但是除此之外,在實(shí)踐過(guò)程中我發(fā)現(xiàn)和并發(fā)消費(fèi)模式還有很大的區(qū)別的。

第一,速度,下面我打算用實(shí)驗(yàn)來(lái)探究一下。

使用mq發(fā)送消息,消費(fèi)者使用有序消費(fèi)模式消費(fèi),具體的業(yè)務(wù)是阻塞100ms

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerOrderly() { 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        logger.info("==========CONSUME_START===========");  
		logger.info(Thread.currentThread().getName()  
                            + " Receive New Messages: " + msgs.size());  
        try {
        	if(date1 == null)
        		date1 = new Date();//在第一次消費(fèi)時(shí)初始化
        	Thread.sleep(100);
       		logger.info("total:"+(++total));
        	date2 = new Date();
       		totalTime = (date2.getTime() - date1.getTime());
       		logger.info("totalTime:"+totalTime);
            logger.info("==========CONSUME_SUCCESS===========");  
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
            logger.info("==========RECONSUME_LATER===========");  
            logger.error(e.getMessage(),e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}
}

消費(fèi)100條消息

速度挺快的,為了讓結(jié)果更準(zhǔn)確,將消息加到1000條

消費(fèi)1000條消息

可以看到每一條消息平均耗時(shí)25ms,然而業(yè)務(wù)是阻塞100ms,這說(shuō)明有序消費(fèi)模式和同步消費(fèi)可能并不是一回事,那如果不阻塞代碼我們?cè)賮?lái)看一下結(jié)果

不阻塞過(guò)后速度明顯提高了,那么我阻塞300ms會(huì)怎么樣呢?

時(shí)間相比阻塞100ms多了2倍

接下來(lái)我們測(cè)試并發(fā)消費(fèi)模式

Long totalTime = 0L;
Date date1 = null;
Date date2 = new Date();
new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(  
                       List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  
 
    		logger.info(Thread.currentThread().getName()  
                                 + " Receive New Messages: " + msgs.size()); 
    		try {
    			if(date1 == null)
    				date1 = new Date();
            	Thread.sleep(100);
           		logger.info("total:"+(++total));
           		date2 = new Date();
           		totalTime = (date2.getTime() - date1.getTime());
           		logger.info("totalTime:"+totalTime);
                logger.info("==========CONSUME_SUCCESS===========");  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            } catch (Exception e) {
                logger.info("==========RECONSUME_LATER===========");  
                logger.error(e.getMessage(),e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
    }  
}

基于上次的經(jīng)驗(yàn),同樣測(cè)試三種情況,消費(fèi)1000條不阻塞,消費(fèi)1000條阻塞100ms,消費(fèi)1000條阻塞300ms

消費(fèi)1000條不阻塞的情況

和有序消費(fèi)模式差不多,快個(gè)一兩秒。

消費(fèi)1000條阻塞100ms

竟然比不阻塞的情況更快,可能是誤差把

消費(fèi)1000條阻塞300ms

速度稍慢,但是還是比有序消費(fèi)快得多。

結(jié)論是并發(fā)消費(fèi)的消費(fèi)速度要比有序消費(fèi)更快。

另一個(gè)區(qū)別是消費(fèi)失敗時(shí)的處理不同,有序消費(fèi)模式返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后,消費(fèi)者會(huì)立馬消費(fèi)這條消息,而使用并發(fā)消費(fèi)模式,返回ConsumeConcurrentlyStatus.RECONSUME_LATER后,要過(guò)好幾秒甚至十幾秒才會(huì)再次消費(fèi)。

我是在只有一條消息的情況下測(cè)試的。更重要的區(qū)別是,

返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT并不會(huì)增加消息的消費(fèi)次數(shù),mq消息有個(gè)默認(rèn)最大消費(fèi)次數(shù)16,消費(fèi)次數(shù)到了以后,這條消息會(huì)進(jìn)入死信隊(duì)列,這個(gè)最大消費(fèi)次數(shù)是可以在mqadmin中設(shè)置的。

mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g MonitorCumsumerGroupName -r 3

我測(cè)試后發(fā)現(xiàn),并發(fā)模式下返回ConsumeConcurrentlyStatus.RECONSUME_LATER,同一個(gè)消息到達(dá)最大消費(fèi)次數(shù)之后就不會(huì)再出現(xiàn)了。這說(shuō)明有序消費(fèi)模式可能并沒(méi)有這個(gè)機(jī)制,這意味著你再有序消費(fèi)模式下拋出固定異常,那么這條異常信息將會(huì)被永遠(yuǎn)消費(fèi),并且很可能會(huì)影響之后正常的消息。下面依然做個(gè)試驗(yàn)

Map<String, Integer> map = new HashMap<>();//保存消息錯(cuò)誤消費(fèi)次數(shù)
new MessageListenerOrderly() {
 
	@Override
	public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeOrderlyContext context) {
        try {
        	if(1 == 1)
        			throw new Exception();
            return ConsumeOrderlyStatus.SUCCESS;  
        }catch (Exception e) {
        	MessageExt msg = msgs.get(0);
			if(map.containsKey(msg.getKeys())) {//消息每消費(fèi)一次,加1
			    map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
			}else {
			    map.put(msg.getKeys(), 1);
			}
			logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
	}	
}

發(fā)送了十條消息

可以看到雖然我發(fā)了十條消息,但是一直在消費(fèi)同樣四條消息,這可能跟消息broker有默認(rèn)四條隊(duì)列有關(guān)系。同時(shí)從時(shí)間可以看到,消費(fèi)失敗后,會(huì)馬上拉這條信息。

至于并發(fā)消費(fèi)模式則不會(huì)無(wú)限消費(fèi),而且消費(fèi)失敗后不會(huì)馬上再消費(fèi)。具體的就不嘗試了。

結(jié)論是有序消費(fèi)模式MessageListenerOrderly要慎重地處理異常,我則是用全局變量記錄消息的錯(cuò)誤消費(fèi)次數(shù),只要消費(fèi)次數(shù)達(dá)到一定次數(shù),那么就直接返回ConsumeOrderlyStatus.SUCCESS。

突然想到之前測(cè)試有序消費(fèi)模式MessageListenerOrderly的時(shí)候?yàn)槭裁?000條消息阻塞100ms耗時(shí)25000ms了,因?yàn)橛行蛳M(fèi)模式是同時(shí)拉取四條隊(duì)列消息的,這就對(duì)上了。

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論