RocketMQ消息存儲文件的加載與恢復(fù)機(jī)制源碼分析
前言
前面文章我們介紹了Broker是如何將消息全量存儲到CommitLog文件中,并異步生成dispatchRequest任務(wù)更新ConsumeQueue,IndexFile的過程以及ConsumeQueue和IndexFile的文件結(jié)構(gòu)。由于是異步轉(zhuǎn)發(fā)消息,就可能出現(xiàn)消息成功存儲到CommitLog文件,轉(zhuǎn)發(fā)請求任務(wù)執(zhí)行失敗,Broker宕機(jī)了,此時CommitLog和Index消息并未處理完,導(dǎo)致CommitLog與ConsumeQueue和IndexFile文件中的數(shù)據(jù)不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么這部分消息Consumer將永遠(yuǎn)無法消費到了,那么Broker是如何保證數(shù)據(jù)一致性的呢?
StoreCheckPoint介紹
StoreCheckPoint的作用是記錄CommitLog,ConsumeQueue和IndexFile的刷盤點,當(dāng)Broker異常結(jié)束時會根據(jù)StoreCheckPoint的數(shù)據(jù)恢復(fù),StoreCheckPoint屬性如下
public class StoreCheckpoint { // commitLog最后一條信息的刷盤時間戳 private volatile long physicMsgTimestamp = 0; // consumeQueue最后一個存儲單元刷盤時間戳 private volatile long logicsMsgTimestamp = 0; // 最近一個已經(jīng)寫完IndexFile的最后一條記錄刷盤時間戳 private volatile long indexMsgTimestamp = 0; }
StoreCheckPoint文件的存儲位置是${user.home}/store/checkpoint
,文件的固定長度為4K,但StoreCheckPoint只占用了前24個字節(jié),存儲格式如下圖所示
StoreCheckPoint時間戳更新時機(jī)
physicMsgTimestamp
FlushRealTimeService刷盤時更新
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { // ... // 更新CommitLog刷盤時間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
GroupCommitService刷盤時更新
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // ... // 更新CommitLog刷盤時間戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
logicsMsgTimestamp
ConsumeQueue保存消息存儲單元時更新
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { // ... // 如果consumeQueue保存成功,則更新ConsumeQueue存儲點信息 if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); } }
ConsumeQueue刷盤時更新并觸發(fā)StoreCheckPoint刷盤
// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush private void doFlush(int retryTimes) { // ... // 更新ConsumeQueue存儲時間戳,并刷盤 if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } // 更新存儲點 DefaultMessageStore.this.getStoreCheckpoint().flush(); } }
indexMsgTimestamp
// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile public IndexFile getAndCreateLastIndexFile() { // 獲取最新IndexFile,如果IndexFile已經(jīng)滿了,需要創(chuàng)建一個新的IndexFile if (indexFile == null) { indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp); // 如果創(chuàng)建新的IndexFile成功,原IndexFile刷盤 if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @Override public void run() { // indexFile刷盤 IndexService.this.flush(flushThisFile); } }, "FlushIndexFileThread"); flushThread.setDaemon(true); flushThread.start(); } } return indexFile; } // org.apache.rocketmq.store.index.IndexService#flush public void flush(final IndexFile f) { if (null == f) return; long indexMsgTimestamp = 0; if (f.isWriteFull()) { indexMsgTimestamp = f.getEndTimestamp(); } f.flush(); if (indexMsgTimestamp > 0) { // 更新checkPoint的indexMsgTimestamp并觸發(fā)刷盤 this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp); this.defaultMessageStore.getStoreCheckpoint().flush(); } }
- 保存消息Index,獲取最新的IndexFile如果滿了,則會創(chuàng)建一個新的IndexFile,并且更新IndexMsgTimestamp并觸發(fā)StoreCheckPoint刷盤
StoreCheckPoint刷盤源碼
StoreCheckPoint刷盤源碼如下所示,就是將CommitLog,ConsumeQueue和IndexFile刷盤時間戳持久化到硬盤上,由上面源碼可知它的刷盤觸發(fā)時機(jī)
- ConsumeQueue刷盤時觸發(fā)
- 創(chuàng)建新IndexFile文件時觸發(fā)
StoreCheckPoint刷盤源碼如下
// org.apache.rocketmq.store.StoreCheckpoint#flush public void flush() { this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp); this.mappedByteBuffer.force(); }
消息加載源碼分析
在BrokerController啟動時會調(diào)用DefaultMessageStore#load
加載存儲文件加載和恢復(fù)過程主要分為下面幾步
- 判斷Broker上次是否正常退出。這個判斷邏輯是根據(jù)
${user.home}/store/abort
是否存在。如果文件存在,說明上次是異常退出,如果文件不存在,則說明是正常退出。 - 加載CommitLog
- 加載ConsumeQueue
- 加載StoreCheckPoint
- 加載IndexFile
- 恢復(fù)ConsumeQueue與IndexFile
- 加載延遲隊列服務(wù)
// org.apache.rocketmq.store.DefaultMessageStore#load public boolean load() { boolean result = true; try { // 1. Broker上次是否正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // 2. 加載commitLog result = result && this.commitLog.load(); // 3. 加載consumeQueue result = result && this.loadConsumeQueue(); if (result) { // 4. 加載StoreCheckPoint this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // 5. 加載IndexFile this.indexService.load(lastExitOK); // 6. 恢復(fù)ConsumeQueue與IndexFile this.recover(lastExitOK); // 7. 延遲隊列服務(wù)加載 if (null != scheduleMessageService) { result = this.scheduleMessageService.load(); } } } return result; }
CommitLog加載
前面文章介紹過,CommitLog文件的存儲目錄是${user.home}/store/commitlog/
,并且CommitLog文件的底層是MappedFile,由MappedFileQueue管理。
CommitLog文件的加載其實調(diào)用的是MappedFileQueue#load
方法,代碼如下所示,load()中首先加載CommitLog文件目錄下的所有文件,并調(diào)用doLoad()方法加載CommitLog。
// org.apache.rocketmq.store.MappedFileQueue#load public boolean load() { File dir = new File(this.storePath/*${user.home}/store/commitlog/*/); File[] ls = dir.listFiles(); if (ls != null) { return doLoad(Arrays.asList(ls)); } return true; }
MappedFile的加載過程如下所示,核心邏輯主要分為下面三步
- 按照文件名稱將文件排序,排序好的文件就會按照消息保存的先后順序存放在列表中
- 校驗文件大小與mappedFile是否一致,如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改
- 創(chuàng)建mappedFile,并且設(shè)置wrotePosition,flushedPosition,committedPosition為mappedFileSize
public boolean doLoad(List<File> files) { // 按照文件名稱排序 files.sort(Comparator.comparing(File::getName)); for (File file : files) { // 如果commitLog文件大小與mappedFileSize不一致,則說明配置被改了,或者CommitLog文件被修改 if (file.length() != this.mappedFileSize) { return false; } try { // 創(chuàng)建MappedFile MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); } } return true; }
看到這里肯定會有疑問,加載后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都為mappedFileSize,如果最后一個MappedFile沒有使用完,Broker啟動后還會從最后一個MappedFile開始寫么?我們可以在后面消息文件恢復(fù)源碼分析找到答案。
ConsumeQueue加載
從前面文章我們知道,ConsumeQueue文件底層其實也是MappedFile,因此ConsumeQueue文件的加載與CommitLog加載差別不大。ConsumeQueue加載邏輯為
- 獲取ConsumeQueue目錄下存儲的所有Topic目錄,遍歷Topic目錄
- 遍歷每個Topic目錄下的所有queueId目錄,逐個加載ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue private boolean loadConsumeQueue() { // 獲取consumeQueue目錄 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */)); // topic文件夾數(shù)組 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { // 遍歷topic for (File fileTopic : fileTopicList) { // 獲取topic名稱 String topic = fileTopic.getName(); // 獲取queueId文件夾數(shù)組 File[] fileQueueIdList = fileTopic.listFiles(); // 遍歷queueId if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; // 文件夾名稱就是queueId queueId = Integer.parseInt(fileQueueId.getName()); // 構(gòu)建consumeQueue ConsumeQueue logic = new ConsumeQueue(/* ... */); this.putConsumeQueue(topic, queueId, logic); // ConsumeQueue加載 if (!logic.load()) { return false; } } } } } return true; }
IndexFile加載
IndexFile文件加載過程調(diào)用的是IndexService#load
,首先獲取${user.home}/store/index
目錄下的所有文件,遍歷所有文件,如果IndexFile最后存儲時間大于StoreCheckPoint中indexMsgTimestamp,則會先刪除IndexFile
// org.apache.rocketmq.store.index.IndexService#load public boolean load(final boolean lastExitOK) { // indexFile文件目錄 File dir = new File(this.storePath); // indexFile文件列表 File[] files = dir.listFiles(); if (files != null) { // 文件排序 Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); f.load(); if (!lastExitOK) { // 文件最后存儲時間戳大于刷盤點,則摧毀indexFile,重建 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存儲點時間*/ .getIndexMsgTimestamp()) { f.destroy(0); continue; } } this.indexFileList.add(f); } } } return true; }
ConsumeQueue與IndexFile恢復(fù)
如果是正常退出,數(shù)據(jù)都已經(jīng)正常刷盤,前面我們說到CommitLog在加載時的wrotePosition,flushedPosition,committedPosition都設(shè)置為mappedFileSize,
因此即使是正常退出,也會調(diào)用CommitLog#recoverNormally
找到最后一條消息的位置,更新這三個屬性。
// org.apache.rocketmq.store.DefaultMessageStore#recover private void recover(final boolean lastExitOK) { // consumeQueue中最大物理偏移量 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { // 正常退出文件恢復(fù) this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { // 異常退出文件恢復(fù) this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } // 恢復(fù)topicQueueTable this.recoverTopicQueueTable(); }
正?;謴?fù)的源碼如下,由于Broker是正常關(guān)閉,因此CommitLog,ConsumeQueue與IndexFile都已經(jīng)正確刷盤,并且三者的消息是一致的。正?;謴?fù)的主要目的是找到找到最后一條消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盤點(flushWhere)和提交點(committedWhere),
- 從最后3個mappedFile開始恢復(fù),如果mappedFile總數(shù)不足3個,則從第0個mappedFile開始恢復(fù)
- 逐個遍歷mappedFile,找到每個MappedFile的最后一條消息的偏移量,并將其更新到CommitLog中MappedFileQueue的刷盤點和提交點中
- 清除ConsumeQueue冗余數(shù)據(jù)
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // 確認(rèn)消息是否完整,默認(rèn)是true boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 默認(rèn)從最后3個mappedFile開始恢復(fù) int index = mappedFiles.size() - 3; // 如果commitLog不足三個,則從第一個文件開始恢復(fù) if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); // 最后一個MappedFile的文件起始偏移量 long processOffset = mappedFile.getFileFromOffset(); // mappedFileOffset偏移量 long mappedFileOffset = 0; // 遍歷CommitLog文件 while (true) { // 校驗消息完整性 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // 獲取消息size int size = dispatchRequest.getMsgSize(); // 返回結(jié)果為true并且消息size>0,說明消息是完整的 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } } // 最大物理偏移量 processOffset += mappedFileOffset; // 更新flushedWhere和committedPosition指針 this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue冗余數(shù)據(jù) if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/); } } }
異?;謴?fù)源碼如下,由于上次Broker沒有正常關(guān)閉,因此由可能存在CommitLog、ConsumeQueue與IndexFile不一致的情況,因此在異?;謴?fù)時可能需要恢復(fù)ConsumeQueue和IndexFile,異?;謴?fù)核心邏輯主要包括
- 倒序查CommitLog的mappedFile文件,找到第一條消息存儲的時間戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,該mappedFile至少有一部分消息是被正常轉(zhuǎn)發(fā),正常存儲,正常刷盤的
- 從該mappedFile開始逐條轉(zhuǎn)發(fā)消息,重新恢復(fù)ConsumeQueue和IndexFile
- 當(dāng)遍歷到最后一條消息,將其偏移量更新到CommitLog中MappedFileQueue的刷盤點和提交點中
- 清除ConsumeQueue冗余數(shù)據(jù)
// org.apache.rocketmq.store.CommitLog#recoverAbnormally public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // 是否CRC校驗 boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 最后一個mappedFile的index int index = mappedFiles.size() - 1; MappedFile mappedFile = null; // 倒序遍歷mappedFile數(shù)組, for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); // 1. 如果第一條消息的時間戳小于存儲點時間戳 if (this.isMappedFileMatchedRecover(mappedFile)) { break; } } long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; // 2. 轉(zhuǎn)發(fā)消息 if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重復(fù),默認(rèn)是false*/) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } } // 3. 更新MappedFileQueue中的刷盤位置和提交位置 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue中的冗余數(shù)據(jù) if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } }
總結(jié)
Broker啟動時會分別加載CommitLog、ConsumeQueue與IndexFile。加載完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一條消息,并更新刷盤點和提交點。如果Broker上次是異常退出,就有可能出現(xiàn)ConsumeQueue、IndexFile與CommitLog不一致的情況,需要根據(jù)StoreCheckPoint存儲的時間戳從CommitLog找到消息,逐條恢復(fù)ConsumeQueue與IndexFile。
以上就是RocketMQ | 源碼分析】消息存儲文件的加載與恢復(fù)機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ 消息存儲文件加載恢復(fù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java實現(xiàn)UDP通信過程實例分析【服務(wù)器端與客戶端】
這篇文章主要介紹了Java實現(xiàn)UDP通信過程,結(jié)合實例形式分析了java實現(xiàn)UDP服務(wù)器端與客戶端相關(guān)操作技巧與注意事項,需要的朋友可以參考下2020-05-05利用java實現(xiàn)一個客戶信息管理系統(tǒng)
這篇文章主要給大家介紹了關(guān)于利用java實現(xiàn)一個客戶信息管理系統(tǒng)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04淺談SpringBoot實現(xiàn)異步調(diào)用的幾種方式
本文主要介紹了淺談SpringBoot實現(xiàn)異步調(diào)用的幾種方式,主要包括CompletableFuture異步任務(wù),基于@Async異步任務(wù), TaskExecutor異步任務(wù),感興趣的可以了解一下2023-11-11IntelliJ IDEA 創(chuàng)建 Java 項目及創(chuàng)建 Java 文件并運行的詳細(xì)步驟
這篇文章主要介紹了IntelliJ IDEA 創(chuàng)建 Java 項目及創(chuàng)建 Java 文件并運行的詳細(xì)步驟,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11SpringBoot增強(qiáng)Controller方法@ControllerAdvice注解的使用詳解
這篇文章主要介紹了SpringBoot增強(qiáng)Controller方法@ControllerAdvice注解的使用詳解,@ControllerAdvice,是Spring3.2提供的新注解,它是一個Controller增強(qiáng)器,可對controller進(jìn)行增強(qiáng)處理,需要的朋友可以參考下2023-10-10Java數(shù)據(jù)結(jié)構(gòu)之堆(優(yōu)先隊列)的實現(xiàn)
堆(優(yōu)先隊列)是一種典型的數(shù)據(jù)結(jié)構(gòu),其形狀是一棵完全二叉樹,一般用于求解topk問題。本文將利用Java語言實現(xiàn)堆,感興趣的可以學(xué)習(xí)一下2022-05-05