⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 documentswriter.java

📁 lucene-2.4.0 是一个全文收索的工具包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
package org.apache.lucene.index;/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.document.Document;import org.apache.lucene.search.Similarity;import org.apache.lucene.search.Query;import org.apache.lucene.search.IndexSearcher;import org.apache.lucene.search.Scorer;import org.apache.lucene.search.Weight;import org.apache.lucene.store.Directory;import org.apache.lucene.store.AlreadyClosedException;import org.apache.lucene.util.ArrayUtil;import java.io.IOException;import java.io.PrintStream;import java.util.Collection;import java.util.Iterator;import java.util.List;import java.util.HashMap;import java.util.HashSet;import java.util.ArrayList;import java.util.Map.Entry;import java.text.NumberFormat;/** * This class accepts multiple added documents and directly * writes a single segment file.  It does this more * efficiently than creating a single segment per document * (with DocumentWriter) and doing standard merges on those * segments. * * Each added document is passed to the {@link DocConsumer}, * which in turn processes the document and interacts with * other consumers in the indexing chain.  Certain * consumers, like {@link StoredFieldsWriter} and {@link * TermVectorsTermsWriter}, digest a document and * immediately write bytes to the "doc store" files (ie, * they do not consume RAM per document, except while they * are processing the document). * * Other consumers, eg {@link FreqProxTermsWriter} and * {@link NormsWriter}, buffer bytes in RAM and flush only * when a new segment is produced. * Once we have used our allowed RAM buffer, or the number * of added docs is large enough (in the case we are * flushing by doc count instead of RAM usage), we create a * real segment and flush it to the Directory. * * Threads: * * Multiple threads are allowed into addDocument at once. * There is an initial synchronized call to getThreadState * which allocates a ThreadState for this thread.  The same * thread will get the same ThreadState over time (thread * affinity) so that if there are consistent patterns (for * example each thread is indexing a different content * source) then we make better use of RAM.  Then * processDocument is called on that ThreadState without * synchronization (most of the "heavy lifting" is in this * call).  Finally the synchronized "finishDocument" is * called to flush changes to the directory. * * When flush is called by IndexWriter, or, we flush * internally when autoCommit=false, we forcefully idle all * threads and flush only once they are all idle.  This * means you can call flush with a given thread even while * other threads are actively adding/deleting documents. * * * Exceptions: * * Because this class directly updates in-memory posting * lists, and flushes stored fields and term vectors * directly to files in the directory, there are certain * limited times when an exception can corrupt this state. * For example, a disk full while flushing stored fields * leaves this file in a corrupt state.  Or, an OOM * exception while appending to the in-memory posting lists * can corrupt that posting list.  We call such exceptions * "aborting exceptions".  In these cases we must call * abort() to discard all docs added since the last flush. * * All other exceptions ("non-aborting exceptions") can * still partially update the index structures.  These * updates are consistent, but, they represent only a part * of the document seen up until the exception was hit. * When this happens, we immediately mark the document as * deleted so that the document is always atomically ("all * or none") added to the index. */final class DocumentsWriter {  IndexWriter writer;  Directory directory;  String segment;                         // Current segment we are working on  private String docStoreSegment;         // Current doc-store segment we are writing  private int docStoreOffset;                     // Current starting doc-store offset of current segment  private int nextDocID;                          // Next docID to be added  private int numDocsInRAM;                       // # docs buffered in RAM  int numDocsInStore;                     // # docs written to doc stores  // Max # ThreadState instances; if there are more threads  // than this they share ThreadStates  private final static int MAX_THREAD_STATE = 5;  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];  private final HashMap threadBindings = new HashMap();  private int pauseThreads;               // Non-zero when we need all threads to                                          // pause (eg to flush)  boolean flushPending;                   // True when a thread has decided to flush  boolean bufferIsFull;                   // True when it's time to write segment  private boolean aborting;               // True if an abort is pending  private DocFieldProcessor docFieldProcessor;  PrintStream infoStream;  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;  Similarity similarity;  List newFiles;  static class DocState {    DocumentsWriter docWriter;    Analyzer analyzer;    int maxFieldLength;    PrintStream infoStream;    Similarity similarity;    int docID;    Document doc;    String maxTermPrefix;    // Only called by asserts    public boolean testPoint(String name) {      return docWriter.writer.testPoint(name);    }  }  static class FlushState {    DocumentsWriter docWriter;    Directory directory;    String segmentName;    String docStoreSegmentName;    int numDocsInRAM;    int numDocsInStore;    Collection flushedFiles;    public String segmentFileName(String ext) {      return segmentName + "." + ext;    }  }  /** Consumer returns this on each doc.  This holds any   *  state that must be flushed synchronized "in docID   *  order".  We gather these and flush them in order. */  abstract static class DocWriter {    DocWriter next;    int docID;    abstract void finish() throws IOException;    abstract void abort();    abstract long sizeInBytes();    void setNext(DocWriter next) {      this.next = next;    }  };  final DocConsumer consumer;  // Deletes done after the last flush; these are discarded  // on abort  private BufferedDeletes deletesInRAM = new BufferedDeletes();  // Deletes done before the last flush; these are still  // kept on abort  private BufferedDeletes deletesFlushed = new BufferedDeletes();  // The max number of delete terms that can be buffered before  // they must be flushed to disk.  private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS;  // How much RAM we can use before flushing.  This is 0 if  // we are flushing by doc count instead.  private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024);  private long waitQueuePauseBytes = (long) (ramBufferSize*0.1);  private long waitQueueResumeBytes = (long) (ramBufferSize*0.05);  // If we've allocated 5% over our RAM budget, we then  // free down to 95%  private long freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05);  private long freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95);  // Flush @ this number of docs.  If ramBufferSize is  // non-zero we will flush by RAM usage instead.  private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS;  private int flushedDocCount;                      // How many docs already flushed to index  synchronized void updateFlushedDocCount(int n) {    flushedDocCount += n;  }  synchronized int getFlushedDocCount() {    return flushedDocCount;  }  synchronized void setFlushedDocCount(int n) {    flushedDocCount = n;  }  private boolean closed;  DocumentsWriter(Directory directory, IndexWriter writer) throws IOException {    this.directory = directory;    this.writer = writer;    this.similarity = writer.getSimilarity();    flushedDocCount = writer.maxDoc();    /*      This is the current indexing chain:      DocConsumer / DocConsumerPerThread        --> code: DocFieldProcessor / DocFieldProcessorPerThread          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField              --> code: DocInverter / DocInverterPerThread / DocInverterPerField                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField    */    // TODO FI: this should be something the user can pass in    // Build up indexing chain:    final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(this);    final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();    final InvertedDocConsumer  termsHash = new TermsHash(this, true, freqProxWriter,                                                         new TermsHash(this, false, termVectorsWriter, null));    final NormsWriter normsWriter = new NormsWriter();    final DocInverter docInverter = new DocInverter(termsHash, normsWriter);    final StoredFieldsWriter fieldsWriter = new StoredFieldsWriter(this);    final DocFieldConsumers docFieldConsumers = new DocFieldConsumers(docInverter, fieldsWriter);    consumer = docFieldProcessor = new DocFieldProcessor(this, docFieldConsumers);  }  /** Returns true if any of the fields in the current   *  buffered docs have omitTf==false */  boolean hasProx() {    return docFieldProcessor.fieldInfos.hasProx();  }  /** If non-null, various details of indexing are printed   *  here. */  synchronized void setInfoStream(PrintStream infoStream) {    this.infoStream = infoStream;    for(int i=0;i<threadStates.length;i++)      threadStates[i].docState.infoStream = infoStream;  }  synchronized void setMaxFieldLength(int maxFieldLength) {    this.maxFieldLength = maxFieldLength;    for(int i=0;i<threadStates.length;i++)      threadStates[i].docState.maxFieldLength = maxFieldLength;  }  synchronized void setSimilarity(Similarity similarity) {    this.similarity = similarity;    for(int i=0;i<threadStates.length;i++)      threadStates[i].docState.similarity = similarity;  }  /** Set how much RAM we can use before flushing. */  synchronized void setRAMBufferSizeMB(double mb) {    if (mb == IndexWriter.DISABLE_AUTO_FLUSH) {      ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH;      waitQueuePauseBytes = 4*1024*1024;      waitQueueResumeBytes = 2*1024*1024;    } else {      ramBufferSize = (long) (mb*1024*1024);      waitQueuePauseBytes = (long) (ramBufferSize*0.1);      waitQueueResumeBytes = (long) (ramBufferSize*0.05);      freeTrigger = (long) (1.05 * ramBufferSize);      freeLevel = (long) (0.95 * ramBufferSize);    }  }  synchronized double getRAMBufferSizeMB() {    if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) {      return ramBufferSize;    } else {      return ramBufferSize/1024./1024.;    }  }  /** Set max buffered docs, which means we will flush by   *  doc count instead of by RAM usage. */  void setMaxBufferedDocs(int count) {    maxBufferedDocs = count;  }  int getMaxBufferedDocs() {    return maxBufferedDocs;  }  /** Get current segment name we are writing. */  String getSegment() {    return segment;  }  /** Returns how many docs are currently buffered in RAM. */  int getNumDocsInRAM() {    return numDocsInRAM;  }  /** Returns the current doc store segment we are writing   *  to.  This will be the same as segment when autoCommit   *  * is true. */  synchronized String getDocStoreSegment() {    return docStoreSegment;  }  /** Returns the doc offset into the shared doc store for   *  the current buffered docs. */  int getDocStoreOffset() {    return docStoreOffset;  }  /** Closes the current open doc stores an returns the doc   *  store segment name.  This returns null if there are *   *  no buffered documents. */  synchronized String closeDocStore() throws IOException {        assert allThreadsIdle();    if (infoStream != null)      message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore);        boolean success = false;    try {      initFlushState(true);      closedFiles.clear();      consumer.closeDocStore(flushState);      assert 0 == openFiles.size();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -