欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ設(shè)計(jì)之同步刷盤

 更新時(shí)間:2022年03月21日 10:17:40   作者:周杰倫本人  
這篇文章主要介紹了RocketMQ設(shè)計(jì)之同步刷盤,文章主要通過CommitLog的handleDiskFlush方法展開全文內(nèi)容,實(shí)現(xiàn)同步刷盤,下面文章詳細(xì)介紹,需要的小伙伴可以參考一下

同步刷盤方式:在返回寫成功狀態(tài)時(shí),消息已經(jīng)被寫入磁盤。具體流程是,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。

在同步刷盤模式下,當(dāng)消息寫到內(nèi)存后,會等待數(shù)據(jù)寫到磁盤的CommitLog文件。

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
? ? // Synchronization flush
? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
? ? ? ? if (messageExt.isWaitStoreMsgOK()) {
? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
? ? ? ? ? ? service.putRequest(request);
? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString());
? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? service.wakeup();
? ? ? ? }
? ? }
? ? // Asynchronous flush
? ? else {
? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
? ? ? ? ? ? flushCommitLogService.wakeup();
? ? ? ? } else {
? ? ? ? ? ? commitLogService.wakeup();
? ? ? ? }
? ? }
}


class GroupCommitService extends FlushCommitLogService {
? ? ? ? private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
? ? ? ? private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

? ? ?? ?//提交刷盤任務(wù)到任務(wù)列表
? ? ? ? public synchronized void putRequest(final GroupCommitRequest request) {
? ? ? ? ? ? synchronized (this.requestsWrite) {
? ? ? ? ? ? ? ? this.requestsWrite.add(request);
? ? ? ? ? ? }
? ? ? ? ? ? if (hasNotified.compareAndSet(false, true)) {
? ? ? ? ? ? ? ? waitPoint.countDown(); // notify
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? private void swapRequests() {
? ? ? ? ? ? List<GroupCommitRequest> tmp = this.requestsWrite;
? ? ? ? ? ? this.requestsWrite = this.requestsRead;
? ? ? ? ? ? this.requestsRead = tmp;
? ? ? ? }

? ? ? ? private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }

? ? ? ? public void run() {
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started");

? ? ? ? ? ? while (!this.isStopped()) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? this.waitForRunning(10);
? ? ? ? ? ? ? ? ? ? this.doCommit();
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }

? ? ? ? ? ? // Under normal circumstances shutdown, wait for the arrival of the
? ? ? ? ? ? // request, and then flush
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? CommitLog.log.warn("GroupCommitService Exception, ", e);
? ? ? ? ? ? }

? ? ? ? ? ? synchronized (this) {
? ? ? ? ? ? ? ? this.swapRequests();
? ? ? ? ? ? }

? ? ? ? ? ? this.doCommit();

? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end");
? ? ? ? }

? ? ? ? @Override
? ? ? ? protected void onWaitEnd() {
? ? ? ? ? ? this.swapRequests();
? ? ? ? }

? ? ? ? @Override
? ? ? ? public String getServiceName() {
? ? ? ? ? ? return GroupCommitService.class.getSimpleName();
? ? ? ? }

? ? ? ? @Override
? ? ? ? public long getJointime() {
? ? ? ? ? ? return 1000 * 60 * 5;
? ? ? ? }
? ? }

GroupCommitRequest是刷盤任務(wù),提交刷盤任務(wù)后,會在刷盤隊(duì)列中等待刷盤,而刷盤線程

GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。之所以不直接寫是磁盤io壓力大,寫入性能低,每隔10毫秒寫一次可以提升磁盤io效率和寫入性能。

  • putRequest(request) 提交刷盤任務(wù)到任務(wù)列表
  • request.waitForFlush同步等待GroupCommitService將任務(wù)列表中的任務(wù)刷盤完成。

兩個(gè)隊(duì)列讀寫分離,requestsWrite是寫隊(duì)列,用戶保存添加進(jìn)來的刷盤任務(wù),requestsRead是讀隊(duì)列,在刷盤之前會把寫隊(duì)列的數(shù)據(jù)放入讀隊(duì)列。

CommitLog的doCommit方法:

private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? //根據(jù)offset確定是否已經(jīng)刷盤
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }
?? ??? ??? ??? ??? ?//清空已刷盤的列表
? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
  • 刷盤的時(shí)候依次讀取requestsRead中的數(shù)據(jù)寫入磁盤,
  • 寫入完成后清空requestsRead。

讀寫分離設(shè)計(jì)的目的是在刷盤時(shí)不影響任務(wù)提交到列表。

CommitLog.this.mappedFileQueue.flush(0);是刷盤操作:

public boolean flush(final int flushLeastPages) {
? ? boolean result = true;
? ? MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
? ? if (mappedFile != null) {
? ? ? ? long tmpTimeStamp = mappedFile.getStoreTimestamp();
? ? ? ? int offset = mappedFile.flush(flushLeastPages);
? ? ? ? long where = mappedFile.getFileFromOffset() + offset;
? ? ? ? result = where == this.flushedWhere;
? ? ? ? this.flushedWhere = where;
? ? ? ? if (0 == flushLeastPages) {
? ? ? ? ? ? this.storeTimestamp = tmpTimeStamp;
? ? ? ? }
? ? }

? ? return result;
}

通過MappedFile映射的CommitLog文件寫入磁盤

這就是RocketMQ高可用設(shè)計(jì)之同步刷盤的基本情況了,大體思路就是一個(gè)讀寫分離的隊(duì)列來刷盤,同步刷盤任務(wù)提交后會在刷盤隊(duì)列中等待刷盤完成后再返回,而GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。

到此這篇關(guān)于RocketMQ設(shè)計(jì)之同步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ同步刷盤內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • java 泛型的詳解及實(shí)例

    java 泛型的詳解及實(shí)例

    這篇文章主要介紹了java 泛型的詳解及實(shí)例的相關(guān)資料,希望通過本文大家能徹底掌握泛型的使用方法,需要的朋友可以參考下
    2017-08-08
  • Mybatis?saveAndUpdate空值不更新問題及解決

    Mybatis?saveAndUpdate空值不更新問題及解決

    這篇文章主要介紹了Mybatis?saveAndUpdate空值不更新問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • 淺談Java中Collection和Collections的區(qū)別

    淺談Java中Collection和Collections的區(qū)別

    下面小編就為大家?guī)硪黄獪\談Java中Collection和Collections的區(qū)別。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2016-08-08
  • MyBatis-Plus Generator配置詳解

    MyBatis-Plus Generator配置詳解

    這篇文章主要介紹了MyBatis-Plus Generator配置詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • 使用socket進(jìn)行服務(wù)端與客戶端傳文件的方法

    使用socket進(jìn)行服務(wù)端與客戶端傳文件的方法

    這篇文章主要介紹了使用socket進(jìn)行服務(wù)端與客戶端傳文件的方法,需要的朋友可以參考下
    2017-08-08
  • MyBatis全局映射文件實(shí)現(xiàn)原理解析

    MyBatis全局映射文件實(shí)現(xiàn)原理解析

    這篇文章主要介紹了MyBatis全局映射文件實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-08-08
  • Spring 中jdbcTemplate 實(shí)現(xiàn)執(zhí)行多條sql語句示例

    Spring 中jdbcTemplate 實(shí)現(xiàn)執(zhí)行多條sql語句示例

    本篇文章主要介紹了Spring 中jdbcTemplate 實(shí)現(xiàn)執(zhí)行多條sql語句示例,可以對多個(gè)表執(zhí)行多個(gè)sql語句,有興趣的可以了解一下。
    2017-01-01
  • 深入了解Java中Cookie和Session的區(qū)別

    深入了解Java中Cookie和Session的區(qū)別

    會話跟蹤是Web程序中常用的技術(shù),用來跟蹤用戶的整個(gè)會話,常用的會話跟蹤技術(shù)是Cookie與Session,本文就詳細(xì)的介紹一下Java中Cookie和Session的區(qū)別,感興趣的可以了解一下
    2023-06-06
  • Mybatis中的游標(biāo)查詢Cursor(滾動查詢)

    Mybatis中的游標(biāo)查詢Cursor(滾動查詢)

    這篇文章主要介紹了Mybatis中的游標(biāo)查詢Cursor(滾動查詢),具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • java中刪除 數(shù)組中的指定元素方法

    java中刪除 數(shù)組中的指定元素方法

    下面小編就為大家?guī)硪黄猨ava中刪除 數(shù)組中的指定元素方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-01-01

最新評論