RocketMQ存儲(chǔ)文件的實(shí)現(xiàn)
RocketMQ存儲(chǔ)路徑默認(rèn)是${ROCKRTMQ_HOME}/store,主要存儲(chǔ)消息、主題對(duì)應(yīng)的消息隊(duì)列的索引等。
1、概述
查看其目錄文件
commitlog
:消息的存儲(chǔ)目錄config
:運(yùn)行期間一些配置信息
consumequeue
:消息消費(fèi)隊(duì)列存儲(chǔ)目錄index
:消息索引文件存儲(chǔ)目錄abort
:如果存在abort文件說(shuō)明Broker非正常關(guān)閉,該文件默認(rèn)啟動(dòng)時(shí)創(chuàng)建,正常退出時(shí)刪除checkpoint
:文件檢測(cè)點(diǎn)。存儲(chǔ)commitlog文件最后一次刷盤(pán)時(shí)間戳、consumequeue最后一次刷盤(pán)時(shí)間、index索引文件最后一次刷盤(pán)時(shí)間戳。
2、文件簡(jiǎn)介
2.1、commitlog文件
commitlog文件的存儲(chǔ)地址:$HOME\store\commitlog${fileName},每個(gè)文件的大小默認(rèn)1G =102410241024,commitlog的文件名fileName,名字長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量;比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)這個(gè)文件滿(mǎn)了,第二個(gè)文件名字為00000000001073741824,起始偏移量為1073741824,以此類(lèi)推,第三個(gè)文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲(chǔ)的時(shí)候會(huì)順序?qū)懭胛募?dāng)文件滿(mǎn)了,寫(xiě)入下一個(gè)文件。
commitlog目錄下的文件主要存儲(chǔ)消息,每條消息的長(zhǎng)度不同,查看其存儲(chǔ)的邏輯視圖,每條消息的前面4個(gè)字節(jié)存儲(chǔ)該條消息的總長(zhǎng)度。
文件的消息單元存儲(chǔ)詳細(xì)信息
編號(hào) | 字段簡(jiǎn)稱(chēng) | 字段大?。ㄗ止?jié)) | 字段含義 |
---|---|---|---|
1 | msgSize | 4 | 代表這個(gè)消息的大小 |
2 | MAGICCODE | 4 | MAGICCODE = daa320a7 |
3 | BODY CRC | 4 | 消息體BODY CRC 當(dāng)broker重啟recover時(shí)會(huì)校驗(yàn) |
4 | queueId | 4 | |
5 | flag | 4 | |
6 | QUEUEOFFSET | 8 | 這個(gè)值是個(gè)自增值不是真正的consume queue的偏移量,可以代表這個(gè)consumeQueue隊(duì)列或者tranStateTable隊(duì)列中消息的個(gè)數(shù),若是非事務(wù)消息或者commit事務(wù)消息,可以通過(guò)這個(gè)值查找到consumeQueue中數(shù)據(jù),QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事務(wù),則可以通過(guò)該值從tranStateTable中查找數(shù)據(jù) |
7 | PHYSICALOFFSET | 8 | 代表消息在commitLog中的物理起始地址偏移量 |
8 | SYSFLAG | 4 | 指明消息是事物事物狀態(tài)等消息特征,二進(jìn)制為四個(gè)字節(jié)從右往左數(shù):當(dāng)4個(gè)字節(jié)均為0(值為0)時(shí)表示非事務(wù)消息;當(dāng)?shù)?個(gè)字節(jié)為1(值為1)時(shí)表示表示消息是壓縮的(Compressed);當(dāng)?shù)?個(gè)字節(jié)為1(值為2)表示多消息(MultiTags);當(dāng)?shù)?個(gè)字節(jié)為1(值為4)時(shí)表示prepared消息;當(dāng)?shù)?個(gè)字節(jié)為1(值為8)時(shí)表示commit消息;當(dāng)?shù)?/4個(gè)字節(jié)均為1時(shí)(值為12)時(shí)表示rollback消息;當(dāng)?shù)?/4個(gè)字節(jié)均為0時(shí)表示非事務(wù)消息 |
9 | BORNTIMESTAMP | 8 | 消息產(chǎn)生端(producer)的時(shí)間戳 |
10 | BORNHOST | 8 | 消息產(chǎn)生端(producer)地址(address:port) |
11 | STORETIMESTAMP | 8 | 消息在broker存儲(chǔ)時(shí)間 |
12 | STOREHOSTADDRESS | 8 | 消息存儲(chǔ)到broker的地址(address:port) |
13 | RECONSUMETIMES | 8 | 消息被某個(gè)訂閱組重新消費(fèi)了幾次(訂閱組之間獨(dú)立計(jì)數(shù)),因?yàn)橹卦囅l(fā)送到了topic名字為%retry%groupName的隊(duì)列queueId=0的隊(duì)列中去了,成功消費(fèi)一次記錄為0; |
14 | PreparedTransaction Offset | 8 | 表示是prepared狀態(tài)的事物消息 |
15 | messagebodyLength | 4 | 消息體大小值 |
16 | messagebody | bodyLength | 消息體內(nèi)容 |
17 | topicLength | 1 | topic名稱(chēng)內(nèi)容大小 |
18 | topic | topicLength | topic的內(nèi)容值 |
19 | propertiesLength | 2 | 屬性值大小 |
20 | properties | propertiesLength | propertiesLength大小的屬性數(shù)據(jù) |
2.2、consumequeue
RocketMQ基于主題訂閱模式實(shí)現(xiàn)消息的消費(fèi),消費(fèi)者關(guān)心的是主題下的所有消息。
但是由于不同的主題的消息不連續(xù)的存儲(chǔ)在commitlog文件中,如果只是檢索該消息文件可想而知會(huì)有多慢,為了提高效率,對(duì)應(yīng)的主題的隊(duì)列建立了索引文件,為了加快消息的檢索和節(jié)省磁盤(pán)空間,每一個(gè)consumequeue條目存儲(chǔ)了消息的關(guān)鍵信息commitog文件中的偏移量、消息長(zhǎng)度、tag的hashcode值。
查看目錄結(jié)構(gòu):
單個(gè)consumequeue文件中默認(rèn)包含30萬(wàn)個(gè)條目,每個(gè)條目20個(gè)字節(jié),所以每個(gè)文件的大小是固定的20w x 20字節(jié),單個(gè)consumequeue文件可認(rèn)為是一個(gè)數(shù)組,下標(biāo)即為邏輯偏移量,消息的消費(fèi)進(jìn)度存儲(chǔ)的偏移量即邏輯偏移量。
2.3、IndexFile
IndexFile:用于為生成的索引文件提供訪(fǎng)問(wèn)服務(wù),通過(guò)消息Key值查詢(xún)消息真正的實(shí)體內(nèi)容。在實(shí)際的物理存儲(chǔ)上,文件名則是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引;
2.3.1、IndexFile結(jié)構(gòu)分析
IndexHead 數(shù)據(jù): beginTimestamp:該索引文件包含消息的最小存儲(chǔ)時(shí)間 endTimestamp:該索引文件包含消息的最大存儲(chǔ)時(shí)間 beginPhyoffset:該索引文件中包含消息的最小物理偏移量(commitlog 文件偏移量) endPhyoffset:該索引文件中包含消息的最大物理偏移量(commitlog 文件偏移量) hashSlotCount:hashslot個(gè)數(shù),并不是 hash 槽使用的個(gè)數(shù),在這里意義不大, indexCount:已使用的 Index 條目個(gè)數(shù)
Hash 槽: 一個(gè) IndexFile 默認(rèn)包含 500W 個(gè) Hash 槽,每個(gè) Hash 槽存儲(chǔ)的是落在該 Hash 槽的 hashcode 最新的 Index 的索引
Index 條目列表 hashcode:key 的 hashcode phyoffset:消息對(duì)應(yīng)的物理偏移量 timedif:該消息存儲(chǔ)時(shí)間與第一條消息的時(shí)間戳的差值,小于 0 表示該消息無(wú)效 preIndexNo:該條目的前一條記錄的 Index 索引,hash 沖突時(shí),根據(jù)該值構(gòu)建鏈表結(jié)構(gòu)
2.3.2、IndexFile條目存儲(chǔ)
RocketMQ將消息索引鍵與消息的偏移量映射關(guān)系寫(xiě)入IndexFile中,其核心的實(shí)現(xiàn)方法是public boolean putKey(final String key, final long phyOffset, final long storeTimestamp);參數(shù)含義分別是消息的索引、消息的物理偏移量、消息的存儲(chǔ)時(shí)間。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { //判斷當(dāng)前的條目數(shù)是否大于最大的允許的條目數(shù) if (this.indexHeader.getIndexCount() < this.indexNum) { //獲取KEY的hash值(正整數(shù)) int keyHash = indexKeyHashMethod(key); //計(jì)算hash槽的下標(biāo) int slotPos = keyHash % this.hashSlotNum; //獲取hash槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize, // false); //獲取hash槽中存儲(chǔ)的數(shù)據(jù) int slotValue = this.mappedByteBuffer.getInt(absSlotPos); //判斷值是否小于等于0或者 大于當(dāng)前索引文件的最大條目 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } //計(jì)算當(dāng)前消息存儲(chǔ)時(shí)間與第一條消息時(shí)間戳的時(shí)間差 long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); //秒 timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } //計(jì)算條目的物理地址 = 索引頭部大小(40字節(jié)) + hash槽的大小(4字節(jié))*槽的數(shù)量(500w) + 當(dāng)前索引最大條目的個(gè)數(shù)*每index的大?。?0字節(jié)) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; //依次存入 key的hash值(4字節(jié))+消息的物理偏移量(8字節(jié))+消息存儲(chǔ)時(shí)間戳和index文件的時(shí)間戳差(4字節(jié))+當(dāng)前hash槽的值(4字節(jié)) this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); //存儲(chǔ)當(dāng)前index中包含的條目數(shù)量存入hash槽中,覆蓋原先hash槽的值 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } //更新文件索引的頭信息,hash槽的總數(shù)、index條目的總數(shù)、最后消息的物理偏移量、最后消息的存儲(chǔ)時(shí)間 this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; }
以上詳細(xì)了分析了IndexFile條目存儲(chǔ)的業(yè)務(wù)邏輯
2.3.3、通過(guò)KEY查找消息
DefaultMessageStore類(lèi)中的
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end)
中其核心方法是
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
獲取消息的物理存儲(chǔ)地址,通過(guò)偏移量去commitLog中獲取消息集。
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end)
核心方法又是IndexFile類(lèi)中的
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock)
方法
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { //獲取key的hash信息 int keyHash = indexKeyHashMethod(key); //獲取hash槽的下標(biāo) int slotPos = keyHash % this.hashSlotNum; //獲取hash槽的物理地址 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } //獲取hash槽的值 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } //判斷值是否小于等于0或者 大于當(dāng)前索引文件的最大條目 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } //計(jì)算條目的物理地址 = 索引頭部大小(40字節(jié)) + hash槽的大小(4字節(jié))*槽的數(shù)量(500w) + 當(dāng)前索引最大條目的個(gè)數(shù)*每index的大?。?0字節(jié)) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; //獲取key的hash值 int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); //獲取消息的物理偏移量 long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); //獲取當(dāng)前消息的存儲(chǔ)時(shí)間戳與index文件的時(shí)間戳差值 long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); //獲取前一個(gè)條目的信息(鏈表結(jié)構(gòu)) int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; //判斷該消息是否在查詢(xún)的區(qū)間 boolean timeMatched = (timeRead >= begin) && (timeRead <= end); //判斷key的hash值是否相等并且在查詢(xún)的時(shí)間區(qū)間內(nèi) if (keyHash == keyHashRead && timeMatched) { //加入到物理偏移量的List中 phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } //繼續(xù)前一個(gè)條目信息獲取進(jìn)行匹配 nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
1、根據(jù)查詢(xún)的 key 的 hashcode%slotNum 得到具體的槽的位置( slotNum 是一個(gè)索引文件里面包含的最大槽的數(shù)目,例如圖中所示 slotNum=5000000)。
2、根據(jù) slotValue( slot 位置對(duì)應(yīng)的值)查找到索引項(xiàng)列表的最后一項(xiàng)(倒序排列, slotValue 總是指向最新的一個(gè) 索引項(xiàng))。
3、遍歷索引項(xiàng)列表返回查詢(xún)時(shí)間范圍內(nèi)的結(jié)果集(默認(rèn)一次最大返回的 32 條記彔)
4、Hash 沖突;尋找 key 的 slot 位置時(shí)相當(dāng)于執(zhí)行了兩次散列函數(shù),一次 key 的 hash,一次 key 的 hash 值取模,因此返里存在兩次沖突的情況;第一種, key 的 hash 不同但模數(shù)相同,此時(shí)查詢(xún)的時(shí)候會(huì)在比較一次key 的hash 值(每個(gè)索引項(xiàng)保存了 key 的 hash 值),過(guò)濾掉 hash 值不相等的項(xiàng)。第二種, hash 值相等但 key 不等,出于性能的考慮沖突的檢測(cè)放到客戶(hù)端處理( key 的原始值是存儲(chǔ)在消息文件中的,避免對(duì)數(shù)據(jù)文件的解析),客戶(hù)端比較一次消息體的 key 是否相同
2.4、checkpoint
checkpoint文件的作用是記錄commitlog、consumequeue、index文件的刷盤(pán)時(shí)間點(diǎn),文件固定長(zhǎng)度4k,其中只用了該文件的前24個(gè)字節(jié)。查看其存儲(chǔ)格式
physicMsgTimestamp
:commitlog文件刷盤(pán)時(shí)間點(diǎn)
logicsMsgTimestamp
:消息的消費(fèi)隊(duì)列文件刷盤(pán)時(shí)間點(diǎn)
indexMsgTimestamp
:索引文件刷盤(pán)時(shí)間點(diǎn)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java發(fā)送郵件及打開(kāi)狀態(tài)判斷分析實(shí)例
這篇文章主要為大家介紹了java發(fā)送郵件及打開(kāi)狀態(tài)判斷分析實(shí)例2023-12-12mybatis-plus實(shí)現(xiàn)邏輯刪除的示例代碼
在大多數(shù)公司里,都會(huì)采用邏輯刪除的方式,本文主要介紹了mybatis-plus實(shí)現(xiàn)邏輯刪除的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05Java實(shí)現(xiàn)文件夾中內(nèi)容定時(shí)刪除
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)文件夾中內(nèi)容定時(shí)刪除,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08基于Java實(shí)現(xiàn)ssh命令登錄主機(jī)執(zhí)行shell命令過(guò)程解析
這篇文章主要介紹了基于Java實(shí)現(xiàn)ssh命令登錄主機(jī)執(zhí)行shell命令過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12Java多線(xiàn)程之ThreadLocal原理總結(jié)
這篇文章主要介紹了Java多線(xiàn)程ThreadLocal原理,同一個(gè)ThreadLocal所包含的對(duì)象,在不同的Thread中有不同的副本,文章中有詳細(xì)的代碼示例,需要的朋友參考一下2023-04-04Java日期工具類(lèi)操作字符串Date和LocalDate互轉(zhuǎn)
這篇文章主要介紹了Java日期工具類(lèi)操作字符串Date和LocalDate互轉(zhuǎn),文章首先通過(guò)需要先引入坐標(biāo)展開(kāi)主題的相關(guān)內(nèi)容介紹,需要的朋友可以參一下2022-06-06Spring擴(kuò)展之基于HandlerMapping實(shí)現(xiàn)接口灰度發(fā)布實(shí)例
這篇文章主要介紹了Spring擴(kuò)展之基于HandlerMapping實(shí)現(xiàn)接口灰度發(fā)布實(shí)例,灰度發(fā)布是指在黑與白之間,能夠平滑過(guò)渡的一種發(fā)布方式,灰度發(fā)布可以保證整體系統(tǒng)的穩(wěn)定,在初始灰度的時(shí)候就可以發(fā)現(xiàn)、調(diào)整問(wèn)題,以保證其影響度,需要的朋友可以參考下2023-08-08解決Mybatis的serverTimezone時(shí)區(qū)出現(xiàn)問(wèn)題
這篇文章主要介紹了解決Mybatis的serverTimezone時(shí)區(qū)出現(xiàn)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09