RocketMQ消息拉取過程詳解
前言
在上一篇文章中,我們講述了DefaultMQPushConsumer
拉消息的原理,它是通過重平衡觸發(fā)pullRequest
的創(chuàng)建,通過阻塞隊(duì)列作為pullRequest
的存儲(chǔ)容器,另一端通過定時(shí)任務(wù)從阻塞隊(duì)列中取出pullRequest
來向Broker
發(fā)送拉消息的請(qǐng)求,無論消息拉取成功還是失敗,都會(huì)重新把pullRequest
放回阻塞隊(duì)列中,這樣就能保證持續(xù)不斷地向Broker
拉消息了;
今天這篇文章我們繼續(xù)講述DefaultLitePullConsumer
是如何實(shí)現(xiàn)消息拉取的;
DefaultLitePullConsumer拉消息代碼示例
我們?cè)谑褂?code>DefaultLitePullConsumer時(shí)都是主動(dòng)去poll
消息,并不是像DefaultMQPushConsumer
那樣設(shè)置一個(gè)消息監(jiān)聽器:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(group); consumer.setNamesrvAddr(nameSrv); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(topic, subExpression); try { consumer.start(); } catch (Exception e) { e.printStackTrace(); } try { while (true) { List<MessageExt> messageExts = consumer.poll(5000); // 處理業(yè)務(wù)邏輯 System.out.println("消息數(shù)量:" + messageExts.size()); System.out.println("消息內(nèi)容:"); for (MessageExt messageExt : messageExts) { System.out.println(new String(messageExt.getBody(), StandardCharsets.UTF_8)); } } } catch (Exception e) { e.printStackTrace(); }
一般拿到消息后都會(huì)交給業(yè)務(wù)線程池去處理,上述代碼我只簡單地打印了一下消息內(nèi)容;
消息消費(fèi)
跟著poll()
方法,我們最終定位到DefaultLitePullConsumerImpl.poll()
這個(gè)方法:
public synchronized List<MessageExt> poll(long timeout) { try { this.checkServiceState(); if (timeout < 0L) { throw new IllegalArgumentException("Timeout must not be negative"); } ? if (this.defaultLitePullConsumer.isAutoCommit()) { this.maybeAutoCommit(); } ? long endTime = System.currentTimeMillis() + timeout; // 從阻塞隊(duì)列中取ConsumeRequest DefaultLitePullConsumerImpl.ConsumeRequest consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() > 0L) { while(consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = (DefaultLitePullConsumerImpl.ConsumeRequest)this.consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() <= 0L) { break; } } } ? if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { List<MessageExt> messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); // 取到消息后直接更新消費(fèi)點(diǎn)位 this.assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); this.resetTopic(messages); // 下面是調(diào)用consumeMessageHook if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(this.defaultLitePullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(consumeRequest.getMessageQueue()); consumeMessageContext.setMsgList(messages); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); // 默認(rèn)是消費(fèi)成功 consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } ? return messages; } } catch (InterruptedException var10) { } ? return Collections.emptyList(); }
1.直接從阻塞隊(duì)列consumeRequestCache
中取出消息對(duì)象ConsumeRequest
,這里面就包含了消息內(nèi)容;
2.取出來后直接更新消費(fèi)點(diǎn)位,默認(rèn)為此次消息消費(fèi)成功;
這里跟DefaultMQPushConsumer
不同的是,DefaultLitePullConsumerImpl.poll()
默認(rèn)的是消息消費(fèi)一定成功,如果消費(fèi)失敗的話,需要開發(fā)人員自己處理,消費(fèi)失敗的消息不會(huì)再次發(fā)送給消費(fèi)者;
那么咱們的疑問就出來了,poll()
方法光顧著從consumeRequestCache
中取消息,那消息是啥時(shí)候放進(jìn)去的呢?
消息拉取入口
我們可以重新了解一下消費(fèi)者重平衡過程,在MessageQueue
分配完畢后,會(huì)對(duì)比被分配的MessageQueue
是否和分配前的不一致,大部分情況下是會(huì)發(fā)生改變的,那么就會(huì)觸發(fā)messageQueueChanged()
方法的調(diào)用:
@Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { // 取出所有的MessageQueueListener MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener(); if (messageQueueListener != null) { try { // 依次調(diào)用messageQueueChanged方法 messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } }
MessageQueueListener
是什么時(shí)候被放進(jìn)去的呢?可以看一下subscribe()
方法:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { try { if (topic == null || "".equals(topic)) { throw new IllegalArgumentException("Topic can not be null or empty."); } setSubscriptionType(SubscriptionType.SUBSCRIBE); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); // 每個(gè)subscribe()都會(huì)設(shè)置MessageQueueListener this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); if (serviceState == ServiceState.RUNNING) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); updateTopicSubscribeInfoWhenSubscriptionChanged(); } } catch (Exception e) { throw new MQClientException("subscribe exception", e); } }
可以發(fā)現(xiàn),每個(gè)subscribe()都會(huì)設(shè)置MessageQueueListener
,MessageQueueListenerImpl
里面只干了一件事情:更新MessageQueue
并且創(chuàng)建pullTask
;
class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { updateAssignQueueAndStartPullTask(topic, mqAll, mqDivided); } } ? public void updateAssignQueueAndStartPullTask(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); switch (messageModel) { case BROADCASTING: updateAssignedMessageQueue(topic, mqAll); // 更新拉消息任務(wù) updatePullTask(topic, mqAll); break; case CLUSTERING: updateAssignedMessageQueue(topic, mqDivided); // 更新拉消息任務(wù) updatePullTask(topic, mqDivided); break; default: break; } }
現(xiàn)在終于快找到這個(gè)消息拉取的入口了:
private void startPullTask(Collection<MessageQueue> mqSet) { for (MessageQueue messageQueue : mqSet) { if (!this.taskTable.containsKey(messageQueue)) { // 創(chuàng)建消息拉取任務(wù) PullTaskImpl pullTask = new PullTaskImpl(messageQueue); this.taskTable.put(messageQueue, pullTask); // 這個(gè)就是任務(wù)執(zhí)行的入口 this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); } } }
消息拉取的入口尋找起來還是有點(diǎn)困難的,但是主要思路還是從【重平衡】開始,另外就是觸發(fā)了MessageQueueListener
,此時(shí)才會(huì)創(chuàng)建pullTask
;
雖然DefaultMQPushConsumer
也是【重平衡】觸發(fā)pullRequest
的創(chuàng)建,但是它是將pullRequest
放進(jìn)阻塞隊(duì)列,另一端由消息拉取任務(wù)去取pullRequest
向Broker
發(fā)送請(qǐng)求;而DefaultLitePullConsumer
是直接創(chuàng)建pullTask
去拉消息;
PullTaskImpl拉消息
很顯然,PullTaskImpl
就是一個(gè)Runnable
,那么最重要的就是它的run()
方法,這個(gè)方法就是負(fù)責(zé)從Broker
拉消息并放進(jìn)consumeRequestCache
阻塞隊(duì)列中,這樣poll()
方法才能從consumeRequestCache
阻塞隊(duì)列中取到消息;
messageQueue
暫停
if (DefaultLitePullConsumerImpl.this.assignedMessageQueue.isPaused(this.messageQueue)) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 1000L, TimeUnit.MILLISECONDS); DefaultLitePullConsumerImpl.this.log.debug("Message Queue: {} has been paused!", this.messageQueue); return; }
如果messageQueue
處于暫停狀態(tài),那么延遲1秒重新執(zhí)行這個(gè)任務(wù);
ProcessQueue
被移除
ProcessQueue processQueue = DefaultLitePullConsumerImpl.this.assignedMessageQueue.getProcessQueue(this.messageQueue); if (null == processQueue || processQueue.isDropped()) { DefaultLitePullConsumerImpl.this.log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; }
如果processQueue
不存在或者已經(jīng)被移除了,那么這個(gè)任務(wù)也不用執(zhí)行了;
- 流量控制
if ((long)DefaultLitePullConsumerImpl.this.consumeRequestCache.size() * (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize() > DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForAll()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", DefaultLitePullConsumerImpl.this.consumeRequestCache.size(), DefaultLitePullConsumerImpl.this.consumeRequestFlowControlTimes); } return; }
如果consumeRequestCache
中的消息數(shù)量超過了PullThresholdForAll
閾值,那么觸發(fā)限流機(jī)制,當(dāng)前任務(wù)將不會(huì)繼續(xù)拉消息,并且50毫秒后才會(huì)重新執(zhí)行該任務(wù);
long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L; // 單個(gè)processQueue上面消息數(shù)量限制 if (cachedMessageCount > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes}); } return; } // 單個(gè)processQueue中消息總大小限制 if (cachedMessageSizeInMiB > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", new Object[]{DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, DefaultLitePullConsumerImpl.this.queueFlowControlTimes}); } return; }
- 如果當(dāng)前
processQueue
中消息的數(shù)量大于PullThresholdForQueue
閾值,也同樣觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不再執(zhí)行,50毫秒后重新執(zhí)行該任務(wù); - 如果當(dāng)前
processQueue
中消息的總大小超過PullThresholdSizeForQueue
(單位:MB
)閾值,將觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不再執(zhí)行,50毫秒后重新執(zhí)行該任務(wù);
if (processQueue.getMaxSpan() > (long)DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getConsumeMaxSpan()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 50L, TimeUnit.MILLISECONDS); if (DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) { DefaultLitePullConsumerImpl.this.log.warn("The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), DefaultLitePullConsumerImpl.this.queueMaxSpanFlowControlTimes}); } return; }
如果processQueue
中的maxSpan
大于消費(fèi)者的ConsumeMaxSpan
,也就是第一個(gè)消息與最后一個(gè)消息的點(diǎn)位偏差大于ConsumeMaxSpan
(默認(rèn)是2000),將觸發(fā)限流機(jī)制,當(dāng)前任務(wù)不執(zhí)行,50毫秒后重新執(zhí)行該任務(wù);
- 計(jì)算拉取點(diǎn)位
long offset = 0L; try { offset = DefaultLitePullConsumerImpl.this.nextPullOffset(this.messageQueue); } catch (Exception var17) { DefaultLitePullConsumerImpl.this.log.error("Failed to get next pull offset", var17); DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, 3000L, TimeUnit.MILLISECONDS); return; }
計(jì)算消息拉取的點(diǎn)位,如果產(chǎn)生異常,那么簡隔3秒后再來重新開始任務(wù);
- 拉消息
PullResult pullResult = DefaultLitePullConsumerImpl.this.pull(this.messageQueue, subscriptionData, offset, DefaultLitePullConsumerImpl.this.defaultLitePullConsumer.getPullBatchSize());
這個(gè)就是發(fā)請(qǐng)求給Broker
拉消息;
- 放進(jìn)消息緩存區(qū)
switch(pullResult.getPullStatus()) { case FOUND: Object objLock = DefaultLitePullConsumerImpl.this.messageQueueLock.fetchLockObject(this.messageQueue); synchronized(objLock) { if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && DefaultLitePullConsumerImpl.this.assignedMessageQueue.getSeekOffset(this.messageQueue) == -1L) { processQueue.putMessage(pullResult.getMsgFoundList()); DefaultLitePullConsumerImpl.this.submitConsumeRequest(DefaultLitePullConsumerImpl.this.new ConsumeRequest(pullResult.getMsgFoundList(), this.messageQueue, processQueue)); } break; }
找到消息的情況下,將調(diào)用submitConsumeRequest()
方法把消息放進(jìn)阻塞隊(duì)列中,等待poll()
方法來消費(fèi);
- 重新開啟拉取任務(wù)
if (!this.isCancelled()) { DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS); } else { DefaultLitePullConsumerImpl.this.log.warn("The Pull Task is cancelled after doPullTask, {}", this.messageQueue); }
如果當(dāng)前任務(wù)還沒有被取消的話,那么重新開啟下一個(gè)輪回,準(zhǔn)備下一次消息拉??;
以上就是RocketMQ消息拉取過程詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ消息拉取過程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實(shí)現(xiàn)Kruskal算法的示例代碼
Kruskal算法是一種用來尋找最小生成樹的算法,由Joseph Kruskal在1956年發(fā)表。用來解決同樣問題的還有Prim算法和Boruvka算法等。本文將介紹用Java語言實(shí)現(xiàn)Kruskal算法的示例代碼,需要的可以參考一下2022-07-07java在linux系統(tǒng)下開機(jī)啟動(dòng)無法使用sudo命令的原因及解決辦法
每次開機(jī)自動(dòng)啟動(dòng)的java進(jìn)程,頁面上的關(guān)機(jī)按鈕都無法實(shí)現(xiàn)關(guān)機(jī)功能,但是此時(shí)如果以chb賬號(hào)通過ssh登錄該服務(wù)器,手動(dòng)殺掉tomcat進(jìn)程,然后再重新啟動(dòng)tomcat,頁面上的關(guān)機(jī)按鈕就有效了2013-08-08Java中使用回調(diào)函數(shù)的方法實(shí)例
本文主要介紹了Java中使用回調(diào)函數(shù)的方法實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05分析java中全面的單例模式多種實(shí)現(xiàn)方式
單例模式是一種常用的軟件設(shè)計(jì)模式,單例對(duì)象的類只能允許一個(gè)實(shí)例存在。許多時(shí)候整個(gè)系統(tǒng)只需要擁有一個(gè)的全局對(duì)象,有利于協(xié)調(diào)系統(tǒng)整體的行為。比如在某個(gè)服務(wù)器程序中,該服務(wù)器的配置信息存放在一個(gè)文件中。本文將介紹它的思想和多種實(shí)現(xiàn)方式2021-06-06Springboot 讀取 yml 配置文件里的參數(shù)值
本文主要介紹了Springboot 讀取 yml 配置文件里的參數(shù)值,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12