RocketMQ?Broker如何保存消息源碼解析
前言
前面我們介紹了RocketMQ是如何接收消息的,下面我們來介紹Broker是如何保存消息的。
消息存儲格式總覽
Broker消息存儲主要包括CommitLog,ConsumerQueue和Index三個(gè)部分。
- CommitLog
CommitLog主要用于消息存儲,所有topic的消息按順序都存儲在CommitLog中。
- ConsumerQueue
ConsumerQueue對應(yīng)消費(fèi)隊(duì)列,消息存儲到CommitLog后,會異步轉(zhuǎn)發(fā)到ConsumerQueue文件中
- Index
消息索引,只要存儲消息key與offset的關(guān)系
CommitLog介紹
CommitLog是消息和消息數(shù)據(jù)存儲的主體,CommitLog存儲的文件目錄在${user.home}/store/commitlog
中,它其實(shí)是一個(gè)目錄,消息并不是直接存儲在CommitLog中,而是存儲在由20位數(shù)字構(gòu)成的文件中。
MappedFile詳解
commitlog文件夾中文件單元是MappedFile,我們可以把MappedFile理解成一個(gè)文件管理的工具,如果需要將數(shù)據(jù)存儲到磁盤,或者快速查找數(shù)據(jù),都可以通過MappedFile。
每個(gè)MappedFile文件大小默認(rèn)是1GB,文件名是由20位數(shù)字構(gòu)成,文件名其實(shí)是MappedFile的起始偏移量,如果偏移量不足20位,則將偏移量的左邊補(bǔ)0。上圖中MappedFile的文件名是00000000000000000000
,它代表的是CommitLog中的第一個(gè)文件,由于每個(gè)MappedFile文件大小是1GB,因此第二個(gè)文件的偏移量為1024*1024*1024(1GB)
,計(jì)算后的結(jié)果為1073741824
,因此第二個(gè)文件的文件名為00000000001073741824
,可依此類推其他文件的文件名。
消息存儲格式介紹
消息在commitLog中存儲的格式如下所示
- totalSize
消息總長度,4字節(jié)
- magicCode
魔數(shù),4字節(jié),固定值十六進(jìn)制是0xdaa320a7
,10進(jìn)制是-875286124
- bodyCRC
消息體crc校驗(yàn)碼,4字節(jié)
- queueId
消息隊(duì)列id,4字節(jié)
- flag
消息標(biāo)記,RocketMQ不做處理,默認(rèn)4字節(jié)
- queueOffset
消息在ConsumeQueue文件中的物理偏移量,默認(rèn)8字節(jié)
- physicalOffset
消息在CommitLog文件中的物理偏移量,默認(rèn)8字節(jié)
- sysFlag
消息系統(tǒng)標(biāo)記,例如是否壓縮、是否是事務(wù)消息等,4字節(jié)
- bornTimestamp
消息生產(chǎn)者調(diào)用消息API的時(shí)間戳,8字節(jié)
- bornHost
BORNHOST 消息生產(chǎn)者IP和端口號,8字節(jié)
- storeTimestamp
消息存儲時(shí)間戳,8字節(jié)
- storeHostAddress
STOREHOSTADDRESS 消息存儲Broker的IP和端口號,8字節(jié)
- reconsumeTimes
消息重試次數(shù) 4字節(jié)
- Prepared Transaction Offset
事務(wù)消息偏移量,8字節(jié)
- bodyLength
消息體長度,4字節(jié)
- body
消息體內(nèi)容,它是變長的,長度為bodyLength中存儲的值
- TopicLength
topicLength表示topic占用的長度,topicLength占用1字節(jié),也就是255,也就是說topic長度最長不能超過255字節(jié)
- Topic
topic是消息主題名稱,topic是變長的,實(shí)際占用topicLength字節(jié)
- PropertiesLength
propertiesLength表示properties占用的長度,propertiesLength占用2字節(jié),也就是說properties長度最長不超過65536字節(jié)
- Properties
properties是消息屬性,properties是變長的,實(shí)際占用propertiesLength字節(jié)
DefaultMessageStore介紹
Broker保存消息是通過消息存儲默認(rèn)實(shí)現(xiàn)類org.apache.rocketmq.store.DefaultMessageStore
執(zhí)行的,它是Broker存儲模塊中最最最重要的一個(gè)類,提供了很多存儲文件的API。DefaultMessageStore中和消息存儲相關(guān)的屬性如下所示,
// 消息存儲配置 private final MessageStoreConfig messageStoreConfig; // CommitLog文件的存儲實(shí)現(xiàn)類 private final CommitLog commitLog; // 消息隊(duì)列存儲緩存表,key是topic private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; // MappedFile分配服務(wù) private final AllocateMappedFileService allocateMappedFileService; // 直接內(nèi)存暫存池 private final TransientStorePool transientStorePool; // broker狀態(tài)管理器 private final BrokerStatsManager brokerStatsManager; // 鎖文件 // 目錄: ${user.home}/store/lock private RandomAccessFile lockFile;
消息存儲源碼分析
發(fā)送消息存儲流程
發(fā)送消息存儲的入口函數(shù)是DefaultMessageStore#asyncPutMessage
,它主要分為下面三步
- 存儲狀態(tài)校驗(yàn)
- 校驗(yàn)消息存儲服務(wù)是否關(guān)閉,當(dāng)前Broker是否是從節(jié)點(diǎn),queue是否可寫
- 消息校驗(yàn)
- 校驗(yàn)topic名稱長度是否超過了127字節(jié)和property長度是否超過了32767
- 將消息保存到commitLog
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) { // 1. 存儲狀態(tài)校驗(yàn) PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } // 2. 校驗(yàn)topic名稱和property長度 PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } // ... long beginTime = this.getSystemClock().now(); // 3. 保存到commitLog CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); //... return putResultFuture; }
CommitLog#asyncPutMessage
保存消息
CommitLog#asyncPutMessage
保存消息可以分為三個(gè)階段
- 消息預(yù)處理階段
- 消息保存階段
- 消息保存結(jié)果處理階段
消息預(yù)處理階段
消息預(yù)處理階段可以分為下面三個(gè)步驟
- 設(shè)置消息存儲時(shí)間戳和消息體CSC32信息
- 如果是延遲消息,則設(shè)置延遲信息
如果是非事務(wù)消息或者是提交的事務(wù)消息,并且設(shè)置了消息的延遲級別,說明當(dāng)前消息是延遲消息,Broker在處理延遲消息時(shí)會將消息投遞到名為SCHEDULE_TOPIC_XXXX
的Topic。在消息預(yù)處理的階段,會先將當(dāng)前消息的topic設(shè)置為SCHEDULE_TOPIC_XXXX
,queueId設(shè)置為延遲級別-1
,并且將原來的Topic和queueId設(shè)置到消息的REAL_TOPIC
和REAL_QID
屬性中。
- 設(shè)置ip及構(gòu)建存儲消息上下文
// org.apache.rocketmq.store.CommitLog#asyncPutMessage public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // 1. 設(shè)置消息存儲時(shí)間戳和消息體CSC32信息 msg.setStoreTimestamp(System.currentTimeMillis()); // 設(shè)置消息存儲時(shí)間 msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // 設(shè)置消息體CRC32校驗(yàn)值 // 2. 如果是非事務(wù)消息,或者是事務(wù)提交消息,判斷是否是是否是延遲消息,如果是延遲消息則設(shè)置延遲相關(guān)信息 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery // 如果延遲級別>0,說明是延遲消息 if (msg.getDelayTimeLevel() > 0) { // 如果大于最大的延遲級別,則取最大的延遲級別 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 消息topic改成延遲消息topic(SCHEDULE_TOPIC_XXXX) topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; // 延遲topic的queueId:延遲級別-1 int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // 消息屬性中設(shè)置真實(shí)的QueueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 把SCHEDULE_TOPIC_XXXX設(shè)置為當(dāng)前消息的topic,消息先投遞到這個(gè)隊(duì)列中 msg.setTopic(topic); msg.setQueueId(queueId); } } // 3. 設(shè)置ip并構(gòu)建存儲消息上下文信息 msg.setBornHostV6Flag(); // 如果producer的ip是IpV6,則設(shè)置生產(chǎn)者IpV6 flag msg.setStoreHostAddressV6Flag(); // 如果如果broker的ip是IpV6,則設(shè)置BrokerIpV6 flag PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); // 構(gòu)建存消息上下文 PutMessageContext putMessageContext = new PutMessageContext(/*key值:topic-queueId*/generateKey(putMessageThreadLocal.getKeyBuilder()/*StringBuilder*/, msg)); // ... 省略部分代碼 }
消息保存階段
消息保存階段可以分為如下步驟
- 獲取消息保存鎖
- 獲取最新的mappedFile
獲取MappedFile調(diào)用的是MappedFileQueue中的方法,獲取最新的MappedFile
- 如果最新的mappedFile為空或者已經(jīng)滿了,則創(chuàng)建新的MappedFile
- 將消息保存的mappedFile中
- 處理消息保存結(jié)果
- 釋放消息保存鎖
// org.apache.rocketmq.store.CommitLog#asyncPutMessage public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... 省略部分代碼 // 1. 消息保存鎖,默認(rèn)是ReentrantLock互斥鎖 putMessageLock.lock(); try { // 2. 獲取最新的mappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 3. 如果獲取到的mappedFile是null說明之前沒有存儲消息 // 如果mappedFile滿了,說明需要?jiǎng)?chuàng)建一個(gè)新的MappedFile if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); } // 如果創(chuàng)建mappedFile失敗,則返回異常信息 if (null == mappedFile) { // 創(chuàng)建mappedFile失敗 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } // 4. 將消息保存的mappedFile中 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // 5. 處理消息保存結(jié)果 switch (result.getStatus()) { case PUT_OK: break; // mappedFile滿了,重新創(chuàng)建mappedFile后再寫入消息 case END_OF_FILE: unlockMappedFile = mappedFile; // 創(chuàng)建一個(gè)新的文件,然后重新寫入 mappedFile = this.mappedFileQueue.getLastMappedFile(0); //... // 寫消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); break; // ... } } finally { // 6. 釋放鎖 putMessageLock.unlock(); } // ... 省略部分代碼 }
上面第4步MappedFile#appendMessage
邏輯主要有三步
- 獲取當(dāng)前寫文件位置
如果寫指針小于文件大小,則對消息進(jìn)行追加處理
獲取寫緩沖
調(diào)用AppendMessageCallback的doAppend將消息寫到內(nèi)存緩沖中
回調(diào)函數(shù)doAppend方法分為單條處理邏輯和批量消息處理邏輯,下面僅展示了單條消息處理邏輯
- 消息保存完成后會更新當(dāng)前寫文件的位置和消息保存時(shí)間戳
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, PutMessageContext putMessageContext) { // 獲取當(dāng)前寫文件位置 int currentPos = this.wrotePosition.get(); // 如果寫文件位置小于文件size if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; // 如果是單條消息 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長度-當(dāng)前寫位置,可以寫的長度*/,(MessageExtBrokerInner) messageExt, putMessageContext); } //... // 更新當(dāng)前寫文件位置和消息保存時(shí)間戳 this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } }
上面保存消息回調(diào)函數(shù)中的doAppend實(shí)際調(diào)用的是CommitLog中內(nèi)部類DefaultAppendMessageCallback的doAppend方法,這里大致可以分為下面幾個(gè)步驟
- 獲取消息物理偏移量,并且創(chuàng)建消息id生成器,從topicQueueTable中獲取Queue的最大相對便宜量。
消息id的格式如下所示,它由ip,端口和消息偏移量公共構(gòu)成,長度是16字節(jié),為了保證消息的可讀性,返回給應(yīng)用程序的Id轉(zhuǎn)成了字符串。
消息id這么設(shè)計(jì)的原因是可以根據(jù)消息id快速找到broker的IP,端口,以及消息在的物理偏移量,通過它可以快速找到消息
- 如果消息長度加上消息結(jié)束符(8字節(jié))大于maxBlank,則表示該mappedFile已經(jīng)沒有足夠的空間保存該消息了,那么就會將消息結(jié)束符寫入緩沖中,并返回
END_OF_FILE
,mappedFile消息結(jié)束符如下所示
- 如果空間足夠,將queue的相對偏移量,物理偏移量,sysflag,消息創(chuàng)建時(shí)間,消息創(chuàng)建ip,消息保存時(shí)間及消息體等按照上面消息格式保存到緩沖中。
- 創(chuàng)建AppendMessageResult對象并返回,它包括消息追加狀態(tài)、消息寫入物理偏移量、消息寫入長度、消息ID生成器、消息開始追加的時(shí)間戳、消息隊(duì)列偏移量、消息開始寫入的時(shí)間戳等屬性。
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset/*消息文件起始偏移量*/, final ByteBuffer byteBuffer, final int maxBlank/*文件可寫長度*/,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // 1. 物理offset,文件起始o(jì)ffset+寫offset long wroteOffset = fileFromOffset + byteBuffer.position(); // 創(chuàng)建消息id supplier Supplier<String> msgIdSupplier = () -> { int sysflag = msgInner.getSysFlag(); int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer msgIdBuffer.putLong(msgIdLen - 8, wroteOffset); return UtilAll.bytes2string(msgIdBuffer.array()); }; // topic-ququeId String key = putMessageContext.getTopicQueueTableKey(); // 獲取消息queue offset Long queueOffset = CommitLog.this.topicQueueTable.get(key); // 如果queueOffset是null,則將其置0 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // 獲取寫緩沖 ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); final int msgLen = preEncodeBuffer.getInt(0); // 2. 判斷空間是否足夠,如果剩余空間不足,則保存TOTAL+MAGICCODE之后,返回BLANK_MAGIC_CODE if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.msgStoreItemMemory.clear(); // 1 TOTALSIZE 寫消息總長度 this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE 寫魔數(shù) this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); return new AppendMessageResult(/*...*/); } int pos = 4/*totalSize*/ + 4/*magicCode*/ + 4/*bodyCRC*/ + 4/*queueId*/ + 4/*flag*/; // set隊(duì)列的offset, preEncodeBuffer.putLong(pos, queueOffset); pos += 8; // 設(shè)置物理offset: 文件起始o(jì)ffset+當(dāng)前文件寫消息的offset preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; // set 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP pos += 8 + 4 + 8 + ipLen; // 設(shè)置存儲消息ip地址 preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); // 寫消息到隊(duì)列緩沖 byteBuffer.put(preEncodeBuffer); msgInner.setEncodedBuff(null); // 4. 返回消息保存結(jié)果 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); return result; }
消息保存結(jié)果處理階段
消息保存結(jié)果處理階段主要包括下面三個(gè)
- 提交刷盤請求
如果是同步刷盤,則會創(chuàng)建刷盤請求并返回CompleteFuture,如果是異步刷盤,則會喚醒刷盤服務(wù),然后返回消息保存成功的CompleteFuture
- 提交消息復(fù)制請求
如果是同步復(fù)制,則創(chuàng)建消息同步請求然后返回CompleteFuture,如果是異步復(fù)制則直接放回消息保存成功的CompleteFuture
- 合并提交刷盤請求和提交消息復(fù)制請求
CompleteFuture#thenCombine
是將兩個(gè)CompleteFuture(提交刷盤請求,提交消息復(fù)制請求)組合起來,等提交刷盤請求和提交消息復(fù)制請求都執(zhí)行完了之后再執(zhí)行后續(xù)任務(wù)
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... 省略部分代碼 PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // 1. 提交刷盤請求 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // 2. 提交復(fù)制請求 CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); // 3. 合并提交刷盤請求和提交復(fù)制請求結(jié)果 return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; }); }
總結(jié)
消息保存到commitLog實(shí)際上是保存到byteBuffer中,消息是在回調(diào)結(jié)果時(shí)根據(jù)配置決定同步/異步刷盤以及同步/異步同步到從節(jié)點(diǎn)。消息在這個(gè)階段也并不會將消息分發(fā)到comsumeQueue以及Index中。
以上就是RocketMQ | 源碼分析】Broker是如何保存消息的?的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ Broker保存消息的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot結(jié)合Redis配置工具類實(shí)現(xiàn)動態(tài)切換庫
本文主要介紹了SpringBoot結(jié)合Redis配置工具類實(shí)現(xiàn)動態(tài)切換庫,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08Java+Springboot搭建一個(gè)在線網(wǎng)盤文件分享系統(tǒng)
本主要介紹了通過springboot+freemark+jpa+MySQL實(shí)現(xiàn)的在線網(wǎng)盤文件分享系統(tǒng),其功能跟百度網(wǎng)盤非常類似,可以實(shí)現(xiàn)文件的上傳、移動、復(fù)制、下載等,需要的可以參考一下2021-11-11Java設(shè)計(jì)模式七大原則之單一職責(zé)原則詳解
單一職責(zé)原則(Single Responsibility Principle, SRP),有且僅有一個(gè)原因引起類的變更。簡單來說,就是針對一個(gè)java類,它應(yīng)該只負(fù)責(zé)一項(xiàng)職責(zé)。本文將詳細(xì)介紹一下Java設(shè)計(jì)模式七大原則之一的單一職責(zé)原則,需要的可以參考一下2022-02-02java基礎(chǔ)理論Stream的Filter與謂詞邏輯
這篇文章主要為大家介紹了java基礎(chǔ)理論Stream的Filter與謂詞邏輯,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03Java通過PropertyDescriptor反射調(diào)用set和get方法
這篇文章主要為大家詳細(xì)介紹了Java通過PropertyDescriptor反射調(diào)用set和get方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-03-03Java實(shí)現(xiàn)作業(yè)調(diào)度的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何利用Java實(shí)現(xiàn)SJF算法調(diào)度,要求測試數(shù)據(jù)可以隨即輸入或從文件中讀入,文中的示例代碼講解詳細(xì),需要的可以參考一下2023-04-04