欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程分析

 更新時(shí)間:2022年07月13日 09:47:36   作者:vivo互聯(lián)網(wǎng)技術(shù)  
本文主要介紹了RocketMQ的消費(fèi)者啟動(dòng)流程,結(jié)合官方源碼和示例,一步步講述消費(fèi)者在啟動(dòng)和消息消費(fèi)中的的工作原理及內(nèi)容,并結(jié)合平時(shí)業(yè)務(wù)工作中,對(duì)我們所熟悉的順序、push/pull模式等進(jìn)行詳細(xì)分析,以及對(duì)于消息消費(fèi)失敗和重投帶來(lái)問(wèn)題去進(jìn)行分析,需要的朋友可以參考下

一、簡(jiǎn)介

1.1 RocketMQ 簡(jiǎn)介

RocketMQ是由阿里巴巴開源的分布式消息中間件,支持順序消息、定時(shí)消息、自定義過(guò)濾器、負(fù)載均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。NameServer充當(dāng)名字路由服務(wù),整體架構(gòu)圖如下所示:

  • **Producer:**負(fù)責(zé)生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)生產(chǎn)消息,可通過(guò)集群方式部署。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認(rèn)信息,單向發(fā)送不需要。
  • **Consumer:**負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi),可通過(guò)集群方式部署。一個(gè)消息消費(fèi)者會(huì)從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。提供pull/push兩者消費(fèi)模式。
  • **Broker Server:**負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。RocketMQ系統(tǒng)中負(fù)責(zé)接收從生產(chǎn)者發(fā)送來(lái)的消息并存儲(chǔ)、同時(shí)為消費(fèi)者的拉取請(qǐng)求作準(zhǔn)備,存儲(chǔ)消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組、消費(fèi)進(jìn)度偏移和主題和隊(duì)列消息等。
  • **Name Server:**名字服務(wù),充當(dāng)路由消息的提供者。生產(chǎn)者或消費(fèi)者能夠通過(guò)名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個(gè)NameServer實(shí)例組成集群,相互獨(dú)立,沒有信息交換。

本文基于Apache RocketMQ 最新版本主要講述RocketMQ的消費(fèi)者機(jī)制,分析其啟動(dòng)流程、pull/push機(jī)制,消息ack機(jī)制以及定時(shí)消息和順序消息的不同。

1.2 工作流程

(1)啟動(dòng)NameServer。

NameServer起來(lái)后監(jiān)聽端口,等待Broker、Producer、Consumer連上來(lái),相當(dāng)于一個(gè)路由控制中心。

(2)啟動(dòng)Broker。

跟所有的NameServer保持長(zhǎng)連接,定時(shí)發(fā)送心跳包。心跳包中包含當(dāng)前Broker信息(IP+端口等)以及存儲(chǔ)所有Topic信息。注冊(cè)成功后,NameServer集群中就有Topic跟Broker的映射關(guān)系。

(3)創(chuàng)建Topic。

創(chuàng)建Topic時(shí)需要指定該Topic要存儲(chǔ)在哪些Broker上,也可以在發(fā)送消息時(shí)自動(dòng)創(chuàng)建Topic。

(4)Producer發(fā)送消息。

啟動(dòng)時(shí)先跟NameServer集群中的其中一臺(tái)建立長(zhǎng)連接,并從NameServer中獲取當(dāng)前發(fā)送的Topic存在哪些Broker上,輪詢從隊(duì)列列表中選擇一個(gè)隊(duì)列,然后與隊(duì)列所在的Broker建立長(zhǎng)連接從而向Broker發(fā)消息。

(5)Consumer消費(fèi)消息。

跟其中一臺(tái)NameServer建立長(zhǎng)連接,獲取當(dāng)前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費(fèi)消息。

二、消費(fèi)者啟動(dòng)流程

官方給出的消費(fèi)者實(shí)現(xiàn)代碼如下所示:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 實(shí)例化消費(fèi)者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
        // 設(shè)置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱一個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息
        consumer.subscribe("Test", "*");
        // 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動(dòng)消費(fèi)者實(shí)例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

下面讓我們來(lái)分析消費(fèi)者在啟動(dòng)中每一階段中做了什么吧,let’s go.

2.1 實(shí)例化消費(fèi)者

第一步主要是實(shí)例化消費(fèi)者,這里采取默認(rèn)的Push消費(fèi)者模式,構(gòu)造器中參數(shù)為對(duì)應(yīng)的消費(fèi)者分組,指定同一分組可以消費(fèi)同一類型的消息,如果沒有指定,將會(huì)采取默認(rèn)的分組模式,這里實(shí)例化了一個(gè)DefaultMQPushConsumerImpl對(duì)象,它是后面消費(fèi)功能的主要實(shí)現(xiàn)類。

// 實(shí)例化消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");

主要通過(guò)DefaultMQPushConsumer實(shí)例化DefaultMQPushConsumerImpl,它是主要的消費(fèi)功能實(shí)現(xiàn)類。

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
       AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
       this.consumerGroup = consumerGroup;
       this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
       defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
   }

2.2 設(shè)置NameServer和訂閱topic過(guò)程

// 設(shè)置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個(gè)或者多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息
consumer.subscribe("Test", "*");

2.2.1 添加tag

設(shè)置NameServer地址后,這個(gè)地址為你名字服務(wù)集群的地址,類似于zookeeper集群地址,樣例給出的是單機(jī)本地地址,搭建集群后,可以設(shè)置為集群地址,接下來(lái)我們需要訂閱一個(gè)主題topic下的消息,設(shè)置對(duì)應(yīng)的topic,可以進(jìn)行分類,通過(guò)設(shè)置不同的tag來(lái)實(shí)現(xiàn),但目前只支持"||"進(jìn)行連接,如:"tag1 || tag2 || tag3"。歸根在于構(gòu)造訂閱數(shù)據(jù)時(shí),源碼通過(guò)"||"進(jìn)行了字符串的分割,如下所示:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);
 
    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            throw new Exception("subString split error");
        }
    }
 
    return subscriptionData;
}

2.2.2 發(fā)送心跳至Broker

前面構(gòu)造好訂閱主題和分類后,將其放入了一個(gè)ConcurrentMap中,并調(diào)用sendHeartbeatToAllBrokerWithLock()方法,進(jìn)行心跳檢測(cè)和上傳過(guò)濾器類至broker集群(生產(chǎn)者啟動(dòng)過(guò)程也會(huì)進(jìn)行此步驟)。如下所示:

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

首先會(huì)對(duì)broker集群進(jìn)行心跳檢測(cè),在此過(guò)程中會(huì)施加鎖,它會(huì)執(zhí)行sendHeartbeatToAllBroker方法,構(gòu)建心跳數(shù)據(jù)heartbeatData,然后遍歷消費(fèi)和生產(chǎn)者table,將消費(fèi)者和生產(chǎn)者信息加入到heartbeatData中,當(dāng)都存在消費(fèi)者和生產(chǎn)者的情況下,會(huì)遍歷brokerAddrTable,往每個(gè)broker 地址發(fā)送心跳,相當(dāng)于往對(duì)應(yīng)地址發(fā)送一次http請(qǐng)求,用于探測(cè)當(dāng)前broker是否存活。

this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);

2.2.3上傳過(guò)濾器類至FilterServer

之后會(huì)執(zhí)行uploadFilterClassSource()方法,只有push模式才會(huì)有此過(guò)程,在此模式下,它會(huì)循環(huán)遍歷訂閱數(shù)據(jù)SubscriptionData,如果此訂閱數(shù)據(jù)使用了類模式過(guò)濾,會(huì)調(diào)uploadFilterClassToAllFilterServer()方法:上傳用戶自定義的過(guò)濾消息實(shí)現(xiàn)類至過(guò)濾器服務(wù)器。

private void uploadFilterClassSource() {
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> next = it.next();
        MQConsumerInner consumer = next.getValue();
        if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
            Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {
                if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
                    final String consumerGroup = consumer.groupName();
                    final String className = sub.getSubString();
                    final String topic = sub.getTopic();
                    final String filterClassSource = sub.getFilterClassSource();
                    try {
                        this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                    } catch (Exception e) {
                        log.error("uploadFilterClassToAllFilterServer Exception", e);
                    }
                }
            }
        }
    }
}

過(guò)濾器類的作用:消費(fèi)端可以上傳一個(gè)Class類文件到 FilterServer,Consumer從FilterServer拉取消息時(shí),F(xiàn)ilterServer會(huì)把請(qǐng)求轉(zhuǎn)發(fā)給Broker,F(xiàn)ilterServer收取到Broker消息后,根據(jù)上傳的過(guò)濾類中的邏輯做過(guò)濾操作,過(guò)濾完成后再把消息給到Consumer,用戶可以自定義過(guò)濾消息的實(shí)現(xiàn)類。

2.3 注冊(cè)回調(diào)實(shí)現(xiàn)類

接下來(lái)就是代碼中的注冊(cè)回調(diào)實(shí)現(xiàn)類了,當(dāng)然,如果你是pull模式的話就不需要實(shí)現(xiàn)它了,push模式需要定義,兩者區(qū)別后面會(huì)講到,它主要用于從broker實(shí)時(shí)獲取消息,這里有兩種消費(fèi)上下文類型,用于不同的消費(fèi)類型。

**ConsumeConcurrentlyContext:**延時(shí)類消息上下文,用于延時(shí)消息,即定時(shí)消息,默認(rèn)不延遲,可以設(shè)置延遲等級(jí),每個(gè)等級(jí)對(duì)應(yīng)固定時(shí)間刻度,RocketMQ中不能自定義延遲時(shí)間,延遲等級(jí)從1開始,對(duì)應(yīng)的時(shí)間間隔如下所示:

"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

**ConsumeOrderlyContext :**順序類消息上下文,控制發(fā)送消息的順序,生產(chǎn)者設(shè)置分片路由規(guī)則后,相同key只落到指定queue上,消費(fèi)過(guò)程中會(huì)對(duì)順序消息所在的queue加鎖,保證消息的有序性。

2.4 消費(fèi)者啟動(dòng)

我們先來(lái)看下消費(fèi)者啟動(dòng)的過(guò)程,如下所示:

**(1)this.checkConfig():**首先是檢測(cè)消費(fèi)配置項(xiàng),包括消費(fèi)分組group、消息模型(集群、廣播)、訂閱數(shù)據(jù)、消息監(jiān)聽器等是否存在,如果不存在的話,會(huì)拋出異常。

**(2)copySubscription():**構(gòu)建主題訂閱信息SubscriptionData并加入到RebalanceImpl負(fù)載均衡方法的訂閱信息中。

**(3)getAndCreateMQClientInstance():**初始化MQ客戶端實(shí)例。

**(4)offsetStore.load():**根據(jù)不同消息模式創(chuàng)建消費(fèi)進(jìn)度offsetStore并加載:BROADCASTING-廣播模式,同一個(gè)消費(fèi)group中的consumer都消費(fèi)一次,CLUSTERING-集群模式,默認(rèn)方式,只被消費(fèi)一次。

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
}

可以通過(guò)setMessageModel方式設(shè)置不同模式;廣播模式下同消費(fèi)組的消費(fèi)者相互獨(dú)立,消費(fèi)進(jìn)度在本地單獨(dú)進(jìn)行存儲(chǔ);集群模式下,同一條消息只會(huì)被同一個(gè)消費(fèi)組消費(fèi)一次,消費(fèi)進(jìn)度會(huì)參與到負(fù)載均衡中,消費(fèi)進(jìn)度是共享在整個(gè)消費(fèi)組中的。

**(5)consumeMessageService.start():**根據(jù)不同消息監(jiān)聽類型實(shí)例化并啟動(dòng)。這里有延時(shí)消息和順序消息。

這里主要講下順序消息,RocketMQ也幫我們實(shí)現(xiàn)了,在啟動(dòng)時(shí),如果是集群模式并是順序類型,它會(huì)啟動(dòng)定時(shí)任務(wù),定時(shí)向broker發(fā)送批量鎖,鎖住當(dāng)前順序消費(fèi)發(fā)往的消息隊(duì)列,順序消息因?yàn)樯a(chǎn)者生產(chǎn)消息時(shí)指定了分片策略和消息上下文,只會(huì)發(fā)往一個(gè)消費(fèi)隊(duì)列。

定時(shí)任務(wù)發(fā)送批量鎖,鎖住當(dāng)前順序消息隊(duì)列。

public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

發(fā)送鎖住隊(duì)列的消息至broker,broker端返回鎖住成功的隊(duì)列集合lockOKMQSet,順序消息具體實(shí)現(xiàn)可查看后面第四節(jié)。

**(6)mQClientFactory.registerConsumer():**MQClientInstance注冊(cè)消費(fèi)者,并啟動(dòng)MQClientInstance,沒有注冊(cè)成功會(huì)結(jié)束消費(fèi)服務(wù)。

**(7)mQClientFactory.start():**最后會(huì)啟動(dòng)如下服務(wù):遠(yuǎn)程客戶端、定時(shí)任務(wù)、pull消息服務(wù)、負(fù)載均衡服務(wù)、push消息服務(wù),然后將狀態(tài)改為運(yùn)行中。

switch (this.serviceState) {
    case CREATE_JUST:
        this.serviceState = ServiceState.START_FAILED;
        // If not specified,looking address from name server
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.mQClientAPIImpl.fetchNameServerAddr();
        }
        // Start request-response channel
        this.mQClientAPIImpl.start();
        // Start various schedule tasks
        this.startScheduledTask();
        // Start pull service
        this.pullMessageService.start();
        // Start rebalance service
        this.rebalanceService.start();
        // Start push service
        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
        log.info("the client factory [{}] start OK", this.clientId);
        this.serviceState = ServiceState.RUNNING;
        break;
    case RUNNING:
        break;
    case SHUTDOWN_ALREADY:
        break;
    case START_FAILED:
        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    default:
        break;
}

全部啟動(dòng)完畢后,整個(gè)消費(fèi)者也就啟動(dòng)好了,接下來(lái)就可以對(duì)生產(chǎn)者發(fā)送過(guò)來(lái)的消息進(jìn)行消費(fèi)了,那么是如何進(jìn)行消息消費(fèi)的呢?不同的消息模式有何區(qū)別呢?

三、pull/push 模式消費(fèi)

3.1 pull模式-DefaultMQPullConsumer

**pull拉取式消費(fèi):**應(yīng)用通常主動(dòng)調(diào)用Consumer的拉消息方法從Broker服務(wù)器拉消息、主動(dòng)權(quán)由應(yīng)用程序控制,可以指定消費(fèi)的位移,【偽代碼】如下所示:

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");
// 設(shè)置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 啟動(dòng)消費(fèi)者實(shí)例
consumer.start();
//獲取主題下所有的消息隊(duì)列,這里根據(jù)主題從nameserver獲取的
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");
for (MessageQueue queue : mqs) {
    //獲取當(dāng)前隊(duì)列的消費(fèi)位移,指定消費(fèi)進(jìn)度offset,fromstore:從broker中獲取還是本地獲取,true-broker
    long offset = consumer.fetchConsumeOffset(queue, true);
    PullResult pullResult = null;
    while (offset < pullResult.getMaxOffset()) {
        //第二個(gè)參數(shù)為tag,獲取指定topic下的tag
        //第三個(gè)參數(shù)表示從哪個(gè)位移下開始消費(fèi)消息
        //第四個(gè)參數(shù)表示一次最大拉取多少個(gè)消息
        try {
            pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("pull拉取消息失敗");
        }
        //代碼省略,記錄消息位移
        offset = pullResult.getNextBeginOffset();
        //代碼省略,這里為消費(fèi)消息
    }
}

可以看到我們是主動(dòng)拉取topic對(duì)應(yīng)下的消息隊(duì)列,然后遍歷它們,獲取當(dāng)前消費(fèi)進(jìn)度并進(jìn)行消費(fèi)。

3.2 push模式-DefaultMQPushConsumer

該模式下Broker收到數(shù)據(jù)后會(huì)主動(dòng)推送給消費(fèi)端,該消費(fèi)模式一般實(shí)時(shí)性較高,現(xiàn)在一般推薦使用該方式,具體示例可以觀看第一章開頭的官方demo。

它也是通過(guò)實(shí)現(xiàn)pull方式來(lái)實(shí)現(xiàn)的,首先,前面2.4消費(fèi)者啟動(dòng)之后,最后會(huì)啟動(dòng)拉取消息服務(wù)pullMessageService和負(fù)載均衡rebalanceService服務(wù),它們啟動(dòng)后會(huì)一直有線程進(jìn)行消費(fèi)。

case CREATE_JUST:
               //......
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                //.......
                this.serviceState = ServiceState.RUNNING;
                break;
  case RUNNING:

這里面調(diào)用doRebalance()方法,進(jìn)行負(fù)載均衡,默認(rèn)每20s做一次,會(huì)輪詢所有訂閱該實(shí)例的topic。

public class RebalanceService extends ServiceThread {
    //初始化,省略....
 
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
 
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            //做負(fù)載均衡
            this.mqClientFactory.doRebalance();
        }
 
        log.info(this.getServiceName() + " service end");
    }
 
    @Override
    public String getServiceName() {
        return RebalanceService.class.getSimpleName();
    }
}

然后根據(jù)每個(gè)topic,以及它是否順序消息模式來(lái)做rebalance。

具體做法就是先對(duì)Topic下的消息消費(fèi)隊(duì)列、消費(fèi)者Id進(jìn)行排序,然后用消息隊(duì)列的平均分配算法,計(jì)算出待拉取的消息隊(duì)列,將分配到的消息隊(duì)列集合與processQueueTable做一個(gè)過(guò)濾比對(duì),新隊(duì)列不包含或已過(guò)期,則進(jìn)行移除 。

public void doRebalance(final boolean isOrder) {
      Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
      if (subTable != null) {
          for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
              final String topic = entry.getKey();
              try {
                  /根據(jù) /每個(gè)topic,以及它是否順序消息模式來(lái)做rebalance
                  this.rebalanceByTopic(topic, isOrder);
              } catch (Throwable e) {
                  if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                      log.warn("rebalanceByTopic Exception", e);
                  }
              }
          }
      }
 
      this.truncateMessageQueueNotMyTopic();
  }

rebalanceByTopic中廣播和集群模式都會(huì)執(zhí)行updateProcessQueueTableInRebalance()方法,最后會(huì)分發(fā)請(qǐng)求dispatchPullRequest,通過(guò)executePullRequestImmediately()方法將pull請(qǐng)求放入pull請(qǐng)求隊(duì)列pullRequestQueue中,注意,pull模式下分發(fā)請(qǐng)求方法dispatchPullRequest()實(shí)際實(shí)現(xiàn)是一個(gè)空方法,這里兩者很大不同,push模式實(shí)現(xiàn)如下

@Override
 public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     for (PullRequest pullRequest : pullRequestList) {
         this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
         log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
     }
 }

然后再PullMessageService中,因?yàn)榍懊鎐onsumer啟動(dòng)成功了,PullMessageService線程會(huì)實(shí)時(shí)去取pullRequestQueue中的pull請(qǐng)求。

@Override
  public void run() {
      log.info(this.getServiceName() + " service started");
 
      while (!this.isStopped()) {
          try {
              PullRequest pullRequest = this.pullRequestQueue.take();
              if (pullRequest != null) {
                  this.pullMessage(pullRequest);
              }
          } catch (InterruptedException e) {
          } catch (Exception e) {
              log.error("Pull Message Service Run Method exception", e);
          }
      }
 
      log.info(this.getServiceName() + " service end");
  }

取出來(lái)的pull請(qǐng)求又會(huì)經(jīng)由DefaultMQPushConsumerImpl的消息監(jiān)聽類,調(diào)用pullMessage()方法。

private void pullMessage(final PullRequest pullRequest) {
     final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
     if (consumer != null) {
         DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
         impl.pullMessage(pullRequest);
     } else {
         log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
     }
 }

pullMessage()中pullKernelImpl()有一個(gè)Pullback方法用于執(zhí)行消息的回調(diào),它會(huì)通過(guò)submitConsumeRequest()這個(gè)方法來(lái)處理消息,總而言之就是通過(guò)線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到。

//Pull回調(diào)
PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
 
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                         //省略...消費(fèi)位移更新
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispathToConsume);

這個(gè)方法對(duì)應(yīng)的不同消費(fèi)模式有著不同實(shí)現(xiàn),但都是會(huì)構(gòu)建一個(gè)消費(fèi)請(qǐng)求ConsumeRequest,里面有一個(gè)run()方法,構(gòu)建完畢后,會(huì)把它放入到listener監(jiān)聽器中。

//監(jiān)聽消息
 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

還記得前面我們樣例給出的注冊(cè)監(jiān)聽器回調(diào)處理方法嗎?

我們可以點(diǎn)擊上面的consumeMessage方法,查看它在源碼中的實(shí)現(xiàn)位置,發(fā)現(xiàn)它就回到了我們前面的2.3注冊(cè)回調(diào)實(shí)現(xiàn)類里面了,整個(gè)流程是不是通順了呢?這個(gè)監(jiān)聽器中就會(huì)收到push的消息,拉取出來(lái)進(jìn)行業(yè)務(wù)消費(fèi)邏輯,下面是我們自己定義的消息回調(diào)處理方法。

// 注冊(cè)回調(diào)實(shí)現(xiàn)類來(lái)處理從broker拉取回來(lái)的消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
         // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

3.3 小結(jié)

push模式相比較于pull模式不同的是,做負(fù)載均衡時(shí),pullRequest請(qǐng)求會(huì)放入pullRequestQueue,然后PullMessageService線程會(huì)實(shí)時(shí)去取出這個(gè)請(qǐng)求,將消息存入ProcessQueue,通過(guò)線程回調(diào)的方式讓push模式下的監(jiān)聽器能夠感知到,這樣消息從分發(fā)請(qǐng)求到接收都是實(shí)時(shí)的,而pull模式是消費(fèi)端主動(dòng)去拉取指定消息的,需要指定消費(fèi)進(jìn)度。

對(duì)于我們開發(fā)者來(lái)說(shuō),選取哪種模式實(shí)現(xiàn)我們的業(yè)務(wù)邏輯比較合適呢?別急,先讓我們總結(jié)下他們的特點(diǎn):

共同點(diǎn):

兩者底層實(shí)際一樣,push模式也是基于pull模式來(lái)實(shí)現(xiàn)的。

pull模式需要我們通過(guò)程序主動(dòng)通過(guò)consumer向broker拉消息,而消息的push模式則只需要我們提供一個(gè)listener監(jiān)聽,實(shí)時(shí)獲取消息。

優(yōu)點(diǎn):

push模式采用長(zhǎng)輪詢阻塞的方式獲取消息,實(shí)時(shí)性非常高;

push模式rocketMQ處理了獲取消息的細(xì)節(jié),使用起來(lái)比較簡(jiǎn)單方便;

pull模式可以指定消費(fèi)進(jìn)度,想消費(fèi)多少就消費(fèi)多少,靈活性大。

缺點(diǎn):

push模式當(dāng)消費(fèi)者能力遠(yuǎn)遠(yuǎn)低于生產(chǎn)者能力的時(shí)候,會(huì)產(chǎn)生一定的消費(fèi)者消息堆積;

pull模式實(shí)時(shí)性很低,頻率不好設(shè)置;

拉取消息的間隔不好設(shè)置,太短則產(chǎn)生很多無(wú)效Pull請(qǐng)求的RPC開銷,影響MQ整體的網(wǎng)絡(luò)性能,太長(zhǎng)則實(shí)時(shí)性差。

適用場(chǎng)景:

對(duì)于服務(wù)端生產(chǎn)消息數(shù)據(jù)比較大時(shí),而消費(fèi)端處理比較復(fù)雜,消費(fèi)能力相對(duì)較低時(shí),這種情況就適用pull模式;

對(duì)于數(shù)據(jù)實(shí)時(shí)性要求高的場(chǎng)景,就比較適用與push模式。

現(xiàn)在的你是否明確業(yè)務(wù)中該使用哪種模式了呢?

四、順序消息

4.1 實(shí)現(xiàn)MQ順序消息發(fā)送存在問(wèn)題

(1)一般消息發(fā)送會(huì)采取輪詢方式把消息發(fā)送到不同的queue(分區(qū)隊(duì)列);而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,broker之間是無(wú)感知的,這種情況發(fā)送和消費(fèi)是不能保證順序。

(2)異步方式發(fā)送消息時(shí),發(fā)送的時(shí)候不是按著一條一條順序發(fā)送的,保證不了消息到達(dá)Broker的時(shí)間也是按照發(fā)送的順序來(lái)的。

消息發(fā)送到存儲(chǔ),最后到消費(fèi)要經(jīng)歷這么多步驟,我們?cè)撊绾卧跇I(yè)務(wù)中使用順序消息呢?讓咱們來(lái)一步步拆解下吧。

4.2 實(shí)現(xiàn)MQ順序消息關(guān)鍵點(diǎn)

既然分散到多個(gè)broker上無(wú)法追蹤順序,那么可以控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。在發(fā)送時(shí)設(shè)置分片路由規(guī)則,讓相同key的消息只落到指定queue上,然后消費(fèi)過(guò)程中對(duì)順序消息所在的queue加鎖,保證消息的有序性,讓這個(gè)queue上的消息就按照FIFO順序來(lái)進(jìn)行消費(fèi)。因此我們滿足以下三個(gè)條件是否就可以呢?

**1)消息順序發(fā)送:**多線程發(fā)送的消息無(wú)法保證有序性,因此,需要業(yè)務(wù)方在發(fā)送時(shí),針對(duì)同一個(gè)業(yè)務(wù)編號(hào)(如同一筆訂單)的消息需要保證在一個(gè)線程內(nèi)順序發(fā)送,在上一個(gè)消息發(fā)送成功后,在進(jìn)行下一個(gè)消息的發(fā)送。對(duì)應(yīng)到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無(wú)法保證順序性。

//采用的同步發(fā)送方式,在一個(gè)線程內(nèi)順序發(fā)送,異步發(fā)送方式為:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

**2)消息順序存儲(chǔ):**MQ 的topic下會(huì)存在多個(gè)queue,要保證消息的順序存儲(chǔ),同一個(gè)業(yè)務(wù)編號(hào)的消息需要被發(fā)送到一個(gè)queue中。對(duì)應(yīng)到mq中,需要使用MessageQueueSelector來(lái)選擇要發(fā)送的queue。即可以對(duì)業(yè)務(wù)編號(hào)設(shè)置路由規(guī)則,像根據(jù)隊(duì)列數(shù)量對(duì)業(yè)務(wù)字段hash取余,將消息發(fā)送到一個(gè)queue中。

//使用"%"操作,使得訂單id取余后相同的數(shù)據(jù)路由到同一個(gè)queue中,也可以自定義路由規(guī)則
long index = id % mqs.size();  
return mqs.get((int) index);

3)消息順序消費(fèi):要保證消息順序消費(fèi),同一個(gè)queue就只能被一個(gè)消費(fèi)者所消費(fèi),因此對(duì)broker中消費(fèi)隊(duì)列加鎖是無(wú)法避免的。同一時(shí)刻,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者消費(fèi),消費(fèi)者內(nèi)部,也只能有一個(gè)消費(fèi)線程來(lái)消費(fèi)該隊(duì)列。這里RocketMQ已經(jīng)為我們實(shí)現(xiàn)好了。

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
 
         //....省略
        }
    }

消費(fèi)者重新負(fù)載,并且分配完消費(fèi)隊(duì)列后,需要向mq服務(wù)器發(fā)起消息拉取請(qǐng)求,代碼實(shí)現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance()中,針對(duì)順序消息的消息拉取,mq做了以上判斷,即消費(fèi)客戶端先向broker端發(fā)起對(duì)messageQueue的加鎖請(qǐng)求,只有加鎖成功時(shí)才創(chuàng)建pullRequest進(jìn)行消息拉取,這里的pullRequest就是前面pull和push模式消息體,而updateProcessQueueTableInRebalance這個(gè)方法也是在前面消費(fèi)者啟動(dòng)過(guò)程中有講到過(guò)哦。

具體加鎖邏輯如下:

public boolean lock(final MessageQueue mq) {
     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
     if (findBrokerResult != null) {
         LockBatchRequestBody requestBody = new LockBatchRequestBody();
         requestBody.setConsumerGroup(this.consumerGroup);
         requestBody.setClientId(this.mQClientFactory.getClientId());
         requestBody.getMqSet().add(mq);
 
         try {
             Set<MessageQueue> lockedMq =
                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
             for (MessageQueue mmqq : lockedMq) {
                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                 if (processQueue != null) {
                     processQueue.setLocked(true);
                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
                 }
             }
 
             boolean lockOK = lockedMq.contains(mq);
             log.info("the message queue lock {}, {} {}",
                 lockOK ? "OK" : "Failed",
                 this.consumerGroup,
                 mq);
             return lockOK;
         } catch (Exception e) {
             log.error("lockBatchMQ exception, " + mq, e);
         }
     }
 
     return false;
 }

可以看到,就是調(diào)用lockBatchMQ方法發(fā)送了一個(gè)加鎖請(qǐng)求,成功獲取到消息處理隊(duì)列就設(shè)為獲取到鎖,返回鎖定成功,如果加鎖成功,同一時(shí)刻只有一個(gè)線程進(jìn)行消息消費(fèi)。加鎖失敗,會(huì)延遲1000ms重新嘗試向broker端申請(qǐng)鎖定messageQueue,鎖定成功后重新提交消費(fèi)請(qǐng)求。

怎么樣,這樣的加鎖方式是不是很像我們平時(shí)用到的分布式鎖呢?由你來(lái)設(shè)計(jì)實(shí)現(xiàn)你會(huì)怎么做呢?

五、消息ack機(jī)制

5.1 消息消費(fèi)失敗處理

消息被消費(fèi)者消費(fèi)了,那么如何保證被消費(fèi)成功呢?消息消費(fèi)失敗會(huì)出現(xiàn)什么情況呢?

消息被消費(fèi),那么如何保證被消費(fèi)成功呢?這里只有使用方控制,只有使用方確認(rèn)成功了,才會(huì)消費(fèi)成功,否則會(huì)重新投遞。

RocketMQ其實(shí)是通過(guò)ACK機(jī)制來(lái)對(duì)失敗消息進(jìn)行重試和通知的,具體流程如下所示:

消息成功與否是由使用方控制,只有使用方確認(rèn)成功了,才會(huì)消費(fèi)成功,否則會(huì)重新投遞,Consumer會(huì)通過(guò)監(jiān)聽器監(jiān)聽回調(diào)過(guò)來(lái)的消息,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消費(fèi)成功,如果消費(fèi)失敗,返回ConsumeConcurrentlyStatus.RECONSUME_LATER狀態(tài)(消費(fèi)重試),RocketMQ就會(huì)默認(rèn)為這條消息失敗了,延遲一定時(shí)間后(默認(rèn)10s,可配置),會(huì)再次投送到ConsumerGroup,重試次數(shù)與間隔時(shí)間關(guān)系上圖所示。如果持續(xù)這樣,失敗到一定次數(shù)(默認(rèn)16次),就會(huì)進(jìn)入到DLQ死信隊(duì)列,不再投遞,此時(shí)可以通過(guò)監(jiān)控人工來(lái)干預(yù)。

5.2 消息重投帶來(lái)問(wèn)題

RocketMQ 消費(fèi)消息因?yàn)橄⒅赝逗艽笠粋€(gè)問(wèn)題就是無(wú)法保證消息只被消費(fèi)一次,因此需要開發(fā)人員在業(yè)務(wù)里面自己去處理。

六、總結(jié)

本文主要介紹了RocketMQ的消費(fèi)者啟動(dòng)流程,結(jié)合官方源碼和示例,一步步講述消費(fèi)者在啟動(dòng)和消息消費(fèi)中的的工作原理及內(nèi)容,并結(jié)合平時(shí)業(yè)務(wù)工作中,對(duì)我們所熟悉的順序、push/pull模式等進(jìn)行詳細(xì)分析,以及對(duì)于消息消費(fèi)失敗和重投帶來(lái)問(wèn)題去進(jìn)行分析。

對(duì)于自己而言,希望通過(guò)主動(dòng)學(xué)習(xí)源碼方式,能夠明白其中啟動(dòng)的原理,學(xué)習(xí)里面優(yōu)秀的方案,像對(duì)于pull/push,順序消息這些,學(xué)習(xí)之后能夠了解到push模式是何如做到實(shí)時(shí)拉取消息的,順序消息是如何保證的,再就是能夠聯(lián)想到平時(shí)遇到這種問(wèn)題該如何處理,像順序消息在消息被消費(fèi)時(shí)保持和存儲(chǔ)的順序一致,這里自己施加分布式鎖寫能不能實(shí)現(xiàn)等,文中也有很多引導(dǎo)性問(wèn)題,希望能引起讀者自己的思考,能夠?qū)φ麄€(gè)消費(fèi)者啟動(dòng)和消息消費(fèi)流程有著較為直觀的認(rèn)知,但還有著一些技術(shù)細(xì)節(jié)由于篇幅原因沒做出詳細(xì)說(shuō)明,也歡迎大家一起探討交流~

參考資料:

到此這篇關(guān)于RocketMQ中的消費(fèi)者啟動(dòng)與消費(fèi)流程的文章就介紹到這了,更多相關(guān)RocketMQ消費(fèi)者啟動(dòng)流程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 深入探究HashMap二次Hash原因

    深入探究HashMap二次Hash原因

    在java開發(fā)中,HashMap是最常用、最常見的集合容器類之一,文中通過(guò)示例代碼介紹HashMap為啥要二次Hash,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • Spring中的@Value和@PropertySource注解詳解

    Spring中的@Value和@PropertySource注解詳解

    這篇文章主要介紹了Spring中的@Value和@PropertySource注解詳解,@PropertySource:讀取外部配置文件中的key-value保存到運(yùn)行的環(huán)境變量中,本文提供了部分實(shí)現(xiàn)代碼,需要的朋友可以參考下
    2023-11-11
  • JAVA過(guò)濾標(biāo)簽實(shí)現(xiàn)將html內(nèi)容轉(zhuǎn)換為文本的方法示例

    JAVA過(guò)濾標(biāo)簽實(shí)現(xiàn)將html內(nèi)容轉(zhuǎn)換為文本的方法示例

    這篇文章主要介紹了JAVA過(guò)濾標(biāo)簽實(shí)現(xiàn)將html內(nèi)容轉(zhuǎn)換為文本的方法,涉及java針對(duì)HTML代碼的正則替換相關(guān)操作技巧,需要的朋友可以參考下
    2017-07-07
  • Java 實(shí)戰(zhàn)練手項(xiàng)目之校園超市管理系統(tǒng)的實(shí)現(xiàn)流程

    Java 實(shí)戰(zhàn)練手項(xiàng)目之校園超市管理系統(tǒng)的實(shí)現(xiàn)流程

    讀萬(wàn)卷書不如行萬(wàn)里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+SSM+Mysql+Maven+Bootstrap實(shí)現(xiàn)一個(gè)校園超市管理系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平
    2021-11-11
  • Java中的Sort排序問(wèn)題

    Java中的Sort排序問(wèn)題

    這篇文章主要介紹了Java中的Sort排序問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-08-08
  • 使用vscode搭建javaweb項(xiàng)目的詳細(xì)步驟

    使用vscode搭建javaweb項(xiàng)目的詳細(xì)步驟

    我個(gè)人是很喜歡VsCode的,開源免費(fèi)、功能全面,所以為了方便,我把我?guī)缀跛械倪\(yùn)行都集成到了VsCode上來(lái),JavaWeb也不例外,下面這篇文章主要給大家介紹了關(guān)于使用vscode搭建javaweb項(xiàng)目的相關(guān)資料,需要的朋友可以參考下
    2022-11-11
  • mybatis-plus?執(zhí)行insert(),實(shí)體的id自動(dòng)更新問(wèn)題

    mybatis-plus?執(zhí)行insert(),實(shí)體的id自動(dòng)更新問(wèn)題

    這篇文章主要介紹了mybatis-plus?執(zhí)行insert(),實(shí)體的id自動(dòng)更新問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • Java深入學(xué)習(xí)圖形用戶界面GUI之創(chuàng)建窗體

    Java深入學(xué)習(xí)圖形用戶界面GUI之創(chuàng)建窗體

    圖形編程中,窗口是一個(gè)重要的概念,窗口其實(shí)是一個(gè)矩形框,應(yīng)用程序可以使用其從而達(dá)到輸出結(jié)果和接受用戶輸入的效果,學(xué)習(xí)了GUI就讓我們用它來(lái)創(chuàng)建一個(gè)窗體
    2022-05-05
  • java List.of()與Arrays.asList()方法對(duì)比分析

    java List.of()與Arrays.asList()方法對(duì)比分析

    這篇文章主要為大家介紹了java List.of()與Arrays.asList()方法對(duì)比分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-11-11
  • java中final修飾符的使用方法

    java中final修飾符的使用方法

    這篇文章主要為大家詳細(xì)介紹了java中final修飾符的使用方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-07-07

最新評(píng)論