RocketMQ之Consumer整體介紹啟動(dòng)源碼分析
前言
從本篇文章開(kāi)始,我們將逐步開(kāi)始分析Consumer的源碼,首先我們將整體介紹Consumer的接口和相關(guān)實(shí)現(xiàn)類(lèi)以及DefaultMQPushConsumer的主要API和關(guān)鍵屬性,然后我們將分析Consumer的啟動(dòng)過(guò)程源碼,通過(guò)對(duì)啟動(dòng)過(guò)程的分析,之前我們分析過(guò)Producer和Broker的啟動(dòng)源碼,Consumer的啟動(dòng)源碼與Producer還是有很多相似的地方。
Consumer整體介紹
Consumer實(shí)現(xiàn)類(lèi)
RocketMQ給我們提供的Consumer實(shí)現(xiàn)類(lèi)如下圖所示,包括推送式的DefaultMQPushConsumer
和拉取式的DefaultMQPullConsumer
、DefaultLitePullConsumer
,從圖中可以看到DefaultMQPullConsumer
已經(jīng)被標(biāo)注為deprecated,如果需要使用拉取式的Consumer,官方推薦使用DefaultLitePullConsumer。
Consumer消費(fèi)類(lèi)型
- 拉取式消費(fèi)
Consumer主動(dòng)從Broker拉去消息,消費(fèi)消息的主動(dòng)權(quán)由Consumer控制。一旦獲取了批量消息,就會(huì)啟動(dòng)消費(fèi)過(guò)程。不過(guò)這種方式實(shí)時(shí)性較弱,即Broker中有了新的消息時(shí)消費(fèi)者并不能及時(shí)發(fā)現(xiàn)并消費(fèi)。
- 推送式消費(fèi)
該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給Consumer,這種方式一般實(shí)時(shí)性比較高。
RocketMQ官方更推薦我們?cè)谌粘9ぷ髦惺褂?code>DefaultMQPushConsumer,它已經(jīng)能夠滿足我們大多數(shù)使用場(chǎng)景。從技術(shù)上講,這個(gè)DefaultMQPushConsumer
客戶(hù)端實(shí)際上是底層拉取服務(wù)的包裝器。當(dāng)從代理中提取的消息到達(dá)時(shí),它大致調(diào)用注冊(cè)的回調(diào)處理程序來(lái)饋送消息。本篇文章,我們將介紹DefaultMQPushConsumer
的啟動(dòng)流程
DefaultMQPushConsumer主要API
DefaultMQPushConsumer實(shí)現(xiàn)了MQConsumer和MQPushConsumer接口,DefaultMQPushConsumer的主要API都在這兩個(gè)接口中定義了,如下所示
// org.apache.rocketmq.client.consumer.MQConsumer public interface MQConsumer extends MQAdmin { // 如果消費(fèi)失敗,消息將被發(fā)送回代理,并延遲消耗一些時(shí)間 void sendMessageBack(final MessageExt msg/*消息*/, final int delayLevel/*延遲級(jí)別*/, final String brokerName); // 根據(jù)topic從使用者緩存中獲取消息隊(duì)列 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException; } // org.apache.rocketmq.client.consumer.MQPushConsumer public interface MQPushConsumer extends MQConsumer { // 啟動(dòng)Consumer void start() throws MQClientException; // 關(guān)閉Consumer void shutdown(); // 注冊(cè)并發(fā)消息Listener void registerMessageListener(final MessageListenerConcurrently messageListener); // 注冊(cè)順序消息Listener,將會(huì)有序地接收消息。一個(gè)隊(duì)列一個(gè)線程 void registerMessageListener(final MessageListenerOrderly messageListener); // 訂閱Topic void subscribe(final String topic, final String subExpression) throws MQClientException; // 退訂topic void unsubscribe(final String topic); }
DefaultMQPushConsumer關(guān)鍵屬性
DefaultMQPushConsumer的關(guān)鍵屬性如下所示
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { // DefaultMQPushConsumer的默認(rèn)實(shí)現(xiàn),DefaultMQPushConsumer中大部分功能都是對(duì)它的代理 protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; // 相同角色的消費(fèi)者需要具有完全相同的subscriptions和consumerGroup才能正確實(shí)現(xiàn)負(fù)載平衡,它需要全局唯一 private String consumerGroup; // 消息模型定義了如何將消息傳遞到每個(gè)消費(fèi)者客戶(hù)端的方式,默認(rèn)是集群模式 private MessageModel messageModel = MessageModel.CLUSTERING; // 第一次消費(fèi)時(shí)指定的消費(fèi)策略,默認(rèn)是CONSUME_FROM_LAST_OFFSET private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; // 隊(duì)列分配算法,指定如何將消息隊(duì)列分配給每個(gè)使用者客戶(hù)端。 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; // 訂閱關(guān)系 private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); // 消息監(jiān)聽(tīng)器 private MessageListener messageListener; // 消息消費(fèi)進(jìn)度存儲(chǔ)器 private OffsetStore offsetStore; // 最小消費(fèi)線程數(shù) private int consumeThreadMin = 20; // 最大消費(fèi)線程數(shù) private int consumeThreadMax = 20; // 推送模式下拉去消息的間隔時(shí)間,默認(rèn)一次拉取消息完成后立刻繼續(xù)拉取 private long pullInterval = 0; // 批量消費(fèi)數(shù)量 private int consumeMessageBatchMaxSize = 1; // 批量拉取的數(shù)量 private int pullBatchSize = 32; // 每次拉取時(shí)是否更新訂閱關(guān)系,默認(rèn)是false private boolean postSubscriptionWhenPull = false; // 消息最大重試次數(shù),如果消息消費(fèi)最大次數(shù)超過(guò)maxReconsumeTimes還未成功,則消息將被轉(zhuǎn)移到一個(gè)失敗隊(duì)列 private int maxReconsumeTimes = -1; //延遲將該隊(duì)列的消息提交到消費(fèi)者線程的等待時(shí)間,默認(rèn)延遲1s private long suspendCurrentQueueTimeMillis = 1000; // 消息阻塞消費(fèi)線程的最大超時(shí)時(shí)間,默認(rèn)15分鐘 private long consumeTimeout = 15; // 關(guān)閉使用者時(shí)等待消息的最長(zhǎng)時(shí)間,0表示沒(méi)有等待。 private long awaitTerminationMillisWhenShutdown = 0; }
Consumer消費(fèi)模式
Consumer提供下面兩種消費(fèi)模式,由上面DefaultMQPushConsumer的messageModel定義
- 廣播模式(BROADCASTING)
廣播消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer實(shí)例都接收同一個(gè)Topic的全量消息。即每條消息會(huì)被相同Consumer Group中的所有Consumer消費(fèi)
- 集群模式(CLUSTERING)
集群模式是Consumer默認(rèn)的消費(fèi)模式,集群消費(fèi)模式下,相同Consumer Group的每個(gè)Consumer按照負(fù)載均衡策略分?jǐn)偼粋€(gè)Topic消息,即每條消息只會(huì)被相同Consumer Group中的一個(gè)Consumer消費(fèi)
Consumer消費(fèi)策略
Consumer主要提了下面三種消費(fèi)策略
- CONSUME_FROM_LAST_OFFSET
這是Consumer默認(rèn)的消費(fèi)策略,它分為兩種情況,如果Broker的磁盤(pán)消息未過(guò)期且未被刪除,則從最小偏移量開(kāi)始消費(fèi)。如果磁盤(pán)已過(guò)期,并被刪除,則從最大偏移量開(kāi)始消費(fèi)。
- CONSUME_FROM_FIRST_OFFSET
從最早可用的消息開(kāi)始消費(fèi)
- CONSUME_FROM_TIMESTAMP
從指定的時(shí)間戳開(kāi)始消費(fèi),這意味著在consumeTimestamp之前生成的消息將被忽略
Consumer使用
要使用Consumer開(kāi)始消費(fèi)消息,至少需要下面5個(gè)步驟
public static void main(String[] args) throws MQClientException { // 1. 傳入CONSUMER_GROUP,創(chuàng)建DefaultMQPushConsumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); // 2. 設(shè)置namesrvAddr consumer.setNamesrvAddr("127.0.0.1:9876"); // 3. 訂閱Topic consumer.subscribe(TOPIC, "*"); // 4.注冊(cè)消息Listener consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 5.啟動(dòng)Consumer consumer.start(); }
DefaultMQPushConsumer源碼分析
啟動(dòng)源碼分析
DefaultMQPushConsumer只是設(shè)置屬性,Consumer的初始化實(shí)際是在DefaultMQPushConsumer#start
中執(zhí)行的,DefaultMQPushConsumer#start
實(shí)際調(diào)用了DefaultMQPushConsumerImpl#start
執(zhí)行初始化。
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start public void start() throws MQClientException { // consumerGroup封裝namespace setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); // DefaultMQPushConsumerImpl啟動(dòng) this.defaultMQPushConsumerImpl.start(); // 消息軌跡跟蹤服務(wù),默認(rèn)null if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
下面我們來(lái)分步驟分析DefaultMQPushConsumerImpl#start
代碼
第一步:
- 先將Consumer的狀態(tài)更新為START_FAILED
- 校驗(yàn)Consumer的配置。主要校驗(yàn)ConsumerGroup,
- 消費(fèi)模式校驗(yàn)(MessageModel),消費(fèi)開(kāi)始位置(ConsumeFromWhere),消費(fèi)時(shí)間戳(默認(rèn)是半小時(shí)之前),隊(duì)列分配策略(默認(rèn)是AllocateMessageQueueAveragely),訂閱Topic和Subscription關(guān)系校驗(yàn),消息監(jiān)聽(tīng)器(MessageListener)校驗(yàn)等。
- 將Consumer中的訂閱關(guān)系拷貝到RebalanceImpl中,Consumer中訂閱關(guān)系的來(lái)源主要包括
DefaultMQPushConsumerImpl#subscribe
方法獲取,也會(huì)訂閱重試topic,其主題名為%RETRY%+消費(fèi)者組名
,消費(fèi)者啟動(dòng)時(shí)會(huì)自動(dòng)訂閱該主題 - 如果是集群模式,則修改消費(fèi)者名稱(chēng)為
PID#時(shí)間戳
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start public synchronized void start() throws MQClientException { //... // 狀態(tài)先設(shè)置為啟動(dòng)失敗 this.serviceState = ServiceState.START_FAILED; // 校驗(yàn)配置,ConsumerGroup校驗(yàn), this.checkConfig(); // 訂閱關(guān)系copy到RebalanceImpl中 this.copySubscription(); // 如果是集群模式,消費(fèi)者名稱(chēng)如果是DEFAULT,則會(huì)改成:PID#時(shí)間戳 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } //... }
第二步:
主要是初始化MQClientInstance、RebalanceImpl和pullAPIWrapper。
**MQClientInstance:**是消息拉取服務(wù),主要用于拉取消息,同一個(gè)進(jìn)程內(nèi)的所有Consumer會(huì)使用同一個(gè)MQClientInstance
**RebalanceImpl:**是消費(fèi)者負(fù)載均衡服務(wù),用于確定消費(fèi)者消費(fèi)的消息隊(duì)列以及負(fù)載均衡。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start // 生成一個(gè)MQClientInstance this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 設(shè)置消費(fèi)者組 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 消息消費(fèi)模式 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 設(shè)置消息消費(fèi)模式 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); // 設(shè)置MQClientInstance this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); // 構(gòu)建拉消息包裝器 this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
第三步:
根據(jù)消息消費(fèi)模式的不同設(shè)置不同的消息消費(fèi)進(jìn)度存儲(chǔ)器(OffsetStore),如果是廣播模式,則使用LocalFileOffsetStore作為消息進(jìn)度存儲(chǔ)器,如果是集群模式則使用RemoteBrokerOffsetStore作為消息進(jìn)度存儲(chǔ)器。創(chuàng)建完成之后調(diào)用load()方法加載偏移量,如果是LocalFileOffsetStore將會(huì)從本地加載。
廣播模式下:LocalFileOffsetStore將消費(fèi)進(jìn)度存儲(chǔ)在Consumer本地的${user.home}/.rocketmq_offsets/clientId/consumerGroup/offsets.json
文件中
集群模式下:RemoteBrokerOffsetStore將消費(fèi)進(jìn)度存儲(chǔ)在Broker
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 如果是廣播模式,則使用LocalFileOffsetStore存儲(chǔ)偏移量 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: // 如果是集群模式,則使用RemoteBrokerOffsetStore存儲(chǔ)偏移量 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } // 如果是廣播模式,則從本地文件load偏移量,如果是集群模式則是一個(gè)空實(shí)現(xiàn) this.offsetStore.load();
第四步:
根據(jù)消息監(jiān)聽(tīng)器的類(lèi)型不同創(chuàng)建不同的消息消費(fèi)服務(wù)(并發(fā)/順序消息消費(fèi)服務(wù)),并啟動(dòng)。然后注冊(cè)消費(fèi)者組和消費(fèi)者信息到MQClientInstance中的consumerTable中,注冊(cè)成功后啟動(dòng)MQClientInstance客戶(hù)端通信實(shí)例。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start // 如果是順序消費(fèi) if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); // 如果是并發(fā)消費(fèi) } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); // 將自身注冊(cè)到MQClientInstance boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); // ... mQClientFactory.start();
第五步:
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start // 向Namesrv拉取并更新當(dāng)前消費(fèi)者訂閱topic路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 隨機(jī)選擇一個(gè)Broker,發(fā)送檢查客戶(hù)端tag配置的請(qǐng)求,主要是檢測(cè)Broker是否支持SQL92類(lèi)型的tag過(guò)濾以及SQL92的tag語(yǔ)法是否正確 this.mQClientFactory.checkClientInBroker(); // 給所有Broker發(fā)送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 喚醒負(fù)載均衡服務(wù)rebalanceService,并進(jìn)行rebalance this.mQClientFactory.rebalanceImmediately();
總結(jié)
本篇文章我們介紹了Consumer的API,屬性,接口和實(shí)現(xiàn)類(lèi),通過(guò)對(duì)這幾部分的了解,我們能夠?qū)onsumer有一個(gè)整體的認(rèn)識(shí)。我們還分析了DefaultMQPushConsumer的啟動(dòng)的源碼,通過(guò)對(duì)DefaultMQPushConsumer#start
開(kāi)始逐漸深入分析DefaultMQPushConsumer的啟動(dòng)過(guò)程,能夠幫助我們對(duì)Consumer消費(fèi)消息一些關(guān)鍵的類(lèi)如MQClientInstance,OffsetStore,RebalanceImpl,ConsumeMessageService由一個(gè)初步的認(rèn)識(shí),由助于我們后續(xù)詳細(xì)了解這些服務(wù)的工作原理。
以上就是RocketMQ 源碼分析之Consumer整體介紹啟動(dòng)分析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Consumer源碼解析的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
ElasticSearch學(xué)習(xí)之多條件組合查詢(xún)驗(yàn)證及示例分析
這篇文章主要為大家介紹了ElasticSearch 多條件組合查詢(xún)驗(yàn)證及示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02spring-boot-maven-plugin?配置有啥用
這篇文章主要介紹了spring-boot-maven-plugin?配置是干啥的,這個(gè)是SpringBoot的Maven插件,主要用來(lái)打包的,通常打包成jar或者war文件,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08Java?Float?保留小數(shù)位精度的實(shí)現(xiàn)
這篇文章主要介紹了Java?Float?保留小數(shù)位精度的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Mybatis Plus整合PageHelper分頁(yè)的實(shí)現(xiàn)示例
這篇文章主要介紹了Mybatis Plus整合PageHelper分頁(yè)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09java實(shí)現(xiàn)即時(shí)通信的完整步驟分享
這篇文章主要給大家介紹了關(guān)于java實(shí)現(xiàn)即時(shí)通信的完整步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09關(guān)于報(bào)錯(cuò)IDEA Terminated with exit code
如果在IDEA構(gòu)建項(xiàng)目時(shí)遇到下面這樣的報(bào)錯(cuò)IDEA Terminated with exit code 1,那必然是Maven的設(shè)置參數(shù)重置了,導(dǎo)致下載錯(cuò)誤引起的,本文給大家分享兩種解決方法,需要的朋友可以參考下2022-08-08