詳解RocketMQ中的消費者啟動與消費流程分析
一、簡介
1.1 RocketMQ 簡介
RocketMQ是由阿里巴巴開源的分布式消息中間件,支持順序消息、定時消息、自定義過濾器、負(fù)載均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費消息,Broker 負(fù)責(zé)存儲消息。NameServer充當(dāng)名字路由服務(wù),整體架構(gòu)圖如下所示:
- **Producer:**負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)生產(chǎn)消息,可通過集群方式部署。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。
- **Consumer:**負(fù)責(zé)消費消息,一般是后臺系統(tǒng)負(fù)責(zé)異步消費,可通過集群方式部署。一個消息消費者會從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。提供pull/push兩者消費模式。
- **Broker Server:**負(fù)責(zé)存儲消息、轉(zhuǎn)發(fā)消息。RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準(zhǔn)備,存儲消息相關(guān)的元數(shù)據(jù),包括消費者組、消費進(jìn)度偏移和主題和隊列消息等。
- **Name Server:**名字服務(wù),充當(dāng)路由消息的提供者。生產(chǎn)者或消費者能夠通過名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個NameServer實例組成集群,相互獨立,沒有信息交換。
本文基于Apache RocketMQ 最新版本主要講述RocketMQ的消費者機制,分析其啟動流程、pull/push機制,消息ack機制以及定時消息和順序消息的不同。
1.2 工作流程
(1)啟動NameServer。
NameServer起來后監(jiān)聽端口,等待Broker、Producer、Consumer連上來,相當(dāng)于一個路由控制中心。
(2)啟動Broker。
跟所有的NameServer保持長連接,定時發(fā)送心跳包。心跳包中包含當(dāng)前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關(guān)系。
(3)創(chuàng)建Topic。
創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發(fā)送消息時自動創(chuàng)建Topic。
(4)Producer發(fā)送消息。
啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發(fā)消息。
(5)Consumer消費消息。
跟其中一臺NameServer建立長連接,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
二、消費者啟動流程
官方給出的消費者實現(xiàn)代碼如下所示:
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer"); // 設(shè)置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱一個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe("Test", "*"); // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標(biāo)記該消息已經(jīng)被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者實例 consumer.start(); System.out.printf("Consumer Started.%n"); } }
下面讓我們來分析消費者在啟動中每一階段中做了什么吧,let’s go.
2.1 實例化消費者
第一步主要是實例化消費者,這里采取默認(rèn)的Push消費者模式,構(gòu)造器中參數(shù)為對應(yīng)的消費者分組,指定同一分組可以消費同一類型的消息,如果沒有指定,將會采取默認(rèn)的分組模式,這里實例化了一個DefaultMQPushConsumerImpl對象,它是后面消費功能的主要實現(xiàn)類。
// 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
主要通過DefaultMQPushConsumer實例化DefaultMQPushConsumerImpl,它是主要的消費功能實現(xiàn)類。
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); }
2.2 設(shè)置NameServer和訂閱topic過程
// 設(shè)置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息 consumer.subscribe("Test", "*");
2.2.1 添加tag
設(shè)置NameServer地址后,這個地址為你名字服務(wù)集群的地址,類似于zookeeper集群地址,樣例給出的是單機本地地址,搭建集群后,可以設(shè)置為集群地址,接下來我們需要訂閱一個主題topic下的消息,設(shè)置對應(yīng)的topic,可以進(jìn)行分類,通過設(shè)置不同的tag來實現(xiàn),但目前只支持"||"進(jìn)行連接,如:"tag1 || tag2 || tag3"。歸根在于構(gòu)造訂閱數(shù)據(jù)時,源碼通過"||"進(jìn)行了字符串的分割,如下所示:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData; }
2.2.2 發(fā)送心跳至Broker
前面構(gòu)造好訂閱主題和分類后,將其放入了一個ConcurrentMap中,并調(diào)用sendHeartbeatToAllBrokerWithLock()方法,進(jìn)行心跳檢測和上傳過濾器類至broker集群(生產(chǎn)者啟動過程也會進(jìn)行此步驟)。如下所示:
public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { this.sendHeartbeatToAllBroker(); this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed."); } }
首先會對broker集群進(jìn)行心跳檢測,在此過程中會施加鎖,它會執(zhí)行sendHeartbeatToAllBroker方法,構(gòu)建心跳數(shù)據(jù)heartbeatData,然后遍歷消費和生產(chǎn)者table,將消費者和生產(chǎn)者信息加入到heartbeatData中,當(dāng)都存在消費者和生產(chǎn)者的情況下,會遍歷brokerAddrTable,往每個broker 地址發(fā)送心跳,相當(dāng)于往對應(yīng)地址發(fā)送一次http請求,用于探測當(dāng)前broker是否存活。
this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
2.2.3上傳過濾器類至FilterServer
之后會執(zhí)行uploadFilterClassSource()方法,只有push模式才會有此過程,在此模式下,它會循環(huán)遍歷訂閱數(shù)據(jù)SubscriptionData,如果此訂閱數(shù)據(jù)使用了類模式過濾,會調(diào)uploadFilterClassToAllFilterServer()方法:上傳用戶自定義的過濾消息實現(xiàn)類至過濾器服務(wù)器。
private void uploadFilterClassSource() { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> next = it.next(); MQConsumerInner consumer = next.getValue(); if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) { Set<SubscriptionData> subscriptions = consumer.subscriptions(); for (SubscriptionData sub : subscriptions) { if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) { final String consumerGroup = consumer.groupName(); final String className = sub.getSubString(); final String topic = sub.getTopic(); final String filterClassSource = sub.getFilterClassSource(); try { this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource); } catch (Exception e) { log.error("uploadFilterClassToAllFilterServer Exception", e); } } } } } }
過濾器類的作用:消費端可以上傳一個Class類文件到 FilterServer,Consumer從FilterServer拉取消息時,F(xiàn)ilterServer會把請求轉(zhuǎn)發(fā)給Broker,F(xiàn)ilterServer收取到Broker消息后,根據(jù)上傳的過濾類中的邏輯做過濾操作,過濾完成后再把消息給到Consumer,用戶可以自定義過濾消息的實現(xiàn)類。
2.3 注冊回調(diào)實現(xiàn)類
接下來就是代碼中的注冊回調(diào)實現(xiàn)類了,當(dāng)然,如果你是pull模式的話就不需要實現(xiàn)它了,push模式需要定義,兩者區(qū)別后面會講到,它主要用于從broker實時獲取消息,這里有兩種消費上下文類型,用于不同的消費類型。
**ConsumeConcurrentlyContext:**延時類消息上下文,用于延時消息,即定時消息,默認(rèn)不延遲,可以設(shè)置延遲等級,每個等級對應(yīng)固定時間刻度,RocketMQ中不能自定義延遲時間,延遲等級從1開始,對應(yīng)的時間間隔如下所示:
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
**ConsumeOrderlyContext :**順序類消息上下文,控制發(fā)送消息的順序,生產(chǎn)者設(shè)置分片路由規(guī)則后,相同key只落到指定queue上,消費過程中會對順序消息所在的queue加鎖,保證消息的有序性。
2.4 消費者啟動
我們先來看下消費者啟動的過程,如下所示:
**(1)this.checkConfig():**首先是檢測消費配置項,包括消費分組group、消息模型(集群、廣播)、訂閱數(shù)據(jù)、消息監(jiān)聽器等是否存在,如果不存在的話,會拋出異常。
**(2)copySubscription():**構(gòu)建主題訂閱信息SubscriptionData并加入到RebalanceImpl負(fù)載均衡方法的訂閱信息中。
**(3)getAndCreateMQClientInstance():**初始化MQ客戶端實例。
**(4)offsetStore.load():**根據(jù)不同消息模式創(chuàng)建消費進(jìn)度offsetStore并加載:BROADCASTING-廣播模式,同一個消費group中的consumer都消費一次,CLUSTERING-集群模式,默認(rèn)方式,只被消費一次。
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; }
可以通過setMessageModel方式設(shè)置不同模式;廣播模式下同消費組的消費者相互獨立,消費進(jìn)度在本地單獨進(jìn)行存儲;集群模式下,同一條消息只會被同一個消費組消費一次,消費進(jìn)度會參與到負(fù)載均衡中,消費進(jìn)度是共享在整個消費組中的。
**(5)consumeMessageService.start():**根據(jù)不同消息監(jiān)聽類型實例化并啟動。這里有延時消息和順序消息。
這里主要講下順序消息,RocketMQ也幫我們實現(xiàn)了,在啟動時,如果是集群模式并是順序類型,它會啟動定時任務(wù),定時向broker發(fā)送批量鎖,鎖住當(dāng)前順序消費發(fā)往的消息隊列,順序消息因為生產(chǎn)者生產(chǎn)消息時指定了分片策略和消息上下文,只會發(fā)往一個消費隊列。
定時任務(wù)發(fā)送批量鎖,鎖住當(dāng)前順序消息隊列。
public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
發(fā)送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合lockOKMQSet,順序消息具體實現(xiàn)可查看后面第四節(jié)。
**(6)mQClientFactory.registerConsumer():**MQClientInstance注冊消費者,并啟動MQClientInstance,沒有注冊成功會結(jié)束消費服務(wù)。
**(7)mQClientFactory.start():**最后會啟動如下服務(wù):遠(yuǎn)程客戶端、定時任務(wù)、pull消息服務(wù)、負(fù)載均衡服務(wù)、push消息服務(wù),然后將狀態(tài)改為運行中。
switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; }
全部啟動完畢后,整個消費者也就啟動好了,接下來就可以對生產(chǎn)者發(fā)送過來的消息進(jìn)行消費了,那么是如何進(jìn)行消息消費的呢?不同的消息模式有何區(qū)別呢?
三、pull/push 模式消費
3.1 pull模式-DefaultMQPullConsumer
**pull拉取式消費:**應(yīng)用通常主動調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動權(quán)由應(yīng)用程序控制,可以指定消費的位移,【偽代碼】如下所示:
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer"); // 設(shè)置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 啟動消費者實例 consumer.start(); //獲取主題下所有的消息隊列,這里根據(jù)主題從nameserver獲取的 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test"); for (MessageQueue queue : mqs) { //獲取當(dāng)前隊列的消費位移,指定消費進(jìn)度offset,fromstore:從broker中獲取還是本地獲取,true-broker long offset = consumer.fetchConsumeOffset(queue, true); PullResult pullResult = null; while (offset < pullResult.getMaxOffset()) { //第二個參數(shù)為tag,獲取指定topic下的tag //第三個參數(shù)表示從哪個位移下開始消費消息 //第四個參數(shù)表示一次最大拉取多少個消息 try { pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32); } catch (Exception e) { e.printStackTrace(); System.out.println("pull拉取消息失敗"); } //代碼省略,記錄消息位移 offset = pullResult.getNextBeginOffset(); //代碼省略,這里為消費消息 } }
可以看到我們是主動拉取topic對應(yīng)下的消息隊列,然后遍歷它們,獲取當(dāng)前消費進(jìn)度并進(jìn)行消費。
3.2 push模式-DefaultMQPushConsumer
該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高,現(xiàn)在一般推薦使用該方式,具體示例可以觀看第一章開頭的官方demo。
它也是通過實現(xiàn)pull方式來實現(xiàn)的,首先,前面2.4消費者啟動之后,最后會啟動拉取消息服務(wù)pullMessageService和負(fù)載均衡rebalanceService服務(wù),它們啟動后會一直有線程進(jìn)行消費。
case CREATE_JUST: //...... // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); //....... this.serviceState = ServiceState.RUNNING; break; case RUNNING:
這里面調(diào)用doRebalance()方法,進(jìn)行負(fù)載均衡,默認(rèn)每20s做一次,會輪詢所有訂閱該實例的topic。
public class RebalanceService extends ServiceThread { //初始化,省略.... @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); //做負(fù)載均衡 this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return RebalanceService.class.getSimpleName(); } }
然后根據(jù)每個topic,以及它是否順序消息模式來做rebalance。
具體做法就是先對Topic下的消息消費隊列、消費者Id進(jìn)行排序,然后用消息隊列的平均分配算法,計算出待拉取的消息隊列,將分配到的消息隊列集合與processQueueTable做一個過濾比對,新隊列不包含或已過期,則進(jìn)行移除 。
public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { /根據(jù) /每個topic,以及它是否順序消息模式來做rebalance this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
rebalanceByTopic中廣播和集群模式都會執(zhí)行updateProcessQueueTableInRebalance()方法,最后會分發(fā)請求dispatchPullRequest,通過executePullRequestImmediately()方法將pull請求放入pull請求隊列pullRequestQueue中,注意,pull模式下分發(fā)請求方法dispatchPullRequest()實際實現(xiàn)是一個空方法,這里兩者很大不同,push模式實現(xiàn)如下:
@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
然后再PullMessageService中,因為前面consumer啟動成功了,PullMessageService線程會實時去取pullRequestQueue中的pull請求。
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
取出來的pull請求又會經(jīng)由DefaultMQPushConsumerImpl的消息監(jiān)聽類,調(diào)用pullMessage()方法。
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
pullMessage()中pullKernelImpl()有一個Pullback方法用于執(zhí)行消息的回調(diào),它會通過submitConsumeRequest()這個方法來處理消息,總而言之就是通過線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到。
//Pull回調(diào) PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: //省略...消費位移更新 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispathToConsume);
這個方法對應(yīng)的不同消費模式有著不同實現(xiàn),但都是會構(gòu)建一個消費請求ConsumeRequest,里面有一個run()方法,構(gòu)建完畢后,會把它放入到listener監(jiān)聽器中。
//監(jiān)聽消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
還記得前面我們樣例給出的注冊監(jiān)聽器回調(diào)處理方法嗎?
我們可以點擊上面的consumeMessage方法,查看它在源碼中的實現(xiàn)位置,發(fā)現(xiàn)它就回到了我們前面的2.3注冊回調(diào)實現(xiàn)類里面了,整個流程是不是通順了呢?這個監(jiān)聽器中就會收到push的消息,拉取出來進(jìn)行業(yè)務(wù)消費邏輯,下面是我們自己定義的消息回調(diào)處理方法。
// 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 標(biāo)記該消息已經(jīng)被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
3.3 小結(jié)
push模式相比較于pull模式不同的是,做負(fù)載均衡時,pullRequest請求會放入pullRequestQueue,然后PullMessageService線程會實時去取出這個請求,將消息存入ProcessQueue,通過線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到,這樣消息從分發(fā)請求到接收都是實時的,而pull模式是消費端主動去拉取指定消息的,需要指定消費進(jìn)度。
對于我們開發(fā)者來說,選取哪種模式實現(xiàn)我們的業(yè)務(wù)邏輯比較合適呢?別急,先讓我們總結(jié)下他們的特點:
共同點:
兩者底層實際一樣,push模式也是基于pull模式來實現(xiàn)的。
pull模式需要我們通過程序主動通過consumer向broker拉消息,而消息的push模式則只需要我們提供一個listener監(jiān)聽,實時獲取消息。
優(yōu)點:
push模式采用長輪詢阻塞的方式獲取消息,實時性非常高;
push模式rocketMQ處理了獲取消息的細(xì)節(jié),使用起來比較簡單方便;
pull模式可以指定消費進(jìn)度,想消費多少就消費多少,靈活性大。
缺點:
push模式當(dāng)消費者能力遠(yuǎn)遠(yuǎn)低于生產(chǎn)者能力的時候,會產(chǎn)生一定的消費者消息堆積;
pull模式實時性很低,頻率不好設(shè)置;
拉取消息的間隔不好設(shè)置,太短則產(chǎn)生很多無效Pull請求的RPC開銷,影響MQ整體的網(wǎng)絡(luò)性能,太長則實時性差。
適用場景:
對于服務(wù)端生產(chǎn)消息數(shù)據(jù)比較大時,而消費端處理比較復(fù)雜,消費能力相對較低時,這種情況就適用pull模式;
對于數(shù)據(jù)實時性要求高的場景,就比較適用與push模式。
現(xiàn)在的你是否明確業(yè)務(wù)中該使用哪種模式了呢?
四、順序消息
4.1 實現(xiàn)MQ順序消息發(fā)送存在問題
(1)一般消息發(fā)送會采取輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊列);而消費消息的時候從多個queue上拉取消息,broker之間是無感知的,這種情況發(fā)送和消費是不能保證順序。
(2)異步方式發(fā)送消息時,發(fā)送的時候不是按著一條一條順序發(fā)送的,保證不了消息到達(dá)Broker的時間也是按照發(fā)送的順序來的。
消息發(fā)送到存儲,最后到消費要經(jīng)歷這么多步驟,我們該如何在業(yè)務(wù)中使用順序消息呢?讓咱們來一步步拆解下吧。
4.2 實現(xiàn)MQ順序消息關(guān)鍵點
既然分散到多個broker上無法追蹤順序,那么可以控制發(fā)送的順序消息只依次發(fā)送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。在發(fā)送時設(shè)置分片路由規(guī)則,讓相同key的消息只落到指定queue上,然后消費過程中對順序消息所在的queue加鎖,保證消息的有序性,讓這個queue上的消息就按照FIFO順序來進(jìn)行消費。因此我們滿足以下三個條件是否就可以呢?
**1)消息順序發(fā)送:**多線程發(fā)送的消息無法保證有序性,因此,需要業(yè)務(wù)方在發(fā)送時,針對同一個業(yè)務(wù)編號(如同一筆訂單)的消息需要保證在一個線程內(nèi)順序發(fā)送,在上一個消息發(fā)送成功后,在進(jìn)行下一個消息的發(fā)送。對應(yīng)到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無法保證順序性。
//采用的同步發(fā)送方式,在一個線程內(nèi)順序發(fā)送,異步發(fā)送方式為:producer.send(msg, new SendCallback() {...}) SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}
**2)消息順序存儲:**MQ 的topic下會存在多個queue,要保證消息的順序存儲,同一個業(yè)務(wù)編號的消息需要被發(fā)送到一個queue中。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue。即可以對業(yè)務(wù)編號設(shè)置路由規(guī)則,像根據(jù)隊列數(shù)量對業(yè)務(wù)字段hash取余,將消息發(fā)送到一個queue中。
//使用"%"操作,使得訂單id取余后相同的數(shù)據(jù)路由到同一個queue中,也可以自定義路由規(guī)則 long index = id % mqs.size(); return mqs.get((int) index);
3)消息順序消費:要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內(nèi)部,也只能有一個消費線程來消費該隊列。這里RocketMQ已經(jīng)為我們實現(xiàn)好了。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } //....省略 } }
消費者重新負(fù)載,并且分配完消費隊列后,需要向mq服務(wù)器發(fā)起消息拉取請求,代碼實現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance()中,針對順序消息的消息拉取,mq做了以上判斷,即消費客戶端先向broker端發(fā)起對messageQueue的加鎖請求,只有加鎖成功時才創(chuàng)建pullRequest進(jìn)行消息拉取,這里的pullRequest就是前面pull和push模式消息體,而updateProcessQueueTableInRebalance這個方法也是在前面消費者啟動過程中有講到過哦。
具體加鎖邏輯如下:
public boolean lock(final MessageQueue mq) { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); requestBody.setClientId(this.mQClientFactory.getClientId()); requestBody.getMqSet().add(mq); try { Set<MessageQueue> lockedMq = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { processQueue.setLocked(true); processQueue.setLastLockTimestamp(System.currentTimeMillis()); } } boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); } } return false; }
可以看到,就是調(diào)用lockBatchMQ方法發(fā)送了一個加鎖請求,成功獲取到消息處理隊列就設(shè)為獲取到鎖,返回鎖定成功,如果加鎖成功,同一時刻只有一個線程進(jìn)行消息消費。加鎖失敗,會延遲1000ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費請求。
怎么樣,這樣的加鎖方式是不是很像我們平時用到的分布式鎖呢?由你來設(shè)計實現(xiàn)你會怎么做呢?
五、消息ack機制
5.1 消息消費失敗處理
消息被消費者消費了,那么如何保證被消費成功呢?消息消費失敗會出現(xiàn)什么情況呢?
消息被消費,那么如何保證被消費成功呢?這里只有使用方控制,只有使用方確認(rèn)成功了,才會消費成功,否則會重新投遞。
RocketMQ其實是通過ACK機制來對失敗消息進(jìn)行重試和通知的,具體流程如下所示:
消息成功與否是由使用方控制,只有使用方確認(rèn)成功了,才會消費成功,否則會重新投遞,Consumer會通過監(jiān)聽器監(jiān)聽回調(diào)過來的消息,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消費成功,如果消費失敗,返回ConsumeConcurrentlyStatus.RECONSUME_LATER狀態(tài)(消費重試),RocketMQ就會默認(rèn)為這條消息失敗了,延遲一定時間后(默認(rèn)10s,可配置),會再次投送到ConsumerGroup,重試次數(shù)與間隔時間關(guān)系上圖所示。如果持續(xù)這樣,失敗到一定次數(shù)(默認(rèn)16次),就會進(jìn)入到DLQ死信隊列,不再投遞,此時可以通過監(jiān)控人工來干預(yù)。
5.2 消息重投帶來問題
RocketMQ 消費消息因為消息重投很大一個問題就是無法保證消息只被消費一次,因此需要開發(fā)人員在業(yè)務(wù)里面自己去處理。
六、總結(jié)
本文主要介紹了RocketMQ的消費者啟動流程,結(jié)合官方源碼和示例,一步步講述消費者在啟動和消息消費中的的工作原理及內(nèi)容,并結(jié)合平時業(yè)務(wù)工作中,對我們所熟悉的順序、push/pull模式等進(jìn)行詳細(xì)分析,以及對于消息消費失敗和重投帶來問題去進(jìn)行分析。
對于自己而言,希望通過主動學(xué)習(xí)源碼方式,能夠明白其中啟動的原理,學(xué)習(xí)里面優(yōu)秀的方案,像對于pull/push,順序消息這些,學(xué)習(xí)之后能夠了解到push模式是何如做到實時拉取消息的,順序消息是如何保證的,再就是能夠聯(lián)想到平時遇到這種問題該如何處理,像順序消息在消息被消費時保持和存儲的順序一致,這里自己施加分布式鎖寫能不能實現(xiàn)等,文中也有很多引導(dǎo)性問題,希望能引起讀者自己的思考,能夠?qū)φ麄€消費者啟動和消息消費流程有著較為直觀的認(rèn)知,但還有著一些技術(shù)細(xì)節(jié)由于篇幅原因沒做出詳細(xì)說明,也歡迎大家一起探討交流~
參考資料:
到此這篇關(guān)于RocketMQ中的消費者啟動與消費流程的文章就介紹到這了,更多相關(guān)RocketMQ消費者啟動流程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring中的@Value和@PropertySource注解詳解
這篇文章主要介紹了Spring中的@Value和@PropertySource注解詳解,@PropertySource:讀取外部配置文件中的key-value保存到運行的環(huán)境變量中,本文提供了部分實現(xiàn)代碼,需要的朋友可以參考下2023-11-11JAVA過濾標(biāo)簽實現(xiàn)將html內(nèi)容轉(zhuǎn)換為文本的方法示例
這篇文章主要介紹了JAVA過濾標(biāo)簽實現(xiàn)將html內(nèi)容轉(zhuǎn)換為文本的方法,涉及java針對HTML代碼的正則替換相關(guān)操作技巧,需要的朋友可以參考下2017-07-07Java 實戰(zhàn)練手項目之校園超市管理系統(tǒng)的實現(xiàn)流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+Mysql+Maven+Bootstrap實現(xiàn)一個校園超市管理系統(tǒng),大家可以在過程中查缺補漏,提升水平2021-11-11mybatis-plus?執(zhí)行insert(),實體的id自動更新問題
這篇文章主要介紹了mybatis-plus?執(zhí)行insert(),實體的id自動更新問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Java深入學(xué)習(xí)圖形用戶界面GUI之創(chuàng)建窗體
圖形編程中,窗口是一個重要的概念,窗口其實是一個矩形框,應(yīng)用程序可以使用其從而達(dá)到輸出結(jié)果和接受用戶輸入的效果,學(xué)習(xí)了GUI就讓我們用它來創(chuàng)建一個窗體2022-05-05java List.of()與Arrays.asList()方法對比分析
這篇文章主要為大家介紹了java List.of()與Arrays.asList()方法對比分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11