RocketMQ設(shè)計(jì)之同步刷盤
同步刷盤方式:在返回寫成功狀態(tài)時(shí),消息已經(jīng)被寫入磁盤。具體流程是,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。
在同步刷盤模式下,當(dāng)消息寫到內(nèi)存后,會等待數(shù)據(jù)寫到磁盤的CommitLog
文件。
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { ? ? // Synchronization flush ? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { ? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; ? ? ? ? if (messageExt.isWaitStoreMsgOK()) { ? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); ? ? ? ? ? ? service.putRequest(request); ? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); ? ? ? ? ? ? if (!flushOK) { ? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() ? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString()); ? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); ? ? ? ? ? ? } ? ? ? ? } else { ? ? ? ? ? ? service.wakeup(); ? ? ? ? } ? ? } ? ? // Asynchronous flush ? ? else { ? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { ? ? ? ? ? ? flushCommitLogService.wakeup(); ? ? ? ? } else { ? ? ? ? ? ? commitLogService.wakeup(); ? ? ? ? } ? ? } } class GroupCommitService extends FlushCommitLogService { ? ? ? ? private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); ? ? ? ? private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); ? ? ?? ?//提交刷盤任務(wù)到任務(wù)列表 ? ? ? ? public synchronized void putRequest(final GroupCommitRequest request) { ? ? ? ? ? ? synchronized (this.requestsWrite) { ? ? ? ? ? ? ? ? this.requestsWrite.add(request); ? ? ? ? ? ? } ? ? ? ? ? ? if (hasNotified.compareAndSet(false, true)) { ? ? ? ? ? ? ? ? waitPoint.countDown(); // notify ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? private void swapRequests() { ? ? ? ? ? ? List<GroupCommitRequest> tmp = this.requestsWrite; ? ? ? ? ? ? this.requestsWrite = this.requestsRead; ? ? ? ? ? ? this.requestsRead = tmp; ? ? ? ? } ? ? ? ? private void doCommit() { ? ? ? ? ? ? synchronized (this.requestsRead) { ? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) { ? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) { ? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of ? ? ? ? ? ? ? ? ? ? ? ? // two times the flush ? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false; ? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0); ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); ? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) { ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? this.requestsRead.clear(); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it ? ? ? ? ? ? ? ? ? ? // will come to this process ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? public void run() { ? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started"); ? ? ? ? ? ? while (!this.isStopped()) { ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? this.waitForRunning(10); ? ? ? ? ? ? ? ? ? ? this.doCommit(); ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? // Under normal circumstances shutdown, wait for the arrival of the ? ? ? ? ? ? // request, and then flush ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? Thread.sleep(10); ? ? ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? ? ? CommitLog.log.warn("GroupCommitService Exception, ", e); ? ? ? ? ? ? } ? ? ? ? ? ? synchronized (this) { ? ? ? ? ? ? ? ? this.swapRequests(); ? ? ? ? ? ? } ? ? ? ? ? ? this.doCommit(); ? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end"); ? ? ? ? } ? ? ? ? @Override ? ? ? ? protected void onWaitEnd() { ? ? ? ? ? ? this.swapRequests(); ? ? ? ? } ? ? ? ? @Override ? ? ? ? public String getServiceName() { ? ? ? ? ? ? return GroupCommitService.class.getSimpleName(); ? ? ? ? } ? ? ? ? @Override ? ? ? ? public long getJointime() { ? ? ? ? ? ? return 1000 * 60 * 5; ? ? ? ? } ? ? }
GroupCommitRequest
是刷盤任務(wù),提交刷盤任務(wù)后,會在刷盤隊(duì)列中等待刷盤,而刷盤線程
GroupCommitService
每隔10毫秒寫一批數(shù)據(jù)到磁盤。之所以不直接寫是磁盤io壓力大,寫入性能低,每隔10毫秒寫一次可以提升磁盤io效率和寫入性能。
- putRequest(request) 提交刷盤任務(wù)到任務(wù)列表
- request.waitForFlush同步等待
GroupCommitService
將任務(wù)列表中的任務(wù)刷盤完成。
兩個(gè)隊(duì)列讀寫分離,requestsWrite
是寫隊(duì)列,用戶保存添加進(jìn)來的刷盤任務(wù),requestsRead
是讀隊(duì)列,在刷盤之前會把寫隊(duì)列的數(shù)據(jù)放入讀隊(duì)列。
CommitLog的doCommit方法:
private void doCommit() { ? ? ? ? ? ? synchronized (this.requestsRead) { ? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) { ? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) { ? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of ? ? ? ? ? ? ? ? ? ? ? ? // two times the flush ? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false; ? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? //根據(jù)offset確定是否已經(jīng)刷盤 ? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0); ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); ? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) { ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); ? ? ? ? ? ? ? ? ? ? } ?? ??? ??? ??? ??? ?//清空已刷盤的列表 ? ? ? ? ? ? ? ? ? ? this.requestsRead.clear(); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it ? ? ? ? ? ? ? ? ? ? // will come to this process ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? }
- 刷盤的時(shí)候依次讀取
requestsRead
中的數(shù)據(jù)寫入磁盤, - 寫入完成后清空
requestsRead
。
讀寫分離設(shè)計(jì)的目的是在刷盤時(shí)不影響任務(wù)提交到列表。
CommitLog.this.mappedFileQueue.flush(0);是刷盤操作:
public boolean flush(final int flushLeastPages) { ? ? boolean result = true; ? ? MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); ? ? if (mappedFile != null) { ? ? ? ? long tmpTimeStamp = mappedFile.getStoreTimestamp(); ? ? ? ? int offset = mappedFile.flush(flushLeastPages); ? ? ? ? long where = mappedFile.getFileFromOffset() + offset; ? ? ? ? result = where == this.flushedWhere; ? ? ? ? this.flushedWhere = where; ? ? ? ? if (0 == flushLeastPages) { ? ? ? ? ? ? this.storeTimestamp = tmpTimeStamp; ? ? ? ? } ? ? } ? ? return result; }
通過MappedFile映射的CommitLog文件寫入磁盤
這就是RocketMQ高可用設(shè)計(jì)之同步刷盤的基本情況了,大體思路就是一個(gè)讀寫分離的隊(duì)列來刷盤,同步刷盤任務(wù)提交后會在刷盤隊(duì)列中等待刷盤完成后再返回,而GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。
到此這篇關(guān)于RocketMQ設(shè)計(jì)之同步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ同步刷盤內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis?saveAndUpdate空值不更新問題及解決
這篇文章主要介紹了Mybatis?saveAndUpdate空值不更新問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02淺談Java中Collection和Collections的區(qū)別
下面小編就為大家?guī)硪黄獪\談Java中Collection和Collections的區(qū)別。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-08-08使用socket進(jìn)行服務(wù)端與客戶端傳文件的方法
這篇文章主要介紹了使用socket進(jìn)行服務(wù)端與客戶端傳文件的方法,需要的朋友可以參考下2017-08-08MyBatis全局映射文件實(shí)現(xiàn)原理解析
這篇文章主要介紹了MyBatis全局映射文件實(shí)現(xiàn)原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08Spring 中jdbcTemplate 實(shí)現(xiàn)執(zhí)行多條sql語句示例
本篇文章主要介紹了Spring 中jdbcTemplate 實(shí)現(xiàn)執(zhí)行多條sql語句示例,可以對多個(gè)表執(zhí)行多個(gè)sql語句,有興趣的可以了解一下。2017-01-01深入了解Java中Cookie和Session的區(qū)別
會話跟蹤是Web程序中常用的技術(shù),用來跟蹤用戶的整個(gè)會話,常用的會話跟蹤技術(shù)是Cookie與Session,本文就詳細(xì)的介紹一下Java中Cookie和Session的區(qū)別,感興趣的可以了解一下2023-06-06Mybatis中的游標(biāo)查詢Cursor(滾動查詢)
這篇文章主要介紹了Mybatis中的游標(biāo)查詢Cursor(滾動查詢),具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01