RocketMQ設(shè)計(jì)之異步刷盤
異步刷盤方式:在返回寫成功狀態(tài)時(shí),消息可能只是被寫入了內(nèi)存的PAGECACHE
,寫操作的返回快,吞吐量大;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤操作,快速寫入
RocketMQ
默認(rèn)采用異步刷盤,異步刷盤兩種策略:開啟緩沖池,不開啟緩沖池
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { ? ? // Synchronization flush ? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { ? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; ? ? ? ? if (messageExt.isWaitStoreMsgOK()) { ? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); ? ? ? ? ? ? service.putRequest(request); ? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); ? ? ? ? ? ? if (!flushOK) { ? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() ? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString()); ? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); ? ? ? ? ? ? } ? ? ? ? } else { ? ? ? ? ? ? service.wakeup(); ? ? ? ? } ? ? } ? ? // Asynchronous flush ? ? else { ? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { ? ? ? ? ? ? flushCommitLogService.wakeup(); ? ? ? ? } else { ? ? ? ? ? ? commitLogService.wakeup(); ? ? ? ? } ? ? } }
不開啟緩沖池:默認(rèn)不開啟,刷盤線程FlushRealTimeService
會(huì)每間隔500毫秒嘗試去刷盤。
class FlushRealTimeService extends FlushCommitLogService { ? ? private long lastFlushTimestamp = 0; ? ? private long printTimes = 0; ? ? public void run() { ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started"); ? ? ? ? while (!this.isStopped()) { ? ? ? ? ? ? boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); ? ? ? ? ? ? //每次Flush間隔500毫秒 ? ? ? ? ? ? int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); ? ? ? ? ? ? //每次Flush最少4頁內(nèi)存數(shù)據(jù)(16KB) ? ? ? ? ? ? int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); ? ? ? ? ? ??? ?//距離上次刷盤時(shí)間閾值為10秒 ? ? ? ? ? ? int flushPhysicQueueThoroughInterval = ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); ? ? ? ? ? ? boolean printFlushProgress = false; ? ? ? ? ? ? // Print flush progress ? ? ? ? ? ? long currentTimeMillis = System.currentTimeMillis(); ? ? ? ? ? ? if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { ? ? ? ? ? ? ? ? this.lastFlushTimestamp = currentTimeMillis; ? ? ? ? ? ? ? ? flushPhysicQueueLeastPages = 0; ? ? ? ? ? ? ? ? printFlushProgress = (printTimes++ % 10) == 0; ? ? ? ? ? ? } ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? if (flushCommitLogTimed) { ? ? ? ? ? ? ? ? ? ? Thread.sleep(interval); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? this.waitForRunning(interval); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? if (printFlushProgress) { ? ? ? ? ? ? ? ? ? ? this.printFlushProgress(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? long begin = System.currentTimeMillis(); ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); ? ? ? ? ? ? ? ? if (storeTimestamp > 0) { ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? long past = System.currentTimeMillis() - begin; ? ? ? ? ? ? ? ? if (past > 500) { ? ? ? ? ? ? ? ? ? ? log.info("Flush data to disk costs {} ms", past); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } catch (Throwable e) { ? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); ? ? ? ? ? ? ? ? this.printFlushProgress(); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? // Normal shutdown, to ensure that all the flush before exit ? ? ? ? boolean result = false; ? ? ? ? for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { ? ? ? ? ? ? result = CommitLog.this.mappedFileQueue.flush(0); ? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); ? ? ? ? } ? ? ? ? this.printFlushProgress(); ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end"); ? ? } ? ? @Override ? ? public String getServiceName() { ? ? ? ? return FlushRealTimeService.class.getSimpleName(); ? ? } ? ? private void printFlushProgress() { ? ? ? ? // CommitLog.log.info("how much disk fall behind memory, " ? ? ? ? // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); ? ? } ? ? @Override ? ? public long getJointime() { ? ? ? ? return 1000 * 60 * 5; ? ? } }
- 判斷是否超過10秒沒刷盤了,如果超過強(qiáng)制刷盤
- 等待Flush間隔500ms
- 通過
MappedFile
刷盤 - 設(shè)置
StoreCheckpoint
刷盤時(shí)間點(diǎn) - 超過500ms的刷盤記錄日志
- Broker正常停止前,把內(nèi)存page中的數(shù)據(jù)刷盤
開啟緩沖池:
class CommitRealTimeService extends FlushCommitLogService { ? ? private long lastCommitTimestamp = 0; ? ? @Override ? ? public String getServiceName() { ? ? ? ? return CommitRealTimeService.class.getSimpleName(); ? ? } ? ? @Override ? ? public void run() { ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started"); ? ? ? ? while (!this.isStopped()) { ? ? ? ? ? ? //每次提交間隔200毫秒 ? ? ? ? ? ? int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); ? ? ? ? ? ? //每次提交最少4頁內(nèi)存數(shù)據(jù)(16KB) ? ? ? ? ? ? int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); ? ? ? ? ? ? //距離上次提交時(shí)間閾值為200毫秒 ? ? ? ? ? ? int commitDataThoroughInterval = ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); ? ? ? ? ? ? long begin = System.currentTimeMillis(); ? ? ? ? ? ? if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { ? ? ? ? ? ? ? ? this.lastCommitTimestamp = begin; ? ? ? ? ? ? ? ? commitDataLeastPages = 0; ? ? ? ? ? ? } ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); ? ? ? ? ? ? ? ? long end = System.currentTimeMillis(); ? ? ? ? ? ? ? ? if (!result) { ? ? ? ? ? ? ? ? ? ? this.lastCommitTimestamp = end; // result = false means some data committed. ? ? ? ? ? ? ? ? ? ? //now wake up flush thread. ? ? ? ? ? ? ? ? ? ? flushCommitLogService.wakeup(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? if (end - begin > 500) { ? ? ? ? ? ? ? ? ? ? log.info("Commit data to file costs {} ms", end - begin); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? this.waitForRunning(interval); ? ? ? ? ? ? } catch (Throwable e) { ? ? ? ? ? ? ? ? CommitLog.log.error(this.getServiceName() + " service has exception. ", e); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? boolean result = false; ? ? ? ? for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { ? ? ? ? ? ? result = CommitLog.this.mappedFileQueue.commit(0); ? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); ? ? ? ? } ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end"); ? ? } }
RocketMQ
申請(qǐng)一塊和CommitLog
文件相同大小的堆外內(nèi)存來做緩沖池,數(shù)據(jù)會(huì)先寫入緩沖池,提交線程CommitRealTimeService也每間隔500毫秒嘗試提交到文件通道等待刷盤,刷盤最終由FlushRealTimeService
來完成,和不開啟緩沖池的處理一致。使用緩沖池的目的是多條數(shù)據(jù)合并寫入,從而提高io性能。
- 判斷是否超過200毫秒沒提交,需要強(qiáng)制提交
- 提交到
MappedFile
,此時(shí)還未刷盤 - 然后喚醒刷盤線程
- 在Broker正常停止前,提交內(nèi)存page中的數(shù)據(jù)
到此這篇關(guān)于RocketMQ設(shè)計(jì)之異步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ異步刷盤內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java利用Dijkstra算法求解拓?fù)潢P(guān)系最短路徑
迪杰斯特拉算法(Dijkstra)是由荷蘭計(jì)算機(jī)科學(xué)迪家迪杰斯特拉于1959年提出的,因此又叫狄克斯特拉算法。本文將利用迪克斯特拉(Dijkstra)算法求拓?fù)潢P(guān)系最短路徑,感興趣的可以了解一下2022-07-07Java定義隊(duì)列結(jié)構(gòu),并實(shí)現(xiàn)入隊(duì)、出隊(duì)操作完整示例
這篇文章主要介紹了Java定義隊(duì)列結(jié)構(gòu),并實(shí)現(xiàn)入隊(duì)、出隊(duì)操作,結(jié)合完整實(shí)例形式分析了java數(shù)據(jù)結(jié)構(gòu)中隊(duì)列的定義、入隊(duì)、出隊(duì)、判斷隊(duì)列是否為空、打印隊(duì)列元素等相關(guān)操作技巧,需要的朋友可以參考下2020-02-02java實(shí)現(xiàn)靜默加載Class示例代碼
這篇文章主要給大家介紹了關(guān)于java實(shí)現(xiàn)靜默加載Class的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼
這篇文章主要介紹了SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07Java反應(yīng)式框架Reactor中的Mono和Flux
這篇文章主要介紹了Java反應(yīng)式框架Reactor中的Mono和Flux,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07JavaWeb dbutils執(zhí)行sql命令并遍歷結(jié)果集時(shí)不能查到內(nèi)容的原因分析
這篇文章主要介紹了JavaWeb dbutils執(zhí)行sql命令并遍歷結(jié)果集時(shí)不能查到內(nèi)容的原因分析及簡單處理方法,文中給大家介紹了javaweb中dbutils的使用,需要的朋友可以參考下2017-12-12springboot @Controller和@RestController的區(qū)別及應(yīng)用詳解
這篇文章主要介紹了springboot @Controller和@RestController的區(qū)別及應(yīng)用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Mybatis中注入執(zhí)行sql查詢、更新、新增及建表語句案例代碼
這篇文章主要介紹了Mybatis中注入執(zhí)行sql查詢、更新、新增以及建表語句,主要說明一個(gè)另類的操作,注入sql,并使用mybatis執(zhí)行,結(jié)合案例代碼詳解講解,需要的朋友可以參考下2023-02-02