RocketMQ?offset確認(rèn)機(jī)制示例詳解
消息存儲(chǔ)

offset?

本文要探討的offset指的是上圖中的Queue Offset。
為了保存消費(fèi)的消費(fèi)進(jìn)度,避免重復(fù)消費(fèi),我們需要將offset保存下來。
針對(duì)集群消費(fèi),offset保存在broker,在客戶端使用RemoteBrokerOffsetStore。
針對(duì)廣播消費(fèi),offset保存在本地,在客戶端使用LocalFileOffsetStore。
最后,比較重要的一點(diǎn)是,保存的offset指的是下一條消息的offset,而不是消費(fèi)完最后一條消息的offset。
比如,你消費(fèi)了上圖中第一個(gè)Queue的offset為0的消息,其實(shí)保存的offset為1,表示下次我從offset=1的位置進(jìn)行消費(fèi)。
broker端
在broker端,通過ConsumerOffsetManager中的offsetTable來保存Topic下各個(gè)ConsumerGroup的消費(fèi)進(jìn)度。
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);從offsetTable的雙層Map結(jié)構(gòu)也是能夠看出,我上面說的消費(fèi)進(jìn)度,細(xì)指為ConsumerGroup在Topic下每個(gè)queue的消費(fèi)進(jìn)度。
offsetTable畢竟只是內(nèi)存結(jié)構(gòu),因此ConsumerOffsetManager繼承了ConfigManager實(shí)現(xiàn)了持久化功能。
實(shí)現(xiàn)了encode,decode,configFilePath三個(gè)模板方法。用于指定序列化,反序列化的邏輯以及保存位置
public String encode() {
return this.encode(false);
}
@Override
public String configFilePath() {
return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}其中序列化,反序列化的邏輯很簡(jiǎn)單,就是使用到了我們的FastJson。
保存文件名為consumerOffset.json。
offset加載
broker啟動(dòng)時(shí)從本地文件加載
org.apache.rocketmq.broker.BrokerController#initialize
result = result && this.consumerOffsetManager.load();
offset持久化
定時(shí)觸發(fā),持久化到磁盤
org.apache.rocketmq.broker.BrokerController#initialize
//定期將consumeroffset持久化到磁盤
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);- shutdown觸發(fā)
BrokerController#shutdown
offset更新
- ConsumerManageProcessor#updateConsumerOffset
用于consumer定時(shí)同步offset
- PullMessageProcessor
拉取消息時(shí)會(huì)順帶確認(rèn)offset
- TransactionalMessageBridge
事務(wù)回查觸發(fā),暫不深入研究
consumer端
本文只討論P(yáng)USH模式的集群消費(fèi),本地的offset緩存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();因?yàn)閏onsumer每次重啟都會(huì)重新拉取offset,只是一個(gè)臨時(shí)存儲(chǔ),因此RemoteBrokerOffsetStore的offsetTable的設(shè)計(jì)沒有像ConsumerOffsetManager那么復(fù)雜。
offset拉取
consumer啟動(dòng)后會(huì)進(jìn)行第一次rebalance,并且之后都會(huì)定期rebalance。
在rebalance分配好messagequeue之后,會(huì)根據(jù)messagequeue生成processqueue進(jìn)行消息拉取。
而在進(jìn)行消息拉取前,有一個(gè)關(guān)鍵的操作,拉取對(duì)應(yīng)messagequeue的offset。
RebalanceImpl#updateProcessQueueTableInRebalance
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//如果是順序消費(fèi) 但是lock失敗 那么跳過
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//從offsetstore刪除之前的數(shù)據(jù),可能之前有一段時(shí)間屬于該消費(fèi)者
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 獲取該mq應(yīng)該從哪里開始消費(fèi)
// pull模式 默認(rèn)是0
// push模式 動(dòng)態(tài)計(jì)算
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//創(chuàng)建新的pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}其中獲取消息拉取初始位置有三種策略
CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第一個(gè)offset
CONSUME_FROM_TIMESTAMP 根據(jù)時(shí)間戳獲取offset
但是從源碼中可以看出來,實(shí)際上的邏輯和我們想象的有點(diǎn)不同,上面三個(gè)的邏輯的觸發(fā)前提是,從broker拉取不到offset進(jìn)度。
這應(yīng)該是為了防止重復(fù)消費(fèi)以及少消費(fèi),畢竟rocketmq是業(yè)務(wù)相關(guān)的mq。
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}offset更新
在consumer端,針對(duì)offsetTable的更新,當(dāng)然通過消費(fèi)消息觸發(fā)。
并發(fā)消費(fèi)
ConsumeMessageConcurrentlyService#processConsumeResult
//removeMessage會(huì)返回offset
//不管消費(fèi)成功還是失敗 都會(huì)確認(rèn)offset
//失敗的消息會(huì)在重試topic
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//更新offsetstore
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
//increaseOnly 表示offset只能增加
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}針對(duì)并發(fā)消費(fèi)的offset,更新值來源于ProcessQueue#removeMessage方法
/**
* 移除pq中的msg,并且返回剩下第一條消息的offset
* 如果消息全部消費(fèi)完,返回this.queueOffsetMax + 1
* 如果msgs為空 返回-1
* @param msgs
* @return
*/
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
//通過map的key移除,也就是queueoffset
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
//如果消息沒有全部消費(fèi)完畢,剩下消息中最小的那個(gè)
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}removeMessage的邏輯,用到了滑動(dòng)窗口的算法。
比如10條消息,offset為 0 - 9。
在多線程并發(fā)消費(fèi)的場(chǎng)景下
比如我第一個(gè)線程消費(fèi)了offset為0的消息,那么offsetTable中的offset更新為1
然后我第二個(gè)線程消費(fèi)了offset為5的消息,removeMessage返回的offset還是為1
只有前面的消息全被消費(fèi)了,窗口才會(huì)滑動(dòng)
順序消費(fèi)
ConsumeMessageOrderlyService#processConsumeResult
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}順序消費(fèi),暫不研究。
offset持久化
最終的offset以broker為準(zhǔn),因此本地的offset要定期持久化到offset。
主要持久化邏輯在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
public void persist(MessageQueue mq) {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
}
}
persistAll和persist邏輯大致相同,核心邏輯都是通過updateConsumeOffsetToBroker持久化到broker。
觸發(fā)持久化邏輯的時(shí)機(jī)有以下4個(gè)
定時(shí)同步
MQClientInstance#startScheduledTask
//定時(shí)同步offset到broker
//除了拉消息的時(shí)候會(huì)同步下 也有定時(shí)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//持久化消費(fèi)進(jìn)度
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
shutdown
DefaultMQPullConsumerImpl#shutdown
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.persistConsumerOffset();//here
this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
DefaultMQPushConsumerImpl#shutdown
public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.persistConsumerOffset();//here
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}
Rebalance
當(dāng)一個(gè)queue不再屬于當(dāng)前consumer的時(shí)候,需要同步進(jìn)步給broker,以便于新拿到queue的consumer從最新未消費(fèi)的消息開始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
return true;
}
拉取消息
拉取消息的時(shí)候會(huì)順帶commit offset
DefaultMQPushConsumerImpl#pullMessage
// 拉取消息的時(shí)候順帶ack消息進(jìn)度
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
//READ_FROM_MEMORY 獲取本地緩存的消費(fèi)進(jìn)度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),//需要拉取的下個(gè)offset ,拉取到了 ,不代表被消費(fèi)到
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,//確認(rèn)的offset
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC, //異步拉取消息
pullCallback //注意這個(gè)回調(diào)也傳過去了
);
PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend;
//需要hasCommitOffsetFlag=true
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
//需要當(dāng)前broker是master
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
//這邊應(yīng)該是保存改group在該topic下面的消費(fèi)進(jìn)度
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
總結(jié)
- 針對(duì)集群消費(fèi)offset保存在broker,針對(duì)廣播消費(fèi)offset保存在本地
- offset保存的值為消息下次開始消費(fèi)的位置,而不是上次消費(fèi)結(jié)束的位置
- 針對(duì)offset的提交采取基于TreeMap的滑動(dòng)窗口機(jī)制
- 消費(fèi)者啟動(dòng)會(huì)先從broker拉取對(duì)應(yīng)Topic的offset進(jìn)度,然后在進(jìn)行消息拉取
- offset持久化觸發(fā)的幾種方式
問題
消息消費(fèi)失敗是否影響窗口滑動(dòng)
正常情況下,消息消費(fèi)失敗不會(huì)影響窗口滑動(dòng),因?yàn)獒槍?duì)消費(fèi)失敗的消息,client會(huì)進(jìn)行sendback。
sendback之后,消息經(jīng)過延遲之后會(huì)發(fā)往Topic=%RETRY%{CONSUMERGROUP}的Retry隊(duì)列
每個(gè)ConsumerGroup會(huì)強(qiáng)制監(jiān)聽Retry隊(duì)列的消息
ConsumeMessageConcurrentlyService#processConsumeResult
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//因?yàn)閙sgsize=1,所以只有失敗的時(shí)候才會(huì)進(jìn)入下面的循環(huán)
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//消費(fèi)失敗的消息 重新插入到commitlog 發(fā)送到group對(duì)應(yīng)重試topic
boolean result = this.sendMessageBack(msg, context);
//如果發(fā)送請(qǐng)求失敗 那么本地再消費(fèi)一次試試
if (!result) {
//本地的重試也算重試次數(shù)?。?!
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
//發(fā)送到重試隊(duì)列失敗的消息,重新消費(fèi)
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//如果消費(fèi)失敗 重新消費(fèi)
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}而sendMessageBack失敗的消息,會(huì)重新封裝成另一個(gè)ConsumeRequest在本地再次消費(fèi)。
這些失敗的消息會(huì)從之前的consumeRequest移除,因此也就影響到了ProcessQueue#removeMessage的返回值。
但是這是一個(gè)優(yōu)化,重試之后窗口大概率上還是會(huì)正?;瑒?dòng)。
消費(fèi)者并發(fā)消費(fèi)如何保證提交位置偏移量正確,保證消費(fèi)消費(fèi)不丟失? 比如a線程消費(fèi)msgid=1,b線程消費(fèi)msgid=2,b線程消費(fèi)速度比a線程快。如何避免a線程消費(fèi)失敗,消息不丟失?
如何保證并發(fā)消費(fèi)提交偏移量正確?
基于TreeMap的滑動(dòng)窗口
如何保證消息消費(fèi)不丟失?
滑動(dòng)窗口+broker遠(yuǎn)端保存+sendback+本地重試兜底
應(yīng)用重啟,消息從哪里開始消費(fèi)
如果broker保存了offset
那么從對(duì)應(yīng)offset重新拉取消息
如果broker沒有保存offset,或者其他情況丟失
那么根據(jù)配置的策略,從對(duì)應(yīng)的offset開始拉取
以上就是RocketMQ offset確認(rèn)機(jī)制示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ offset確認(rèn)機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增)
本文主要介紹了MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Springboot mybais配置多數(shù)據(jù)源過程解析
這篇文章主要介紹了Springboot+mybais配置多數(shù)據(jù)源過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例
下面小編就為大家?guī)硪黄狹aven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09
Java微服務(wù)Filter過濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過程詳解
這篇文章主要介紹了Java微服務(wù)Filter過濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過程,首先Sentinel規(guī)則的存儲(chǔ)默認(rèn)是存儲(chǔ)在內(nèi)存的,應(yīng)用重啟之后規(guī)則會(huì)丟失。因此我們通過配置中心Nacos保存規(guī)則,然后通過定時(shí)拉取Nacos數(shù)據(jù)來獲取規(guī)則配置,可以做到動(dòng)態(tài)實(shí)時(shí)的刷新規(guī)則2023-02-02
Assert.assertEquals的使用方法及注意事項(xiàng)說明
這篇文章主要介紹了Assert.assertEquals的使用方法及注意事項(xiàng)說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解
這篇文章主要介紹了java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解的相關(guān)資料,希望通過本文能幫助到大家,需要的朋友可以參考下2017-09-09
SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)源的實(shí)現(xiàn)
本文主要介紹了SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)庫源的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12

