欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ源碼分析之Broker過(guò)期消息清理機(jī)制

 更新時(shí)間:2023年05月09日 10:54:44   作者:林師傅  
這篇文章主要為大家介紹了RocketMQ源碼分析之Broker過(guò)期消息清理機(jī)制示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

前言

前面文章講了消息是如何保存的以及consumeQueue與Index文件更新機(jī)制。隨著消息的增加,Broker不可能一直保存所有消息,Broker是按照什么規(guī)則清理消息的呢?被消費(fèi)過(guò)后的消息就會(huì)被清理掉嗎?下面我們來(lái)介紹Broker消息清理機(jī)制。

Broker消息清理機(jī)制簡(jiǎn)介

消息是被順序存儲(chǔ)在CommitLog文件中的,且消息長(zhǎng)度不定長(zhǎng),因此消息的清理不是以消息為單位進(jìn)行的,而是以CommitLog為單位進(jìn)行的。默認(rèn)情況下,Broker會(huì)清理單個(gè)CommitLog文件中最后一條消息超過(guò)72小時(shí)的CommitLog文件,除了用戶手動(dòng)清理為,下面幾種情況會(huì)被默認(rèn)清理。

  • CommitLog清理機(jī)制

    • CommitLog文件過(guò)期(72小時(shí)),且達(dá)到清理時(shí)間點(diǎn)(默認(rèn)為04:00~05:00),自動(dòng)清理過(guò)期的CommitLog文件

    • CommitLog文件過(guò)期(72小時(shí)),且CommitLog所在磁盤(pán)分區(qū)占用率已經(jīng)達(dá)到過(guò)期清理警戒線(默認(rèn)75%),無(wú)論是否到達(dá)清理時(shí)間點(diǎn)都會(huì)自動(dòng)清理過(guò)期文件

    • CommitLog所在磁盤(pán)分區(qū)占用率已經(jīng)達(dá)到清理警戒線(默認(rèn)85%),無(wú)論是否過(guò)期,都會(huì)從最早的文件開(kāi)始清理,一次最多清理10個(gè)文件

    • CommitLog所在磁盤(pán)分區(qū)占用率已經(jīng)達(dá)到系統(tǒng)危險(xiǎn)警戒線(默認(rèn)90%),Broker將拒絕消息寫(xiě)入

    • Broker至少會(huì)保留最新的CommitLog文件

  • ConsumeQueue清理機(jī)制

    • 如果ConsumeQueue文件關(guān)聯(lián)CommitLog都被清理,則清理此ConsumeQueue文件
    • Broker每個(gè)Topic-QueueId至少會(huì)保留最新的文件
  • IndexFile清理機(jī)制

    • 如果IndexFile所有索引單元關(guān)聯(lián)CommitLog都被清理,則清理此IndexFile

Broker與消息清理相關(guān)配置

# 文件自動(dòng)清理時(shí)間,單位H,默認(rèn)72
fileReservedTime=72
# CommitLog物理文件刪除間隔,但是ms,默認(rèn)100
deleteCommitLogFilesInterval = 100
# 文件自動(dòng)清理時(shí)間,默認(rèn)04,即凌晨4點(diǎn)
deleteWhen = "04"
# 硬盤(pán)占用率所在分區(qū)過(guò)期清理警戒線,超過(guò)這個(gè)值,無(wú)論是否到達(dá)清理時(shí)間,都會(huì)自動(dòng)清理過(guò)期文件
diskMaxUsedSpaceRatio = 75

消息清理機(jī)制源碼分析

消息定時(shí)清理的是由DefaultMessageStore類負(fù)責(zé)的,它在啟動(dòng)時(shí)(start)會(huì)調(diào)用DefaultMessageStore#addScheduleTask添加和消息存儲(chǔ)相關(guān)的定時(shí)任務(wù),其中就包括消息刪除相關(guān)的定時(shí)任務(wù)DefaultMessageStore.this.cleanFilesPeriodically(),這個(gè)定時(shí)任務(wù)在Broker啟動(dòng)后60s開(kāi)始,每隔10秒執(zhí)行一次

// org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // commitLog、consumeQueue和IndexFile定時(shí)刪除
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval()/*10s*/, TimeUnit.MILLISECONDS);
  	// ...
}

在cleanFilesPeriodically()中有兩個(gè)方法,cleanCommitLogService.run()負(fù)責(zé)清理CommitLog,cleanConsumeQueueService.run()負(fù)責(zé)清理ConsumeQueue和IndexFile。

// org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {
    // 清理CommitLog
    this.cleanCommitLogService.run();
    // 清理ConsumeQueue和IndexFile
    this.cleanConsumeQueueService.run();
}

CommitLog清理源碼分析

CommitLog清理方法CleanCommitLogService#run調(diào)用了CleanCommitLogService#deleteExpiredFiles,deleteExpiredFiles方法的核心代碼邏輯如下,以下三種情況會(huì)觸發(fā)CommitLog文件的刪除

  • 當(dāng)前時(shí)間是凌晨4點(diǎn)
  • CommitLog所在磁盤(pán)分區(qū)硬盤(pán)占用率超過(guò)75%
  • 手動(dòng)刪除CommitLog
// org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles
private void deleteExpiredFiles() {
    // 是否是凌晨4點(diǎn),用小時(shí)匹配[04:00,05:00)
    boolean timeup = this.isTimeToDelete();
    // >75%就會(huì)返回true,如果大于85%,則觸發(fā)強(qiáng)制刪除
    boolean spacefull = this.isSpaceToDelete();
    // 手動(dòng)刪除次數(shù)是否>0
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    if (timeup/*凌晨4點(diǎn)*/ || spacefull/*空間滿了*/ || manualDelete/*手動(dòng)刪除*/) {
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable()/*默認(rèn)true*/ && this.cleanImmediately/*空間占用超過(guò)85%,觸發(fā)強(qiáng)制刪除*/;
				// 刪除CommitLog
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

CommitLog的清理邏輯在MappedFileQueue#deleteExpiredFileByTime,其核心代碼如下所示,主要分為下面幾個(gè)步驟

  • 復(fù)制MappedFileQueue中的mappedFiles,循環(huán)處理刪除邏輯,循環(huán)mfsLength-1次,也就是無(wú)論如何都會(huì)保留最新的MappedFile
  • 如果MappedFile的最后修改時(shí)間超過(guò)72小時(shí)或CommitLog所在磁盤(pán)分區(qū)硬盤(pán)占用率超過(guò)85%觸發(fā)強(qiáng)制刪除MappedFile,則會(huì)刪除MappedFile,每次刪除最多刪除10個(gè)MappedFile,相鄰MappedFile刪除時(shí)間間隔默認(rèn)100ms
  • 刪除MappedFileQueue的mappedFiles數(shù)組中已刪除的MappedFile,并返回刪除MappedFile的數(shù)量
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {
    // 復(fù)制一份當(dāng)前mappedFile
    Object[] mfs = this.copyMappedFiles(0);
		// 會(huì)保留最后一個(gè)MappedFile
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 最后修改時(shí)間+過(guò)期時(shí)間
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            // 如果commitLog所在磁盤(pán)分區(qū)總?cè)萘砍^(guò)85%,觸發(fā)立即刪除,或者超過(guò)了72小時(shí)的mappedFile
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                // 刪除mappedFile
                if (mappedFile.destroy(intervalForcibly)) {
                    files.add(mappedFile);
                    deleteCount++;
                    // 一次最多刪除10個(gè)mappedFile
                    if (files.size() >= DELETE_FILES_BATCH_MAX/*10*/) {
                        break;
                    }
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            // 刪除文件時(shí)間間隔,默認(rèn)100ms
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }
    // 從MappedFileQueue的mappedFiles中刪除這個(gè)mappedFile
    deleteExpiredFile(files);
    return deleteCount;
}

ConsumeQueue和IndexFile清理源碼分析

ConsumeQueue和IndexFile清理方法CleanConsumeQueueService#run調(diào)用了CleanConsumeQueueService#deleteExpiredFiles方法清理ConsumeQueue和IndexFile。CleanConsumeQueueService#deleteExpiredFiles核心代碼如下,包括兩個(gè)主要邏輯

  • 遍歷consumeQueueTable中的ConsumeQueue,調(diào)用ConsumeQueue#deleteExpiredFile刪除過(guò)期ConsumeQueue
  • 調(diào)用IndexService#deleteExpiredFile刪除過(guò)期IndexFile
// org.apache.rocketmq.store.DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles
private void deleteExpiredFiles() {
    if (minOffset > this.lastPhysicalMinOffset) {
        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
				// 遍歷ConsumeQueue
        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 刪除consumeQueue
                int deleteCount = logic.deleteExpiredFile(minOffset);
								// ... 間隔100ms
            }
        }
				// 刪除indexFile
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

ConsumeQueue文件清理

ConsumeQueue文件底層也是MappedFile,清理ConsumeQueue調(diào)用MappedFileQueue#deleteExpiredFileByOffset清理ConsumeQueue的過(guò)期MappedFile,源碼如下,核心邏輯

  • 復(fù)制MappedFileQueue中的mappedFiles,循環(huán)處理清理邏輯,循環(huán)mfsLength-1次,也就是無(wú)論如何都會(huì)保留最新的MappedFile
  • 如果ConsumeQueue的MappedFile最后一個(gè)存儲(chǔ)單元對(duì)應(yīng)消息在CommitLog中的偏移量小于CommitLog的最小偏移量,說(shuō)明當(dāng)前MappedFile所有存儲(chǔ)單元對(duì)應(yīng)所有CommitLog的消息都已經(jīng)被清理,因此調(diào)用MappedFile#destroy清理當(dāng)前MappedFile
  • 刪除MappedFileQueue緩存的mappedFiles列表中已經(jīng)被清理MappedFile
// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByOffset
public int deleteExpiredFileByOffset(long offset, int unitSize) {
    // 復(fù)制一份mappedFiles
    Object[] mfs = this.copyMappedFiles(0);
    List<MappedFile> files = new ArrayList<MappedFile>();
    int deleteCount = 0;
    if (null != mfs) {
        int mfsLength = mfs.length - 1;
        for (int i = 0; i < mfsLength; i++) {
            boolean destroy;
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 取consumeQueue最后一條消息Buffer切片
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
            if (result != null) {
                // consumeQueue最后一個(gè)存儲(chǔ)單元消息在commitLog的偏移量
                long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                result.release();
                // 如果consumeQueue最后一條消息已經(jīng)小于commitLog的最小offset,則說(shuō)明要?jiǎng)h除了
                destroy = maxOffsetInLogicQueue < offset;
                if (destroy) {
                    log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                        + maxOffsetInLogicQueue + ", delete it");
                }
            } 
            // 刪除ConsumeQueue的MappedFile
            if (destroy && mappedFile.destroy(1000 * 60)) {
                files.add(mappedFile);
                deleteCount++;
            } else {
                break;
            }
        }
    }
    // 刪除MappedFileQueue的mappedFiles列表中已經(jīng)刪除的MappedFile
    deleteExpiredFile(files);
    return deleteCount;
}

IndexFile清理

IndexFile清理邏輯與ConsumeQueue類似,都是刪除文件中關(guān)聯(lián)的CommitLog消息全部被刪除的文件。核心邏輯包括下面兩個(gè)

  • 獲取IndexFileList中所有最大offset小于CommitLog最小offset的IndexFile
  • 刪除過(guò)期的IndexFile
// org.apache.rocketmq.store.index.IndexService#deleteExpiredFile(long)
public void deleteExpiredFile(long offset) {
    Object[] files = null;
    try {
        // indexFileList的第一個(gè)索引文件的最后一個(gè)offset
        long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
        if (endPhyOffset < offset) {
            files = this.indexFileList.toArray();
        }
    } 
    if (files != null) {
        List<IndexFile> fileList = new ArrayList<IndexFile>();
        for (int i = 0; i < (files.length - 1); i++) {
            IndexFile f = (IndexFile) files[i];
          	// IndexFile中最大的offset小于CommitLog最小offset,說(shuō)明文件可以被刪除
            if (f.getEndPhyOffset() < offset) {
                fileList.add(f);
            } else {
                break;
            }
        }
        // 刪除過(guò)期的IndexFile,并將其從indexFileList緩存中刪除
        this.deleteExpiredFile(fileList);
    }
}

總結(jié)

Broker消息清理機(jī)制由DefaultMessageStore負(fù)責(zé),CommitLog、ConsumeQueue和IndexFile的清理都是按照文件顆粒度進(jìn)行。

每10s檢查一次,通常情況下每天凌晨4點(diǎn)刪除超過(guò)72小時(shí)的CommitLog;如果CommitLog所在磁盤(pán)分區(qū)的磁盤(pán)占用率超過(guò)75%,則會(huì)觸發(fā)CommitLog文件清理;如果CommitLog所在磁盤(pán)分區(qū)的磁盤(pán)占用率超過(guò)85%,則會(huì)強(qiáng)制刪除CommitLog文件;

如果ConsumeQueue和IndexFile關(guān)聯(lián)CommitLog都被刪除,ConsumeQueue文件和IndexFile也會(huì)被清理。

以上就是RocketMQ | 源碼分析】Broker過(guò)期消息清理機(jī)制的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ | 源碼分析】Broker過(guò)期消息清理機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 深入理解SpringBoot?最大連接數(shù)及最大并發(fā)數(shù)

    深入理解SpringBoot?最大連接數(shù)及最大并發(fā)數(shù)

    SpringBoot能支持的最大并發(fā)量主要看其對(duì)Tomcat的設(shè)置,可以在配置文件中對(duì)其進(jìn)行更改,本文就來(lái)介紹一下SpringBoot?最大連接數(shù)及最大并發(fā)數(shù),感興趣的可以了解一下
    2023-08-08
  • springBoo3.0集成knife4j4.1.0的詳細(xì)教程(swagger3)

    springBoo3.0集成knife4j4.1.0的詳細(xì)教程(swagger3)

    這篇文章主要介紹了springBoo3.0集成knife4j4.1.0的詳細(xì)教程(swagger3),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-07-07
  • Spring中的Schedule動(dòng)態(tài)添加修改定時(shí)任務(wù)詳解

    Spring中的Schedule動(dòng)態(tài)添加修改定時(shí)任務(wù)詳解

    這篇文章主要介紹了Spring中的Schedule動(dòng)態(tài)添加修改定時(shí)任務(wù)詳解,可能有人會(huì)問(wèn),為啥不用Quartz,Quartz自然是非常方便強(qiáng)大的,但不是本篇要講的內(nèi)容,本篇就偏要使用SpringSchedule來(lái)實(shí)現(xiàn)動(dòng)態(tài)的cron表達(dá)式任務(wù),需要的朋友可以參考下
    2023-11-11
  • java調(diào)用webservice的.asmx接口的使用步驟

    java調(diào)用webservice的.asmx接口的使用步驟

    這篇文章主要介紹了java調(diào)用webservice的.asmx接口的使用步驟,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-09-09
  • MyBatis連接池的深入和動(dòng)態(tài)SQL詳解

    MyBatis連接池的深入和動(dòng)態(tài)SQL詳解

    這篇文章主要介紹了MyBatis連接池的深入和動(dòng)態(tài)SQL詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 詳解Java數(shù)據(jù)庫(kù)連接JDBC基礎(chǔ)知識(shí)(操作數(shù)據(jù)庫(kù):增刪改查)

    詳解Java數(shù)據(jù)庫(kù)連接JDBC基礎(chǔ)知識(shí)(操作數(shù)據(jù)庫(kù):增刪改查)

    這篇文章主要介紹了詳解Java數(shù)據(jù)庫(kù)連接JDBC基礎(chǔ)知識(shí)(操作數(shù)據(jù)庫(kù):增刪改查),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-01-01
  • Java實(shí)現(xiàn)大數(shù)運(yùn)算的實(shí)例代碼

    Java實(shí)現(xiàn)大數(shù)運(yùn)算的實(shí)例代碼

    這篇文章主要介紹了Java實(shí)現(xiàn)大數(shù)運(yùn)算的實(shí)例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-06-06
  • Java上傳文件到服務(wù)器指定文件夾實(shí)現(xiàn)過(guò)程圖解

    Java上傳文件到服務(wù)器指定文件夾實(shí)現(xiàn)過(guò)程圖解

    這篇文章主要介紹了Java上傳文件到服務(wù)器指定文件夾實(shí)現(xiàn)過(guò)程圖解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08
  • 一文帶你搞懂Java中i++ 和 ++i的區(qū)別

    一文帶你搞懂Java中i++ 和 ++i的區(qū)別

    在Java中,i++和++i都用于遞增變量i的值,但它們之間有一個(gè)細(xì)微的區(qū)別,i++是后綴遞增操作符,++i是前綴遞增操作符,在大多數(shù)情況下,這兩種遞增操作的結(jié)果都是一樣的,但在某些特定的表達(dá)式和邏輯中,它們可能會(huì)產(chǎn)生不同的效果,本文將帶大家搞清Java中i++ 和 ++i的區(qū)別
    2023-09-09
  • Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之二叉堆

    Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之二叉堆

    二叉堆是一種特殊的堆,其實(shí)質(zhì)是完全二叉樹(shù)。二叉堆有兩種:最大堆和最小堆。最大堆是指父節(jié)點(diǎn)鍵值總是大于或等于任何一個(gè)子節(jié)點(diǎn)的鍵值。而最小堆恰恰相反,指的是父節(jié)點(diǎn)鍵值總是小于任何一個(gè)子節(jié)點(diǎn)的鍵值
    2022-02-02

最新評(píng)論