欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解RocketMQ 消費端如何監(jiān)聽消息

 更新時間:2022年12月15日 14:51:30   作者:小郭的技術筆記  
這篇文章主要為大家介紹了RocketMQ 消費端如何監(jiān)聽消息示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

上一篇文章中我們主要來看RocketMQ消息消費者是如何啟動的,

那他有一個步驟是非常重要的,就是啟動消息的監(jiān)聽,通過不斷的拉取消息,來實現(xiàn)消息的監(jiān)聽,那具體怎么做,讓我們我們跟著源碼來學習一下~

流程地圖

源碼跟蹤

這一塊的代碼比較多,我自己對關鍵點的一些整理,這個圖我畫的不是很OK

核心模塊(消息拉取)

入口:this.pullMessageService.start();

  • 執(zhí)行線程池run方法,輪流從pullRequestQueue中獲取PullRequest

org.apache.rocketmq.client.impl.consumer.PullMessageService#run

聲明一個阻塞隊列用來存放 PullRequest 對象

PullRequest 用于消息拉取任務,如果 pullRequestQueue 為空則會阻塞,直到拉取任務被放入

private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();

將 stopped 用volatile來修飾,每次執(zhí)行的時候都檢測stopped的狀態(tài),線程只要修改了這個狀態(tài),其余線程就會馬上知道

protected volatile boolean stopped = false;
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // 判斷啟動狀態(tài)
    while (!this.isStopped()) {
        try {
            // 取出一個PullRequest對象
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}
  • 獲取消費隊列快照,判斷狀態(tài)是否正常,同時更新最后一次拉取時間

PullMessageService 從消息服務器默認拉取32條消息,按消息的偏移量順序存放在 ProcessQueue 隊列

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

// 獲取消費隊列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}
// 設置最后一次拉取時間
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
  • 校驗客戶端運行狀態(tài)
// 校驗狀態(tài)
this.makeSureStateOK();
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The consumer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}

如果消費者狀態(tài)不正確,則拋出異常,啟動定時線程池過段時間回收 PullRequest 對象,以便pullMessageService能及時喚醒并再次執(zhí)行消息拉取,這個邏輯在多個地方使用到了

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        // 最后將pullRequest放入pullRequestQueue中
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}
  • 校驗消費隊列中的消息數(shù)量和大小是否符合設置

如果觸發(fā)流量控制,則延遲拉取消息,先將 PullRequest 對象進行回收,以便pullMessageService能及時喚醒并再次執(zhí)行消息拉取

// 緩存消息條數(shù)
long cachedMessageCount = processQueue.getMsgCount().get();
// 緩存消息的大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 當隊列中的消息跳過,超過設置 則延遲拉取消息
if (cachedMessageCount &gt; this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
  • 根據(jù)主題獲取配置的訂閱關系

這里通過查詢 subscriptionInner Map容器,利用主題來獲取對應的訂閱關系,如果沒有找到對應的訂閱關系,則延遲拉取消息,先將 PullRequest 對象進行回收以便 pullMessageService 能及時喚醒并再次執(zhí)行消息拉取

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}
  • 如果為集群模式,則從內(nèi)存中讀取位置

通過消費者啟動的模塊中,我們知道RocketMQ是根據(jù)不同模式,將消息進度存儲在不同的地方

廣播模式:消息進度存儲在本地文件

集群模式:消息進度存儲在Broker 服務器上

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    // 從內(nèi)存中讀取位置
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}
  • 內(nèi)核中拉取消息(最重要的模塊)

入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl

public PullResult pullKernelImpl(
    final MessageQueue mq,
    final String subExpression,
    final String expressionType,
    final long subVersion,
    final long offset,
    final int maxNums,
    final int sysFlag,
    final long commitOffset,
    final long brokerSuspendMaxTimeMillis,
    final long timeoutMillis,
    final CommunicationMode communicationMode,
    final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

我們看到他有非常多的參數(shù)

拉取流程

  • 通過BrokerName找到對應的Broker
// step 1 通過BrokerName找到對應的Broker
FindBrokerResult findBrokerResult =
    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
        this.recalculatePullFromWhichNode(mq), false);
  • 如果沒有找到對應的,則更新路由信息
// step 2 如果沒有找到對應的,則更新路由信息
if (null == findBrokerResult) {
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
    findBrokerResult =
        this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
            this.recalculatePullFromWhichNode(mq), false);
}
  • 檢查Broker版本和Tag信息
// check version
if (!ExpressionType.isTagType(expressionType)
    &amp;&amp; findBrokerResult.getBrokerVersion() &lt; MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
  • 設置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
  • 調(diào)用pullMessage方法拉取消息,返回拉取結果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);

因為 CommunicationMode 傳遞的是ASYNC,我們著重來看一下這個方法

入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync

調(diào)用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()

這里我們就先不細看了

拉取消息處理

  • 如果PullCallback回調(diào)成功,則對結果進行處理
// 處理pullResult數(shù)據(jù)
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
    subscriptionData);

主要做了三件事,轉(zhuǎn)換消息格式、設置消息信息、放入msgFoundList

將pullResult 轉(zhuǎn)成 PullResultExt,轉(zhuǎn)換消息格式為List

PullResultExt pullResultExt = (PullResultExt) pullResult;
// 轉(zhuǎn)換消息格式為List
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

執(zhí)行消息過濾,匹配符合的tag

if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

設置消息的transactionId、擴展屬性、BrokerName名稱,放入List中

for (MessageExt msg : msgListFilterAgain) {
    String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (Boolean.parseBoolean(traFlag)) {
        msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    }
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
        Long.toString(pullResult.getMinOffset()));
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
        Long.toString(pullResult.getMaxOffset()));
    msg.setBrokerName(mq.getBrokerName());
}
pullResultExt.setMsgFoundList(msgListFilterAgain);

當pullStatus為FOUND,消息進行提交消費的請求

  • 獲取第一條消息的offset(偏移量)
// 獲取第一條消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  • 將讀取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());

主要做了兩件事,循環(huán)讀取消息list,存入msgTreeMap和計算此次讀取信息偏移量

public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        // 上鎖
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // 循環(huán)讀取消息list,存入msgTreeMap
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    msgSize.addAndGet(msg.getBody().length);
                }
            }
            msgCount.addAndGet(validMsgCnt);
            if (!msgTreeMap.isEmpty() && !this.consuming) {
                dispatchToConsume = true;
                this.consuming = true;
            }
            if (!msgs.isEmpty()) {
                // 獲取最后一條消息
                MessageExt messageExt = msgs.get(msgs.size() - 1);
                // 獲取最大偏移量
                String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                ...
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    }
    ...
}
  • 提交消費請求,消息提交到內(nèi)部的線程池
// 提交消費請求,消息提交到內(nèi)部的線程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest

獲取 ConsumeRequest對象,拿到當前主題的監(jiān)聽器

這里拿到的監(jiān)聽器,就是我們在啟動消費者的時候所注冊的,監(jiān)聽到消息后執(zhí)行相關的業(yè)務邏輯

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
               ...
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

在這里觸發(fā)我們在一開始重寫的consumeMessage方法,這里msgs用Collections.unmodifiableList進行包裝,意思就是不可以修改的,是一個只讀的List

ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  • ProcessQueue中移除已經(jīng)處理的消息,同時更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        if (consumeRequest.getMsgs().isEmpty())
            return;
        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            ...
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            ...
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }
                // 如果存在失敗消息,則過5秒在定時執(zhí)行
                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
                ...
        }
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        // 更新Offset位置  
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore()
            .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • 最后pullRequest放入pullRequestQueue中

入口:

org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately

DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

消息消費進度提交

  • 成功消費一條消息后,更新本地緩存表
  • 每5s向Broker提交消息消費進度
  • Broker每5s將進度持久化到consumerOffset.json

總結

目前只是將整體的一個消費端監(jiān)聽消息的流程了解清楚,里面還有許多細節(jié)需要去推敲~

以上就是詳解RocketMQ 消費端如何監(jiān)聽消息的詳細內(nèi)容,更多關于RocketMQ 消費端監(jiān)聽消息的資料請關注腳本之家其它相關文章!

相關文章

  • SpringBoot實現(xiàn)圖片上傳及本地訪問

    SpringBoot實現(xiàn)圖片上傳及本地訪問

    在SpringBoot項目中,處理靜態(tài)文件訪問尤其是實時更新的文件如商品圖片,可通過配置WebMvcConfig將本地文件映射到URL路徑上,以解決重啟項目才能訪問文件的問題,本文詳解如何保存和訪問這些文件,幫助開發(fā)者優(yōu)化項目文件管理
    2022-09-09
  • RabbitMQ單機版部署安裝過程

    RabbitMQ單機版部署安裝過程

    RabbitMQ 是一個由 Erlang 語言開發(fā)的 AMQP 的開源實現(xiàn),在實現(xiàn)過程中需要注意由于rabbitmq是基于erlang語言開發(fā)的,所以必須先安裝erlang,本文給大家介紹的非常詳細,感興趣的朋友一起看看吧
    2022-03-03
  • Java的函數(shù)方法詳解(含漢諾塔問題)

    Java的函數(shù)方法詳解(含漢諾塔問題)

    漢諾塔問題是一個經(jīng)典的遞歸問題,下面這篇文章主要給大家介紹了關于Java函數(shù)方法(含漢諾塔問題)的相關資料,文中通過圖文以及代碼示例介紹的非常詳細,需要的朋友可以參考下
    2023-11-11
  • mybatis中resulthandler的用法

    mybatis中resulthandler的用法

    這篇文章主要介紹了mybatis中resulthandler的用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • java中常見的6種線程池示例詳解

    java中常見的6種線程池示例詳解

    這篇文章主要介紹了java中常見的6種線程池示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • 解決IDEA2020控制臺亂碼的方法

    解決IDEA2020控制臺亂碼的方法

    這篇文章主要介紹了解決IDEA2020控制臺亂碼的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-07-07
  • springboot+chatgpt+chatUI Pro開發(fā)智能聊天工具的實踐

    springboot+chatgpt+chatUI Pro開發(fā)智能聊天工具的實踐

    本文主要介紹了springboot+chatgpt+chatUI Pro開發(fā)智能聊天工具的實踐,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-04-04
  • JSONObject?toJSONString錯誤的解決

    JSONObject?toJSONString錯誤的解決

    這篇文章主要介紹了JSONObject?toJSONString錯誤的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • java基于Apache FTP點斷續(xù)傳的文件上傳和下載

    java基于Apache FTP點斷續(xù)傳的文件上傳和下載

    本篇文章主要介紹了java基于Apache FTP點斷續(xù)傳的文件上傳和下載,利用FTP實現(xiàn)文件的上傳和下載,具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2016-11-11
  • SpringBoot利用隨機鹽值實現(xiàn)密碼的加密與驗證

    SpringBoot利用隨機鹽值實現(xiàn)密碼的加密與驗證

    這篇文章主要為大家詳細介紹了SpringBoot如何利用隨機鹽值實現(xiàn)密碼的加密與驗證,文中的示例代碼講解詳細,有需要的小伙伴可以參考下
    2024-02-02

最新評論