詳解RocketMQ 消費(fèi)端如何監(jiān)聽消息
前言
上一篇文章中我們主要來看RocketMQ消息消費(fèi)者是如何啟動(dòng)的,
那他有一個(gè)步驟是非常重要的,就是啟動(dòng)消息的監(jiān)聽,通過不斷的拉取消息,來實(shí)現(xiàn)消息的監(jiān)聽,那具體怎么做,讓我們我們跟著源碼來學(xué)習(xí)一下~
流程地圖
源碼跟蹤
這一塊的代碼比較多,我自己對(duì)關(guān)鍵點(diǎn)的一些整理,這個(gè)圖我畫的不是很OK
核心模塊(消息拉?。?/h3>
入口:this.pullMessageService.start();
- 執(zhí)行線程池run方法,輪流從pullRequestQueue中獲取PullRequest
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
聲明一個(gè)阻塞隊(duì)列用來存放 PullRequest 對(duì)象
PullRequest 用于消息拉取任務(wù),如果 pullRequestQueue 為空則會(huì)阻塞,直到拉取任務(wù)被放入
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
將 stopped 用volatile來修飾,每次執(zhí)行的時(shí)候都檢測(cè)stopped的狀態(tài),線程只要修改了這個(gè)狀態(tài),其余線程就會(huì)馬上知道
protected volatile boolean stopped = false; @Override public void run() { log.info(this.getServiceName() + " service started"); // 判斷啟動(dòng)狀態(tài) while (!this.isStopped()) { try { // 取出一個(gè)PullRequest對(duì)象 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
- 獲取消費(fèi)隊(duì)列快照,判斷狀態(tài)是否正常,同時(shí)更新最后一次拉取時(shí)間
PullMessageService 從消息服務(wù)器默認(rèn)拉取32條消息,按消息的偏移量順序存放在 ProcessQueue 隊(duì)列
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
入口:org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// 獲取消費(fèi)隊(duì)列快照 final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } // 設(shè)置最后一次拉取時(shí)間 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
- 校驗(yàn)客戶端運(yùn)行狀態(tài)
// 校驗(yàn)狀態(tài) this.makeSureStateOK(); private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } }
如果消費(fèi)者狀態(tài)不正確,則拋出異常,啟動(dòng)定時(shí)線程池過段時(shí)間回收 PullRequest 對(duì)象,以便pullMessageService能及時(shí)喚醒并再次執(zhí)行消息拉取,這個(gè)邏輯在多個(gè)地方使用到了
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } }
public void executePullRequestImmediately(final PullRequest pullRequest) { try { // 最后將pullRequest放入pullRequestQueue中 this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
- 校驗(yàn)消費(fèi)隊(duì)列中的消息數(shù)量和大小是否符合設(shè)置
如果觸發(fā)流量控制,則延遲拉取消息,先將 PullRequest 對(duì)象進(jìn)行回收,以便pullMessageService能及時(shí)喚醒并再次執(zhí)行消息拉取
// 緩存消息條數(shù) long cachedMessageCount = processQueue.getMsgCount().get(); // 緩存消息的大小 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // 當(dāng)隊(duì)列中的消息跳過,超過設(shè)置 則延遲拉取消息 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }
- 根據(jù)主題獲取配置的訂閱關(guān)系
這里通過查詢 subscriptionInner Map容器,利用主題來獲取對(duì)應(yīng)的訂閱關(guān)系,如果沒有找到對(duì)應(yīng)的訂閱關(guān)系,則延遲拉取消息,先將 PullRequest 對(duì)象進(jìn)行回收以便 pullMessageService 能及時(shí)喚醒并再次執(zhí)行消息拉取
protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; }
- 如果為集群模式,則從內(nèi)存中讀取位置
通過消費(fèi)者啟動(dòng)的模塊中,我們知道RocketMQ是根據(jù)不同模式,將消息進(jìn)度存儲(chǔ)在不同的地方
廣播模式:消息進(jìn)度存儲(chǔ)在本地文件
集群模式:消息進(jìn)度存儲(chǔ)在Broker 服務(wù)器上
boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { // 從內(nèi)存中讀取位置 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } }
- 內(nèi)核中拉取消息(最重要的模塊)
入口:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}
我們看到他有非常多的參數(shù)
拉取流程
- 通過BrokerName找到對(duì)應(yīng)的Broker
// step 1 通過BrokerName找到對(duì)應(yīng)的Broker FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
- 如果沒有找到對(duì)應(yīng)的,則更新路由信息
// step 2 如果沒有找到對(duì)應(yīng)的,則更新路由信息 if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); }
- 檢查Broker版本和Tag信息
// check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); }
- 設(shè)置PullMessageRequestHeader
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType);
- 調(diào)用pullMessage方法拉取消息,返回拉取結(jié)果
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
因?yàn)?CommunicationMode 傳遞的是ASYNC,我們著重來看一下這個(gè)方法
入口: org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
調(diào)用 this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()
這里我們就先不細(xì)看了
拉取消息處理
- 如果PullCallback回調(diào)成功,則對(duì)結(jié)果進(jìn)行處理
// 處理pullResult數(shù)據(jù) pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
主要做了三件事,轉(zhuǎn)換消息格式、設(shè)置消息信息、放入msgFoundList
將pullResult 轉(zhuǎn)成 PullResultExt,轉(zhuǎn)換消息格式為L(zhǎng)ist
PullResultExt pullResultExt = (PullResultExt) pullResult; // 轉(zhuǎn)換消息格式為L(zhǎng)ist ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
執(zhí)行消息過濾,匹配符合的tag
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } }
設(shè)置消息的transactionId、擴(kuò)展屬性、BrokerName名稱,放入List中
for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); msg.setBrokerName(mq.getBrokerName()); } pullResultExt.setMsgFoundList(msgListFilterAgain);
當(dāng)pullStatus為FOUND,消息進(jìn)行提交消費(fèi)的請(qǐng)求
- 獲取第一條消息的offset(偏移量)
// 獲取第一條消息的offset firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
- 將讀取消息List,更新到processQueue的TreeMap里面
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
主要做了兩件事,循環(huán)讀取消息list,存入msgTreeMap和計(jì)算此次讀取信息偏移量
public boolean putMessage(final List<MessageExt> msgs) { boolean dispatchToConsume = false; try { // 上鎖 this.treeMapLock.writeLock().lockInterruptibly(); try { int validMsgCnt = 0; // 循環(huán)讀取消息list,存入msgTreeMap for (MessageExt msg : msgs) { MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); msgSize.addAndGet(msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); if (!msgTreeMap.isEmpty() && !this.consuming) { dispatchToConsume = true; this.consuming = true; } if (!msgs.isEmpty()) { // 獲取最后一條消息 MessageExt messageExt = msgs.get(msgs.size() - 1); // 獲取最大偏移量 String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); ... } } finally { this.treeMapLock.writeLock().unlock(); } } ... }
- 提交消費(fèi)請(qǐng)求,消息提交到內(nèi)部的線程池
// 提交消費(fèi)請(qǐng)求,消息提交到內(nèi)部的線程池 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#submitConsumeRequest
獲取 ConsumeRequest對(duì)象,拿到當(dāng)前主題的監(jiān)聽器
這里拿到的監(jiān)聽器,就是我們?cè)趩?dòng)消費(fèi)者的時(shí)候所注冊(cè)的,監(jiān)聽到消息后執(zhí)行相關(guān)的業(yè)務(wù)邏輯
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ... return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
在這里觸發(fā)我們?cè)谝婚_始重寫的consumeMessage方法,這里msgs用Collections.unmodifiableList進(jìn)行包裝,意思就是不可以修改的,是一個(gè)只讀的List
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- ProcessQueue中移除已經(jīng)處理的消息,同時(shí)更新Offset位置
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; ... } switch (this.defaultMQPushConsumer.getMessageModel()) { ... case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } // 如果存在失敗消息,則過5秒在定時(shí)執(zhí)行 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; ... } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); // 更新Offset位置 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore() .updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
- 最后pullRequest放入pullRequestQueue中
入口:
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
消息消費(fèi)進(jìn)度提交
- 成功消費(fèi)一條消息后,更新本地緩存表
- 每5s向Broker提交消息消費(fèi)進(jìn)度
- Broker每5s將進(jìn)度持久化到consumerOffset.json
總結(jié)
目前只是將整體的一個(gè)消費(fèi)端監(jiān)聽消息的流程了解清楚,里面還有許多細(xì)節(jié)需要去推敲~
以上就是詳解RocketMQ 消費(fèi)端如何監(jiān)聽消息的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 消費(fèi)端監(jiān)聽消息的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot實(shí)現(xiàn)圖片上傳及本地訪問
在SpringBoot項(xiàng)目中,處理靜態(tài)文件訪問尤其是實(shí)時(shí)更新的文件如商品圖片,可通過配置WebMvcConfig將本地文件映射到URL路徑上,以解決重啟項(xiàng)目才能訪問文件的問題,本文詳解如何保存和訪問這些文件,幫助開發(fā)者優(yōu)化項(xiàng)目文件管理2022-09-09springboot+chatgpt+chatUI Pro開發(fā)智能聊天工具的實(shí)踐
本文主要介紹了springboot+chatgpt+chatUI Pro開發(fā)智能聊天工具的實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04JSONObject?toJSONString錯(cuò)誤的解決
這篇文章主要介紹了JSONObject?toJSONString錯(cuò)誤的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02java基于Apache FTP點(diǎn)斷續(xù)傳的文件上傳和下載
本篇文章主要介紹了java基于Apache FTP點(diǎn)斷續(xù)傳的文件上傳和下載,利用FTP實(shí)現(xiàn)文件的上傳和下載,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2016-11-11SpringBoot利用隨機(jī)鹽值實(shí)現(xiàn)密碼的加密與驗(yàn)證
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何利用隨機(jī)鹽值實(shí)現(xiàn)密碼的加密與驗(yàn)證,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考下2024-02-02