欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

時序數(shù)據(jù)庫VictoriaMetrics源碼解析之寫入與索引

 更新時間:2023年05月22日 09:43:36   作者:a朋  
這篇文章主要為大家介紹了VictoriaMetrics時序數(shù)據(jù)庫的寫入與索引源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

一. 存儲格式

下圖是向VictoriaMetrics寫入prometheus協(xié)議數(shù)據(jù)的示例:

VM在收到寫入請求時,會對請求中包含的時序數(shù)據(jù)做轉(zhuǎn)換處理:

  • 首先,根據(jù)metrics+labels組成的MetricName,生成一個唯一標(biāo)識TSID;
  • 然后:

    • metric(指標(biāo)名稱__name__) + labels + TSID作為索引index;
    • TSID + timestamp + value作為數(shù)據(jù)data;
  • 最后,索引index和數(shù)據(jù)data分別進行存儲和檢索;

因此,VM的數(shù)據(jù)整體上分為索引和數(shù)據(jù)2個部分:

  • 索引部分,用以支持按照label或tag進行多維檢索,得到TSID;
  • 數(shù)據(jù)部分,用以支持按照TSID得到tv數(shù)據(jù);

二. 整體流程

VictoriaMetrics在寫入原始的rows數(shù)據(jù)時,寫入過程分為兩個部分:

  • 寫index;
  • 寫tv;

寫入流程:

  • 對于原始的rows數(shù)據(jù),根據(jù)其metricsName從cache和內(nèi)存索引中,查找其對應(yīng)的TSID;
  • 若TSID找到,則寫入tv數(shù)據(jù),返回client;
  • 否則:

    • 寫index:

      • 構(gòu)造TSID,構(gòu)造新的index items,然后將其寫入內(nèi)存shard;
      • 內(nèi)存shard被異步的goroutine壓縮并保存到磁盤;
    • 寫tv數(shù)據(jù);
    • 返回client;

三. 寫入代碼

1.入口代碼

vmstorage監(jiān)聽tcp端口,收到vminsert的插入請求后,進行處理:

// app/vmstorage/servers/vminsert.go
func (s *VMInsertServer) run() {
    ...
    for {
        c, err := s.ln.Accept()
        ...
        go func() {
            bc, err := handshake.VMInsertServer(c, compressionLevel)
            ...
            err = clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {
                vminsertMetricsRead.Add(len(rows))
                return s.storage.AddRows(rows, uint8(*precisionBits))    // 入口代碼
            }, s.storage.IsReadOnly)
            ...
        }()
    }
}

寫入時,1次最多寫8K個rows:

func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
    ....
    maxBlockLen := len(ic.rrs)
    for len(mrs) > 0 {
        mrsBlock := mrs
        // 一次最多寫8K,maxBlockLen=8000
        if len(mrs) > maxBlockLen {
            mrsBlock = mrs[:maxBlockLen]
            mrs = mrs[maxBlockLen:]
        } else {
            mrs = nil
        }
        // 寫入8K rows的數(shù)據(jù)
        if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {
            if firstErr == nil {
                firstErr = err
            }
            continue
        }
        atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
    }
    ....
}

2.寫入流程的代碼

寫入過程主要分2步:

  • 首先,為row查找或構(gòu)建TSID;

    • 若該row的metricNameRaw與prevMetricNameRaw,則使用prevTSID;
    • 若cache中有緩存的metricNameRaw,則使用緩存的metricNameRaw對應(yīng)的TSID;
    • 若上述都不滿足,則去內(nèi)存索引中查找,或者創(chuàng)建一個新的TSID;

      • 這一步是最耗時的;
  • 然后,構(gòu)建TSID完畢后,插入tv數(shù)據(jù);
// lib/storage/storage.go
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
    ...
    // 1.構(gòu)造r.TSID
    // 若跟prevMetricNameRaw相同,則使用pervTSID;
    // 若cache中有metricNameRaw,則使用cache.TSID;
    for i := range mrs {
        mr := &mrs[i]
        ...
        dstMrs[j] = mr
        r := &rows[j]
        j++
        r.Timestamp = mr.Timestamp
        r.Value = mr.Value
        r.PrecisionBits = precisionBits
        if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {    // 使用prevTSID
            // Fast path - the current mr contains the same metric name as the previous mr, so it contains the same TSID.
            // This path should trigger on bulk imports when many rows contain the same MetricNameRaw.
            r.TSID = prevTSID
            continue
        }
        if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {        // 使用緩存的TSID
            ...
            r.TSID = genTSID.TSID
            prevTSID = r.TSID
            prevMetricNameRaw = mr.MetricNameRaw
            ...
            continue
        }
        ...
    }
    if pmrs != nil {
        // Sort pendingMetricRows by canonical metric name in order to speed up search via `is` in the loop below.
        pendingMetricRows := pmrs.pmrs
        sort.Slice(pendingMetricRows, func(i, j int) bool {
            return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)
        })
        prevMetricNameRaw = nil
        var slowInsertsCount uint64
        for i := range pendingMetricRows {
            ...
            r := &rows[j]
            j++
            r.Timestamp = mr.Timestamp
            r.Value = mr.Value
            r.PrecisionBits = precisionBits
            // 嘗試去index找查找,或者創(chuàng)建
          if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName, mr.MetricNameRaw, date); err != nil {
                ...
                continue
            }
            genTSID.generation = idb.generation
            genTSID.TSID = r.TSID
            // 放回cache
            s.putTSIDToCache(&genTSID, mr.MetricNameRaw)
            prevTSID = r.TSID
            prevMetricNameRaw = mr.MetricNameRaw
        }
    }
    ...
    dstMrs = dstMrs[:j]
    rows = rows[:j]
    err := s.updatePerDateData(rows, dstMrs)
    if err != nil {
        err = fmt.Errorf("cannot update per-date data: %w", err)
    } else {
        // TSID構(gòu)造完畢,開始插入數(shù)據(jù)
        err = s.tb.AddRows(rows)
        ...
    }
    ...
    return nil
}

3.寫index

寫index是slow path,重點看一下:

  • 首先,去內(nèi)存索引中找TSID,若找到,則返回;
  • 否則,創(chuàng)建一個新的TSID;
// lib/storage/index_db.go
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
    // 1.首先嘗試在index中查找
    if is.tsidByNameMisses < 100 {
        err := is.getTSIDByMetricName(dst, metricName)
        // 在index中找到了
        if err == nil {
            // Fast path - the TSID for the given metricName has been found in the index.
            is.tsidByNameMisses = 0
            if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
                return err
            }
            return nil
        }
        is.tsidByNameMisses++
    } else {
        is.tsidByNameSkips++
        if is.tsidByNameSkips > 10000 {
            is.tsidByNameSkips = 0
            is.tsidByNameMisses = 0
        }
    }
    // 2.沒有找到,那么創(chuàng)建一個
    if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
        userReadableMetricName := getUserReadableMetricName(metricNameRaw)
        return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
    }
    return nil
}

4. 生成TSID

具體生成TSID的邏輯:

  • MetricGroupID: 由metricGroup hash而來;
  • JobID:由tags[0].Value hash而來;
  • InstanceID:由tags[1].Value hash而來;
// lib/storage/index_db.go
func generateTSID(dst *TSID, mn *MetricName) {
    dst.AccountID = mn.AccountID
    dst.ProjectID = mn.ProjectID
    dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
    if len(mn.Tags) > 0 {
        dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
    }
    if len(mn.Tags) > 1 {
        dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
    }
    dst.MetricID = generateUniqueMetricID()
}

而TSID中的metricID是由啟動時的時間戳+1產(chǎn)生:

// Returns local unique MetricID.
func generateUniqueMetricID() uint64 {
    return atomic.AddUint64(&amp;nextUniqueMetricID, 1)
}
var nextUniqueMetricID = uint64(time.Now().UnixNano())

5. 創(chuàng)建index items

  • 創(chuàng)建 MetricName -> TSID index;
  • 創(chuàng)建 MetricID -> MetricName index;
  • 創(chuàng)建 MetricID -> TSID index;
  • 創(chuàng)建 tag -> MetricID 和 MetricGroup+tag -> MetricID index;
  • 最后,將index items存入內(nèi)存shards;
// lib/storage/index_db.go
func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
    // The order of index items is important.
    // It guarantees index consistency.
    ii := getIndexItems()
    defer putIndexItems(ii)
    // Create MetricName -> TSID index.
    ii.B = append(ii.B, nsPrefixMetricNameToTSID)
    ii.B = mn.Marshal(ii.B)
    ii.B = append(ii.B, kvSeparatorChar)
    ii.B = tsid.Marshal(ii.B)
    ii.Next()
    // Create MetricID -> MetricName index.
    ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)
    ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
    ii.B = mn.Marshal(ii.B)
    ii.Next()
    // Create MetricID -> TSID index.
    ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)
    ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
    ii.B = tsid.Marshal(ii.B)
    ii.Next()
    prefix := kbPool.Get()
    prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
    ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)
    kbPool.Put(prefix)
    is.db.tb.AddItems(ii.Items)     // 將items存入內(nèi)存shards
}

6. index items存入內(nèi)存shards

Index items構(gòu)造完成后,被寫入內(nèi)存的shards,會有異步的goroutine將其壓縮寫入disk。

寫內(nèi)存shards的方法: roundRobin

  • 內(nèi)存中有若干個index shards;
  • 寫入時,輪轉(zhuǎn)寫入:idx++ % shards
// lib/mergeset/table.go
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) {
   shards := riss.shards
   shardsLen := uint32(len(shards))
   for len(items) > 0 {
      n := atomic.AddUint32(&riss.shardIdx, 1)
      idx := n % shardsLen
      items = shards[idx].addItems(tb, items)
   }
}

內(nèi)存中shards總數(shù),跟cpu核數(shù)有關(guān)系:

  • shards總數(shù) = (cpu*cpu + 1) / 2
  • 對于4C的機器,有8個shards;
// lib/mergeset/table.go
/ The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = func() int {
   cpus := cgroup.AvailableCPUs()
   multiplier := cpus
   if multiplier > 16 {
      multiplier = 16
   }
   return (cpus*multiplier + 1) / 2
}()

以上就是時序數(shù)據(jù)庫VictoriaMetrics源碼解析之寫入與索引的詳細(xì)內(nèi)容,更多關(guān)于VictoriaMetrics寫入索引的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • dbeaver工具連接達(dá)夢數(shù)據(jù)庫的完整步驟

    dbeaver工具連接達(dá)夢數(shù)據(jù)庫的完整步驟

    DBeaver數(shù)據(jù)庫連接工具是我用了這么久最好用的一個數(shù)據(jù)庫連接工具,擁有的優(yōu)點,支持的數(shù)據(jù)庫多、快捷鍵很贊、導(dǎo)入導(dǎo)出數(shù)據(jù)非常方便,下面這篇文章主要給大家介紹了關(guān)于dbeaver工具連接達(dá)夢數(shù)據(jù)庫的完整步驟,需要的朋友可以參考下
    2023-05-05
  • neo4j安裝配置入門教程

    neo4j安裝配置入門教程

    這篇文章主要為大家詳細(xì)介紹了neo4j安裝配置入門教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-04-04
  • postgresql sql批量更新記錄

    postgresql sql批量更新記錄

    向postgresql中利用sql批量跟新記錄的實現(xiàn)代碼。
    2009-07-07
  • 問哭自己lsm?索引原理深入剖析

    問哭自己lsm?索引原理深入剖析

    這篇文章主要為大家介紹了問哭自己lsm?索引原理及剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-04-04
  • 一個提升PostgreSQL性能的小技巧

    一個提升PostgreSQL性能的小技巧

    這篇文章主要介紹了一個提升Postgres性能的小技巧,通過修改很少的代碼來優(yōu)化查詢,需要的朋友可以參考下
    2015-04-04
  • mssql 區(qū)分大小寫的詳細(xì)說明

    mssql 區(qū)分大小寫的詳細(xì)說明

    mssql區(qū)分大小寫,沒想到mysql也區(qū)分大小寫。相關(guān)的文章稍后奉獻給大家
    2008-03-03
  • 解決Navicat Premium 15連接數(shù)據(jù)庫閃退的問題

    解決Navicat Premium 15連接數(shù)據(jù)庫閃退的問題

    這篇文章主要介紹了Navicat Premium 15連接數(shù)據(jù)庫閃退,本文給大家分享解決方法,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03
  • 通過DBeaver連接Phoenix操作hbase的方法

    通過DBeaver連接Phoenix操作hbase的方法

    DBeaver?可通過?JDBC?連接到數(shù)據(jù)庫,可以支持幾乎所有的數(shù)據(jù)庫產(chǎn)品,本文介紹常用一種通用數(shù)據(jù)庫工具Dbeaver,通過DBeaver連接Phoenix操作hbase的操作,需要的朋友跟隨小編一起看看吧
    2021-11-11
  • 關(guān)系型數(shù)據(jù)庫和非關(guān)系型數(shù)據(jù)庫概述與優(yōu)缺點對比

    關(guān)系型數(shù)據(jù)庫和非關(guān)系型數(shù)據(jù)庫概述與優(yōu)缺點對比

    這篇文章介紹了關(guān)系型數(shù)據(jù)庫和非關(guān)系型數(shù)據(jù)庫概述與優(yōu)缺點對比,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-03-03
  • 關(guān)于navicat事務(wù)自動提交問題

    關(guān)于navicat事務(wù)自動提交問題

    這篇文章主要介紹了關(guān)于navicat事務(wù)自動提交問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12

最新評論