欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ設(shè)計(jì)之異步刷盤

 更新時(shí)間:2022年03月21日 10:29:20   作者:周杰倫本人  
本文介紹RocketMQ設(shè)計(jì)之異步刷盤,RocketMQ消息存儲(chǔ)到磁盤上,這樣既保證斷電后恢復(fù),也讓存儲(chǔ)消息量超出內(nèi)存限制,RocketMQ為了提高性能,會(huì)盡可能保證磁盤順序?qū)?消息通過Producer寫入RocketMQ的時(shí)候,有兩種方式,上篇介紹了同步刷盤,本文介紹異步刷盤,需要的朋友可以參考下

上一篇RocketMQ設(shè)計(jì)之同步刷盤

異步刷盤方式:在返回寫成功狀態(tài)時(shí),消息可能只是被寫入了內(nèi)存的PAGECACHE,寫操作的返回快,吞吐量大;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫磁盤操作,快速寫入

RocketMQ默認(rèn)采用異步刷盤,異步刷盤兩種策略:開啟緩沖池,不開啟緩沖池

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
? ? // Synchronization flush
? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
? ? ? ? if (messageExt.isWaitStoreMsgOK()) {
? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
? ? ? ? ? ? service.putRequest(request);
? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString());
? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? service.wakeup();
? ? ? ? }
? ? }
? ? // Asynchronous flush
? ? else {
? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
? ? ? ? ? ? flushCommitLogService.wakeup();
? ? ? ? } else {
? ? ? ? ? ? commitLogService.wakeup();
? ? ? ? }
? ? }
}

不開啟緩沖池:默認(rèn)不開啟,刷盤線程FlushRealTimeService會(huì)每間隔500毫秒嘗試去刷盤。

class FlushRealTimeService extends FlushCommitLogService {
? ? private long lastFlushTimestamp = 0;
? ? private long printTimes = 0;

? ? public void run() {
? ? ? ? CommitLog.log.info(this.getServiceName() + " service started");

? ? ? ? while (!this.isStopped()) {
? ? ? ? ? ? boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

? ? ? ? ? ? //每次Flush間隔500毫秒
? ? ? ? ? ? int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
? ? ? ? ? ? //每次Flush最少4頁內(nèi)存數(shù)據(jù)(16KB)
? ? ? ? ? ? int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

? ? ? ? ? ??? ?//距離上次刷盤時(shí)間閾值為10秒
? ? ? ? ? ? int flushPhysicQueueThoroughInterval =
? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

? ? ? ? ? ? boolean printFlushProgress = false;

? ? ? ? ? ? // Print flush progress
? ? ? ? ? ? long currentTimeMillis = System.currentTimeMillis();
? ? ? ? ? ? if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
? ? ? ? ? ? ? ? this.lastFlushTimestamp = currentTimeMillis;
? ? ? ? ? ? ? ? flushPhysicQueueLeastPages = 0;
? ? ? ? ? ? ? ? printFlushProgress = (printTimes++ % 10) == 0;
? ? ? ? ? ? }

? ? ? ? ? ? try {
? ? ? ? ? ? ? ? if (flushCommitLogTimed) {
? ? ? ? ? ? ? ? ? ? Thread.sleep(interval);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? this.waitForRunning(interval);
? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (printFlushProgress) {
? ? ? ? ? ? ? ? ? ? this.printFlushProgress();
? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? long begin = System.currentTimeMillis();
? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? long past = System.currentTimeMillis() - begin;
? ? ? ? ? ? ? ? if (past > 500) {
? ? ? ? ? ? ? ? ? ? log.info("Flush data to disk costs {} ms", past);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (Throwable e) {
? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
? ? ? ? ? ? ? ? this.printFlushProgress();
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? // Normal shutdown, to ensure that all the flush before exit
? ? ? ? boolean result = false;
? ? ? ? for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
? ? ? ? ? ? result = CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
? ? ? ? }

? ? ? ? this.printFlushProgress();

? ? ? ? CommitLog.log.info(this.getServiceName() + " service end");
? ? }

? ? @Override
? ? public String getServiceName() {
? ? ? ? return FlushRealTimeService.class.getSimpleName();
? ? }

? ? private void printFlushProgress() {
? ? ? ? // CommitLog.log.info("how much disk fall behind memory, "
? ? ? ? // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
? ? }

? ? @Override
? ? public long getJointime() {
? ? ? ? return 1000 * 60 * 5;
? ? }
}
  • 判斷是否超過10秒沒刷盤了,如果超過強(qiáng)制刷盤
  • 等待Flush間隔500ms
  • 通過MappedFile刷盤
  • 設(shè)置StoreCheckpoint刷盤時(shí)間點(diǎn)
  • 超過500ms的刷盤記錄日志
  • Broker正常停止前,把內(nèi)存page中的數(shù)據(jù)刷盤

開啟緩沖池:

class CommitRealTimeService extends FlushCommitLogService {

? ? private long lastCommitTimestamp = 0;

? ? @Override
? ? public String getServiceName() {
? ? ? ? return CommitRealTimeService.class.getSimpleName();
? ? }

? ? @Override
? ? public void run() {
? ? ? ? CommitLog.log.info(this.getServiceName() + " service started");
? ? ? ? while (!this.isStopped()) {
? ? ? ? ? ? //每次提交間隔200毫秒
? ? ? ? ? ? int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

? ? ? ? ? ? //每次提交最少4頁內(nèi)存數(shù)據(jù)(16KB)
? ? ? ? ? ? int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

? ? ? ? ? ? //距離上次提交時(shí)間閾值為200毫秒
? ? ? ? ? ? int commitDataThoroughInterval =
? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

? ? ? ? ? ? long begin = System.currentTimeMillis();
? ? ? ? ? ? if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
? ? ? ? ? ? ? ? this.lastCommitTimestamp = begin;
? ? ? ? ? ? ? ? commitDataLeastPages = 0;
? ? ? ? ? ? }

? ? ? ? ? ? try {
? ? ? ? ? ? ? ? boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
? ? ? ? ? ? ? ? long end = System.currentTimeMillis();
? ? ? ? ? ? ? ? if (!result) {
? ? ? ? ? ? ? ? ? ? this.lastCommitTimestamp = end; // result = false means some data committed.
? ? ? ? ? ? ? ? ? ? //now wake up flush thread.
? ? ? ? ? ? ? ? ? ? flushCommitLogService.wakeup();
? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? if (end - begin > 500) {
? ? ? ? ? ? ? ? ? ? log.info("Commit data to file costs {} ms", end - begin);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? this.waitForRunning(interval);
? ? ? ? ? ? } catch (Throwable e) {
? ? ? ? ? ? ? ? CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? boolean result = false;
? ? ? ? for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
? ? ? ? ? ? result = CommitLog.this.mappedFileQueue.commit(0);
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
? ? ? ? }
? ? ? ? CommitLog.log.info(this.getServiceName() + " service end");
? ? }
}

RocketMQ申請(qǐng)一塊和CommitLog文件相同大小的堆外內(nèi)存來做緩沖池,數(shù)據(jù)會(huì)先寫入緩沖池,提交線程CommitRealTimeService也每間隔500毫秒嘗試提交到文件通道等待刷盤,刷盤最終由FlushRealTimeService來完成,和不開啟緩沖池的處理一致。使用緩沖池的目的是多條數(shù)據(jù)合并寫入,從而提高io性能。

  • 判斷是否超過200毫秒沒提交,需要強(qiáng)制提交
  • 提交到MappedFile,此時(shí)還未刷盤
  • 然后喚醒刷盤線程
  • 在Broker正常停止前,提交內(nèi)存page中的數(shù)據(jù)

到此這篇關(guān)于RocketMQ設(shè)計(jì)之異步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ異步刷盤內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java利用Dijkstra算法求解拓?fù)潢P(guān)系最短路徑

    Java利用Dijkstra算法求解拓?fù)潢P(guān)系最短路徑

    迪杰斯特拉算法(Dijkstra)是由荷蘭計(jì)算機(jī)科學(xué)迪家迪杰斯特拉于1959年提出的,因此又叫狄克斯特拉算法。本文將利用迪克斯特拉(Dijkstra)算法求拓?fù)潢P(guān)系最短路徑,感興趣的可以了解一下
    2022-07-07
  • Java定義隊(duì)列結(jié)構(gòu),并實(shí)現(xiàn)入隊(duì)、出隊(duì)操作完整示例

    Java定義隊(duì)列結(jié)構(gòu),并實(shí)現(xiàn)入隊(duì)、出隊(duì)操作完整示例

    這篇文章主要介紹了Java定義隊(duì)列結(jié)構(gòu),并實(shí)現(xiàn)入隊(duì)、出隊(duì)操作,結(jié)合完整實(shí)例形式分析了java數(shù)據(jù)結(jié)構(gòu)中隊(duì)列的定義、入隊(duì)、出隊(duì)、判斷隊(duì)列是否為空、打印隊(duì)列元素等相關(guān)操作技巧,需要的朋友可以參考下
    2020-02-02
  • java實(shí)現(xiàn)靜默加載Class示例代碼

    java實(shí)現(xiàn)靜默加載Class示例代碼

    這篇文章主要給大家介紹了關(guān)于java實(shí)現(xiàn)靜默加載Class的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-10-10
  • SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼

    SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼

    這篇文章主要介紹了SpringBoot中使用Cookie實(shí)現(xiàn)記住登錄的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • Java反應(yīng)式框架Reactor中的Mono和Flux

    Java反應(yīng)式框架Reactor中的Mono和Flux

    這篇文章主要介紹了Java反應(yīng)式框架Reactor中的Mono和Flux,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-07-07
  • JavaWeb dbutils執(zhí)行sql命令并遍歷結(jié)果集時(shí)不能查到內(nèi)容的原因分析

    JavaWeb dbutils執(zhí)行sql命令并遍歷結(jié)果集時(shí)不能查到內(nèi)容的原因分析

    這篇文章主要介紹了JavaWeb dbutils執(zhí)行sql命令并遍歷結(jié)果集時(shí)不能查到內(nèi)容的原因分析及簡單處理方法,文中給大家介紹了javaweb中dbutils的使用,需要的朋友可以參考下
    2017-12-12
  • 詳解Java中JSON數(shù)據(jù)的生成與解析

    詳解Java中JSON數(shù)據(jù)的生成與解析

    今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識(shí),文章圍繞著Java中JSON數(shù)據(jù)的生成與解析展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • springboot @Controller和@RestController的區(qū)別及應(yīng)用詳解

    springboot @Controller和@RestController的區(qū)別及應(yīng)用詳解

    這篇文章主要介紹了springboot @Controller和@RestController的區(qū)別及應(yīng)用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Mybatis中注入執(zhí)行sql查詢、更新、新增及建表語句案例代碼

    Mybatis中注入執(zhí)行sql查詢、更新、新增及建表語句案例代碼

    這篇文章主要介紹了Mybatis中注入執(zhí)行sql查詢、更新、新增以及建表語句,主要說明一個(gè)另類的操作,注入sql,并使用mybatis執(zhí)行,結(jié)合案例代碼詳解講解,需要的朋友可以參考下
    2023-02-02
  • Java中的ReadWriteLock讀寫鎖詳解

    Java中的ReadWriteLock讀寫鎖詳解

    這篇文章主要介紹了Java中的ReadWriteLock讀寫鎖詳解,ReadWriteLock也是一個(gè)接口,提供了readLock和writeLock兩種鎖的操作機(jī)制,一個(gè)資源可以被多個(gè)線程同時(shí)讀,或者被一個(gè)線程寫,但是不能同時(shí)存在讀和寫線程,需要的朋友可以參考下
    2023-12-12

最新評(píng)論