⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 documentswriter.java

📁 Lucene a java open-source SearchEngine Framework
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
package org.apache.lucene.index;/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.analysis.Token;import org.apache.lucene.analysis.TokenStream;import org.apache.lucene.document.Document;import org.apache.lucene.document.Fieldable;import org.apache.lucene.search.Similarity;import org.apache.lucene.store.Directory;import org.apache.lucene.store.IndexOutput;import org.apache.lucene.store.IndexInput;import org.apache.lucene.store.RAMOutputStream;import org.apache.lucene.store.AlreadyClosedException;import java.io.IOException;import java.io.PrintStream;import java.io.Reader;import java.util.Arrays;import java.util.List;import java.util.HashMap;import java.util.ArrayList;import java.text.NumberFormat;import java.util.Collections;/** * This class accepts multiple added documents and directly * writes a single segment file.  It does this more * efficiently than creating a single segment per document * (with DocumentWriter) and doing standard merges on those * segments. * * When a document is added, its stored fields (if any) and * term vectors (if any) are immediately written to the * Directory (ie these do not consume RAM).  The freq/prox * postings are accumulated into a Postings hash table keyed * by term.  Each entry in this hash table holds a separate * byte stream (allocated as incrementally growing slices * into large shared byte[] arrays) for freq and prox, that * contains the postings data for multiple documents.  If * vectors are enabled, each unique term for each document * also allocates a PostingVector instance to similarly * track the offsets & positions byte stream. * * Once the Postings hash is full (ie is consuming the * allowed RAM) or the number of added docs is large enough * (in the case we are flushing by doc count instead of RAM * usage), we create a real segment and flush it to disk and * reset the Postings hash. * * In adding a document we first organize all of its fields * by field name.  We then process field by field, and * record the Posting hash per-field.  After each field we * flush its term vectors.  When it's time to flush the full * segment we first sort the fields by name, and then go * field by field and sorts its postings. * * * Threads: * * Multiple threads are allowed into addDocument at once. * There is an initial synchronized call to getThreadState * which allocates a ThreadState for this thread.  The same * thread will get the same ThreadState over time (thread * affinity) so that if there are consistent patterns (for * example each thread is indexing a different content * source) then we make better use of RAM.  Then * processDocument is called on that ThreadState without * synchronization (most of the "heavy lifting" is in this * call).  Finally the synchronized "finishDocument" is * called to flush changes to the directory. * * Each ThreadState instance has its own Posting hash. Once * we're using too much RAM, we flush all Posting hashes to * a segment by merging the docIDs in the posting lists for * the same term across multiple thread states (see * writeSegment and appendPostings). * * When flush is called by IndexWriter, or, we flush * internally when autoCommit=false, we forcefully idle all * threads and flush only once they are all idle.  This * means you can call flush with a given thread even while * other threads are actively adding/deleting documents. * * * Exceptions: * * Because this class directly updates in-memory posting * lists, and flushes stored fields and term vectors * directly to files in the directory, there are certain * limited times when an exception can corrupt this state. * For example, a disk full while flushing stored fields * leaves this file in a corrupt state.  Or, an OOM * exception while appending to the in-memory posting lists * can corrupt that posting list.  We call such exceptions * "aborting exceptions".  In these cases we must call * abort() to discard all docs added since the last flush. * * All other exceptions ("non-aborting exceptions") can * still partially update the index structures.  These * updates are consistent, but, they represent only a part * of the document seen up until the exception was hit. * When this happens, we immediately mark the document as * deleted so that the document is always atomically ("all * or none") added to the index. */final class DocumentsWriter {  private IndexWriter writer;  private Directory directory;  private FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen  private IndexOutput tvx, tvf, tvd;              // To write term vectors  private FieldsWriter fieldsWriter;              // To write stored fields  private String segment;                         // Current segment we are working on  private String docStoreSegment;                 // Current doc-store segment we are writing  private int docStoreOffset;                     // Current starting doc-store offset of current segment  private int nextDocID;                          // Next docID to be added  private int numDocsInRAM;                       // # docs buffered in RAM  private int numDocsInStore;                     // # docs written to doc stores  private int nextWriteDocID;                     // Next docID to be written  // Max # ThreadState instances; if there are more threads  // than this they share ThreadStates  private final static int MAX_THREAD_STATE = 5;  private ThreadState[] threadStates = new ThreadState[0];  private final HashMap threadBindings = new HashMap();  private int numWaiting;  private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE];  private int pauseThreads;                       // Non-zero when we need all threads to                                                  // pause (eg to flush)  private boolean flushPending;                   // True when a thread has decided to flush  private boolean bufferIsFull;                   // True when it's time to write segment  private int abortCount;                         // Non-zero while abort is pending or running  private PrintStream infoStream;  // This Hashmap buffers delete terms in ram before they  // are applied.  The key is delete term; the value is  // number of buffered documents the term applies to.  private HashMap bufferedDeleteTerms = new HashMap();  private int numBufferedDeleteTerms = 0;  // Currently used only for deleting a doc on hitting an non-aborting exception  private List bufferedDeleteDocIDs = new ArrayList();  // The max number of delete terms that can be buffered before  // they must be flushed to disk.  private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;  // How much RAM we can use before flushing.  This is 0 if  // we are flushing by doc count instead.  private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);  // Flush @ this number of docs.  If rarmBufferSize is  // non-zero we will flush by RAM usage instead.  private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;  private boolean closed;  // Coarse estimates used to measure RAM usage of buffered deletes  private static int OBJECT_HEADER_BYTES = 8;  private static int OBJECT_POINTER_BYTES = 4;    // TODO: should be 8 on 64-bit platform  private static int BYTES_PER_CHAR = 2;  private static int BYTES_PER_INT = 4;  private BufferedNorms[] norms = new BufferedNorms[0];   // Holds norms until we flush  DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {    this.directory = directory;    this.writer = writer;    postingsFreeList = new Posting[0];  }  /** If non-null, various details of indexing are printed   *  here. */  void setInfoStream(PrintStream infoStream) {    this.infoStream = infoStream;  }  /** Set how much RAM we can use before flushing. */  void setRAMBufferSizeMB(double mb) {    if (mb == IndexWriter.DISABLE_AUTO_FLUSH) {      ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;    } else {      ramBufferSize = (long) (mb*1024*1024);    }  }  double getRAMBufferSizeMB() {    if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) {      return ramBufferSize;    } else {      return ramBufferSize/1024./1024.;    }  }  /** Set max buffered docs, which means we will flush by   *  doc count instead of by RAM usage. */  void setMaxBufferedDocs(int count) {    maxBufferedDocs = count;  }  int getMaxBufferedDocs() {    return maxBufferedDocs;  }  /** Get current segment name we are writing. */  String getSegment() {    return segment;  }  /** Returns how many docs are currently buffered in RAM. */  int getNumDocsInRAM() {    return numDocsInRAM;  }  /** Returns the current doc store segment we are writing   *  to.  This will be the same as segment when autoCommit   *  * is true. */  String getDocStoreSegment() {    return docStoreSegment;  }  /** Returns the doc offset into the shared doc store for   *  the current buffered docs. */  int getDocStoreOffset() {    return docStoreOffset;  }  /** Closes the current open doc stores an returns the doc   *  store segment name.  This returns null if there are *   *  no buffered documents. */  String closeDocStore() throws IOException {    assert allThreadsIdle();    List flushedFiles = files();    if (infoStream != null)      infoStream.println("\ncloseDocStore: " + flushedFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);    if (flushedFiles.size() > 0) {      files = null;      if (tvx != null) {        // At least one doc in this run had term vectors enabled        assert docStoreSegment != null;        tvx.close();        tvf.close();        tvd.close();        tvx = null;        assert 4+numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION):          "after flush: tvx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION;      }      if (fieldsWriter != null) {        assert docStoreSegment != null;        fieldsWriter.close();        fieldsWriter = null;        assert numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION):          "after flush: fdx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION;      }      String s = docStoreSegment;      docStoreSegment = null;      docStoreOffset = 0;      numDocsInStore = 0;      return s;    } else {      return null;    }  }  private List files = null;                      // Cached list of files we've created  private List abortedFiles = null;               // List of files that were written before last abort()  List abortedFiles() {    return abortedFiles;  }  /* Returns list of files in use by this instance,   * including any flushed segments. */  synchronized List files() {    if (files != null)      return files;    files = new ArrayList();    // Stored fields:    if (fieldsWriter != null) {      assert docStoreSegment != null;      files.add(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION);      files.add(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION);    }    // Vectors:    if (tvx != null) {      assert docStoreSegment != null;      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);      files.add(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);    }    return files;  }  synchronized void setAborting() {    abortCount++;  }  /** Called if we hit an exception when adding docs,   *  flushing, etc.  This resets our state, discarding any   *  docs added since last flush.  If ae is non-null, it   *  contains the root cause exception (which we re-throw   *  after we are done aborting). */  synchronized void abort(AbortException ae) throws IOException {    // Anywhere that throws an AbortException must first    // mark aborting to make sure while the exception is    // unwinding the un-synchronized stack, no thread grabs    // the corrupt ThreadState that hit the aborting    // exception:    assert ae == null || abortCount>0;    try {      if (infoStream != null)        infoStream.println("docWriter: now abort");      // Forcefully remove waiting ThreadStates from line      for(int i=0;i<numWaiting;i++)        waitingThreadStates[i].isIdle = true;      numWaiting = 0;      // Wait for all other threads to finish with DocumentsWriter:      pauseAllThreads();      assert 0 == numWaiting;      try {        bufferedDeleteTerms.clear();        bufferedDeleteDocIDs.clear();        numBufferedDeleteTerms = 0;        try {          abortedFiles = files();        } catch (Throwable t) {          abortedFiles = null;        }        docStoreSegment = null;        numDocsInStore = 0;        docStoreOffset = 0;        files = null;        // Clear vectors & fields from ThreadStates        for(int i=0;i<threadStates.length;i++) {          ThreadState state = threadStates[i];          state.tvfLocal.reset();          state.fdtLocal.reset();          if (state.localFieldsWriter != null) {            try {              state.localFieldsWriter.close();            } catch (Throwable t) {            }            state.localFieldsWriter = null;          }        }        // Reset vectors writer

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -