欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于RocketMQ推拉模式詳解

 更新時間:2021年07月03日 09:54:32   作者:mingxungu  
這篇文章主要介紹了RocketMQ推拉模式的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

消費者客戶端有兩種方式從消息中間件獲取消息并消費。嚴(yán)格意義上來講,RocketMQ并沒有實現(xiàn)PUSH模式,而是對拉模式進(jìn)行一層包裝,名字雖然是 Push 開頭,實際在實現(xiàn)時,使用 Pull 方式實現(xiàn)。

通過 Pull 不斷輪詢 Broker 獲取消息。當(dāng)不存在新消息時,Broker 會掛起請求,直到有新消息產(chǎn)生,取消掛起,返回新消息。

1、概述

1.1、PULL方式

由消費者客戶端主動向消息中間件(MQ消息服務(wù)器代理)拉取消息;采用Pull方式,如何設(shè)置Pull消息的拉取頻率需要重點去考慮,舉個例子來說,可能1分鐘內(nèi)連續(xù)來了1000條消息,然后2小時內(nèi)沒有新消息產(chǎn)生(概括起來說就是“消息延遲與忙等待”)。

如果每次Pull的時間間隔比較久,會增加消息的延遲,即消息到達(dá)消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間內(nèi)MQ中并沒有任何消息可以消費,那么會產(chǎn)生很多無效的Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡(luò)性能;

1.2、PUSH方式

由消息中間件(MQ消息服務(wù)器代理)主動地將消息推送給消費者;采用Push方式,可以盡可能實時地將消息發(fā)送給消費者進(jìn)行消費。

但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業(yè)務(wù)系統(tǒng)處理一條消息的流程比較復(fù)雜,其中的調(diào)用鏈路比較多導(dǎo)致消費時間比較久。

概括起來地說就是“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區(qū)可能會溢出,導(dǎo)致異常;

2、PUSH模式

主動推送的模式實現(xiàn)起來簡單,避免了拉取的消費端業(yè)務(wù)邏輯的復(fù)雜度,消息的消費可以認(rèn)為是實時的,同時也存在一定的弊端,要求消費端要有很強的消費能力。

2.1、代碼實現(xiàn)

public class Consumer1 {	
	public static void main(String[] args){
		try {
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
			consumer.setConsumerGroup("consumer_push");
			consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
			consumer.subscribe("TopicTest", "*");
			consumer.registerMessageListener(new MessageListenerConcurrently(){
 
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
						ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
					try {
					    for(MessageExt msg : paramList){
					    	String msgbody = new String(msg.getBody(), "utf-8");
					    	SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
					    	Date date = new Date(msg.getStoreTimestamp());
					    	System.out.println("Consumer1===  存入時間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
					    }
					} catch (Exception e) {
					    e.printStackTrace();
					    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功
				}
			});
			consumer.start();
			System.out.println("Consumer1===啟動成功!");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

PUSH消費方式,需要注冊一個監(jiān)聽器Listener,,用來監(jiān)聽最新的消息,進(jìn)行業(yè)務(wù)處理,同時反饋消息的消費狀態(tài),消費成功(CONSUME_SUCCESS)、消費重試(RECONSUME_LATER),消息重試會根據(jù)配置的消息的延遲等級的時間間隔,定時重新發(fā)送消費失敗的記錄。(PS:延遲消息中會重點討論)

PUSH消息方式由于返回了消息的狀態(tài),服務(wù)端會維護每個消費端的消費進(jìn)度,內(nèi)部會記錄消費進(jìn)度,消息發(fā)送成功后會更新消費進(jìn)度。

PUSH消息方式的局限性,是在HOLD住Consumer請求的時候需要占用資源,它適合用在消息隊列這種客戶端連接數(shù)可控的場景中。

上一個章節(jié)說明了服務(wù)端存儲的每個主題對應(yīng)的消費組的每個消息隊列的偏移量

查看服務(wù)器文件上的消費進(jìn)度信息:

/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

3、PULL模式

3.1、代碼實現(xiàn)(1)

public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer");
        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        consumer.start();
 
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            
            SINGLE_MQ: while (true) {
                try {
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println("=============================================================");
                    System.out.println("Consume from the queue: " + mq + "offset:" + getMessageQueueOffset(mq) + "結(jié)果:" + pullResult.getPullStatus());
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                    	List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                        for (MessageExt m : messageExtList) {
                            System.out.print(new String(m.getBody()) +" == ");
                        }
                        System.out.println("");
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset; 
        return 0;
    }
}

結(jié)果:

每次拉取消息的時候需要提供偏移量和拉取的消息的個數(shù),需要自己業(yè)務(wù)實現(xiàn)每個主題下的隊列的消費進(jìn)度。

代碼實現(xiàn)(1)這種方式只能拉取歷史的消息,最新的消息拉取不了,也可以進(jìn)行改造,來實現(xiàn)一直拉取。

3.2、代碼實現(xiàn)(2)

在MQPullConsumer這個類里面,有一個MessageQueueListener,它的目的就是當(dāng)queue發(fā)生變化的時候,通知Consumer。也正是這個借口,幫助我們在Pull模式里面,實現(xiàn)負(fù)載均衡。

注意,這個接口在MQPushConsumer里面是沒有的,那里面有的是上面代碼里的MessageListener。

 void registerMessageQueueListener(final String topic, final MessageQueueListener listener); 
public interface MessageQueueListener {
    void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                             final Set<MessageQueue> mqDivided);
}

有了這個Listener,我們就可以動態(tài)的知道當(dāng)前的Consumer分?jǐn)偟搅藥讉€MessageQueue。然后對這些MessageQueue,我們可以開個線程池來消費。

public class PullConsumerExtend {
	public static void main(String[] args) throws MQClientException {
	       //消費組
	        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("pullConsumer");
	       //MQ NameService地址
	        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
	       //負(fù)載均衡模式
	        scheduleService.setMessageModel(MessageModel.CLUSTERING);
	       //需要處理的消息topic
	        scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
 
	            @Override
	            public void doPullTask(MessageQueue mq, PullTaskContext context) {
	                MQPullConsumer consumer = context.getPullConsumer();
	                try {
	                	
	                    long offset = consumer.fetchConsumeOffset(mq, false);
	                    if (offset < 0)
	                        offset = 0;
	                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
	                    System.out.println("");
	                    System.out.println("Consume from the queue: " + mq + "offset:" + offset + "結(jié)果:" + pullResult.getPullStatus());
	                    switch (pullResult.getPullStatus()) {
	                        case FOUND:
	                        	List<MessageExt> messageExtList = pullResult.getMsgFoundList();
	                            for (MessageExt m : messageExtList) {
	                                System.out.print(new String(m.getBody()) +" == ");
	                            }
	                            break;
	                        case NO_MATCHED_MSG:
	                            break;
	                        case NO_NEW_MSG:
	                        case OFFSET_ILLEGAL:
	                            break;
	                        default:
	                            break;
	                    }
	                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
	                    //設(shè)置下一下拉取的間隔時間
	                    context.setPullNextDelayTimeMillis(10000);
	                } catch (Exception e) {
	                    e.printStackTrace();
	                }
	            }
	        }); 
	        scheduleService.start();
	}
}

結(jié)果:

比較**代碼實現(xiàn)(1)**這種方式改進(jìn)了很多,不需要業(yè)務(wù)維護每個消費隊列的消費進(jìn)度,可以更新到服務(wù)端的。

弊端也很明顯就是每次隊列拉取消息的時間間隔,時間長導(dǎo)致消息擠壓,時間段消息少,影響服務(wù)端性能。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 淺談HashMap中7種遍歷方式的性能分析

    淺談HashMap中7種遍歷方式的性能分析

    本文先從HashMap的遍歷方法講起,然后再從性能、原理以及安全性等方面,來分析HashMap各種遍歷方式的優(yōu)勢與不足
    2021-06-06
  • SpringMVC自定義參數(shù)綁定實現(xiàn)詳解

    SpringMVC自定義參數(shù)綁定實現(xiàn)詳解

    這篇文章主要介紹了SpringMVC自定義參數(shù)綁定實現(xiàn)詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • java實現(xiàn)學(xué)生成績錄入系統(tǒng)

    java實現(xiàn)學(xué)生成績錄入系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)學(xué)生成績錄入系統(tǒng),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-01-01
  • 如何兩步解決maven依賴導(dǎo)入失敗的問題

    如何兩步解決maven依賴導(dǎo)入失敗的問題

    這篇文章主要介紹了如何兩步解決maven依賴導(dǎo)入失敗的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • nohup運行Java tail查看日志方式

    nohup運行Java tail查看日志方式

    nohup命令允許程序在用戶退出賬戶或關(guān)閉終端后繼續(xù)運行,常與"&"結(jié)合使用以實現(xiàn)程序的后臺執(zhí)行,配合重定向操作,nohup可以將程序輸出保存到日志文件中,如nohup java -jar XXX.jar &> myout.log &,此外,tail命令可用于實時監(jiān)控日志文件的變化
    2024-09-09
  • MyBatis中不建議使用where?1=1原因詳解

    MyBatis中不建議使用where?1=1原因詳解

    這篇文章主要為大家介紹了MyBatis中不建議使用where?1=1的原因詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • 學(xué)java得這樣學(xué),學(xué)習(xí)確實也得這樣

    學(xué)java得這樣學(xué),學(xué)習(xí)確實也得這樣

    學(xué)java得這樣學(xué),學(xué)習(xí)東西確實也得這樣
    2008-02-02
  • SpringBoot日志配置簡單介紹

    SpringBoot日志配置簡單介紹

    這篇文章主要介紹了SpringBoot日志配置,需要的朋友可以參考下
    2017-09-09
  • JDBC+GUI實現(xiàn)簡單學(xué)生管理系統(tǒng)

    JDBC+GUI實現(xiàn)簡單學(xué)生管理系統(tǒng)

    這篇文章主要為大家詳細(xì)介紹了JDBC+GUI實現(xiàn)簡單學(xué)生管理系統(tǒng),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-02-02
  • Java使用過濾器防止SQL注入XSS腳本注入的實現(xiàn)

    Java使用過濾器防止SQL注入XSS腳本注入的實現(xiàn)

    這篇文章主要介紹了Java使用過濾器防止SQL注入XSS腳本注入,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01

最新評論