RocketMQ之Consumer整體介紹啟動源碼分析
前言
從本篇文章開始,我們將逐步開始分析Consumer的源碼,首先我們將整體介紹Consumer的接口和相關實現(xiàn)類以及DefaultMQPushConsumer的主要API和關鍵屬性,然后我們將分析Consumer的啟動過程源碼,通過對啟動過程的分析,之前我們分析過Producer和Broker的啟動源碼,Consumer的啟動源碼與Producer還是有很多相似的地方。
Consumer整體介紹
Consumer實現(xiàn)類
RocketMQ給我們提供的Consumer實現(xiàn)類如下圖所示,包括推送式的DefaultMQPushConsumer和拉取式的DefaultMQPullConsumer、DefaultLitePullConsumer,從圖中可以看到DefaultMQPullConsumer已經(jīng)被標注為deprecated,如果需要使用拉取式的Consumer,官方推薦使用DefaultLitePullConsumer。
Consumer消費類型
- 拉取式消費
Consumer主動從Broker拉去消息,消費消息的主動權由Consumer控制。一旦獲取了批量消息,就會啟動消費過程。不過這種方式實時性較弱,即Broker中有了新的消息時消費者并不能及時發(fā)現(xiàn)并消費。
- 推送式消費
該模式下Broker收到數(shù)據(jù)后會主動推送給Consumer,這種方式一般實時性比較高。

RocketMQ官方更推薦我們在日常工作中使用DefaultMQPushConsumer,它已經(jīng)能夠滿足我們大多數(shù)使用場景。從技術上講,這個DefaultMQPushConsumer客戶端實際上是底層拉取服務的包裝器。當從代理中提取的消息到達時,它大致調(diào)用注冊的回調(diào)處理程序來饋送消息。本篇文章,我們將介紹DefaultMQPushConsumer的啟動流程
DefaultMQPushConsumer主要API
DefaultMQPushConsumer實現(xiàn)了MQConsumer和MQPushConsumer接口,DefaultMQPushConsumer的主要API都在這兩個接口中定義了,如下所示
// org.apache.rocketmq.client.consumer.MQConsumer
public interface MQConsumer extends MQAdmin {
// 如果消費失敗,消息將被發(fā)送回代理,并延遲消耗一些時間
void sendMessageBack(final MessageExt msg/*消息*/, final int delayLevel/*延遲級別*/, final String brokerName);
// 根據(jù)topic從使用者緩存中獲取消息隊列
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
}
// org.apache.rocketmq.client.consumer.MQPushConsumer
public interface MQPushConsumer extends MQConsumer {
// 啟動Consumer
void start() throws MQClientException;
// 關閉Consumer
void shutdown();
// 注冊并發(fā)消息Listener
void registerMessageListener(final MessageListenerConcurrently messageListener);
// 注冊順序消息Listener,將會有序地接收消息。一個隊列一個線程
void registerMessageListener(final MessageListenerOrderly messageListener);
// 訂閱Topic
void subscribe(final String topic, final String subExpression) throws MQClientException;
// 退訂topic
void unsubscribe(final String topic);
}
DefaultMQPushConsumer關鍵屬性
DefaultMQPushConsumer的關鍵屬性如下所示
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
// DefaultMQPushConsumer的默認實現(xiàn),DefaultMQPushConsumer中大部分功能都是對它的代理
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 相同角色的消費者需要具有完全相同的subscriptions和consumerGroup才能正確實現(xiàn)負載平衡,它需要全局唯一
private String consumerGroup;
// 消息模型定義了如何將消息傳遞到每個消費者客戶端的方式,默認是集群模式
private MessageModel messageModel = MessageModel.CLUSTERING;
// 第一次消費時指定的消費策略,默認是CONSUME_FROM_LAST_OFFSET
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 隊列分配算法,指定如何將消息隊列分配給每個使用者客戶端。
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// 訂閱關系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息監(jiān)聽器
private MessageListener messageListener;
// 消息消費進度存儲器
private OffsetStore offsetStore;
// 最小消費線程數(shù)
private int consumeThreadMin = 20;
// 最大消費線程數(shù)
private int consumeThreadMax = 20;
// 推送模式下拉去消息的間隔時間,默認一次拉取消息完成后立刻繼續(xù)拉取
private long pullInterval = 0;
// 批量消費數(shù)量
private int consumeMessageBatchMaxSize = 1;
// 批量拉取的數(shù)量
private int pullBatchSize = 32;
// 每次拉取時是否更新訂閱關系,默認是false
private boolean postSubscriptionWhenPull = false;
// 消息最大重試次數(shù),如果消息消費最大次數(shù)超過maxReconsumeTimes還未成功,則消息將被轉(zhuǎn)移到一個失敗隊列
private int maxReconsumeTimes = -1;
//延遲將該隊列的消息提交到消費者線程的等待時間,默認延遲1s
private long suspendCurrentQueueTimeMillis = 1000;
// 消息阻塞消費線程的最大超時時間,默認15分鐘
private long consumeTimeout = 15;
// 關閉使用者時等待消息的最長時間,0表示沒有等待。
private long awaitTerminationMillisWhenShutdown = 0;
}Consumer消費模式
Consumer提供下面兩種消費模式,由上面DefaultMQPushConsumer的messageModel定義
- 廣播模式(BROADCASTING)
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收同一個Topic的全量消息。即每條消息會被相同Consumer Group中的所有Consumer消費
- 集群模式(CLUSTERING)
集群模式是Consumer默認的消費模式,集群消費模式下,相同Consumer Group的每個Consumer按照負載均衡策略分攤同一個Topic消息,即每條消息只會被相同Consumer Group中的一個Consumer消費
Consumer消費策略
Consumer主要提了下面三種消費策略
- CONSUME_FROM_LAST_OFFSET
這是Consumer默認的消費策略,它分為兩種情況,如果Broker的磁盤消息未過期且未被刪除,則從最小偏移量開始消費。如果磁盤已過期,并被刪除,則從最大偏移量開始消費。
- CONSUME_FROM_FIRST_OFFSET
從最早可用的消息開始消費
- CONSUME_FROM_TIMESTAMP
從指定的時間戳開始消費,這意味著在consumeTimestamp之前生成的消息將被忽略
Consumer使用
要使用Consumer開始消費消息,至少需要下面5個步驟
public static void main(String[] args) throws MQClientException {
// 1. 傳入CONSUMER_GROUP,創(chuàng)建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 2. 設置namesrvAddr
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3. 訂閱Topic
consumer.subscribe(TOPIC, "*");
// 4.注冊消息Listener
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 5.啟動Consumer
consumer.start();
}
DefaultMQPushConsumer源碼分析
啟動源碼分析
DefaultMQPushConsumer只是設置屬性,Consumer的初始化實際是在DefaultMQPushConsumer#start中執(zhí)行的,DefaultMQPushConsumer#start實際調(diào)用了DefaultMQPushConsumerImpl#start執(zhí)行初始化。
// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
public void start() throws MQClientException {
// consumerGroup封裝namespace
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// DefaultMQPushConsumerImpl啟動
this.defaultMQPushConsumerImpl.start();
// 消息軌跡跟蹤服務,默認null
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}下面我們來分步驟分析DefaultMQPushConsumerImpl#start代碼
第一步:
- 先將Consumer的狀態(tài)更新為START_FAILED
- 校驗Consumer的配置。主要校驗ConsumerGroup,
- 消費模式校驗(MessageModel),消費開始位置(ConsumeFromWhere),消費時間戳(默認是半小時之前),隊列分配策略(默認是AllocateMessageQueueAveragely),訂閱Topic和Subscription關系校驗,消息監(jiān)聽器(MessageListener)校驗等。
- 將Consumer中的訂閱關系拷貝到RebalanceImpl中,Consumer中訂閱關系的來源主要包括
DefaultMQPushConsumerImpl#subscribe方法獲取,也會訂閱重試topic,其主題名為%RETRY%+消費者組名,消費者啟動時會自動訂閱該主題 - 如果是集群模式,則修改消費者名稱為
PID#時間戳
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
//...
// 狀態(tài)先設置為啟動失敗
this.serviceState = ServiceState.START_FAILED;
// 校驗配置,ConsumerGroup校驗,
this.checkConfig();
// 訂閱關系copy到RebalanceImpl中
this.copySubscription();
// 如果是集群模式,消費者名稱如果是DEFAULT,則會改成:PID#時間戳
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//...
}第二步:
主要是初始化MQClientInstance、RebalanceImpl和pullAPIWrapper。
**MQClientInstance:**是消息拉取服務,主要用于拉取消息,同一個進程內(nèi)的所有Consumer會使用同一個MQClientInstance
**RebalanceImpl:**是消費者負載均衡服務,用于確定消費者消費的消息隊列以及負載均衡。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 生成一個MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 設置消費者組
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 消息消費模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 設置消息消費模式
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 設置MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 構建拉消息包裝器
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);第三步:
根據(jù)消息消費模式的不同設置不同的消息消費進度存儲器(OffsetStore),如果是廣播模式,則使用LocalFileOffsetStore作為消息進度存儲器,如果是集群模式則使用RemoteBrokerOffsetStore作為消息進度存儲器。創(chuàng)建完成之后調(diào)用load()方法加載偏移量,如果是LocalFileOffsetStore將會從本地加載。
廣播模式下:LocalFileOffsetStore將消費進度存儲在Consumer本地的${user.home}/.rocketmq_offsets/clientId/consumerGroup/offsets.json文件中
集群模式下:RemoteBrokerOffsetStore將消費進度存儲在Broker
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 如果是廣播模式,則使用LocalFileOffsetStore存儲偏移量
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
// 如果是集群模式,則使用RemoteBrokerOffsetStore存儲偏移量
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 如果是廣播模式,則從本地文件load偏移量,如果是集群模式則是一個空實現(xiàn)
this.offsetStore.load();
第四步:
根據(jù)消息監(jiān)聽器的類型不同創(chuàng)建不同的消息消費服務(并發(fā)/順序消息消費服務),并啟動。然后注冊消費者組和消費者信息到MQClientInstance中的consumerTable中,注冊成功后啟動MQClientInstance客戶端通信實例。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
// 如果是順序消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
// 如果是并發(fā)消費
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 將自身注冊到MQClientInstance
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// ...
mQClientFactory.start();
第五步:
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start // 向Namesrv拉取并更新當前消費者訂閱topic路由信息 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 隨機選擇一個Broker,發(fā)送檢查客戶端tag配置的請求,主要是檢測Broker是否支持SQL92類型的tag過濾以及SQL92的tag語法是否正確 this.mQClientFactory.checkClientInBroker(); // 給所有Broker發(fā)送心跳 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 喚醒負載均衡服務rebalanceService,并進行rebalance this.mQClientFactory.rebalanceImmediately();
總結
本篇文章我們介紹了Consumer的API,屬性,接口和實現(xiàn)類,通過對這幾部分的了解,我們能夠?qū)onsumer有一個整體的認識。我們還分析了DefaultMQPushConsumer的啟動的源碼,通過對DefaultMQPushConsumer#start開始逐漸深入分析DefaultMQPushConsumer的啟動過程,能夠幫助我們對Consumer消費消息一些關鍵的類如MQClientInstance,OffsetStore,RebalanceImpl,ConsumeMessageService由一個初步的認識,由助于我們后續(xù)詳細了解這些服務的工作原理。
以上就是RocketMQ 源碼分析之Consumer整體介紹啟動分析的詳細內(nèi)容,更多關于RocketMQ Consumer源碼解析的資料請關注腳本之家其它相關文章!
相關文章
ElasticSearch學習之多條件組合查詢驗證及示例分析
這篇文章主要為大家介紹了ElasticSearch 多條件組合查詢驗證及示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02
spring-boot-maven-plugin?配置有啥用
這篇文章主要介紹了spring-boot-maven-plugin?配置是干啥的,這個是SpringBoot的Maven插件,主要用來打包的,通常打包成jar或者war文件,本文通過示例代碼給大家介紹的非常詳細,需要的朋友可以參考下2022-08-08
Java?Float?保留小數(shù)位精度的實現(xiàn)
這篇文章主要介紹了Java?Float?保留小數(shù)位精度的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
Mybatis Plus整合PageHelper分頁的實現(xiàn)示例
這篇文章主要介紹了Mybatis Plus整合PageHelper分頁的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09
關于報錯IDEA Terminated with exit code
如果在IDEA構建項目時遇到下面這樣的報錯IDEA Terminated with exit code 1,那必然是Maven的設置參數(shù)重置了,導致下載錯誤引起的,本文給大家分享兩種解決方法,需要的朋友可以參考下2022-08-08

