時(shí)序數(shù)據(jù)庫(kù)VictoriaMetrics源碼解析之寫入與索引
一. 存儲(chǔ)格式
下圖是向VictoriaMetrics寫入prometheus協(xié)議數(shù)據(jù)的示例:

VM在收到寫入請(qǐng)求時(shí),會(huì)對(duì)請(qǐng)求中包含的時(shí)序數(shù)據(jù)做轉(zhuǎn)換處理:
- 首先,根據(jù)metrics+labels組成的MetricName,生成一個(gè)唯一標(biāo)識(shí)TSID;
然后:
- metric(指標(biāo)名稱__name__) + labels + TSID作為索引index;
- TSID + timestamp + value作為數(shù)據(jù)data;
- 最后,索引index和數(shù)據(jù)data分別進(jìn)行存儲(chǔ)和檢索;

因此,VM的數(shù)據(jù)整體上分為索引和數(shù)據(jù)2個(gè)部分:
- 索引部分,用以支持按照l(shuí)abel或tag進(jìn)行多維檢索,得到TSID;
- 數(shù)據(jù)部分,用以支持按照TSID得到tv數(shù)據(jù);
二. 整體流程
VictoriaMetrics在寫入原始的rows數(shù)據(jù)時(shí),寫入過(guò)程分為兩個(gè)部分:
- 寫index;
- 寫tv;
寫入流程:
- 對(duì)于原始的rows數(shù)據(jù),根據(jù)其metricsName從cache和內(nèi)存索引中,查找其對(duì)應(yīng)的TSID;
- 若TSID找到,則寫入tv數(shù)據(jù),返回client;
否則:
寫index:
- 構(gòu)造TSID,構(gòu)造新的index items,然后將其寫入內(nèi)存shard;
- 內(nèi)存shard被異步的goroutine壓縮并保存到磁盤;
- 寫tv數(shù)據(jù);
- 返回client;

三. 寫入代碼
1.入口代碼
vmstorage監(jiān)聽tcp端口,收到vminsert的插入請(qǐng)求后,進(jìn)行處理:
// app/vmstorage/servers/vminsert.go
func (s *VMInsertServer) run() {
...
for {
c, err := s.ln.Accept()
...
go func() {
bc, err := handshake.VMInsertServer(c, compressionLevel)
...
err = clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
vminsertMetricsRead.Add(len(rows))
return s.storage.AddRows(rows, uint8(*precisionBits)) // 入口代碼
}, s.storage.IsReadOnly)
...
}()
}
}寫入時(shí),1次最多寫8K個(gè)rows:
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
....
maxBlockLen := len(ic.rrs)
for len(mrs) > 0 {
mrsBlock := mrs
// 一次最多寫8K,maxBlockLen=8000
if len(mrs) > maxBlockLen {
mrsBlock = mrs[:maxBlockLen]
mrs = mrs[maxBlockLen:]
} else {
mrs = nil
}
// 寫入8K rows的數(shù)據(jù)
if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
}
....
}2.寫入流程的代碼
寫入過(guò)程主要分2步:
首先,為row查找或構(gòu)建TSID;
- 若該row的metricNameRaw與prevMetricNameRaw,則使用prevTSID;
- 若cache中有緩存的metricNameRaw,則使用緩存的metricNameRaw對(duì)應(yīng)的TSID;
若上述都不滿足,則去內(nèi)存索引中查找,或者創(chuàng)建一個(gè)新的TSID;
- 這一步是最耗時(shí)的;
- 然后,構(gòu)建TSID完畢后,插入tv數(shù)據(jù);
// lib/storage/storage.go
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
...
// 1.構(gòu)造r.TSID
// 若跟prevMetricNameRaw相同,則使用pervTSID;
// 若cache中有metricNameRaw,則使用cache.TSID;
for i := range mrs {
mr := &mrs[i]
...
dstMrs[j] = mr
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
if string(mr.MetricNameRaw) == string(prevMetricNameRaw) { // 使用prevTSID
// Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
// This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
r.TSID = prevTSID
continue
}
if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) { // 使用緩存的TSID
...
r.TSID = genTSID.TSID
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
...
continue
}
...
}
if pmrs != nil {
// Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
pendingMetricRows := pmrs.pmrs
sort.Slice(pendingMetricRows, func(i, j int) bool {
return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
})
prevMetricNameRaw = nil
var slowInsertsCount uint64
for i := range pendingMetricRows {
...
r := &rows[j]
j++
r.Timestamp = mr.Timestamp
r.Value = mr.Value
r.PrecisionBits = precisionBits
// 嘗試去index找查找,或者創(chuàng)建
if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
...
continue
}
genTSID.generation = idb.generation
genTSID.TSID = r.TSID
// 放回cache
s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
}
}
...
dstMrs = dstMrs[:j]
rows = rows[:j]
err := s.updatePerDateData(rows, dstMrs)
if err != nil {
err = fmt.Errorf("cannot update per-date data: %w", err)
} else {
// TSID構(gòu)造完畢,開始插入數(shù)據(jù)
err = s.tb.AddRows(rows)
...
}
...
return nil
}3.寫index
寫index是slow path,重點(diǎn)看一下:
- 首先,去內(nèi)存索引中找TSID,若找到,則返回;
- 否則,創(chuàng)建一個(gè)新的TSID;
// lib/storage/index_db.go
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
// 1.首先嘗試在index中查找
if is.tsidByNameMisses < 100 {
err := is.getTSIDByMetricName(dst, metricName)
// 在index中找到了
if err == nil {
// Fast path - the TSID for the given metricName has been found in the index.
is.tsidByNameMisses = 0
if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
return err
}
return nil
}
is.tsidByNameMisses++
} else {
is.tsidByNameSkips++
if is.tsidByNameSkips > 10000 {
is.tsidByNameSkips = 0
is.tsidByNameMisses = 0
}
}
// 2.沒有找到,那么創(chuàng)建一個(gè)
if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
userReadableMetricName := getUserReadableMetricName(metricNameRaw)
return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
}
return nil
}4. 生成TSID
具體生成TSID的邏輯:
- MetricGroupID: 由metricGroup hash而來(lái);
- JobID:由tags[0].Value hash而來(lái);
- InstanceID:由tags[1].Value hash而來(lái);
// lib/storage/index_db.go
func generateTSID(dst *TSID, mn *MetricName) {
dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
}
if len(mn.Tags) > 1 {
dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
}
dst.MetricID = generateUniqueMetricID()
}而TSID中的metricID是由啟動(dòng)時(shí)的時(shí)間戳+1產(chǎn)生:
// Returns local unique MetricID.
func generateUniqueMetricID() uint64 {
return atomic.AddUint64(&nextUniqueMetricID, 1)
}
var nextUniqueMetricID = uint64(time.Now().UnixNano())5. 創(chuàng)建index items
- 創(chuàng)建 MetricName -> TSID index;
- 創(chuàng)建 MetricID -> MetricName index;
- 創(chuàng)建 MetricID -> TSID index;
- 創(chuàng)建 tag -> MetricID 和 MetricGroup+tag -> MetricID index;
- 最后,將index items存入內(nèi)存shards;
// lib/storage/index_db.go
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
// The order of index items is important.
// It guarantees index consistency.
ii := getIndexItems()
defer putIndexItems(ii)
// Create MetricName -> TSID index.
ii.B = append(ii.B, nsPrefixMetricNameToTSID)
ii.B = mn.Marshal(ii.B)
ii.B = append(ii.B, kvSeparatorChar)
ii.B = tsid.Marshal(ii.B)
ii.Next()
// Create MetricID -> MetricName index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = mn.Marshal(ii.B)
ii.Next()
// Create MetricID -> TSID index.
ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
ii.B = tsid.Marshal(ii.B)
ii.Next()
prefix := kbPool.Get()
prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
kbPool.Put(prefix)
is.db.tb.AddItems(ii.Items) // 將items存入內(nèi)存shards
}6. index items存入內(nèi)存shards
Index items構(gòu)造完成后,被寫入內(nèi)存的shards,會(huì)有異步的goroutine將其壓縮寫入disk。
寫內(nèi)存shards的方法: roundRobin
- 內(nèi)存中有若干個(gè)index shards;
- 寫入時(shí),輪轉(zhuǎn)寫入:idx++ % shards
// lib/mergeset/table.go
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
shards := riss.shards
shardsLen := uint32(len(shards))
for len(items) > 0 {
n := atomic.AddUint32(&riss.shardIdx, 1)
idx := n % shardsLen
items = shards[idx].addItems(tb, items)
}
}內(nèi)存中shards總數(shù),跟cpu核數(shù)有關(guān)系:
- shards總數(shù) = (cpu*cpu + 1) / 2
- 對(duì)于4C的機(jī)器,有8個(gè)shards;
// lib/mergeset/table.go
/ The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = func() int {
cpus := cgroup.AvailableCPUs()
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
return (cpus*multiplier + 1) / 2
}()以上就是時(shí)序數(shù)據(jù)庫(kù)VictoriaMetrics源碼解析之寫入與索引的詳細(xì)內(nèi)容,更多關(guān)于VictoriaMetrics寫入索引的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
dbeaver工具連接達(dá)夢(mèng)數(shù)據(jù)庫(kù)的完整步驟
DBeaver數(shù)據(jù)庫(kù)連接工具是我用了這么久最好用的一個(gè)數(shù)據(jù)庫(kù)連接工具,擁有的優(yōu)點(diǎn),支持的數(shù)據(jù)庫(kù)多、快捷鍵很贊、導(dǎo)入導(dǎo)出數(shù)據(jù)非常方便,下面這篇文章主要給大家介紹了關(guān)于dbeaver工具連接達(dá)夢(mèng)數(shù)據(jù)庫(kù)的完整步驟,需要的朋友可以參考下2023-05-05
mssql 區(qū)分大小寫的詳細(xì)說(shuō)明
mssql區(qū)分大小寫,沒想到mysql也區(qū)分大小寫。相關(guān)的文章稍后奉獻(xiàn)給大家2008-03-03
解決Navicat Premium 15連接數(shù)據(jù)庫(kù)閃退的問(wèn)題
這篇文章主要介紹了Navicat Premium 15連接數(shù)據(jù)庫(kù)閃退,本文給大家分享解決方法,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
通過(guò)DBeaver連接Phoenix操作hbase的方法
DBeaver?可通過(guò)?JDBC?連接到數(shù)據(jù)庫(kù),可以支持幾乎所有的數(shù)據(jù)庫(kù)產(chǎn)品,本文介紹常用一種通用數(shù)據(jù)庫(kù)工具Dbeaver,通過(guò)DBeaver連接Phoenix操作hbase的操作,需要的朋友跟隨小編一起看看吧2021-11-11
關(guān)系型數(shù)據(jù)庫(kù)和非關(guān)系型數(shù)據(jù)庫(kù)概述與優(yōu)缺點(diǎn)對(duì)比
這篇文章介紹了關(guān)系型數(shù)據(jù)庫(kù)和非關(guān)系型數(shù)據(jù)庫(kù)概述與優(yōu)缺點(diǎn)對(duì)比,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03
關(guān)于navicat事務(wù)自動(dòng)提交問(wèn)題
這篇文章主要介紹了關(guān)于navicat事務(wù)自動(dòng)提交問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12

