RocketMQ源碼分析之Broker過期消息清理機制
前言
前面文章講了消息是如何保存的以及consumeQueue與Index文件更新機制。隨著消息的增加,Broker不可能一直保存所有消息,Broker是按照什么規(guī)則清理消息的呢?被消費過后的消息就會被清理掉嗎?下面我們來介紹Broker消息清理機制。
Broker消息清理機制簡介
消息是被順序存儲在CommitLog文件中的,且消息長度不定長,因此消息的清理不是以消息為單位進行的,而是以CommitLog為單位進行的。默認情況下,Broker會清理單個CommitLog文件中最后一條消息超過72小時的CommitLog文件,除了用戶手動清理為,下面幾種情況會被默認清理。
CommitLog清理機制
CommitLog文件過期(72小時),且達到清理時間點(默認為04:00~05:00),自動清理過期的CommitLog文件
CommitLog文件過期(72小時),且CommitLog所在磁盤分區(qū)占用率已經達到過期清理警戒線(默認75%),無論是否到達清理時間點都會自動清理過期文件
CommitLog所在磁盤分區(qū)占用率已經達到清理警戒線(默認85%),無論是否過期,都會從最早的文件開始清理,一次最多清理10個文件
CommitLog所在磁盤分區(qū)占用率已經達到系統(tǒng)危險警戒線(默認90%),Broker將拒絕消息寫入
Broker至少會保留最新的CommitLog文件
ConsumeQueue清理機制
- 如果ConsumeQueue文件關聯(lián)CommitLog都被清理,則清理此ConsumeQueue文件
- Broker每個Topic-QueueId至少會保留最新的文件
IndexFile清理機制
- 如果IndexFile所有索引單元關聯(lián)CommitLog都被清理,則清理此IndexFile
Broker與消息清理相關配置
# 文件自動清理時間,單位H,默認72 fileReservedTime=72 # CommitLog物理文件刪除間隔,但是ms,默認100 deleteCommitLogFilesInterval = 100 # 文件自動清理時間,默認04,即凌晨4點 deleteWhen = "04" # 硬盤占用率所在分區(qū)過期清理警戒線,超過這個值,無論是否到達清理時間,都會自動清理過期文件 diskMaxUsedSpaceRatio = 75
消息清理機制源碼分析
消息定時清理的是由DefaultMessageStore類負責的,它在啟動時(start)會調用DefaultMessageStore#addScheduleTask添加和消息存儲相關的定時任務,其中就包括消息刪除相關的定時任務DefaultMessageStore.this.cleanFilesPeriodically(),這個定時任務在Broker啟動后60s開始,每隔10秒執(zhí)行一次。
// org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// commitLog、consumeQueue和IndexFile定時刪除
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval()/*10s*/, TimeUnit.MILLISECONDS);
// ...
}
在cleanFilesPeriodically()中有兩個方法,cleanCommitLogService.run()負責清理CommitLog,cleanConsumeQueueService.run()負責清理ConsumeQueue和IndexFile。
// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {
// 清理CommitLog
this.cleanCommitLogService.run();
// 清理ConsumeQueue和IndexFile
this.cleanConsumeQueueService.run();
}
CommitLog清理源碼分析
CommitLog清理方法CleanCommitLogService#run調用了CleanCommitLogService#deleteExpiredFiles,deleteExpiredFiles方法的核心代碼邏輯如下,以下三種情況會觸發(fā)CommitLog文件的刪除
- 當前時間是凌晨4點
- CommitLog所在磁盤分區(qū)硬盤占用率超過75%
- 手動刪除CommitLog
// org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles
private void deleteExpiredFiles() {
// 是否是凌晨4點,用小時匹配[04:00,05:00)
boolean timeup = this.isTimeToDelete();
// >75%就會返回true,如果大于85%,則觸發(fā)強制刪除
boolean spacefull = this.isSpaceToDelete();
// 手動刪除次數(shù)是否>0
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup/*凌晨4點*/ || spacefull/*空間滿了*/ || manualDelete/*手動刪除*/) {
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable()/*默認true*/ && this.cleanImmediately/*空間占用超過85%,觸發(fā)強制刪除*/;
// 刪除CommitLog
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
CommitLog的清理邏輯在MappedFileQueue#deleteExpiredFileByTime,其核心代碼如下所示,主要分為下面幾個步驟
- 復制MappedFileQueue中的mappedFiles,循環(huán)處理刪除邏輯,循環(huán)
mfsLength-1次,也就是無論如何都會保留最新的MappedFile - 如果MappedFile的最后修改時間超過72小時或CommitLog所在磁盤分區(qū)硬盤占用率超過85%觸發(fā)強制刪除MappedFile,則會刪除MappedFile,每次刪除最多刪除10個MappedFile,相鄰MappedFile刪除時間間隔默認100ms
- 刪除MappedFileQueue的mappedFiles數(shù)組中已刪除的MappedFile,并返回刪除MappedFile的數(shù)量
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {
// 復制一份當前mappedFile
Object[] mfs = this.copyMappedFiles(0);
// 會保留最后一個MappedFile
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 最后修改時間+過期時間
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果commitLog所在磁盤分區(qū)總容量超過85%,觸發(fā)立即刪除,或者超過了72小時的mappedFile
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// 刪除mappedFile
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
// 一次最多刪除10個mappedFile
if (files.size() >= DELETE_FILES_BATCH_MAX/*10*/) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
// 刪除文件時間間隔,默認100ms
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
// 從MappedFileQueue的mappedFiles中刪除這個mappedFile
deleteExpiredFile(files);
return deleteCount;
}
ConsumeQueue和IndexFile清理源碼分析
ConsumeQueue和IndexFile清理方法CleanConsumeQueueService#run調用了CleanConsumeQueueService#deleteExpiredFiles方法清理ConsumeQueue和IndexFile。CleanConsumeQueueService#deleteExpiredFiles核心代碼如下,包括兩個主要邏輯
- 遍歷consumeQueueTable中的ConsumeQueue,調用
ConsumeQueue#deleteExpiredFile刪除過期ConsumeQueue - 調用
IndexService#deleteExpiredFile刪除過期IndexFile
// org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles
private void deleteExpiredFiles() {
if (minOffset > this.lastPhysicalMinOffset) {
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍歷ConsumeQueue
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
// 刪除consumeQueue
int deleteCount = logic.deleteExpiredFile(minOffset);
// ... 間隔100ms
}
}
// 刪除indexFile
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}ConsumeQueue文件清理
ConsumeQueue文件底層也是MappedFile,清理ConsumeQueue調用MappedFileQueue#deleteExpiredFileByOffset清理ConsumeQueue的過期MappedFile,源碼如下,核心邏輯
- 復制MappedFileQueue中的mappedFiles,循環(huán)處理清理邏輯,循環(huán)
mfsLength-1次,也就是無論如何都會保留最新的MappedFile - 如果ConsumeQueue的MappedFile最后一個存儲單元對應消息在CommitLog中的偏移量小于CommitLog的最小偏移量,說明當前MappedFile所有存儲單元對應所有CommitLog的消息都已經被清理,因此調用
MappedFile#destroy清理當前MappedFile - 刪除MappedFileQueue緩存的mappedFiles列表中已經被清理MappedFile
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset
public int deleteExpiredFileByOffset(long offset, int unitSize) {
// 復制一份mappedFiles
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
// 取consumeQueue最后一條消息Buffer切片
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
// consumeQueue最后一個存儲單元消息在commitLog的偏移量
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
result.release();
// 如果consumeQueue最后一條消息已經小于commitLog的最小offset,則說明要刪除了
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
}
// 刪除ConsumeQueue的MappedFile
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
// 刪除MappedFileQueue的mappedFiles列表中已經刪除的MappedFile
deleteExpiredFile(files);
return deleteCount;
}IndexFile清理
IndexFile清理邏輯與ConsumeQueue類似,都是刪除文件中關聯(lián)的CommitLog消息全部被刪除的文件。核心邏輯包括下面兩個
- 獲取IndexFileList中所有最大offset小于CommitLog最小offset的IndexFile
- 刪除過期的IndexFile
// org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long)
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
// indexFileList的第一個索引文件的最后一個offset
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
if (endPhyOffset < offset) {
files = this.indexFileList.toArray();
}
}
if (files != null) {
List<IndexFile> fileList = new ArrayList<IndexFile>();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
// IndexFile中最大的offset小于CommitLog最小offset,說明文件可以被刪除
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}
// 刪除過期的IndexFile,并將其從indexFileList緩存中刪除
this.deleteExpiredFile(fileList);
}
}
總結
Broker消息清理機制由DefaultMessageStore負責,CommitLog、ConsumeQueue和IndexFile的清理都是按照文件顆粒度進行。
每10s檢查一次,通常情況下每天凌晨4點刪除超過72小時的CommitLog;如果CommitLog所在磁盤分區(qū)的磁盤占用率超過75%,則會觸發(fā)CommitLog文件清理;如果CommitLog所在磁盤分區(qū)的磁盤占用率超過85%,則會強制刪除CommitLog文件;
如果ConsumeQueue和IndexFile關聯(lián)CommitLog都被刪除,ConsumeQueue文件和IndexFile也會被清理。
以上就是RocketMQ | 源碼分析】Broker過期消息清理機制的詳細內容,更多關于RocketMQ | 源碼分析】Broker過期消息清理機制的資料請關注腳本之家其它相關文章!
相關文章
深入理解SpringBoot?最大連接數(shù)及最大并發(fā)數(shù)
SpringBoot能支持的最大并發(fā)量主要看其對Tomcat的設置,可以在配置文件中對其進行更改,本文就來介紹一下SpringBoot?最大連接數(shù)及最大并發(fā)數(shù),感興趣的可以了解一下2023-08-08
springBoo3.0集成knife4j4.1.0的詳細教程(swagger3)
這篇文章主要介紹了springBoo3.0集成knife4j4.1.0的詳細教程(swagger3),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-07-07
Spring中的Schedule動態(tài)添加修改定時任務詳解
這篇文章主要介紹了Spring中的Schedule動態(tài)添加修改定時任務詳解,可能有人會問,為啥不用Quartz,Quartz自然是非常方便強大的,但不是本篇要講的內容,本篇就偏要使用SpringSchedule來實現(xiàn)動態(tài)的cron表達式任務,需要的朋友可以參考下2023-11-11
詳解Java數(shù)據庫連接JDBC基礎知識(操作數(shù)據庫:增刪改查)
這篇文章主要介紹了詳解Java數(shù)據庫連接JDBC基礎知識(操作數(shù)據庫:增刪改查),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01

