RocketMQ?broker文件清理源碼解析
1. broker 清理文件介紹
本系列RocketMQ4.8注釋github地址,希望對(duì)大家有所幫助,要是覺(jué)得可以的話麻煩給點(diǎn)一下Star哈
1.1 哪些文件需要清理
首先我們需要介紹下在RocketMQ中哪些文件需要清理,其實(shí)可以想一想,在RocketMQ中哪些文件是一直在往里面寫(xiě)入東西的,最容易想到的就是commitlog 了,因?yàn)樵谝粋€(gè)broker 進(jìn)程中,所有的普通消息,事務(wù)消息,系統(tǒng)消息啥的都往這個(gè)commitlog中寫(xiě),隨著時(shí)間的越來(lái)越長(zhǎng),然后commitlog就會(huì)越積攢越多,肯定會(huì)有磁盤(pán)放不下的那一天,而且我們消息消費(fèi)完成后,那些被消費(fèi)完成后的消息其實(shí)作用就很小了,可能會(huì)有這么一個(gè)場(chǎng)景,比如說(shuō)我線上出現(xiàn)了某個(gè)問(wèn)題,我想看下關(guān)于這個(gè)問(wèn)題的消息有沒(méi)有被消費(fèi)到,可能你會(huì)用到這個(gè)消息,但是這種問(wèn)題一般就是比較緊急的,最近實(shí)效的,之前那些消息其實(shí)作用就基本沒(méi)有了,所以就需要清理掉之前的消息。其實(shí)不光commitlog需要清理,還需要清理一下ConsumeQueue 與indexFile , 因?yàn)槟?code>commitlog里面的消息都被清理了,ConsumeQueue 與indexFile 再保存著之前的一些數(shù)據(jù),就是純粹浪費(fèi)空間了。
所以說(shuō) broker 文件清理主要是清理commitlog , ConsumeQueue , indexFile。
1.2 RocketMQ文件清理的機(jī)制
我們介紹下RocketMQ文件清理的機(jī)制,RocketMQ默認(rèn)是清理72小時(shí)之前的消息,然后它有幾個(gè)觸發(fā)條件, 默認(rèn)是凌晨4點(diǎn)觸發(fā)清理, 除非你你這個(gè)磁盤(pán)空間占用到75% 以上了。在清理commitlog 的時(shí)候,并不是一條消息一條消息的清理,拿到所有的MappedFile(拋去現(xiàn)在還在用著的,也就是最后一個(gè)) ,然后比對(duì)每個(gè)MappedFile的最后一條消息的時(shí)間,如果是72小時(shí)之前的就把MappedFile對(duì)應(yīng)的文件刪除了,銷(xiāo)毀對(duì)應(yīng)MappedFile,這種情況的話只要你MappedFile 最后一條消息還在存活實(shí)效內(nèi)的話,它就不會(huì)清理你這個(gè)MappedFile,就算你這個(gè)MappedFile 靠前的消息過(guò)期了。但是有一種情況它不管你消息超沒(méi)超過(guò)72小時(shí),直接就是刪,那就是磁盤(pán)空間不足的時(shí)候,也就是占了85%以上了,就會(huì)立即清理。
清理完成commitlog 之后,就會(huì)拿到commitlog中最小的offset ,然后去ConsumeQueue與indexFile中把小于offset 的記錄刪除掉。清理ConsumeQueue 的時(shí)候也是遍歷MappedFile ,然后它的最后一條消息(unit)小于commitlog中最小的offset 的話,就說(shuō)明這個(gè)MappedFile都小于offset ,因?yàn)樗麄兪琼樞蜃芳訉?xiě)的,這個(gè)MappedFile 就會(huì)清理掉,如果你MappedFile 最后一個(gè)unit不是小于offset 的話,這個(gè)MappedFile 就不刪了。
2. 源碼解析
我們來(lái)看下源碼是怎樣實(shí)現(xiàn)的: 在broker 存儲(chǔ)器DefaultMessageStore 啟動(dòng)(start)的時(shí)候,會(huì)添加幾個(gè)任務(wù)調(diào)度,其中有一個(gè)就是文件清理的:
private void addScheduleTask() {
// todo 清理過(guò)期文件 每隔10s
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// todo
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
...
}
默認(rèn)是10s執(zhí)行一次,可以看到它調(diào)用了DefaultMessageStore 的cleanFilesPeriodically方法:
private void cleanFilesPeriodically() {
// todo 清除CommitLog文件
this.cleanCommitLogService.run();
// todo 清除ConsumeQueue文件
this.cleanConsumeQueueService.run();
}
2.1 清理commitlog
我們先來(lái)看下關(guān)于commitlog的清理工作:
public void run() {
try {
// todo 刪除過(guò)期文件
this.deleteExpiredFiles();
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
我們看下deleteExpiredFiles 方法的實(shí)現(xiàn):
private void deleteExpiredFiles() {
int deleteCount = 0;
// 文件保留時(shí)間,如果超過(guò)了該時(shí)間,則認(rèn)為是過(guò)期文件,可以被刪除
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 刪除物理文件的間隔時(shí)間,在一次清除過(guò)程中,可能需要被刪除的文件不止一個(gè),該值指定兩次刪除文件的間隔時(shí)間
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 在清除過(guò)期文件時(shí),如
//果該文件被其他線程占用(引用次數(shù)大于0,比如讀取消息),此時(shí)會(huì)
//阻止此次刪除任務(wù),同時(shí)在第一次試圖刪除該文件時(shí)記錄當(dāng)前時(shí)間
//戳,destroyMapedFileIntervalForcibly表示第一次拒絕刪除之后能
//保留文件的最大時(shí)間,在此時(shí)間內(nèi),同樣可以被拒絕刪除,超過(guò)該時(shí)
//間后,會(huì)將引用次數(shù)設(shè)置為負(fù)數(shù),文件將被強(qiáng)制刪除
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 指定刪除文件的時(shí)間點(diǎn),RocketMQ通過(guò)deleteWhen設(shè)置每天在
//固定時(shí)間執(zhí)行一次刪除過(guò)期文件操作,默認(rèn)凌晨4點(diǎn)
boolean timeup = this.isTimeToDelete();
// todo 檢查磁盤(pán)空間是否充足,如果磁盤(pán)空間不充足,則返回true,表示應(yīng)該觸發(fā)過(guò)期文件刪除操作
boolean spacefull = this.isSpaceToDelete();
// 預(yù)留手工觸發(fā)機(jī)制,可以通過(guò)調(diào)用excuteDeleteFilesManualy
//方法手工觸發(fā)刪除過(guò)期文件的操作,目前RocketMQ暫未封裝手工觸發(fā)
//文件刪除的命令
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
// todo 文件的銷(xiāo)毀和刪除
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
開(kāi)始幾個(gè)參數(shù),一個(gè)是文件保留實(shí)效默認(rèn)是72小時(shí),你可以使用fileReservedTime來(lái)配置,一個(gè)是刪除文件的間隔100ms,再就是強(qiáng)行銷(xiāo)毀MappedFile的120s(這個(gè)為啥要強(qiáng)行銷(xiāo)毀,因?yàn)樗€害怕還有地方用著這個(gè)MappedFile,它有個(gè)專(zhuān)門(mén)的引用計(jì)數(shù)器,比如說(shuō)我還有地方要讀它的消息,這個(gè)時(shí)候計(jì)數(shù)器就是+1的)。
接著就是判斷到?jīng)]到刪除的那個(gè)時(shí)間,它默認(rèn)是凌晨4點(diǎn)才能刪除:
private boolean isTimeToDelete() {
// 什么時(shí)候刪除,默認(rèn)是凌晨4點(diǎn) -> 04
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
// 判斷是不是到點(diǎn)了 就是判斷的當(dāng)前小時(shí) 是不是等于 默認(rèn)的刪除時(shí)間
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
再接著就是看看空間是不是充足,看看磁盤(pán)空間使用占比是什么樣子的:
private boolean isSpaceToDelete() {
// 表示CommitLog文件、ConsumeQueue文件所在磁盤(pán)分區(qū)的最大使用量,如果超過(guò)該值,則需要立即清除過(guò)期文件
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
// 表示是否需要立即執(zhí)行清除過(guò)期文件的操作
cleanImmediately = false;
{
// 當(dāng)前CommitLog目錄所在的磁盤(pán)分區(qū)的磁盤(pán)使用率,通過(guò)File#getTotalSpace方法獲取文件所在磁盤(pán)分區(qū)的總?cè)萘浚?
//通過(guò)File#getFreeSpace方法獲取文件所在磁盤(pán)分區(qū)的剩余容量
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
// diskSpaceWarningLevelRatio:默認(rèn)0.90。如果磁盤(pán)分區(qū)使用率超過(guò)該閾值,將設(shè)置磁盤(pán)為不可寫(xiě),此時(shí)會(huì)拒絕寫(xiě)入新消息
// 如果當(dāng)前磁盤(pán)分區(qū)使用率大于diskSpaceWarningLevelRatio,應(yīng)該立即啟動(dòng)過(guò)期文件刪除操作
if (physicRatio > diskSpaceWarningLevelRatio) {
// 設(shè)置 磁盤(pán)不可寫(xiě)
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
cleanImmediately = true;
//diskSpaceCleanForciblyRatio:默認(rèn)0.85 如果磁盤(pán)分區(qū)使用超過(guò)該閾值,建議立即執(zhí)行過(guò)期文件刪除,但不會(huì)拒絕寫(xiě)入新消息
// 如果當(dāng)前磁盤(pán)分區(qū)使用率大于diskSpaceCleanForciblyRatio,建議立即執(zhí)行過(guò)期文件清除
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
// 設(shè)置 磁盤(pán)可以寫(xiě)入
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
// 如果當(dāng)前磁盤(pán)使用率小于diskMaxUsedSpaceRatio,則返回false,表示磁盤(pán)使用率正常,
// 否則返回true,需要執(zhí)行刪除過(guò)期文件
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
/**
* 對(duì)consumeQueue 做同樣的判斷
*/
...
return false;
}
這里其實(shí)不光是判斷 commitlog的存儲(chǔ)區(qū)域,后面還有段判斷ConsumeQueue的存儲(chǔ)區(qū)域的,然后與這塊邏輯一樣,就沒(méi)有放上。這里就是獲取默認(rèn)的最大使用占比 就是75% ,接著就是看看commitlog 存儲(chǔ)的那地方使用了多少了,如果是使用90% 了,就設(shè)置runningFlag 說(shuō)磁盤(pán)滿了,立即清理設(shè)置成true,這個(gè)參數(shù)設(shè)置成true之后,就不會(huì)管你消息有沒(méi)有超過(guò)72小時(shí),如果你使用了85% 以上了,也是設(shè)置立即清理,如果超過(guò)75% 返回true。好了,磁盤(pán)占用空間這塊我們就看完了。
接著看上面deleteExpiredFiles方法實(shí)現(xiàn),還有一個(gè)手動(dòng)清除的,這塊我沒(méi)有找到哪里有用到的,如果后續(xù)找到,會(huì)補(bǔ)充上, 判斷 到了清理的點(diǎn) 或者是磁盤(pán)空間滿了 或者是手動(dòng)刪除了,滿足一個(gè)條件就ok了,如果是立即清除是個(gè)true,它這里這個(gè)cleanAtOnce 變量就是true了,因?yàn)榍懊婺莻€(gè)強(qiáng)制清理是默認(rèn)開(kāi)啟的。
接著計(jì)算了一下fileReservedTime 就是將小時(shí)轉(zhuǎn)成了毫秒,為了后面好比對(duì),最后就是調(diào)用commitlog的deleteExpiredFile 方法清理了:
/**
* 刪除過(guò)期的文件
* @param expiredTime 過(guò)期時(shí)間 默認(rèn)72小時(shí)
* @param deleteFilesInterval 刪除文件的間隔 100ms
* @param intervalForcibly 強(qiáng)制刪除 1000 * 120
* @param cleanImmediately 是不是要一次性清理了
* @return
*/
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
// todo
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
可以看到commitlog 對(duì)象調(diào)用mappedFileQueue 的deleteExpiredFileByTime 方法來(lái)處理的,這個(gè)mappedFileQueue 就是管理了一堆MappedFile:
/**
* 刪除文件
*
* 從倒數(shù)第二個(gè)文件開(kāi)始遍歷,計(jì)算文件的最大存活時(shí)間,即文件的最后一次更新時(shí)間+文件存活時(shí)間(默認(rèn)
* 72小時(shí)),如果當(dāng)前時(shí)間大于文件的最大存活時(shí)間或需要強(qiáng)制刪除文
* 件(當(dāng)磁盤(pán)使用超過(guò)設(shè)定的閾值)時(shí),執(zhí)行MappedFile#destory方
* 法,清除MappedFile占有的相關(guān)資源,如果執(zhí)行成功,將該文件加入
* 待刪除文件列表中,最后統(tǒng)一執(zhí)行File#delete方法將文件從物理磁盤(pán)
* 中刪除。
*/
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
// 拿到mappedFile的引用
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// 計(jì)算文件的最大存活時(shí)間,即文件的最后一次更新時(shí)間+文件存活時(shí)間(默認(rèn)72小時(shí))
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果當(dāng)前時(shí)間大于文件的最大存活時(shí)間 或 需要強(qiáng)制刪除文件(當(dāng)磁盤(pán)使用超過(guò)設(shè)定的閾值)時(shí)
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
// todo 執(zhí)行destroy方法
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
// 一批 最多刪除10 個(gè)
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
// 刪除間隔
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
// todo 統(tǒng)一執(zhí)行File#delete方法將文件從物理磁盤(pán)中刪除
deleteExpiredFile(files);
return deleteCount;
}
這里首先是拿到所有MappedFile的引用,然后就是遍歷了,可以看到它這個(gè)length是-1的,也就是最后一個(gè)MappedFile 是遍歷不到的,這個(gè)是肯定的,因?yàn)樽詈笠粋€(gè)MappedFile肯定是在用著的,如果你來(lái)個(gè)強(qiáng)制清理,一下清理了,就沒(méi)法提供服務(wù)了。
遍歷的時(shí)候,拿到對(duì)應(yīng)MappedFile 里面最后一條消息,看看它的寫(xiě)入時(shí)間是不是已經(jīng)過(guò)了這個(gè)過(guò)期時(shí)間了,或者直接強(qiáng)制刪除,就會(huì)執(zhí)行MappedFile的銷(xiāo)毀方法,而且?guī)еN(xiāo)毀時(shí)間:
/**
* 銷(xiāo)毀方法
* @param intervalForcibly 表示拒絕被銷(xiāo)毀的最大存活時(shí)間
* @return
*/
public boolean destroy(final long intervalForcibly) {
// todo
this.shutdown(intervalForcibly);
// 清理結(jié)束
if (this.isCleanupOver()) {
try {
// 關(guān)閉文件通道,
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
// 刪除物理文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
public void shutdown(final long intervalForcibly) {
// 關(guān)閉MappedFile
if (this.available) {
this.available = false;
// 初次關(guān)閉的時(shí)間戳
this.firstShutdownTimestamp = System.currentTimeMillis();
// todo 嘗試釋放資源
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
這里就不詳細(xì)說(shuō)了,其實(shí)就是shutdown,然后過(guò)了120s后強(qiáng)制把引用清了,之后就是關(guān)閉channel,刪除對(duì)應(yīng)文件。
接著往下說(shuō),就是銷(xiāo)毀成功了,會(huì)記錄刪除數(shù)量,判斷刪了多少了,一批是最多刪10個(gè)的,這塊應(yīng)該是怕影響性能的,你一直刪的的話,這東西很消耗磁盤(pán)性能,容易影響其他寫(xiě)入,讀取功能,如果你銷(xiāo)毀失敗,直接就停了。最后就是將刪除的這些MappedFile從MappedFileQueue中刪除掉。再回到commitlog clean service 的run方法:
public void run() {
try {
// todo 刪除過(guò)期文件
this.deleteExpiredFiles();
// todo
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
我們deleteExpiredFiles 方法已經(jīng)介紹完了,然后再來(lái)看看第二個(gè)方法是干嘛的,這個(gè)其實(shí)就是判斷第一個(gè)MappedFile 還可不可用了,如果不可用的話,就刪了,這塊有可能是上面 deleteExpiredFiles 方法MappedFile銷(xiāo)毀失敗,然后設(shè)置了不可用,但是沒(méi)有清理掉,所以這塊再來(lái)善后下:
private void redeleteHangedFile() {
// redeleteHangedFileInterval間隔 默認(rèn)1000*120
int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
// 當(dāng)前時(shí)間戳
long currentTimestamp = System.currentTimeMillis();
if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
this.lastRedeleteTimestamp = currentTimestamp;
// 獲取強(qiáng)制銷(xiāo)毀Mapped文件間隔
int destroyMapedFileIntervalForcibly =
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// todo 重新刪除第一個(gè)MappedFile
if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
}
}
}
public boolean retryDeleteFirstFile(final long intervalForcibly) {
// 獲取到 第一個(gè)mappedFile
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
// 不可用
if (!mappedFile.isAvailable()) {
log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
// 銷(xiāo)毀
boolean result = mappedFile.destroy(intervalForcibly);
if (result) {
log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
tmpFiles.add(mappedFile);
this.deleteExpiredFile(tmpFiles);
} else {
log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
}
return result;
}
}
return false;
}
這塊就是看第一個(gè)MappedFile 還可不可用,不可用的話,就銷(xiāo)毀掉。好了commitlog 文件清理源碼就解析完成了。接下來(lái)看下這個(gè)ConsumeQueue與indexFile的清理。
2.2 ConsumeQueue 清理
private void cleanFilesPeriodically() {
// todo 清除CommitLog文件
this.cleanCommitLogService.run();
// todo 清除ConsumeQueue文件
this.cleanConsumeQueueService.run();
}
DefaultMessageStore.CleanConsumeQueueService#run:
public void run() {
try {
// 刪除 過(guò)期的file
this.deleteExpiredFiles();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
接下來(lái)DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() {
// 刪除間隔 100
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 獲取 commitLog 的最小offset
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
// 上次 清理 到哪了
this.lastPhysicalMinOffset = minOffset;
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍歷刪除
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
// 進(jìn)行刪除
int deleteCount = logic.deleteExpiredFile(minOffset);
// 間隔
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
}
}
}
}
// todo 刪除 過(guò)期的 indexFile
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
首先是獲取刪除間隔,然后拿到commitlog中最小的那個(gè)offset ,接著就是判斷上次清理位置與最小offset 比較,如果offset 大于它上次清理的位置的話,就說(shuō)明 它得把最小offset之前的清理掉。先是記錄最后一次清理的offset是最小offset , 接著就是遍歷所有的ConsumeQueue ,調(diào)用每個(gè)ConsumeQueue 的 deleteExpiredFile 方法來(lái)清理,我們來(lái)看下這個(gè)方法:
public int deleteExpiredFile(long offset) {
// 進(jìn)行銷(xiāo)毀 然后得到銷(xiāo)毀個(gè)數(shù)
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
// 糾正最小偏移量
this.correctMinOffset(offset);
return cnt;
}
CQ_STORE_UNIT_SIZE 這個(gè)就是每個(gè)unit 占20個(gè)字節(jié),見(jiàn)。
/**
* 刪除過(guò)期的file
* @param offset 最小offset
* @param unitSize 大小為20字節(jié)
* @return
*/
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
// 最后一個(gè)單元位置到這個(gè)MappedFile結(jié)束,其實(shí)就是獲取最后一個(gè)單元
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
// 獲取最大的offset
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
result.release();
// 判斷是否銷(xiāo)毀 如果小于offset 就要銷(xiāo)毀
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
// 進(jìn)行銷(xiāo)毀
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
// 刪除引用
deleteExpiredFile(files);
return deleteCount;
}
它的刪除跟commitlog 的差不多,只不過(guò)commitlog 是根據(jù)時(shí)間來(lái)判斷的,它是根據(jù)commitlog 的offset 來(lái)判斷的,判斷要不要?jiǎng)h除這個(gè)MappedFile,如果這個(gè)MappedFile最后一個(gè)unit 存儲(chǔ)的offset 小于 commitlog 最小的offset 的話就要銷(xiāo)毀了。接著就是銷(xiāo)毀,超時(shí)時(shí)間是1分鐘,最后是刪除引用。
2.3 indexFile 清理
最后我們來(lái)看下 indexFile的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:
private void deleteExpiredFiles() {
// 刪除間隔 100
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 獲取 commitLog 的最小offset
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
...
// todo 刪除 過(guò)期的 indexFile
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
/**
* 刪除 過(guò)期文件
* @param offset 最小的offset 小于這個(gè)offset都要?jiǎng)h除
*/
public void deleteExpiredFile(long offset) {
Object[] files = null;
try {
// 獲取讀鎖
this.readWriteLock.readLock().lock();
if (this.indexFileList.isEmpty()) {
return;
}
// 獲取第一個(gè)indexFile 的一個(gè)offset
long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
if (endPhyOffset < offset) {
files = this.indexFileList.toArray();
}
} catch (Exception e) {
log.error("destroy exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
if (files != null) {
// 找到需要?jiǎng)h除的indexFile
List<IndexFile> fileList = new ArrayList<IndexFile>();
for (int i = 0; i < (files.length - 1); i++) {
IndexFile f = (IndexFile) files[i];
if (f.getEndPhyOffset() < offset) {
fileList.add(f);
} else {
break;
}
}
// 刪除
this.deleteExpiredFile(fileList);
}
}
可以看到,先是拿第一個(gè)indexFile 看看有沒(méi)有小于commitlog 最小offset 的情況發(fā)生,這里也是拿的indexFile最后一個(gè)offset 做的對(duì)比,因?yàn)檫@塊也是按照offset大小 前后順序處理的,最后一個(gè)的offest 肯定是這個(gè)indexFile中最大的了,如果第一個(gè)indexFile滿足了的話,就會(huì)拿到所有引用,然后遍歷找出符合條件的indexFile, 調(diào)用deleteExpiredFile方法遍歷銷(xiāo)毀:
private void deleteExpiredFile(List<IndexFile> files) {
if (!files.isEmpty()) {
try {
this.readWriteLock.writeLock().lock();
for (IndexFile file : files) {
// 銷(xiāo)毀
boolean destroyed = file.destroy(3000);
// 從index 集合中移除
destroyed = destroyed && this.indexFileList.remove(file);
if (!destroyed) {
log.error("deleteExpiredFile remove failed.");
break;
}
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
}
}
這里就是遍歷銷(xiāo)毀,然后移除對(duì)這個(gè)indexFile管理。
3. 總結(jié)
本文主要是介紹了RocketMQ broker 消息清理機(jī)制,介紹了主要清理哪些文件 :commitlog ,ConsumeQueue,indexFile
接著就是介紹了什么時(shí)候觸發(fā)清理,比如說(shuō)凌晨4點(diǎn) ,磁盤(pán)沒(méi)滿85% 以上的話,就是清理72小時(shí)之前的,如果是滿了85%就除了還在用著的那個(gè)先清10個(gè)看看, 還有就是磁盤(pán)使用空間75% 以上也是會(huì)觸發(fā)的, 低于85 % 清理72小時(shí)之前的,高于85% 先清理10個(gè)文件看看,這是commitlog的清理機(jī)制,關(guān)于ConsumeQueue與indexFile的話,就是與commitlog中最小的那個(gè)offset 有關(guān)了,小于commitlog中最小offset 的那些還是要清理掉的。 最后就是分別解析了一下commitlog 文件清理,ConsumeQueue 文件清理與indexFile 文件清理。
參考
以上就是RocketMQ broker文件清理源碼解析的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ broker文件清理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中你絕對(duì)沒(méi)用過(guò)的一個(gè)關(guān)鍵字Record的使用
這篇文章主要給大家介紹一個(gè)?Java?中的一個(gè)關(guān)鍵字?Record,那?Record?關(guān)鍵字跟不可變類(lèi)有什么關(guān)系呢?看完今天的文章你就知道了,快跟隨小編一起學(xué)習(xí)一下吧2022-11-11
解讀nextLine().split(“[\\s]“)的意思
這篇文章主要介紹了解讀nextLine().split(“[\\s]“)的意思,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解
這篇文章主要為大家介紹了SpringBoot中對(duì)應(yīng)2.0.x版本的Redis配置詳解,文中的實(shí)現(xiàn)步驟講解詳細(xì),感興趣的小伙伴們可以了解一下2022-06-06
Java利用EasyExcel實(shí)現(xiàn)合并單元格
在某些業(yè)務(wù)場(chǎng)景中可能會(huì)有合并單元格的需求,本文將詳細(xì)為大家講解Java如何利用EasyExcel實(shí)現(xiàn)合并單元格,感興趣的小伙伴可以了解一下2022-06-06
Java使用Arrays.sort()方法實(shí)現(xiàn)給對(duì)象排序
這篇文章主要介紹了Java使用Arrays.sort()方法實(shí)現(xiàn)給對(duì)象排序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
一篇文章帶你Java Spring開(kāi)發(fā)入門(mén)
這篇文章主要為大家詳細(xì)介紹了Java Spring開(kāi)發(fā)入門(mén)學(xué)習(xí)教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來(lái)幫助2021-09-09
關(guān)于@Scheduled注解的任務(wù)為什么不執(zhí)行的問(wèn)題
這篇文章主要介紹了關(guān)于@Scheduled注解的任務(wù)為什么不執(zhí)行的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-09-09
java8新特性之stream流中reduce()求和知識(shí)總結(jié)
今天帶大家回顧Java8的新特性,文中對(duì)stream流中reduce()求和的相關(guān)知識(shí)作了詳細(xì)的介紹,對(duì)正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-05-05

