RocketMQ?Broker消息如何刷盤源碼解析
前言
我們在學習RocketMQ的時候,我們知道RocketMQ的刷盤策略有兩個刷盤策略
- 同步刷盤
同步刷盤即Broker消息已經被持久化到硬盤后才會向客戶端返回成功。同步刷盤的優(yōu)點是能保證消息不丟失,但是這是以犧牲寫入性能為代價的。
- 異步刷盤
異步刷盤是指Broker將信息存儲到pagecache后就立即向客戶端返回成功,然后會有一個異步線程定時將內存中的數據寫入磁盤,默認時間間隔為500ms。
Broker中的刷盤策略是通過Broker配置文件中flushDiskType
進行配置,可以配置ASYNC_FLUSH
(異步刷盤)和SYNC_FLUSH
(同步刷盤),默認配置是ASYNC_FLUSH
。
Broker的刷盤采用基于JDK NIO技術,消息首先會存儲到內存中,然后再根據不同的刷盤策略在不同時間刷盤,如果有不了解的小伙伴可以參考這篇文章《【NIO實戰(zhàn)】深入理解FileChannel》
刷盤相關類介紹
CommitLog中的內部類FlushCommitLogService及其子類CommitRealTimeService、GroupCommitService、FlushRealTimeService分別是用于不同場景下用于刷盤的刷盤行為,他們會單獨或者配合起來使用。具體類圖如下所示。
如果是同步刷盤會使用GroupCommitService。如果是異步刷盤,并且關閉了堆外緩存(TransientStorePool),則采用FlushRealTimeService刷盤。如果是異步刷盤,并且開啟了堆外緩存,則會使用FlushRealTimeService與CommitRealTimeService配合刷盤。
默認的輸盤策略是異步且關閉堆外緩存,因此默認是采用FlushRealTimeService進行刷盤
Broker刷盤源碼分析
消息刷盤相關邏輯都是圍繞在CommitLog,因此要想知道消息時如何刷盤的關鍵是研究CommitLog
CommitLog構造&屬性賦值
CommitLog中與刷盤相關的屬性有flushCommitLogService、commitLogService。如果是同步刷盤則在構造函數中會給flushCommitLogService賦值GroupCommitService,如果是異步刷盤則給flushCommitLogService賦值FlushRealTimeService。commitLogService的值是CommitRealTimeService,從上面我們可以很明顯的看出它只有在異步且開啟TransientStorePoolEnabled時才會被使用。
public class CommitLog { // 如果是同步刷盤,則是GroupCommitService。如果是異步刷盤則是FlushRealTimeService // 默認是異步刷盤,因此是CommitLog$FlushRealTimeService private final FlushCommitLogService flushCommitLogService; // 開啟TransientStorePoolEnable時使用CommitRealTimeService private final FlushCommitLogService commitLogService; // 構造函數 public CommitLog(final DefaultMessageStore defaultMessageStore) { // 默認是異步刷盤,因此這里是false if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { this.flushCommitLogService = new FlushRealTimeService(); } this.commitLogService = new CommitRealTimeService(); // 消息回調 this.appendMessageCallback = new DefaultAppendMessageCallback(); flushDiskWatcher = new FlushDiskWatcher(); } }
TransientStorePoolEnabled介紹
transientStorePoolEnabled配置的默認值為false,開啟transientStorePoolEnabled需要手動開啟。如果開啟transientStorePoolEnabled會開啟堆外內存存儲池,Broker在啟動時會申請5個與CommitLog大小(1GB)相同的堆外內存交給TransientStorePool,創(chuàng)建MappedFile時會向TransientStorePool“借”一個堆外內存ByteBuffer,保存消息時會先將消息保存到堆外內存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盤中。TransientStorePool屬性和一些核心方法源碼如下,堆外內存ByteBuffer都是由它來管理。
// org.apache.rocketmq.store.TransientStorePool public class TransientStorePool { // 存儲池大小,默認是5 private final int poolSize; // CommitLog MappedFile文件大小,默認1GB private final int fileSize; // 默認存5個ByteBuffer private final Deque<ByteBuffer> availableBuffers; // 消息存儲配置 private final MessageStoreConfig storeConfig; // TransientStorePool初始化 public void init() { // 默認是5 for (int i = 0; i < poolSize; i++) { // 分配1GB的直接內存 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); // 生成的緩存保存到隊列中 availableBuffers.offer(byteBuffer); } } // 歸還緩沖 public void returnBuffer(ByteBuffer byteBuffer) { // 修改position和limit,"清空"緩沖 byteBuffer.position(0); byteBuffer.limit(fileSize); // 緩沖入隊 this.availableBuffers.offerFirst(byteBuffer); } // 向TransientStorePool借緩沖 public ByteBuffer borrowBuffer() { // 緩沖出隊 ByteBuffer buffer = availableBuffers.pollFirst(); return buffer; } }
消息保存源碼分析
前面文章《【RocketMQ | 源碼分析】Broker是如何保存消息的? 》我們雖然介紹了消息的保存過程,但是開啟或者關閉TransientStorePoolEnabled時,消息保存的細節(jié)是不同的,我們再打開消息保存MappedFile的源碼如下,下面代碼中如果writeBuffer不空,則會將消息先追加到writeBuffer,否者直接寫入到MappedFile的內存映射文件中。
// org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, PutMessageContext putMessageContext) { // 如果寫文件位置小于文件size if (currentPos < this.fileSize) { // 如果writeBuffer不空,則獲取writeBuffer的淺拷貝,否則獲取MappedFile的內存映射(MappedByteBuffer)的淺拷貝 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; // 如果是單條消息 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長度-當前寫位置,可以寫的長度*/, (MessageExtBrokerInner) messageExt, putMessageContext); } // ...如果是批量消息 return result; } }
那么什么情況下MappedFile中的writeBuffer為空,什么情況下writeBuffer不為空呢?我們可以先來了解MappedFile是如何創(chuàng)建的,MappedFile是由AllocateMappedFileService創(chuàng)建的,具體源碼如下,如果開啟了TransientStorePoolEnabled,則在創(chuàng)建MappedFile時會向TransientStorePool“借”一個ByteBuffer,如果沒有開啟TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存數據時會將數據直接保存到MappedFile的直接內存映射(MappedByteBuffer)中。
private boolean mmapOperation() { // ... if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); // 初始化mappedFile會向TransientStorePool"借"一個writeBuffer mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { // 創(chuàng)建MappedFile,沒有writeBuffer mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); } // ... }
由上可知,消息保存如下圖所示
消息刷盤入口方法源碼分析
消息保存和刷盤的入口方法CommitLog#asyncPutMessage
,消息保存到mappedFile的緩存后,最后會調用submitFlushRequest方法提交刷盤請求,Broker會根據刷盤策略進行刷盤。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { //... 保存消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // ... // 提交刷盤請求 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // 提交復制請求 CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); // 合并提交刷盤請求和提交復制請求結果 return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
提交了刷盤請求后,根據刷盤策略,是否開啟堆外緩存,推送消息中是否要等待消息保存有如下四種刷盤方式
- 異步刷盤(關閉TransientStorePoolEnabled)
異步刷盤(關閉TransientStorePoolEnabled)是默認的刷盤方案,這個刷盤方案先會**異步喚醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于關閉了TransientStorePoolEnabled,消息是保存到MappedFile中的內存映射文件MappedByteBuffer,FlushRealTimeService將定時MappedByteBuffer刷到磁盤。
- 異步刷盤(開啟TransientStorePoolEnabled)
異步刷盤(開啟TransientStorePoolEnabled)會先**異步喚醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于開啟了TransientStorePoolEnabled,消息會保存到MappedFile中的內存映射文件ByteBuffer,CommitRealTimeService定時將ByteBuffer中的數據刷到FileChannel中。
- 同步刷盤(等待消息保存)
同步刷盤(等待消息保存)會先創(chuàng)建一個刷盤請求(GroupCommitRequest),然后向GroupCommitService提交刷盤請求,最后等待刷盤結果并返回
- 同步刷盤(不等待消息保存)
同步刷盤(不等待消息保存)也是通過GroupCommitService刷盤,與等待消息保存不同的是不等待的方式異步喚醒(wakeup)GroupCommitService后,直接返回消息保存成功。
四種刷盤方式源碼如下所示
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 同步刷盤 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 獲取同步刷盤Service final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { // 創(chuàng)建GroupCommitRequest 刷盤偏移量nextOffset = 當前寫入偏移量 + 當前消息寫入大小 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 向刷盤監(jiān)視器(flushDistWatch)提交刷盤請求 flushDiskWatcher.add(request); // 提交刷盤請求,并且喚醒同步刷盤線程 service.putRequest(request); return request.future(); } else { // 同步刷盤,但是不需要等待刷盤結果,那么喚醒同步刷盤線程 service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // 異步刷盤 else { // 是否啟動了堆外緩存 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 如果沒有啟動堆外緩存,則喚醒異步刷盤服務 flushRealTimeService flushCommitLogService.wakeup(); } else { // 如果啟動了堆外緩存,則喚醒異步轉存服務CommitRealTimeService commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
將上面四種場景及調用關系如下圖所示
總結
本篇文章介紹了TransientStorePool機制以及開啟和管理隊消息保存的影響,我們還介紹了RocketMQ中四種刷盤策略
- 同步刷盤-等待消息保存到磁盤
- 同步刷盤-不等待消息保存到磁盤上
- 異步刷盤-開啟堆外緩存
- 異步刷盤-不開啟堆外緩存
以上就是RocketMQ Broker消息如何刷盤源碼解析的詳細內容,更多關于RocketMQ Broker消息刷盤的資料請關注腳本之家其它相關文章!
相關文章
一行命令同時修改maven項目中多個module的版本號的方法
這篇文章主要介紹了一行命令同時修改maven項目中多個module的版本號的方法,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-06-06利用HttpUrlConnection 上傳 接收文件的實現方法
下面小編就為大家?guī)硪黄肏ttpUrlConnection 上傳 接收文件的實現方法。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11Java面向對象編程之繼承和多態(tài)以及包的解析與使用范例
繼承就是可以直接使用前輩的屬性和方法。自然界如果沒有繼承,那一切都是處于混沌狀態(tài)。多態(tài)是同一個行為具有多個不同表現形式或形態(tài)的能力。多態(tài)就是同一個接口,使用不同的實例而執(zhí)行不同操作2021-11-11在Spring Boot應用程序中使用Apache Kafka的方法步驟詳解
這篇文章主要介紹了在Spring Boot應用程序中使用Apache Kafka的方法步驟詳解,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2018-11-11