📄 documentswriter.java
字号:
package org.apache.lucene.index;/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.document.Document;import org.apache.lucene.search.Similarity;import org.apache.lucene.search.Query;import org.apache.lucene.search.IndexSearcher;import org.apache.lucene.search.Scorer;import org.apache.lucene.search.Weight;import org.apache.lucene.store.Directory;import org.apache.lucene.store.AlreadyClosedException;import org.apache.lucene.util.ArrayUtil;import java.io.IOException;import java.io.PrintStream;import java.util.Collection;import java.util.Iterator;import java.util.List;import java.util.HashMap;import java.util.HashSet;import java.util.ArrayList;import java.util.Map.Entry;import java.text.NumberFormat;/** * This class accepts multiple added documents and directly * writes a single segment file. It does this more * efficiently than creating a single segment per document * (with DocumentWriter) and doing standard merges on those * segments. * * Each added document is passed to the {@link DocConsumer}, * which in turn processes the document and interacts with * other consumers in the indexing chain. Certain * consumers, like {@link StoredFieldsWriter} and {@link * TermVectorsTermsWriter}, digest a document and * immediately write bytes to the "doc store" files (ie, * they do not consume RAM per document, except while they * are processing the document). * * Other consumers, eg {@link FreqProxTermsWriter} and * {@link NormsWriter}, buffer bytes in RAM and flush only * when a new segment is produced. * Once we have used our allowed RAM buffer, or the number * of added docs is large enough (in the case we are * flushing by doc count instead of RAM usage), we create a * real segment and flush it to the Directory. * * Threads: * * Multiple threads are allowed into addDocument at once. * There is an initial synchronized call to getThreadState * which allocates a ThreadState for this thread. The same * thread will get the same ThreadState over time (thread * affinity) so that if there are consistent patterns (for * example each thread is indexing a different content * source) then we make better use of RAM. Then * processDocument is called on that ThreadState without * synchronization (most of the "heavy lifting" is in this * call). Finally the synchronized "finishDocument" is * called to flush changes to the directory. * * When flush is called by IndexWriter, or, we flush * internally when autoCommit=false, we forcefully idle all * threads and flush only once they are all idle. This * means you can call flush with a given thread even while * other threads are actively adding/deleting documents. * * * Exceptions: * * Because this class directly updates in-memory posting * lists, and flushes stored fields and term vectors * directly to files in the directory, there are certain * limited times when an exception can corrupt this state. * For example, a disk full while flushing stored fields * leaves this file in a corrupt state. Or, an OOM * exception while appending to the in-memory posting lists * can corrupt that posting list. We call such exceptions * "aborting exceptions". In these cases we must call * abort() to discard all docs added since the last flush. * * All other exceptions ("non-aborting exceptions") can * still partially update the index structures. These * updates are consistent, but, they represent only a part * of the document seen up until the exception was hit. * When this happens, we immediately mark the document as * deleted so that the document is always atomically ("all * or none") added to the index. */final class DocumentsWriter { IndexWriter writer; Directory directory; String segment; // Current segment we are working on private String docStoreSegment; // Current doc-store segment we are writing private int docStoreOffset; // Current starting doc-store offset of current segment private int nextDocID; // Next docID to be added private int numDocsInRAM; // # docs buffered in RAM int numDocsInStore; // # docs written to doc stores // Max # ThreadState instances; if there are more threads // than this they share ThreadStates private final static int MAX_THREAD_STATE = 5; private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0]; private final HashMap threadBindings = new HashMap(); private int pauseThreads; // Non-zero when we need all threads to // pause (eg to flush) boolean flushPending; // True when a thread has decided to flush boolean bufferIsFull; // True when it's time to write segment private boolean aborting; // True if an abort is pending private DocFieldProcessor docFieldProcessor; PrintStream infoStream; int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH; Similarity similarity; List newFiles; static class DocState { DocumentsWriter docWriter; Analyzer analyzer; int maxFieldLength; PrintStream infoStream; Similarity similarity; int docID; Document doc; String maxTermPrefix; // Only called by asserts public boolean testPoint(String name) { return docWriter.writer.testPoint(name); } } static class FlushState { DocumentsWriter docWriter; Directory directory; String segmentName; String docStoreSegmentName; int numDocsInRAM; int numDocsInStore; Collection flushedFiles; public String segmentFileName(String ext) { return segmentName + "." + ext; } } /** Consumer returns this on each doc. This holds any * state that must be flushed synchronized "in docID * order". We gather these and flush them in order. */ abstract static class DocWriter { DocWriter next; int docID; abstract void finish() throws IOException; abstract void abort(); abstract long sizeInBytes(); void setNext(DocWriter next) { this.next = next; } }; final DocConsumer consumer; // Deletes done after the last flush; these are discarded // on abort private BufferedDeletes deletesInRAM = new BufferedDeletes(); // Deletes done before the last flush; these are still // kept on abort private BufferedDeletes deletesFlushed = new BufferedDeletes(); // The max number of delete terms that can be buffered before // they must be flushed to disk. private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; // How much RAM we can use before flushing. This is 0 if // we are flushing by doc count instead. private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); private long waitQueuePauseBytes = (long) (ramBufferSize*0.1); private long waitQueueResumeBytes = (long) (ramBufferSize*0.05); // If we've allocated 5% over our RAM budget, we then // free down to 95% private long freeTrigger = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*1.05); private long freeLevel = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024*0.95); // Flush @ this number of docs. If ramBufferSize is // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; private int flushedDocCount; // How many docs already flushed to index synchronized void updateFlushedDocCount(int n) { flushedDocCount += n; } synchronized int getFlushedDocCount() { return flushedDocCount; } synchronized void setFlushedDocCount(int n) { flushedDocCount = n; } private boolean closed; DocumentsWriter(Directory directory, IndexWriter writer) throws IOException { this.directory = directory; this.writer = writer; this.similarity = writer.getSimilarity(); flushedDocCount = writer.maxDoc(); /* This is the current indexing chain: DocConsumer / DocConsumerPerThread --> code: DocFieldProcessor / DocFieldProcessorPerThread --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField --> code: DocInverter / DocInverterPerThread / DocInverterPerField --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField --> code: TermsHash / TermsHashPerThread / TermsHashPerField --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField */ // TODO FI: this should be something the user can pass in // Build up indexing chain: final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(this); final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter(); final InvertedDocConsumer termsHash = new TermsHash(this, true, freqProxWriter, new TermsHash(this, false, termVectorsWriter, null)); final NormsWriter normsWriter = new NormsWriter(); final DocInverter docInverter = new DocInverter(termsHash, normsWriter); final StoredFieldsWriter fieldsWriter = new StoredFieldsWriter(this); final DocFieldConsumers docFieldConsumers = new DocFieldConsumers(docInverter, fieldsWriter); consumer = docFieldProcessor = new DocFieldProcessor(this, docFieldConsumers); } /** Returns true if any of the fields in the current * buffered docs have omitTf==false */ boolean hasProx() { return docFieldProcessor.fieldInfos.hasProx(); } /** If non-null, various details of indexing are printed * here. */ synchronized void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; for(int i=0;i<threadStates.length;i++) threadStates[i].docState.infoStream = infoStream; } synchronized void setMaxFieldLength(int maxFieldLength) { this.maxFieldLength = maxFieldLength; for(int i=0;i<threadStates.length;i++) threadStates[i].docState.maxFieldLength = maxFieldLength; } synchronized void setSimilarity(Similarity similarity) { this.similarity = similarity; for(int i=0;i<threadStates.length;i++) threadStates[i].docState.similarity = similarity; } /** Set how much RAM we can use before flushing. */ synchronized void setRAMBufferSizeMB(double mb) { if (mb == IndexWriter.DISABLE_AUTO_FLUSH) { ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; waitQueuePauseBytes = 4*1024*1024; waitQueueResumeBytes = 2*1024*1024; } else { ramBufferSize = (long) (mb*1024*1024); waitQueuePauseBytes = (long) (ramBufferSize*0.1); waitQueueResumeBytes = (long) (ramBufferSize*0.05); freeTrigger = (long) (1.05 * ramBufferSize); freeLevel = (long) (0.95 * ramBufferSize); } } synchronized double getRAMBufferSizeMB() { if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) { return ramBufferSize; } else { return ramBufferSize/1024./1024.; } } /** Set max buffered docs, which means we will flush by * doc count instead of by RAM usage. */ void setMaxBufferedDocs(int count) { maxBufferedDocs = count; } int getMaxBufferedDocs() { return maxBufferedDocs; } /** Get current segment name we are writing. */ String getSegment() { return segment; } /** Returns how many docs are currently buffered in RAM. */ int getNumDocsInRAM() { return numDocsInRAM; } /** Returns the current doc store segment we are writing * to. This will be the same as segment when autoCommit * * is true. */ synchronized String getDocStoreSegment() { return docStoreSegment; } /** Returns the doc offset into the shared doc store for * the current buffered docs. */ int getDocStoreOffset() { return docStoreOffset; } /** Closes the current open doc stores an returns the doc * store segment name. This returns null if there are * * no buffered documents. */ synchronized String closeDocStore() throws IOException { assert allThreadsIdle(); if (infoStream != null) message("closeDocStore: " + openFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); boolean success = false; try { initFlushState(true); closedFiles.clear(); consumer.closeDocStore(flushState); assert 0 == openFiles.size();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -