欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Lucene詞向量索引文件構(gòu)建源碼解析

 更新時間:2022年11月16日 10:36:03   作者:滄叔解碼  
這篇文章主要為大家介紹了Lucene詞向量索引文件構(gòu)建源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

背景

詞向量存儲的信息內(nèi)容其實和倒排(Posting)是一樣的,也是每個term所出現(xiàn)的文檔列表以及在文檔中的位置信息,區(qū)別在于存儲結(jié)構(gòu)的不同:

Posting的存儲結(jié)構(gòu)是:field -> term -> doc -> freq/pos/offset

也就是說Posting是從字段定位到term,再定位到文檔,獲取位置信息。

TermVector的存儲結(jié)構(gòu)是:doc -> field -> term -> freq/pos/offset

TermVector是從文檔定位到字段,再定位term,獲取位置信息。

從上面的介紹中,我們可以看出一些基本的規(guī)律:

  • query查詢是通過Posting來查找匹配的文檔的,因為query就是從field中查找匹配的term,順著Posting的結(jié)構(gòu),下一步就能得到所有匹配的文檔了。
  • 從指定文檔中獲取指定字段單個term的匹配位置,則TermVector(doc查找1次,field查找1次,term查找1次,)和Posting(field查找1次,term查找1次,doc查找1次)效率差不多。
  • 從指定文檔中獲取指定字段多個term的匹配位置,則TermVector(doc查找1次,field查找1次,term查找n次)性能比Posting(field查找1次,term查找n次,doc查找n次)好。
  • 多字段多term查詢位置信息也是TermVector性能比較好,大家可以自行分析。

從上面的規(guī)律可以看出,檢索過程確實使用的Posting,畢竟這是真的倒排索引。TermVector適用于從特定文檔中獲取某些字段中term的位置信息,典型應(yīng)用就是高亮:獲取特定的文檔中相關(guān)term的位置。

特殊說明

詞向量構(gòu)建涉及到幾個類之間的關(guān)系

詞向量的構(gòu)建中主要有3個類:

  • TermVectorsConsumer:調(diào)度詞向量的構(gòu)建的最上層邏輯,負責(zé)創(chuàng)建TermVectorsConsumerPerField和Lucene90CompressingTermVectorsWriter。
  • TermVectorsConsumerPerField:每個開啟詞向量構(gòu)建的字段都對應(yīng)一個TermVectorsConsumerPerField,在TermVectorsConsumerPerField中是把詞向量的倒排信息臨時存在內(nèi)存buffer中,在完成一個document的處理之后,會把這個document的所有詞向量數(shù)據(jù)都序列化到Lucene90CompressingTermVectorsWriter的緩存中,然后重置buffer,等到處理下一個document。
  • Lucene90CompressingTermVectorsWriter:負責(zé)組織詞向量的索引文件格式并持久化。

term的存儲

在詞向量中的term是按序存儲的,但是每個Field中的所有的term,除了完整的存儲第一個term之外,其他term都是存儲除了跟前一個term的最長公共前綴的剩余的后綴部分。

chunk的生成條件

詞向量是使用chunk來劃分數(shù)據(jù)的,生成chunk的條件滿足以下二者其一即可:

  • 緩存中的doc數(shù)量超出設(shè)置的閾值
  • suffix的數(shù)據(jù)總量超出了設(shè)置的閾值

詞向量的索引文件

詞向量最終構(gòu)建生成3個索引文件:

  • tvd:按chunk存儲的term,freq,position,offset,payload信息
  • tvx:chunk的索引文件,記錄的是每個chunk的起始docID,以及每個chunk的起始位置,方便根據(jù)docID快速定位到chunk。
  • tvm:詞向量索引文件的元信息,用來讀取詞向量使用的。

源碼解讀

注意:本文源碼基于lucene-9.1.0版本

工具類

FieldsIndexWriter

FieldsIndexWriter這個工具類,以后介紹正排索引文件也會用到,它主要是用來生成所有chunk中的起始doc編號和chunk在數(shù)據(jù)文件中的位置信息,方便讀取的時候快速定位doc所屬的chunk,并從文件中讀取chunk??傮w邏輯是先使用臨時文件存儲前面說的兩個信息,在真正生成索引文件的時候,使用DirectMonotonicWriter進行壓縮存儲,減小索引文件大小。

public final class FieldsIndexWriter implements Closeable {
  static final int VERSION_START = 0;
  static final int VERSION_CURRENT = 0;
  private final Directory dir;
  // 下面這些信息都是用來創(chuàng)建真正索引文件名的  
  private final String name;
  private final String suffix;
  private final String extension;
  private final String codecName;
  private final byte[] id;
  // DirectMonotonicWriter 所需的參數(shù)
  private final int blockShift;
  private final IOContext ioContext;
  // 臨時文件,用來保存所有chunk中的文檔數(shù)  
  private IndexOutput docsOut;
  // 臨時文件,用來保存所有chunk在數(shù)據(jù)文件中的起始位置  
  private IndexOutput filePointersOut;
  // doc總數(shù)  
  private int totalDocs;
  // chunk總數(shù)  
  private int totalChunks;
  // 前一個chunk在tvd索引文件中的起始位置  
  private long previousFP;
  // 添加一個新的index,index就是用來定位doc屬于哪個chunk,以及chunk在數(shù)據(jù)文件中的起始位置。
  // numDocs是chunk中的文檔總數(shù),后面真正序列化到正式的索引文件會通過換算,得到的是每個chunk的起始docID。  
  void writeIndex(int numDocs, long startPointer) throws IOException {
    assert startPointer >= previousFP;
    docsOut.writeVInt(numDocs);
    filePointersOut.writeVLong(startPointer - previousFP);
    previousFP = startPointer;
    totalDocs += numDocs;
    totalChunks++;
  }
  // metaOut是元信息索引文件  
  void finish(int numDocs, long maxPointer, IndexOutput metaOut) throws IOException {
    if (numDocs != totalDocs) {
      throw new IllegalStateException("Expected " + numDocs + " docs, but got " + totalDocs);
    }
    // 完成臨時文件的寫入  
    CodecUtil.writeFooter(docsOut);
    CodecUtil.writeFooter(filePointersOut);
    IOUtils.close(docsOut, filePointersOut);
    // 創(chuàng)建真正的索引文件  
    try (IndexOutput dataOut =
        dir.createOutput(IndexFileNames.segmentFileName(name, suffix, extension), ioContext)) {
      CodecUtil.writeIndexHeader(dataOut, codecName + "Idx", VERSION_CURRENT, id, suffix);
      // chunk中的doc總數(shù)
      metaOut.writeInt(numDocs);
      // 后面使用DirectMonotonicWriter壓縮的時候需要的參數(shù) 
      metaOut.writeInt(blockShift);
      // 使用DirectMonotonicWriter寫入的數(shù)據(jù)總數(shù),為什么加1,看下面的具體寫入邏輯。
      metaOut.writeInt(totalChunks + 1);
      // chunk索引文件中ChunkStartDocIDs的起始位置  
      metaOut.writeLong(dataOut.getFilePointer());
      try (ChecksumIndexInput docsIn =
          dir.openChecksumInput(docsOut.getName(), IOContext.READONCE)) {
        CodecUtil.checkHeader(docsIn, codecName + "Docs", VERSION_CURRENT, VERSION_CURRENT);
        Throwable priorE = null;
        try {
          // 壓縮存儲所有chunk的起始doc編號  
          final DirectMonotonicWriter docs =
              DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks + 1, blockShift);
          long doc = 0;
          docs.add(doc); // 第一個chunk的起始doc編號肯定是0,這也是上面totalChunks + 1的原因之一。
          for (int i = 0; i < totalChunks; ++i) {
            doc += docsIn.readVInt();
            docs.add(doc);
          }
          docs.finish();
          if (doc != totalDocs) {
            throw new CorruptIndexException("Docs don't add up", docsIn);
          }
        } catch (Throwable e) {
          priorE = e;
        } finally {
          CodecUtil.checkFooter(docsIn, priorE);
        }
      }
      // 刪除臨時文件  
      dir.deleteFile(docsOut.getName());
      docsOut = null;
      // chunk索引文件中ChunkOffsets的起始位置
      metaOut.writeLong(dataOut.getFilePointer());
      try (ChecksumIndexInput filePointersIn =
          dir.openChecksumInput(filePointersOut.getName(), IOContext.READONCE)) {
        CodecUtil.checkHeader(
            filePointersIn, codecName + "FilePointers", VERSION_CURRENT, VERSION_CURRENT);
        Throwable priorE = null;
        try {
          // 壓縮存儲所有chunk在tvd文件中的起始位置  
          final DirectMonotonicWriter filePointers =
              DirectMonotonicWriter.getInstance(metaOut, dataOut, totalChunks + 1, blockShift);
          long fp = 0;
          for (int i = 0; i < totalChunks; ++i) {
            fp += filePointersIn.readVLong();
            filePointers.add(fp);
          }
          if (maxPointer < fp) {
            throw new CorruptIndexException("File pointers don't add up", filePointersIn);
          }
          filePointers.add(maxPointer); // 上面totalChunks + 1的原因之二。
          filePointers.finish();
        } catch (Throwable e) {
          priorE = e;
        } finally {
          CodecUtil.checkFooter(filePointersIn, priorE);
        }
      }
      dir.deleteFile(filePointersOut.getName());
      filePointersOut = null;
      metaOut.writeLong(dataOut.getFilePointer());
      metaOut.writeLong(maxPointer);
      CodecUtil.writeFooter(dataOut);
    }
  }
}

核心類

TermVectorsConsumer

class TermVectorsConsumer extends TermsHash {
  protected final Directory directory;
  protected final SegmentInfo info;
  protected final Codec codec;
  // 詞向量持久化  
  TermVectorsWriter writer;
  /** Scratch term used by TermVectorsConsumerPerField.finishDocument. */
  final BytesRef flushTerm = new BytesRef();
  // 用來從TermVectorsConsumerPerField的bytepool中讀取position信息
  final ByteSliceReader vectorSliceReaderPos = new ByteSliceReader();
  final ByteSliceReader vectorSliceReaderOff = new ByteSliceReader();
  private boolean hasVectors;
  private int numVectorFields;
  int lastDocID;
  // 每一個開啟詞向量構(gòu)建的Field,都有一個TermVectorsConsumerPerField,當(dāng)一個doc處理完之后會把所有的
  // TermVectorsConsumerPerField都序列化到Lucene90CompressingTermVectorsWriter中,然后重置等待處理下一個doc
  private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
  Accountable accountable = Accountable.NULL_ACCOUNTABLE;
  TermVectorsConsumer(
      final IntBlockPool.Allocator intBlockAllocator,
      final ByteBlockPool.Allocator byteBlockAllocator,
      Directory directory,
      SegmentInfo info,
      Codec codec) {
    super(intBlockAllocator, byteBlockAllocator, Counter.newCounter(), null);
    this.directory = directory;
    this.info = info;
    this.codec = codec;
  }
  @Override
  void flush(
      Map<String, TermsHashPerField> fieldsToFlush,
      final SegmentWriteState state,
      Sorter.DocMap sortMap,
      NormsProducer norms)
      throws IOException {
    if (writer != null) {
      int numDocs = state.segmentInfo.maxDoc();
      try {
        // 把不存在詞向量Filed的文檔填充下  
        fill(numDocs);
        assert state.segmentInfo != null;
        // 觸發(fā)詞向量索引文件持久化落盤   
        writer.finish(numDocs);
      } finally {
        IOUtils.close(writer);
      }
    }
  }
  /**
   * Fills in no-term-vectors for all docs we haven't seen since the last doc that had term vectors.
   */
  void fill(int docID) throws IOException {
    while (lastDocID < docID) {
      writer.startDocument(0);
      writer.finishDocument();
      lastDocID++;
    }
  }
  // 創(chuàng)建  Lucene90CompressingTermVectorsWriter
  void initTermVectorsWriter() throws IOException {
    if (writer == null) {
      IOContext context = new IOContext(new FlushInfo(lastDocID, bytesUsed.get()));
      writer = codec.termVectorsFormat().vectorsWriter(directory, info, context);
      lastDocID = 0;
      accountable = writer;
    }
  }
  void setHasVectors() {
    hasVectors = true;
  }
  @Override
  void finishDocument(int docID) throws IOException {
    // 不存在詞向量,直接返回
    if (!hasVectors) {
      return;
    }
    // 按字段名排序TermVectorsConsumerPerField
    ArrayUtil.introSort(perFields, 0, numVectorFields);
    initTermVectorsWriter();
    // 為了確保doc是連續(xù),則把lastDocID到docID的空白填充下
    fill(docID);
    // 開始序列化
    writer.startDocument(numVectorFields);
    // 處理document中的所有Field,會把相關(guān)的詞向量數(shù)據(jù)寫到writer的緩存中
    for (int i = 0; i < numVectorFields; i++) {
      perFields[i].finishDocument();
    }
    // 結(jié)束一個document詞向量的序列化  
    writer.finishDocument();
    assert lastDocID == docID : "lastDocID=" + lastDocID + " docID=" + docID;
    lastDocID++;
    super.reset();
    // 重置TermVectorsConsumerPerField 數(shù)組,等待處理下一個document  
    resetFields();
  }
  @Override
  public void abort() {
    try {
      super.abort();
    } finally {
      IOUtils.closeWhileHandlingException(writer);
      reset();
    }
  }
  void resetFields() {
    Arrays.fill(perFields, null); // don't hang onto stuff from previous doc
    numVectorFields = 0;
  }
  @Override
  public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
    return new TermVectorsConsumerPerField(invertState, this, fieldInfo);
  }
  // 當(dāng)結(jié)束一個Field的所有term的處理之后,就把TermVectorsConsumerPerField存在perFields中,
  // 等待把數(shù)據(jù)都序列化到Lucene90CompressingTermVectorsWriter中
  void addFieldToFlush(TermVectorsConsumerPerField fieldToFlush) {
    if (numVectorFields == perFields.length) {
      int newSize = ArrayUtil.oversize(numVectorFields + 1, RamUsageEstimator.NUM_BYTES_OBJECT_REF);
      TermVectorsConsumerPerField[] newArray = new TermVectorsConsumerPerField[newSize];
      System.arraycopy(perFields, 0, newArray, 0, numVectorFields);
      perFields = newArray;
    }
    perFields[numVectorFields++] = fieldToFlush;
  }
  @Override
  void startDocument() {
    resetFields();
    numVectorFields = 0;
  }
}

Lucene90CompressingTermVectorsWriter

Lucene90CompressingTermVectorsWriter是生成詞向量索引文件的核心類,主要負責(zé)按照特定的索引文件格式組織數(shù)據(jù)并持久化。

父類

TermVectorsWriter

Lucene90CompressingTermVectorsWriter的父類有大量的抽象方法,剩下一個模板方法addProx用來添加term在field中的所有的位置信息。

public abstract class TermVectorsWriter implements Closeable, Accountable {
  /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */
  protected TermVectorsWriter() {}
  // 開始持久化一個doc的所有詞向量
  public abstract void startDocument(int numVectorFields) throws IOException;
  // 結(jié)束一個文檔持久化的時候調(diào)用
  public void finishDocument() throws IOException {};
  // 開始持久化一個Field
  public abstract void startField(
      FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
      throws IOException;
  // 結(jié)束一個Field的處理
  public void finishField() throws IOException {};
  // 開始持久化一個term的倒排信息
  public abstract void startTerm(BytesRef term, int freq) throws IOException;
  // 結(jié)束一個term的處理
  public void finishTerm() throws IOException {}
  // 構(gòu)建term的一個position信息
  public abstract void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
      throws IOException;
  // 所有文檔處理完成,在close方法調(diào)用之前,調(diào)用finish,numDoc是處理的文檔總數(shù)
  public abstract void finish(int numDocs) throws IOException;
  // 從positions和offsets中讀取所有的位置信息,其實就是從TermVectorsConsumerPerField#bytePool中讀取
  public void addProx(int numProx, DataInput positions, DataInput offsets) throws IOException {
    int position = 0;
    int lastOffset = 0;
    BytesRefBuilder payload = null;
    for (int i = 0; i < numProx; i++) {
      final int startOffset;
      final int endOffset;
      final BytesRef thisPayload;
      if (positions == null) {
        position = -1;
        thisPayload = null;
      } else {
        int code = positions.readVInt();
        position += code >>> 1;
        if ((code & 1) != 0) {
          final int payloadLength = positions.readVInt();
          if (payload == null) {
            payload = new BytesRefBuilder();
          }
          payload.grow(payloadLength);
          positions.readBytes(payload.bytes(), 0, payloadLength);
          payload.setLength(payloadLength);
          thisPayload = payload.get();
        } else {
          thisPayload = null;
        }
      }
      if (offsets == null) {
        startOffset = endOffset = -1;
      } else {
        startOffset = lastOffset + offsets.readVInt();
        endOffset = startOffset + offsets.readVInt();
        lastOffset = endOffset;
      }
      // 子類實現(xiàn)的真正的添加  
      addPosition(position, startOffset, endOffset, thisPayload);
    }
  }
    // 刪除了一些跟merge相關(guān)的方法,以后介紹merge的時候再說
  @Override
  public abstract void close() throws IOException;
}

成員變量

  // sement的名稱
  private final String segment;
  // 生成tvx索引文件
  private FieldsIndexWriter indexWriter;
  // metaStream:生成tvm索引文件
  // vectorStream:生成tvd索引文件
  private IndexOutput metaStream, vectorsStream;
  // 壓縮算法
  private final CompressionMode compressionMode;
  private final Compressor compressor;
  // tvx中的chunk大小是2^chunkSize
  private final int chunkSize;
  // chunk總數(shù)
  private long numChunks; 
  // 如果一個chunk中包含的doc信息是不完整的,則算一次
  private long numDirtyChunks; 
  // 在dirtyChunk中的doc總數(shù)
  private long numDirtyDocs; 
  // 處理的doc總數(shù)
  private int numDocs;
  // 構(gòu)建過程中暫時存儲的DocData,觸發(fā)flush的話就會持久化
  private final Deque<DocData> pendingDocs;
  // 當(dāng)前正在處理的doc
  private DocData curDoc;
  // 當(dāng)前正在處理的field
  private FieldData curField;
  // 上一個處理的term
  private final BytesRef lastTerm;
  // 全局臨時存儲的buf
  private int[] positionsBuf, startOffsetsBuf, lengthsBuf, payloadLengthsBuf;
  // 存儲后綴
  private final ByteBuffersDataOutput termSuffixes;
  // 存儲payload信息
  private final ByteBuffersDataOutput payloadBytes;
  // 批量整型的壓縮工具
  private final BlockPackedWriter writer;
  // 一個chunk中最多的文檔數(shù)
  private final int maxDocsPerChunk; 
  private final ByteBuffersDataOutput scratchBuffer = ByteBuffersDataOutput.newResettableInstance();

內(nèi)部類

DocData

表示當(dāng)前要序列化的一個doc的所有的詞向量數(shù)據(jù)信息。

private class DocData {
  // doc中有多少個field  
  final int numFields;
  // 每個field的詞向量信息  
  final Deque<FieldData> fields;
  // 當(dāng)前doc在全局buffer(positionsBuf, startOffsetsBuf, payloadLengthsBuf)中的起始位置  
  final int posStart, offStart, payStart;
  DocData(int numFields, int posStart, int offStart, int payStart) {
    this.numFields = numFields;
    this.fields = new ArrayDeque<>(numFields);
    this.posStart = posStart;
    this.offStart = offStart;
    this.payStart = payStart;
  }
  // 新增一個field  
  FieldData addField(
      int fieldNum, int numTerms, boolean positions, boolean offsets, boolean payloads) {
    final FieldData field;
    if (fields.isEmpty()) {
      field =
          new FieldData(
              fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
    } else {
      final FieldData last = fields.getLast();
      // 計算當(dāng)前field的一些起始位置,也就是前一個field的起始位置+前一個field的所有的數(shù)據(jù)量  
      final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0);
      final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
      final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
      field =
          new FieldData(
              fieldNum, numTerms, positions, offsets, payloads, posStart, offStart, payStart);
    }
    fields.add(field);
    return field;
  }
}

FieldData

存儲一個field的所有的詞向量的所需的數(shù)據(jù)。

private class FieldData {
  final boolean hasPositions, hasOffsets, hasPayloads;
  // flags是個混合標(biāo)記位,標(biāo)記是否需要構(gòu)建position,offset,payload  
  final int fieldNum, flags, numTerms;
  // freqs:存儲的是每個term的頻率
  // prefixLengths:存儲的是當(dāng)前term和前一個term的公共前綴的長度
  // suffixLengths:存儲的是除了當(dāng)前term和前一個term的公共前綴的剩余部分的長度
  final int[] freqs, prefixLengths, suffixLengths;
  // 當(dāng)前Field的position,offset,payload數(shù)據(jù)在全局buf中的起始位置  
  final int posStart, offStart, payStart;
  int totalPositions;
  // 當(dāng)前處理的是第幾個term  
  int ord;
  FieldData(
      int fieldNum,
      int numTerms,
      boolean positions,
      boolean offsets,
      boolean payloads,
      int posStart,
      int offStart,
      int payStart) {
    this.fieldNum = fieldNum;
    this.numTerms = numTerms;
    this.hasPositions = positions;
    this.hasOffsets = offsets;
    this.hasPayloads = payloads;
    this.flags =
        (positions ? POSITIONS : 0) | (offsets ? OFFSETS : 0) | (payloads ? PAYLOADS : 0);
    this.freqs = new int[numTerms];
    this.prefixLengths = new int[numTerms];
    this.suffixLengths = new int[numTerms];
    this.posStart = posStart;
    this.offStart = offStart;
    this.payStart = payStart;
    totalPositions = 0;
    ord = 0;
  }
  // 新增一個term
  // prefixLength:和前一個term的最長公共前綴
  // suffixLength:除了prefix剩下的就是suffix  
  void addTerm(int freq, int prefixLength, int suffixLength) {
    freqs[ord] = freq;
    prefixLengths[ord] = prefixLength;
    suffixLengths[ord] = suffixLength;
    ++ord;
  }
  // 為當(dāng)前處理的term新增一個位置信息數(shù)據(jù),數(shù)據(jù)都是暫存在全局的buffer中
  void addPosition(int position, int startOffset, int length, int payloadLength) {
    if (hasPositions) {
      if (posStart + totalPositions == positionsBuf.length) {
        positionsBuf = ArrayUtil.grow(positionsBuf);
      }
      positionsBuf[posStart + totalPositions] = position;
    }
    if (hasOffsets) {
      if (offStart + totalPositions == startOffsetsBuf.length) {
        final int newLength = ArrayUtil.oversize(offStart + totalPositions, 4);
        startOffsetsBuf = ArrayUtil.growExact(startOffsetsBuf, newLength);
        lengthsBuf = ArrayUtil.growExact(lengthsBuf, newLength);
      }
      startOffsetsBuf[offStart + totalPositions] = startOffset;
      lengthsBuf[offStart + totalPositions] = length;
    }
    if (hasPayloads) {
      if (payStart + totalPositions == payloadLengthsBuf.length) {
        payloadLengthsBuf = ArrayUtil.grow(payloadLengthsBuf);
      }
      payloadLengthsBuf[payStart + totalPositions] = payloadLength;
    }
    ++totalPositions;
  }
}

構(gòu)造方法

Lucene90CompressingTermVectorsWriter(
    Directory directory,
    SegmentInfo si,
    String segmentSuffix,
    IOContext context,
    String formatName,
    CompressionMode compressionMode,
    int chunkSize,
    int maxDocsPerChunk,
    int blockShift)
    throws IOException {
  assert directory != null;
  this.segment = si.name;
  this.compressionMode = compressionMode;
  this.compressor = compressionMode.newCompressor();
  this.chunkSize = chunkSize;
  this.maxDocsPerChunk = maxDocsPerChunk;
  numDocs = 0;
  pendingDocs = new ArrayDeque<>();
  termSuffixes = ByteBuffersDataOutput.newResettableInstance();
  payloadBytes = ByteBuffersDataOutput.newResettableInstance();
  lastTerm = new BytesRef(ArrayUtil.oversize(30, 1));
  boolean success = false;
  try {
    metaStream =
        directory.createOutput(
            IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_META_EXTENSION),
            context);
    CodecUtil.writeIndexHeader(
        metaStream,
        VECTORS_INDEX_CODEC_NAME + "Meta",
        VERSION_CURRENT,
        si.getId(),
        segmentSuffix);
    assert CodecUtil.indexHeaderLength(VECTORS_INDEX_CODEC_NAME + "Meta", segmentSuffix)
        == metaStream.getFilePointer();
    vectorsStream =
        directory.createOutput(
            IndexFileNames.segmentFileName(segment, segmentSuffix, VECTORS_EXTENSION), context);
    CodecUtil.writeIndexHeader(
        vectorsStream, formatName, VERSION_CURRENT, si.getId(), segmentSuffix);
    assert CodecUtil.indexHeaderLength(formatName, segmentSuffix)
        == vectorsStream.getFilePointer();
    // 生成tvx索引文件
    indexWriter =
        new FieldsIndexWriter(
            directory,
            segment,
            segmentSuffix,
            VECTORS_INDEX_EXTENSION,
            VECTORS_INDEX_CODEC_NAME,
            si.getId(),
            blockShift,
            context);
    // 記錄PackedInts的版本
    metaStream.writeVInt(PackedInts.VERSION_CURRENT);
    // 記錄chunkSize  
    metaStream.writeVInt(chunkSize);
    writer = new BlockPackedWriter(vectorsStream, PACKED_BLOCK_SIZE);
    // 全局buffer,用來臨時存儲數(shù)據(jù)  
    positionsBuf = new int[1024];
    startOffsetsBuf = new int[1024];
    lengthsBuf = new int[1024];
    payloadLengthsBuf = new int[1024];
    success = true;
  } finally {
    if (!success) {
      IOUtils.closeWhileHandlingException(metaStream, vectorsStream, indexWriter, indexWriter);
    }
  }
}

核心方法

startDocument

要開始處理一個doc了,創(chuàng)建一個DocData來存儲這個doc所有的數(shù)據(jù)信息。

  @Override
  public void startDocument(int numVectorFields) throws IOException {
    curDoc = addDocData(numVectorFields);
  }  
  private DocData addDocData(int numVectorFields) {
    FieldData last = null;
    // 逆序遍歷pendingDocs列表,獲取最后一個DocData,需要根據(jù)它來計算在下一個DocData在全局buffer中的起始o(jì)ffset
    for (Iterator<DocData> it = pendingDocs.descendingIterator(); it.hasNext(); ) {
      final DocData doc = it.next();
      if (!doc.fields.isEmpty()) {
        last = doc.fields.getLast();
        break;
      }
    }
    final DocData doc;
    if (last == null) {
      doc = new DocData(numVectorFields, 0, 0, 0); // 第一個doc
    } else {
      final int posStart = last.posStart + (last.hasPositions ? last.totalPositions : 0);
      final int offStart = last.offStart + (last.hasOffsets ? last.totalPositions : 0);
      final int payStart = last.payStart + (last.hasPayloads ? last.totalPositions : 0);
      doc = new DocData(numVectorFields, posStart, offStart, payStart);
    }
    pendingDocs.add(doc);
    return doc;
  }

startField

開始處理當(dāng)前doc中的一個新的Field,創(chuàng)建FieldData,用來存儲field的所有的數(shù)據(jù)信息。

  public void startField(
      FieldInfo info, int numTerms, boolean positions, boolean offsets, boolean payloads)
      throws IOException {
    curField = curDoc.addField(info.number, numTerms, positions, offsets, payloads);
    lastTerm.length = 0;
  }

startTerm

計算當(dāng)前term和前一個term的最長公共前綴。

  public void startTerm(BytesRef term, int freq) throws IOException {
    // 和前一個term的最長公共前綴  
    final int prefix;
    if (lastTerm.length == 0) {
      prefix = 0;
    } else {
      prefix = StringHelper.bytesDifference(lastTerm, term);
    }
    // FieldData新增term  
    curField.addTerm(freq, prefix, term.length - prefix);
    // 存儲suffix  
    termSuffixes.writeBytes(term.bytes, term.offset + prefix, term.length - prefix);
    // 更新lastTerm
    if (lastTerm.bytes.length < term.length) {
      lastTerm.bytes = new byte[ArrayUtil.oversize(term.length, 1)];
    }
    lastTerm.offset = 0;
    lastTerm.length = term.length;
    System.arraycopy(term.bytes, term.offset, lastTerm.bytes, 0, term.length);
  }

addPosition

為當(dāng)前處理的term新增一個位置相關(guān)的信息。

  public void addPosition(int position, int startOffset, int endOffset, BytesRef payload)
      throws IOException {
    assert curField.flags != 0;
    curField.addPosition(
        position, startOffset, endOffset - startOffset, payload == null ? 0 : payload.length);
    if (curField.hasPayloads && payload != null) {
      payloadBytes.writeBytes(payload.bytes, payload.offset, payload.length);
    }
  }

finishField

結(jié)束一個field的處理,就是簡單把當(dāng)前的curFiled清空,等待處理下一個field。

  public void finishField() throws IOException {
    curField = null;
  }

finishDocument

可以看到,在結(jié)束一個doc的處理時,會判斷是否滿足一個chunk的構(gòu)建條件,如果滿足的話則進行構(gòu)建。

  public void finishDocument() throws IOException {
    payloadBytes.copyTo(termSuffixes);
    payloadBytes.reset();
    ++numDocs;
    if (triggerFlush()) { // 是否滿足一個chunk
      flush(false); // 構(gòu)建一個chunk
    }
    curDoc = null;
  }

triggerFlush

判斷當(dāng)前是否滿足一個chunk的構(gòu)建條件,二者滿足其一即可:

  • termSuffixes的大小大于等于chunkSize
  • 當(dāng)前待處理的doc總數(shù)大于等于maxDocsPerChunk
  private boolean triggerFlush() {
    return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
  }

構(gòu)建chunk

flush

flush觸發(fā)構(gòu)建chunk邏輯,里面主要是調(diào)度邏輯,按類別構(gòu)建所需的詞向量信息。

  private void flush(boolean force) throws IOException {
    // 當(dāng)前要構(gòu)建的chunk中有幾個doc  
    final int chunkDocs = pendingDocs.size();
    numChunks++;
    if (force) { // 如果是強制構(gòu)建chunk,可能是不滿足chunk條件的,這種chunk被定義為dirtyChunk
      numDirtyChunks++;
      numDirtyDocs += pendingDocs.size();
    }
    // 構(gòu)建chunk的索引信息
    indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer());
    final int docBase = numDocs - chunkDocs;
    // chunk的起始docID  
    vectorsStream.writeVInt(docBase);
    final int dirtyBit = force ? 1 : 0;
    vectorsStream.writeVInt((chunkDocs &lt;&lt; 1) | dirtyBit);
    // 記錄每個doc的field數(shù)量
    final int totalFields = flushNumFields(chunkDocs);
    if (totalFields &gt; 0) {
      // 記錄當(dāng)前chunk中所有Field的編號
      final int[] fieldNums = flushFieldNums();
      // 記錄所有doc的所有field的編號
      flushFields(totalFields, fieldNums);
      // 記錄所有doc的所有field的flag
      flushFlags(totalFields, fieldNums);
      // 記錄所有doc的所有field的term數(shù)量
      flushNumTerms(totalFields);
      // 記錄所有term的長度信息
      flushTermLengths();
      // 記錄所有term的頻率
      flushTermFreqs();
      // 記錄所有term的position信息
      flushPositions();
      // 記錄所有term的offset信息
      flushOffsets(fieldNums);
      // 記錄所有position的payload信息
      flushPayloadLengths();
      // 記錄所有的suffix
      byte[] content = termSuffixes.toArrayCopy();
      compressor.compress(content, 0, content.length, vectorsStream);
    }
    // 重置相關(guān)變量,等待處理下一個chunk
    pendingDocs.clear();
    curDoc = null;
    curField = null;
    termSuffixes.reset();
  }

flushNumFields

記錄所有doc的字段總數(shù),分為兩種情況:

  private int flushNumFields(int chunkDocs) throws IOException {
    if (chunkDocs == 1) { // 如果chunk中只有一個doc,則就直接寫這個doc的字段總數(shù)
      final int numFields = pendingDocs.getFirst().numFields;
      vectorsStream.writeVInt(numFields);
      return numFields;
    } else { // 否則,使用PackedInts壓縮存儲所有doc的字段數(shù)信息
      writer.reset(vectorsStream);
      int totalFields = 0;
      for (DocData dd : pendingDocs) {
        writer.add(dd.numFields);
        totalFields += dd.numFields;
      }
      writer.finish();
      return totalFields;
    }
  }
  • 如果只有一個doc,則單獨記錄這個doc的字段數(shù)
  • 否則,使用PackedInts壓縮存儲所有的doc的字段數(shù)

flushFieldNums

  private int[] flushFieldNums() throws IOException {
    // chunk中所有term的編號按序存儲  
    SortedSet<Integer> fieldNums = new TreeSet<>();
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        fieldNums.add(fd.fieldNum);
      }
    }
    final int numDistinctFields = fieldNums.size();
    final int bitsRequired = PackedInts.bitsRequired(fieldNums.last());
    // bitsRequired最大就是32,所以低5位就夠了
    final int token = (Math.min(numDistinctFields - 1, 0x07) << 5) | bitsRequired;
    vectorsStream.writeByte((byte) token);
    if (numDistinctFields - 1 >= 0x07) {
      vectorsStream.writeVInt(numDistinctFields - 1 - 0x07);
    }
    final PackedInts.Writer writer =
        PackedInts.getWriterNoHeader(
            vectorsStream, PackedInts.Format.PACKED, fieldNums.size(), bitsRequired, 1);
    for (Integer fieldNum : fieldNums) {
      writer.add(fieldNum);
    }
    writer.finish();
    // Integer轉(zhuǎn)int
    int[] fns = new int[fieldNums.size()];
    int i = 0;
    for (Integer key : fieldNums) {
      fns[i++] = key;
    }
    return fns;
  }

flushFields

存儲doc中所有的field的編號。

  private void flushFields(int totalFields, int[] fieldNums) throws IOException {
    scratchBuffer.reset();
    // 使用  DirectWriter 壓縮存儲
    final DirectWriter writer =
        DirectWriter.getInstance(
            scratchBuffer, totalFields, DirectWriter.bitsRequired(fieldNums.length - 1));
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        final int fieldNumIndex = Arrays.binarySearch(fieldNums, fd.fieldNum);
        assert fieldNumIndex >= 0;
        writer.add(fieldNumIndex);
      }
    }
    writer.finish();
    vectorsStream.writeVLong(scratchBuffer.size());
    scratchBuffer.copyTo(vectorsStream);
  }

flushFlags

存儲doc中所有field的flag,分為兩種情況:

  private void flushFlags(int totalFields, int[] fieldNums) throws IOException {
    // 所有doc中相同的field是否都是一樣的flag
    boolean nonChangingFlags = true;
    // 如果所有相同的field的flag都一樣,則最后只存儲這個數(shù)組  
    int[] fieldFlags = new int[fieldNums.length];
    Arrays.fill(fieldFlags, -1);
    outer:
    for (DocData dd : pendingDocs) { // 遍歷所有的doc
      for (FieldData fd : dd.fields) { // 遍歷所有的field
        final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
        assert fieldNumOff >= 0;
        if (fieldFlags[fieldNumOff] == -1) {
          fieldFlags[fieldNumOff] = fd.flags;
        } else if (fieldFlags[fieldNumOff] != fd.flags) { // 有一個field不一樣
          nonChangingFlags = false;
          break outer;
        }
      }
    }
    if (nonChangingFlags) { // 如果所有doc相同的field的flag都一樣,
      // 寫0標(biāo)記這種情況
      vectorsStream.writeVInt(0);
      scratchBuffer.reset();
      final DirectWriter writer =
          DirectWriter.getInstance(scratchBuffer, fieldFlags.length, FLAGS_BITS);
      for (int flags : fieldFlags) { // 每個field只寫一個flag
        assert flags >= 0;
        writer.add(flags);
      }
      writer.finish();
      vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
      scratchBuffer.copyTo(vectorsStream);
    } else { // 需要記錄所有doc中的所有field的flag
      // 寫1標(biāo)記這種情況
      vectorsStream.writeVInt(1);
      scratchBuffer.reset();
      final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, FLAGS_BITS);
      for (DocData dd : pendingDocs) { // 遍歷doc
        for (FieldData fd : dd.fields) { // 遍歷field
          writer.add(fd.flags); // 記錄field的flag
        }
      }
      writer.finish();
      vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
      scratchBuffer.copyTo(vectorsStream);
    }
  }
  • 如果所有doc中相關(guān)的field的flag都一樣,則每個field的flag單獨存儲一份就可以
  • 否則,需要存儲所有doc中所有field的flag

flushNumTerms

存儲所有field的term數(shù)量,會先統(tǒng)計最大的term數(shù)量,用來獲取最大的term數(shù)據(jù)值需要幾個bit存儲。

  private void flushNumTerms(int totalFields) throws IOException {
    int maxNumTerms = 0;
    // 獲取最大的term數(shù)量的值  
    for (DocData dd : pendingDocs) { 
      for (FieldData fd : dd.fields) {
        maxNumTerms |= fd.numTerms;
      }
    }
    final int bitsRequired = DirectWriter.bitsRequired(maxNumTerms);
    vectorsStream.writeVInt(bitsRequired);
    scratchBuffer.reset();
    // 使用DirectWriter壓縮存儲  
    final DirectWriter writer = DirectWriter.getInstance(scratchBuffer, totalFields, bitsRequired);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        writer.add(fd.numTerms);
      }
    }
    writer.finish();
    vectorsStream.writeVInt(Math.toIntExact(scratchBuffer.size()));
    scratchBuffer.copyTo(vectorsStream);
  }

flushTermLengths

分別存儲term的prefixLength和suffixLength。

  private void flushTermLengths() throws IOException {
    // 存儲prefixLength  
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          writer.add(fd.prefixLengths[i]);
        }
      }
    }
    writer.finish();
    // 存儲suffixLength  
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          writer.add(fd.suffixLengths[i]);
        }
      }
    }
    writer.finish();
  }

flushTermFreqs

存儲term的頻率,這里有個小小的優(yōu)化,為了提高壓縮率。

  private void flushTermFreqs() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        for (int i = 0; i < fd.numTerms; ++i) {
          // 已經(jīng)確定了freq肯定是大于等于1,減1是為了提高writer的壓縮率,讀取的時候加1就行了。
          writer.add(fd.freqs[i] - 1);
        }
      }
    }
    writer.finish();
  }

flushPositions

差值存儲所有的position。

  private void flushPositions() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        if (fd.hasPositions) {
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            int previousPosition = 0;
            for (int j = 0; j < fd.freqs[i]; ++j) {
              final int position = positionsBuf[fd.posStart + pos++];
              writer.add(position - previousPosition);
              previousPosition = position;
            }
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    writer.finish();
  }

flushOffsets

offset的存儲做了一個優(yōu)化設(shè)計,原因是term出現(xiàn)的不同的offset跨度可能會比較大,如果把原始的offset用PackedInts進行存儲,可能壓縮率不會很高。因此,在正式存儲offset之前,先計算平均的term長度,根據(jù)term出現(xiàn)的前后兩個offset的position,可以估計兩個position的距離,用真實的前后兩個offset的距離減去這個估計的距離,就能使得offset的差值向0趨近,可以提高PackedInts的壓縮率。

  private void flushOffsets(int[] fieldNums) throws IOException {
    // 至少一個字段開啟了offset  
    boolean hasOffsets = false;
    // term在所有字段中出現(xiàn)的最后一個postition之和  
    long[] sumPos = new long[fieldNums.length];
    // term在所有字段中出現(xiàn)的最后一個startOffset之和  
    long[] sumOffsets = new long[fieldNums.length];
    for (DocData dd : pendingDocs) { // 遍歷所有的doc
      for (FieldData fd : dd.fields) { // 遍歷doc中的所有field
        hasOffsets |= fd.hasOffsets;
        if (fd.hasOffsets && fd.hasPositions) { // 如果字段開啟了offset和position
          // 查找在term數(shù)組中的下標(biāo)  
          final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            sumPos[fieldNumOff] += positionsBuf[fd.posStart + fd.freqs[i] - 1 + pos];
            sumOffsets[fieldNumOff] += startOffsetsBuf[fd.offStart + fd.freqs[i] - 1 + pos];
            pos += fd.freqs[i];
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    if (!hasOffsets) {
      return;
    }
    final float[] charsPerTerm = new float[fieldNums.length];
    // 用  sumOffsets[i] / sumPos[i] 估計第i個term的長度
    for (int i = 0; i < fieldNums.length; ++i) {
      charsPerTerm[i] =
          (sumPos[i] <= 0 || sumOffsets[i] <= 0) ? 0 : (float) ((double) sumOffsets[i] / sumPos[i]);
    }
    // tvd中存儲charsPerTerm
    for (int i = 0; i < fieldNums.length; ++i) {
      vectorsStream.writeInt(Float.floatToRawIntBits(charsPerTerm[i]));
    }
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) { // 遍歷所有的doc
      for (FieldData fd : dd.fields) { // 遍歷doc中所有的field
        if ((fd.flags & OFFSETS) != 0) { // 如果開啟了offset
          final int fieldNumOff = Arrays.binarySearch(fieldNums, fd.fieldNum);
          final float cpt = charsPerTerm[fieldNumOff];
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) { // 遍歷field中所有的term
            int previousPos = 0; // 差值使用
            int previousOff = 0; // 差值使用
            for (int j = 0; j < fd.freqs[i]; ++j) { // 遍歷term出現(xiàn)的所有位置
              final int position = fd.hasPositions ? positionsBuf[fd.posStart + pos] : 0;
              final int startOffset = startOffsetsBuf[fd.offStart + pos];
              // (int) (cpt * (position - previousPos)):當(dāng)前potition和前一個position之間的長度
              // startOffset - previousOff再減去(int) (cpt * (position - previousPos))就把值降到最小
              writer.add(startOffset - previousOff - (int) (cpt * (position - previousPos)));
              previousPos = position;
              previousOff = startOffset;
              ++pos;
            }
          }
        }
      }
    }
    writer.finish();
    // lengths
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) { // 遍歷所有的doc
      for (FieldData fd : dd.fields) { // 遍歷所有的Field
        if ((fd.flags & OFFSETS) != 0) { // 如果開啟了offset
          int pos = 0;
          for (int i = 0; i < fd.numTerms; ++i) {
            for (int j = 0; j < fd.freqs[i]; ++j) {
              writer.add(
                  // 減去前綴長度和后綴長度也是為了把值變小,減少存儲空間
                  lengthsBuf[fd.offStart + pos++] - fd.prefixLengths[i] - fd.suffixLengths[i]);
            }
          }
          assert pos == fd.totalPositions;
        }
      }
    }
    writer.finish();
  }

flushPayloadLengths

存儲所有的payload的長度信息。

  private void flushPayloadLengths() throws IOException {
    writer.reset(vectorsStream);
    for (DocData dd : pendingDocs) {
      for (FieldData fd : dd.fields) {
        if (fd.hasPayloads) {
          for (int i = 0; i &lt; fd.totalPositions; ++i) {
            writer.add(payloadLengthsBuf[fd.payStart + i]);
          }
        }
      }
    }
    writer.finish();
  }

finish

結(jié)束詞向量索引文件的構(gòu)建,把待處理doc列表中剩下的doc生成一個chunk。

  public void finish(int numDocs) throws IOException {
    if (!pendingDocs.isEmpty()) { // 如果還有待處理的doc,則強制生成一個chunk
      flush(true);
    }
    if (numDocs != this.numDocs) {
      throw new RuntimeException(
          "Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
    }
    // 生成tvx索引文件  
    indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
    metaStream.writeVLong(numChunks);
    metaStream.writeVLong(numDirtyChunks);
    metaStream.writeVLong(numDirtyDocs);
    CodecUtil.writeFooter(metaStream);
    CodecUtil.writeFooter(vectorsStream);
  }

TermVectorsConsumerPerField

TermVectorsConsumerPerField是TermsHashPerField的子類,TermsHashPerField在之前介紹倒排的時候已經(jīng)非常詳細地介紹過了。

在本文中,我們重點介紹不一樣的地方。在介紹倒排的時候使用的是FreqProxTermsWriterPerField,它存儲了所有的倒排數(shù)據(jù),在所有的文檔都處理完了之后才進行序列化和持久化,TermVectorsConsumerPerField和它最大的區(qū)別是每處理完一個doc,就進行序列化然后重置等待處理下一個doc。

在TermVectorsConsumerPerField的源碼中,如果已經(jīng)看明白之前倒排的邏輯,則大部分地方理解起來都比較容易,這里我們只看一個文檔處理完之后進行序列化的邏輯,實際上在TermVectorsConsumerPerField中只負責(zé)調(diào)度Lucene90CompressingTermVectorsWriter進行操作:

  void finishDocument() throws IOException {
    // 如果沒有開啟詞向量構(gòu)建  
    if (doVectors == false) {
      return;
    }
    doVectors = false;
    // 當(dāng)前field的term總數(shù)
    final int numPostings = getNumTerms();
    // 用來存儲當(dāng)前序列化的term
    final BytesRef flushTerm = termsWriter.flushTerm;
    TermVectorsPostingsArray postings = termVectorsPostingsArray;
    // 序列化和持久化的核心類,實際上使用的實現(xiàn)類:Lucene90CompressingTermVectorsWriter 
    final TermVectorsWriter tv = termsWriter.writer;
    // 對term進行排序
    sortTerms();
    // 獲取排序后的termID列表
    final int[] termIDs = getSortedTermIDs();
    // 開始處理一個Field
    tv.startField(fieldInfo, numPostings, doVectorPositions, doVectorOffsets, hasPayloads);
    // 用來從bytePool中讀取position信息
    final ByteSliceReader posReader = doVectorPositions ? termsWriter.vectorSliceReaderPos : null;
    // 用來從bytePool中讀取offset信息  
    final ByteSliceReader offReader = doVectorOffsets ? termsWriter.vectorSliceReaderOff : null;
    // 遍歷所有的term
    for (int j = 0; j < numPostings; j++) {
      final int termID = termIDs[j];
      final int freq = postings.freqs[termID];
      // 當(dāng)前處理的term存入flushTerm
      termBytePool.setBytesRef(flushTerm, postings.textStarts[termID]);
      // 準(zhǔn)備序列化term的詞向量信息  
      tv.startTerm(flushTerm, freq);
      if (doVectorPositions || doVectorOffsets) {
        if (posReader != null) {
          initReader(posReader, termID, 0);
        }
        if (offReader != null) {
          initReader(offReader, termID, 1);
        }
        // 序列化所有的position和offset信息
        tv.addProx(freq, posReader, offReader);
      }
      // 結(jié)束term的處理  
      tv.finishTerm();
    }
    // 結(jié)束Field的處理  
    tv.finishField();
    reset();
    fieldInfo.setStoreTermVectors();
  }

索引文件格式

tvm

詞向量索引文件的元信息,用來讀取使用。

tvm.png

字段詳解

Header

文件頭部信息,主要是包括:

  • 文件頭魔數(shù)(同一lucene版本所有文件相同)
  • 該文件使用的codec名稱:Lucene90TermVectorsIndexMeta
  • codec版本
  • segment id(也是Segment_N文件中的N)
  • segment后綴名(一般為空)

PackedItsVersion

在詞向量的索引文件中有很多數(shù)據(jù)是使用PackedIts壓縮存儲,該字段記錄PackedInts的版本。、

ChunkSize

用來判斷是否滿足一個chunk的一種條件,如果chunk的大小超過了ChunkSize的限制,則可以構(gòu)建一個chunk

NumChunks

chunk總數(shù)

NumDirtyChunks

dirtyChunk總數(shù)

NumDirtyDocs

dirtyChunk中的doc總數(shù)

NumDocs

doc總數(shù)

BlockShift

DirectMonotonicWriter需要的參數(shù),DirectMonotonicWriter壓縮存儲會生成多個block,BlockShift決定了block的大小。

TotalChunks + 1

chunk總數(shù) + 1,在生成tvx索引文件中ChunkStartDocIDs和ChunkTVDOffsets兩個字段時,使用DirectMonotonicWriter寫入的值的總數(shù)。

tvxDocStartFP

tvx索引文件中ChunkStartDocIDs的起始位置

DocBlockMeta

tvx索引文件中ChunkStartDocIDs使用DirectMonotonicWriter編碼存儲,會生成多個block,這些block的元信息。

tvxOffsetStartFP

tvx中ChunkTVDOffsets的起始位置

OffsetBlockMeta

tvx索引文件中ChunkTVDOffsets使用DirectMonotonicWriter編碼存儲,會生成多個block,這些block的元信息。

SPEndPoint

tvx文件的結(jié)束位置,后面是tvx的footer信息。

MaxPointer

tvd文件的結(jié)束位置,后面tvd的footer信息。

Footer

文件尾,主要包括

  • 文件尾魔數(shù)(同一個lucene版本所有文件一樣)
  • 0
  • 校驗碼

tvd 字段詳解

tvd索引文件主要是存儲倒排信息中freq,position,offset,payload。

Header

文件頭部信息,主要是包括:

  • 文件頭魔數(shù)(同一lucene版本所有文件相同)
  • 該文件使用的codec名稱:Lucene90TermVectorsData
  • codec版本
  • segment id(也是Segment_N文件中的N)
  • segment后綴名(一般為空)

chunk

在詞向量的構(gòu)建過程中,

  • DocBase:Chunk中Doc的起始編號,Chunk中所有doc的真實編號需要加上這個DocBase
  • ChunkDocsCode:是ChunkDocs和isDirty的int組合體
    • ChunkDocs:chunk中的doc總數(shù)
    • isDirty:chunk中是否存在dirtyChunk
  • NumField:chunk中文檔的字段個數(shù)。如果chunk中只有一個doc,則存儲的就是這個doc的字段個數(shù)。否則使用packedInts壓縮存儲所有的doc的字段個數(shù)信息。
  • AllUniqueFieldNums:按序存儲chunk中所有去重后的field的編號,用PackedInts壓縮存儲。
  • FieldNums:使用DirectWriter壓縮存儲所有doc的字段編號在AllUniqueFieldNums列表中的下標(biāo)。
  • FieldFlagsCode:int類型的IsChangeFlag和DirectWriter壓縮存儲的FiledFlags
    • IsChangeFlag:是否有字段在不同的doc中的flag是不一樣的。0表示所有的doc中的相同F(xiàn)ield的flag都一樣,1則不是。
    • FieldFlags:如果IsChangeFlag==0,則存儲的是AllUniqueFieldNums中每個字段的flag,否則存儲的就是所有doc中所有字段的flag。
  • FieldNumTerms:使用DirectWriter壓縮存儲所有的doc中的所有field的term的總數(shù)
  • PrefixLengths:使用PackedInts存儲所有term和同字段中前一個term的最長公共前綴長度
  • SuffixLengths:使用PackedInts存儲所有term和同字段中前一個term的扣除最長公共前綴剩下的后綴的長度
  • TermFreqs:使用PackedInts存儲所有doc的所有字段中term出現(xiàn)的頻率
  • Positions:使用PackedInts存儲所有doc中所有Field中term出現(xiàn)的差值position
  • StartOffsets:使用PackedInts存儲所有doc中所有Field中term出現(xiàn)的StartOffset,具體做了一些處理(看前面的源碼分析),占用空間會更小。
  • Lengths:使用PackedInts存儲所有doc中所有Field中term出現(xiàn)的EndOffset - StartOffset,具體做了一些處理(看前面的源碼分析),占用空間會更小。
  • PayloadLengths:使用PackedInts存儲所有的doc中所有字段中的term的payload長度
  • TermSuffixes:使用LZ4壓縮存儲所有term的suffix

Footer

文件尾,主要包括

  • 文件尾魔數(shù)(同一個lucene版本所有文件一樣)
  • 0
  • 校驗碼

tvx 字段詳解

tvx索引文件主要存儲的是tvd索引文件中的一些索引信息,tvd中每個chunk的起始docID以及存儲的起始位置。

Header

文件頭部信息,主要是包括:

  • 文件頭魔數(shù)(同一lucene版本所有文件相同)
  • 該文件使用的codec名稱:Lucene90TermVectorsIndexIdx
  • codec版本
  • segment id(也是Segment_N文件中的N)
  • segment后綴名(一般為空)

ChunkStartDocIDs

所有chunk的起始docID,使用DirectMonotonicWriter編碼存儲,會生成多個block。

ChunkTVDOffsetsBlock

所有chunk在tvd索引文件中的起始位置,使用DirectMonotonicWriter編碼存儲,會生成多個block。

Footer

文件尾,主要包括

  • 文件尾魔數(shù)(同一個lucene版本所有文件一樣)
  • 0
  • 校驗碼

以上就是Lucene詞向量索引文件構(gòu)建源碼解析的詳細內(nèi)容,更多關(guān)于Lucene詞向量索引文件構(gòu)建的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論