RocketMQ?源碼分析Broker消息刷盤服務(wù)
前言
上篇文章我們介紹了消息刷盤的四種方式,本篇文章我們來介紹Broker是如何實(shí)現(xiàn)這四種刷盤方式。
刷盤服務(wù)源碼分析
Broker中的四種刷盤分別是由CommitRealTimeService,F(xiàn)lushRealTimeService,GroupCommitService將消息從內(nèi)存中刷到磁盤上的。在了解刷盤這三個(gè)刷盤服務(wù)之前,我們先來了解MappedFile中下面幾個(gè)屬性
public class MappedFile extends ReferenceResource { // 當(dāng)前寫文件位置,即數(shù)據(jù)被寫入MappedFile的最新指針,可能存在ByteBuffer中,沒有提交 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 數(shù)據(jù)被寫入文件的最新指針(只是被寫入文件映射,不一定被刷盤) protected final AtomicInteger committedPosition = new AtomicInteger(0); // 刷盤位置,該指針之前的數(shù)據(jù)都持久化存儲(chǔ)到磁盤中 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 文件大小,默認(rèn)是1042*1024*4(4GB) protected int fileSize; // 起始偏移量,MappedFile創(chuàng)建時(shí)從文件名中解析 private long fileFromOffset; }
上面幾個(gè)屬性在MappedFile中的位置如下圖所示
上面幾個(gè)位置關(guān)系: flushedPosition ≤ commitedPosition ≤ wrotePosition
CommitRealTimeService刷盤源碼分析
CommitRealTimeService類的作用就是將上圖中紅色的消息(也就是committedPosition -> wrotePosition之間的消息)從直接內(nèi)存ByteBuffer提交到FileChannel,提交完成并不帶表刷盤完成,還需要將FileChannel將數(shù)據(jù)刷到硬盤中,才正式刷盤完成。CommitRealTimeService核心代碼邏輯是在run()中,在run()中是包含一個(gè)死循環(huán),死循環(huán)中每個(gè)200ms提交一次消息,每次最少提交4頁(yè)的消息,每頁(yè)大小是4kb,也就是說只有wrotePosition - committedPosition ≥ 4*4kb
,消息才會(huì)被提交。
// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run public void run() { // 死循環(huán) while (!this.isStopped()) { // 消息提交時(shí)間間隔,默認(rèn)200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 最少提交頁(yè)數(shù),默認(rèn)是4 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); try { // 提交消息 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 等待200ms this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } }
上面mappedFileQueue#commit
提交最終會(huì)調(diào)用MappedFile#commit0
,commit0代碼邏輯如下,將直接內(nèi)存ByteBuffer中的數(shù)據(jù)拷貝到fileChannel中。
// org.apache.rocketmq.store.MappedFile#commit0 protected void commit0() { // 寫指針 int writePos = this.wrotePosition.get(); // 最后提交指針 int lastCommittedPosition = this.committedPosition.get(); // byteBuffer的數(shù)據(jù)提交到FileChannel if (writePos - lastCommittedPosition > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
FlushRealTimeService刷盤源碼分析
FlushRealTimeService的代碼與CommitRealTimeService類似,核心代碼也帶run()中,run()中也是一個(gè)死循環(huán),每隔500ms調(diào)用mappedFileQueue#flush
刷盤。
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { while (!this.isStopped()) { // 定時(shí)刷盤時(shí)間間隔,默認(rèn)500ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 一次刷盤頁(yè)數(shù),默認(rèn)是4頁(yè) int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); try { if (flushCommitLogTimed) { // sleep 500ms Thread.sleep(interval); } else { this.waitForRunning(interval); } // 消息刷盤 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); } catch (Throwable e) { this.printFlushProgress(); } } }
mappedFileQueue#flush
刷盤最終調(diào)用了MappedFile#flush
,代碼如下所示,可以看到如果MappedFile中有直接內(nèi)存寫緩存,則會(huì)調(diào)用fileChannel.force(false)
刷盤,如果沒有寫緩存,則消息直接提交到MappedFile的內(nèi)存映射文件mappedByteBuffer中,因此調(diào)用mappedByteBuffer.force()
刷盤。
// org.apache.rocketmq.store.MappedFile#flush public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果使用了堆外內(nèi)存,那么通過fileChannel強(qiáng)制刷盤,這是異步堆外內(nèi)存的邏輯 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { // 如果沒有使用堆外內(nèi)存,那么通過fileChannel強(qiáng)制刷盤,這是同步或者異步刷盤走的邏輯 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 設(shè)置刷盤位置為寫入位置 this.flushedPosition.set(value); // 減少對(duì)該MappedFile的引用次數(shù) this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
GroupCommitService刷盤源碼分析
同步刷盤GroupCommitService代碼與上述代碼類似,都繼承了ServiceThread
,它的核心邏輯在GroupCommitService#run
,在run()中也是一個(gè)死循環(huán),每隔10ms調(diào)用一次doCommit()
,雖然這個(gè)方法的名字叫doCommit,實(shí)際底層也與FlushRealTimeService相同,都是調(diào)用的mappedFileQueue#flush
,將mappedByteBuffer
中的數(shù)據(jù)刷入磁盤。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#run public void run() { // 死循環(huán) while (!this.isStopped()) { try { // 間隔10ms this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } }
看到這里大家可能會(huì)有疑問,為什么同步刷盤也是定時(shí)刷盤,這與異步刷盤有什么區(qū)別呢?實(shí)際上這里有著相當(dāng)精妙的設(shè)計(jì),在上篇文章中我們了解到同步刷盤包括等待消息保存與不等待消息保存。
如果不等待消息保存,則調(diào)用了ServiceThread#wakeup
方法。
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); } }
ServiceThread狀態(tài)如下所示,如果刷盤線程在10ms等待中,hasNotified屬性值為false,hastNotified更新成功,刷盤線程被喚醒,立即停止等待。如果刷盤線程正在執(zhí)行中,hasNotified更新失敗,刷盤線程喚醒失敗。只能等待下一次被喚醒或者下一次時(shí)間間隔后再次刷盤。
如果是要等待刷盤成功后才返回結(jié)果,就要利用到GroupCommitService屬性中兩個(gè)刷盤請(qǐng)求容器
- requestWrite
同步刷盤請(qǐng)求暫存容器
- requestsRead
處理中的刷盤請(qǐng)求容器
class GroupCommitService extends FlushCommitLogService { // 同步刷盤請(qǐng)求暫存容器 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 每次處理刷盤的request容器 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); }
提交刷盤請(qǐng)求首先會(huì)被放入到requestsWrite容器中,然后再喚醒刷盤線程。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest public synchronized void putRequest(final GroupCommitRequest request) { lock.lock(); try { // 寫請(qǐng)求 this.requestsWrite.add(request); } finally { lock.unlock(); } // 喚醒當(dāng)前線程 this.wakeup(); }
刷盤線程被喚醒或者線程結(jié)束等待時(shí)都會(huì)調(diào)用onWaitEnd()
方法,交換請(qǐng)求暫存容器和刷盤request容器
// org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd @Override protected void onWaitEnd() { this.swapRequests(); } // org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests // 交換請(qǐng)求暫存容器和刷盤request容器 private void swapRequests() { lock.lock(); try { LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } }
線程被喚醒后會(huì)調(diào)用doCommit(),從下面代碼可以發(fā)現(xiàn),不管requestsRead是否包含要處理的刷盤請(qǐng)求,實(shí)際都是通過調(diào)用mappedFileQueue#flush
執(zhí)行刷盤。
- 如果requestsRead中包含刷盤請(qǐng)求
則有可能需要調(diào)用mappedFileQueue#flush
,確保當(dāng)前請(qǐng)求的消息能夠被刷盤,并返回刷盤結(jié)果給客戶端,如果包含請(qǐng)求,最多會(huì)調(diào)用兩次刷盤方法,確保消息能夠正確刷盤。
由于文件是固定大小,有可能刷盤位置在上一個(gè)MappedFile中,當(dāng)前消息請(qǐng)求在最新的MappedFile中,刷盤兩次,確保當(dāng)前消息能夠被刷入硬盤中
- 如果requestsRead中不包含刷盤請(qǐng)求
處理請(qǐng)求容器中包含request,直接調(diào)用MappedFileQueue#flush
,如果當(dāng)前消息不在flushPosition所在的mappedFile中,則本次刷盤有可能并不會(huì)將當(dāng)前消息持久化到磁盤中,需要等待下次刷盤。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // 如果處理Request不空 if (!this.requestsRead.isEmpty()) { // 遍歷處理Request for (GroupCommitRequest req : this.requestsRead) { // 如果刷盤指針大于刷盤請(qǐng)求中需要刷盤的offSet boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // 消息刷盤 for (int i = 0; i < 2 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // 喚醒客戶端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 如果消息不等待刷盤成功就返回,則不會(huì)提交刷盤請(qǐng)求,調(diào)用這個(gè)方法 CommitLog.this.mappedFileQueue.flush(0); } }
總結(jié)
本次我們了解了RocketMQ中四種刷盤策略對(duì)應(yīng)的刷盤服務(wù)
- 同步刷盤-等待消息保存到磁盤
- 同步刷盤-不等待消息保存到磁盤上
上面兩個(gè)同步刷盤都是由GroupCommitService實(shí)現(xiàn)的,由GroupCommitService將MappedByteBuffer消息刷盤到磁盤上
- 異步刷盤-開啟堆外緩存
如果開啟了堆外緩存,刷盤時(shí)會(huì)先由CommitRealTimeService將消息從Bytebuffer拷貝到FileChannel,F(xiàn)lushRealTimeService再將消息從FileChannel刷到磁盤上
- 異步刷盤-不開啟堆外緩存
這種方式也是默認(rèn)的刷盤方式,由FlushRealTimeService將MappedByteBuffer消息刷盤到磁盤上
以上就是RocketMQ 源碼分析Broker消息刷盤服務(wù)的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Broker刷盤服務(wù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JVM加載class文件的原理機(jī)制實(shí)例詳解
Java是一種具有動(dòng)態(tài)性的解釋型語(yǔ)言,類(class)只有被加載到JVM后才能運(yùn)行,接下來通過本文給大家介紹JVM加載class文件的原理機(jī)制詳解,感興趣的朋友一起看看吧2022-04-04Java?將HTML轉(zhuǎn)為XML的詳細(xì)步驟
這篇文章主要介紹了Java?將HTML轉(zhuǎn)為XML,本文將以html轉(zhuǎn)為xml格式為例,介紹如何實(shí)現(xiàn)轉(zhuǎn)換,以下是詳細(xì)方法及步驟,需要的朋友可以參考下2022-06-06Java結(jié)構(gòu)型設(shè)計(jì)模式之組合模式詳解
組合模式,又叫部分整體模式,它創(chuàng)建了對(duì)象組的數(shù)據(jù)結(jié)構(gòu)組合模式使得用戶對(duì)單個(gè)對(duì)象和組合對(duì)象的訪問具有一致性。本文將通過示例為大家詳細(xì)介紹一下組合模式,需要的可以參考一下2022-09-09idea配置檢查XML中SQL語(yǔ)法及書寫sql語(yǔ)句智能提示的方法
idea連接了數(shù)據(jù)庫(kù),也可以執(zhí)行SQL查到數(shù)據(jù),但是無(wú)法識(shí)別sql語(yǔ)句中的表導(dǎo)致沒有提示,下面這篇文章主要給大家介紹了關(guān)于idea配置檢查XML中SQL語(yǔ)法及書寫sql語(yǔ)句智能提示的相關(guān)資料,需要的朋友可以參考下2023-03-03詳談java線程與線程、進(jìn)程與進(jìn)程間通信
下面小編就為大家?guī)硪黄斦刯ava線程與線程、進(jìn)程與進(jìn)程間通信。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-04-04學(xué)習(xí)C語(yǔ)言對(duì)后期java有幫助嗎
在本篇文章里小編給大家整理的是一篇關(guān)于學(xué)習(xí)C語(yǔ)言對(duì)后期java有幫助嗎的基礎(chǔ)文章,有興趣的朋友們可以參考下。2020-11-11java實(shí)現(xiàn)服務(wù)器文件打包zip并下載的示例(邊打包邊下載)
這篇文章主要介紹了java實(shí)現(xiàn)服務(wù)器文件打包zip并下載的示例,使用該方法,可以即時(shí)打包文件,一邊打包一邊傳輸,不使用任何的緩存,讓用戶零等待,需要的朋友可以參考下2014-04-04SpringBoot同時(shí)支持HTTPS與HTTP的實(shí)現(xiàn)示例
本文主要介紹了SpringBoot同時(shí)支持HTTPS與HTTP的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07