📄 documentswriter.java
字号:
package org.apache.lucene.index;/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.analysis.Token;import org.apache.lucene.analysis.TokenStream;import org.apache.lucene.document.Document;import org.apache.lucene.document.Fieldable;import org.apache.lucene.search.Similarity;import org.apache.lucene.store.Directory;import org.apache.lucene.store.IndexOutput;import org.apache.lucene.store.IndexInput;import org.apache.lucene.store.RAMOutputStream;import org.apache.lucene.store.AlreadyClosedException;import java.io.IOException;import java.io.PrintStream;import java.io.Reader;import java.util.Arrays;import java.util.List;import java.util.HashMap;import java.util.ArrayList;import java.text.NumberFormat;import java.util.Collections;/** * This class accepts multiple added documents and directly * writes a single segment file. It does this more * efficiently than creating a single segment per document * (with DocumentWriter) and doing standard merges on those * segments. * * When a document is added, its stored fields (if any) and * term vectors (if any) are immediately written to the * Directory (ie these do not consume RAM). The freq/prox * postings are accumulated into a Postings hash table keyed * by term. Each entry in this hash table holds a separate * byte stream (allocated as incrementally growing slices * into large shared byte[] arrays) for freq and prox, that * contains the postings data for multiple documents. If * vectors are enabled, each unique term for each document * also allocates a PostingVector instance to similarly * track the offsets & positions byte stream. * * Once the Postings hash is full (ie is consuming the * allowed RAM) or the number of added docs is large enough * (in the case we are flushing by doc count instead of RAM * usage), we create a real segment and flush it to disk and * reset the Postings hash. * * In adding a document we first organize all of its fields * by field name. We then process field by field, and * record the Posting hash per-field. After each field we * flush its term vectors. When it's time to flush the full * segment we first sort the fields by name, and then go * field by field and sorts its postings. * * * Threads: * * Multiple threads are allowed into addDocument at once. * There is an initial synchronized call to getThreadState * which allocates a ThreadState for this thread. The same * thread will get the same ThreadState over time (thread * affinity) so that if there are consistent patterns (for * example each thread is indexing a different content * source) then we make better use of RAM. Then * processDocument is called on that ThreadState without * synchronization (most of the "heavy lifting" is in this * call). Finally the synchronized "finishDocument" is * called to flush changes to the directory. * * Each ThreadState instance has its own Posting hash. Once * we're using too much RAM, we flush all Posting hashes to * a segment by merging the docIDs in the posting lists for * the same term across multiple thread states (see * writeSegment and appendPostings). * * When flush is called by IndexWriter, or, we flush * internally when autoCommit=false, we forcefully idle all * threads and flush only once they are all idle. This * means you can call flush with a given thread even while * other threads are actively adding/deleting documents. * * * Exceptions: * * Because this class directly updates in-memory posting * lists, and flushes stored fields and term vectors * directly to files in the directory, there are certain * limited times when an exception can corrupt this state. * For example, a disk full while flushing stored fields * leaves this file in a corrupt state. Or, an OOM * exception while appending to the in-memory posting lists * can corrupt that posting list. We call such exceptions * "aborting exceptions". In these cases we must call * abort() to discard all docs added since the last flush. * * All other exceptions ("non-aborting exceptions") can * still partially update the index structures. These * updates are consistent, but, they represent only a part * of the document seen up until the exception was hit. * When this happens, we immediately mark the document as * deleted so that the document is always atomically ("all * or none") added to the index. */final class DocumentsWriter { private IndexWriter writer; private Directory directory; private FieldInfos fieldInfos = new FieldInfos(); // All fields we've seen private IndexOutput tvx, tvf, tvd; // To write term vectors private FieldsWriter fieldsWriter; // To write stored fields private String segment; // Current segment we are working on private String docStoreSegment; // Current doc-store segment we are writing private int docStoreOffset; // Current starting doc-store offset of current segment private int nextDocID; // Next docID to be added private int numDocsInRAM; // # docs buffered in RAM private int numDocsInStore; // # docs written to doc stores private int nextWriteDocID; // Next docID to be written // Max # ThreadState instances; if there are more threads // than this they share ThreadStates private final static int MAX_THREAD_STATE = 5; private ThreadState[] threadStates = new ThreadState[0]; private final HashMap threadBindings = new HashMap(); private int numWaiting; private final ThreadState[] waitingThreadStates = new ThreadState[MAX_THREAD_STATE]; private int pauseThreads; // Non-zero when we need all threads to // pause (eg to flush) private boolean flushPending; // True when a thread has decided to flush private boolean bufferIsFull; // True when it's time to write segment private int abortCount; // Non-zero while abort is pending or running private PrintStream infoStream; // This Hashmap buffers delete terms in ram before they // are applied. The key is delete term; the value is // number of buffered documents the term applies to. private HashMap bufferedDeleteTerms = new HashMap(); private int numBufferedDeleteTerms = 0; // Currently used only for deleting a doc on hitting an non-aborting exception private List bufferedDeleteDocIDs = new ArrayList(); // The max number of delete terms that can be buffered before // they must be flushed to disk. private int maxBufferedDeleteTerms = IndexWriter.DEFAULT_MAX_BUFFERED_DELETE_TERMS; // How much RAM we can use before flushing. This is 0 if // we are flushing by doc count instead. private long ramBufferSize = (long) (IndexWriter.DEFAULT_RAM_BUFFER_SIZE_MB*1024*1024); // Flush @ this number of docs. If rarmBufferSize is // non-zero we will flush by RAM usage instead. private int maxBufferedDocs = IndexWriter.DEFAULT_MAX_BUFFERED_DOCS; private boolean closed; // Coarse estimates used to measure RAM usage of buffered deletes private static int OBJECT_HEADER_BYTES = 8; private static int OBJECT_POINTER_BYTES = 4; // TODO: should be 8 on 64-bit platform private static int BYTES_PER_CHAR = 2; private static int BYTES_PER_INT = 4; private BufferedNorms[] norms = new BufferedNorms[0]; // Holds norms until we flush DocumentsWriter(Directory directory, IndexWriter writer) throws IOException { this.directory = directory; this.writer = writer; postingsFreeList = new Posting[0]; } /** If non-null, various details of indexing are printed * here. */ void setInfoStream(PrintStream infoStream) { this.infoStream = infoStream; } /** Set how much RAM we can use before flushing. */ void setRAMBufferSizeMB(double mb) { if (mb == IndexWriter.DISABLE_AUTO_FLUSH) { ramBufferSize = IndexWriter.DISABLE_AUTO_FLUSH; } else { ramBufferSize = (long) (mb*1024*1024); } } double getRAMBufferSizeMB() { if (ramBufferSize == IndexWriter.DISABLE_AUTO_FLUSH) { return ramBufferSize; } else { return ramBufferSize/1024./1024.; } } /** Set max buffered docs, which means we will flush by * doc count instead of by RAM usage. */ void setMaxBufferedDocs(int count) { maxBufferedDocs = count; } int getMaxBufferedDocs() { return maxBufferedDocs; } /** Get current segment name we are writing. */ String getSegment() { return segment; } /** Returns how many docs are currently buffered in RAM. */ int getNumDocsInRAM() { return numDocsInRAM; } /** Returns the current doc store segment we are writing * to. This will be the same as segment when autoCommit * * is true. */ String getDocStoreSegment() { return docStoreSegment; } /** Returns the doc offset into the shared doc store for * the current buffered docs. */ int getDocStoreOffset() { return docStoreOffset; } /** Closes the current open doc stores an returns the doc * store segment name. This returns null if there are * * no buffered documents. */ String closeDocStore() throws IOException { assert allThreadsIdle(); List flushedFiles = files(); if (infoStream != null) infoStream.println("\ncloseDocStore: " + flushedFiles.size() + " files to flush to segment " + docStoreSegment + " numDocs=" + numDocsInStore); if (flushedFiles.size() > 0) { files = null; if (tvx != null) { // At least one doc in this run had term vectors enabled assert docStoreSegment != null; tvx.close(); tvf.close(); tvd.close(); tvx = null; assert 4+numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION): "after flush: tvx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION; } if (fieldsWriter != null) { assert docStoreSegment != null; fieldsWriter.close(); fieldsWriter = null; assert numDocsInStore*8 == directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION): "after flush: fdx size mismatch: " + numDocsInStore + " docs vs " + directory.fileLength(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) + " length in bytes of " + docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION; } String s = docStoreSegment; docStoreSegment = null; docStoreOffset = 0; numDocsInStore = 0; return s; } else { return null; } } private List files = null; // Cached list of files we've created private List abortedFiles = null; // List of files that were written before last abort() List abortedFiles() { return abortedFiles; } /* Returns list of files in use by this instance, * including any flushed segments. */ synchronized List files() { if (files != null) return files; files = new ArrayList(); // Stored fields: if (fieldsWriter != null) { assert docStoreSegment != null; files.add(docStoreSegment + "." + IndexFileNames.FIELDS_EXTENSION); files.add(docStoreSegment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION); } // Vectors: if (tvx != null) { assert docStoreSegment != null; files.add(docStoreSegment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); files.add(docStoreSegment + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION); files.add(docStoreSegment + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION); } return files; } synchronized void setAborting() { abortCount++; } /** Called if we hit an exception when adding docs, * flushing, etc. This resets our state, discarding any * docs added since last flush. If ae is non-null, it * contains the root cause exception (which we re-throw * after we are done aborting). */ synchronized void abort(AbortException ae) throws IOException { // Anywhere that throws an AbortException must first // mark aborting to make sure while the exception is // unwinding the un-synchronized stack, no thread grabs // the corrupt ThreadState that hit the aborting // exception: assert ae == null || abortCount>0; try { if (infoStream != null) infoStream.println("docWriter: now abort"); // Forcefully remove waiting ThreadStates from line for(int i=0;i<numWaiting;i++) waitingThreadStates[i].isIdle = true; numWaiting = 0; // Wait for all other threads to finish with DocumentsWriter: pauseAllThreads(); assert 0 == numWaiting; try { bufferedDeleteTerms.clear(); bufferedDeleteDocIDs.clear(); numBufferedDeleteTerms = 0; try { abortedFiles = files(); } catch (Throwable t) { abortedFiles = null; } docStoreSegment = null; numDocsInStore = 0; docStoreOffset = 0; files = null; // Clear vectors & fields from ThreadStates for(int i=0;i<threadStates.length;i++) { ThreadState state = threadStates[i]; state.tvfLocal.reset(); state.fdtLocal.reset(); if (state.localFieldsWriter != null) { try { state.localFieldsWriter.close(); } catch (Throwable t) { } state.localFieldsWriter = null; } } // Reset vectors writer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -