📄 indexwriter.java
字号:
* collection can be broken into sub-collections. Each sub-collection can be * indexed in parallel, on a different thread, process or machine. The * complete index can then be created by merging sub-collection indexes * with this method. * * <p>After this completes, the index is optimized. * * <p>This method is transactional in how Exceptions are * handled: it does not commit a new segments_N file until * all indexes are added. This means if an Exception * occurs (for example disk full), then either no indexes * will have been added or they all will have been.</p> * * <p>If an Exception is hit, it's still possible that all * indexes were successfully added. This happens when the * Exception is hit when trying to build a CFS file. In * this case, one segment in the index will be in non-CFS * format, even when using compound file format.</p> * * <p>Also note that on an Exception, the index may still * have been partially or fully optimized even though none * of the input indexes were added. </p> * * <p>Note that this requires temporary free space in the * Directory up to 2X the sum of all input indexes * (including the starting index). If readers/searchers * are open against the starting index, then temporary * free space required will be higher by the size of the * starting index (see {@link #optimize()} for details). * </p> * * <p>Once this completes, the final size of the index * will be less than the sum of all input index sizes * (including the starting index). It could be quite a * bit smaller (if there were many pending deletes) or * just slightly smaller.</p> * * <p>See <a target="_top" * href="http://issues.apache.org/jira/browse/LUCENE-702">LUCENE-702</a> * for details.</p> * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ public synchronized void addIndexes(Directory[] dirs) throws CorruptIndexException, IOException { ensureOpen(); optimize(); // start with zero or 1 seg int start = segmentInfos.size(); boolean success = false; startTransaction(); try { for (int i = 0; i < dirs.length; i++) { SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { segmentInfos.addElement(sis.info(j)); // add each info } } // merge newly added segments in log(n) passes while (segmentInfos.size() > start+mergeFactor) { for (int base = start; base < segmentInfos.size(); base++) { int end = Math.min(segmentInfos.size(), base+mergeFactor); if (end-base > 1) { mergeSegments(segmentInfos, base, end); } } } success = true; } finally { if (success) { commitTransaction(); } else { rollbackTransaction(); } } optimize(); // final cleanup } /** * Merges all segments from an array of indexes into this index. * <p> * This is similar to addIndexes(Directory[]). However, no optimize() * is called either at the beginning or at the end. Instead, merges * are carried out as necessary. * <p> * This requires this index not be among those to be added, and the * upper bound* of those segment doc counts not exceed maxMergeDocs. * * <p>See {@link #addIndexes(Directory[])} for * details on transactional semantics, temporary free * space required in the Directory, and non-CFS segments * on an Exception.</p> * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ public synchronized void addIndexesNoOptimize(Directory[] dirs) throws CorruptIndexException, IOException { // Adding indexes can be viewed as adding a sequence of segments S to // a sequence of segments T. Segments in T follow the invariants but // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // // 1 Flush ram segments. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call // maybeMergeSegments(), but instead of starting w/ lowerBound = -1 // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and // upperBound = upperBound of level h. After this, the invariants // are guaranteed except for the last < M segments whose levels <= h. // 4 If the invariants hold for the last < M segments whose levels <= h, // if some of those < M segments are from S (not merged in step 3), // properly copy them over*, otherwise done. // Otherwise, simply merge those segments. If the merge results in // a segment of level <= h, done. Otherwise, it's of level h+1 and call // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1. // // * Ideally, we want to simply copy a segment. However, directory does // not support copy yet. In addition, source may use compound file or not // and target may use compound file or not. So we use mergeSegments() to // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. // 1 flush ram segments ensureOpen(); flushRamSegments(); // 2 copy segment infos and find the highest level from dirs int startUpperBound = minMergeDocs; boolean success = false; startTransaction(); try { for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added throw new IllegalArgumentException("Cannot add this index to itself"); } SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); segmentInfos.addElement(info); // add each info while (startUpperBound < info.docCount) { startUpperBound *= mergeFactor; // find the highest level from dirs if (startUpperBound > maxMergeDocs) { // upper bound cannot exceed maxMergeDocs throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs"); } } } } // 3 maybe merge segments starting from the highest level from dirs maybeMergeSegments(startUpperBound); // get the tail segments whose levels <= h int segmentCount = segmentInfos.size(); int numTailSegments = 0; while (numTailSegments < segmentCount && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) { numTailSegments++; } if (numTailSegments == 0) { success = true; return; } // 4 make sure invariants hold for the tail segments whose levels <= h if (checkNonDecreasingLevels(segmentCount - numTailSegments)) { // identify the segments from S to be copied (not merged in 3) int numSegmentsToCopy = 0; while (numSegmentsToCopy < segmentCount && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) { numSegmentsToCopy++; } if (numSegmentsToCopy == 0) { success = true; return; } // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { mergeSegments(segmentInfos, i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; return; } } // invariants do not hold, simply merge those segments mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { maybeMergeSegments(startUpperBound * mergeFactor); } success = true; } finally { if (success) { commitTransaction(); } else { rollbackTransaction(); } } } /** Merges the provided indexes into this index. * <p>After this completes, the index is optimized. </p> * <p>The provided IndexReaders are not closed.</p> * <p>See {@link #addIndexes(Directory[])} for * details on transactional semantics, temporary free * space required in the Directory, and non-CFS segments * on an Exception.</p> * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ public synchronized void addIndexes(IndexReader[] readers) throws CorruptIndexException, IOException { ensureOpen(); optimize(); // start with zero or 1 seg final String mergedName = newSegmentName(); SegmentMerger merger = new SegmentMerger(this, mergedName); SegmentInfo info; IndexReader sReader = null; try { if (segmentInfos.size() == 1){ // add existing index, if any sReader = SegmentReader.get(segmentInfos.info(0)); merger.add(sReader); } for (int i = 0; i < readers.length; i++) // add new indexes merger.add(readers[i]); boolean success = false; startTransaction(); try { int docCount = merger.merge(); // merge 'em if(sReader != null) { sReader.close(); sReader = null; } segmentInfos.setSize(0); // pop old infos & add new info = new SegmentInfo(mergedName, docCount, directory, false, true); segmentInfos.addElement(info); success = true; } finally { if (!success) { rollbackTransaction(); } else { commitTransaction(); } } } finally { if (sReader != null) { sReader.close(); } } if (useCompoundFile) { boolean success = false; startTransaction(); try { merger.createCompoundFile(mergedName + ".cfs"); info.setUseCompoundFile(true); } finally { if (!success) { rollbackTransaction(); } else { commitTransaction(); } } } } // Overview of merge policy: // // A flush is triggered either by close() or by the number of ram segments // reaching maxBufferedDocs. After a disk segment is created by the flush, // further merges may be triggered. // // LowerBound and upperBound set the limits on the doc count of a segment // which may be merged. Initially, lowerBound is set to 0 and upperBound // to maxBufferedDocs. Starting from the rightmost* segment whose doc count // > lowerBound and <= upperBound, count the number of consecutive segments // whose doc count <= upperBound. // // Case 1: number of worthy segments < mergeFactor, no merge, done. // Case 2: number of worthy segments == mergeFactor, merge these segments. // If the doc count of the merged segment <= upperBound, done. // Otherwise, set lowerBound to upperBound, and multiply upperBound // by mergeFactor, go through the process again. // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor // M changes), merge the leftmost* M segments. If the doc count of // the merged segment <= upperBound, consider the merged segment for // further merges on this same level. Merge the now leftmost* M // segments, and so on, until number of worthy segments < mergeFactor. // If the doc count of all the merged segments <= upperBound, done. // Otherwise, set lowerBound to upperBound, and multiply upperBound // by mergeFactor, go through the process again. // Note that case 2 can be considerd as a special case of case 3. // // This merge policy guarantees two invariants if M does not change and // segment doc count is not reaching maxMergeDocs: // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B))) // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc // counts x and y, then f(x) >= f(y). // 2: The number of committed segments on the same level (f(n)) <= M. // This is called after pending
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -