📄 indexwriter.java
字号:
* Merges all segments from an array of indexes into this index. * <p> * This is similar to addIndexes(Directory[]). However, no optimize() * is called either at the beginning or at the end. Instead, merges * are carried out as necessary. * <p> * This requires this index not be among those to be added, and the * upper bound* of those segment doc counts not exceed maxMergeDocs. * * <p>See {@link #addIndexes(Directory[])} for * details on transactional semantics, temporary free * space required in the Directory, and non-CFS segments * on an Exception.</p> */ public synchronized void addIndexesNoOptimize(Directory[] dirs) throws IOException { // Adding indexes can be viewed as adding a sequence of segments S to // a sequence of segments T. Segments in T follow the invariants but // segments in S may not since they could come from multiple indexes. // Here is the merge algorithm for addIndexesNoOptimize(): // // 1 Flush ram segments. // 2 Consider a combined sequence with segments from T followed // by segments from S (same as current addIndexes(Directory[])). // 3 Assume the highest level for segments in S is h. Call // maybeMergeSegments(), but instead of starting w/ lowerBound = -1 // and upperBound = maxBufferedDocs, start w/ lowerBound = -1 and // upperBound = upperBound of level h. After this, the invariants // are guaranteed except for the last < M segments whose levels <= h. // 4 If the invariants hold for the last < M segments whose levels <= h, // if some of those < M segments are from S (not merged in step 3), // properly copy them over*, otherwise done. // Otherwise, simply merge those segments. If the merge results in // a segment of level <= h, done. Otherwise, it's of level h+1 and call // maybeMergeSegments() starting w/ upperBound = upperBound of level h+1. // // * Ideally, we want to simply copy a segment. However, directory does // not support copy yet. In addition, source may use compound file or not // and target may use compound file or not. So we use mergeSegments() to // copy a segment, which may cause doc count to change because deleted // docs are garbage collected. // 1 flush ram segments flushRamSegments(); // 2 copy segment infos and find the highest level from dirs int start = segmentInfos.size(); int startUpperBound = minMergeDocs; boolean success = false; startTransaction(); try { try { for (int i = 0; i < dirs.length; i++) { if (directory == dirs[i]) { // cannot add this index: segments may be deleted in merge before added throw new IllegalArgumentException("Cannot add this index to itself"); } SegmentInfos sis = new SegmentInfos(); // read infos from dir sis.read(dirs[i]); for (int j = 0; j < sis.size(); j++) { SegmentInfo info = sis.info(j); segmentInfos.addElement(info); // add each info while (startUpperBound < info.docCount) { startUpperBound *= mergeFactor; // find the highest level from dirs if (startUpperBound > maxMergeDocs) { // upper bound cannot exceed maxMergeDocs throw new IllegalArgumentException("Upper bound cannot exceed maxMergeDocs"); } } } } } catch (IllegalArgumentException e) { for (int i = segmentInfos.size() - 1; i >= start; i--) { segmentInfos.remove(i); } throw e; } // 3 maybe merge segments starting from the highest level from dirs maybeMergeSegments(startUpperBound); // get the tail segments whose levels <= h int segmentCount = segmentInfos.size(); int numTailSegments = 0; while (numTailSegments < segmentCount && startUpperBound >= segmentInfos.info(segmentCount - 1 - numTailSegments).docCount) { numTailSegments++; } if (numTailSegments == 0) { success = true; return; } // 4 make sure invariants hold for the tail segments whose levels <= h if (checkNonDecreasingLevels(segmentCount - numTailSegments)) { // identify the segments from S to be copied (not merged in 3) int numSegmentsToCopy = 0; while (numSegmentsToCopy < segmentCount && directory != segmentInfos.info(segmentCount - 1 - numSegmentsToCopy).dir) { numSegmentsToCopy++; } if (numSegmentsToCopy == 0) { success = true; return; } // copy those segments from S for (int i = segmentCount - numSegmentsToCopy; i < segmentCount; i++) { mergeSegments(segmentInfos, i, i + 1); } if (checkNonDecreasingLevels(segmentCount - numSegmentsToCopy)) { success = true; return; } } // invariants do not hold, simply merge those segments mergeSegments(segmentInfos, segmentCount - numTailSegments, segmentCount); // maybe merge segments again if necessary if (segmentInfos.info(segmentInfos.size() - 1).docCount > startUpperBound) { maybeMergeSegments(startUpperBound * mergeFactor); } success = true; } finally { if (success) { commitTransaction(); } else { rollbackTransaction(); } } } /** Merges the provided indexes into this index. * <p>After this completes, the index is optimized. </p> * <p>The provided IndexReaders are not closed.</p> * <p>See {@link #addIndexes(Directory[])} for * details on transactional semantics, temporary free * space required in the Directory, and non-CFS segments * on an Exception.</p> */ public synchronized void addIndexes(IndexReader[] readers) throws IOException { optimize(); // start with zero or 1 seg final String mergedName = newSegmentName(); SegmentMerger merger = new SegmentMerger(this, mergedName); final Vector segmentsToDelete = new Vector(); IndexReader sReader = null; if (segmentInfos.size() == 1){ // add existing index, if any sReader = SegmentReader.get(segmentInfos.info(0)); merger.add(sReader); segmentsToDelete.addElement(sReader); // queue segment for deletion } for (int i = 0; i < readers.length; i++) // add new indexes merger.add(readers[i]); SegmentInfo info; String segmentsInfosFileName = segmentInfos.getCurrentSegmentFileName(); boolean success = false; startTransaction(); try { int docCount = merger.merge(); // merge 'em segmentInfos.setSize(0); // pop old infos & add new info = new SegmentInfo(mergedName, docCount, directory, false, true); segmentInfos.addElement(info); commitPending = true; if(sReader != null) sReader.close(); success = true; } finally { if (!success) { rollbackTransaction(); } else { commitTransaction(); } } deleter.deleteFile(segmentsInfosFileName); // delete old segments_N file deleter.deleteSegments(segmentsToDelete); // delete now-unused segments if (useCompoundFile) { success = false; segmentsInfosFileName = segmentInfos.getCurrentSegmentFileName(); Vector filesToDelete; startTransaction(); try { filesToDelete = merger.createCompoundFile(mergedName + ".cfs"); info.setUseCompoundFile(true); commitPending = true; success = true; } finally { if (!success) { rollbackTransaction(); } else { commitTransaction(); } } deleter.deleteFile(segmentsInfosFileName); // delete old segments_N file deleter.deleteFiles(filesToDelete); // delete now unused files of segment } } // Overview of merge policy: // // A flush is triggered either by close() or by the number of ram segments // reaching maxBufferedDocs. After a disk segment is created by the flush, // further merges may be triggered. // // LowerBound and upperBound set the limits on the doc count of a segment // which may be merged. Initially, lowerBound is set to 0 and upperBound // to maxBufferedDocs. Starting from the rightmost* segment whose doc count // > lowerBound and <= upperBound, count the number of consecutive segments // whose doc count <= upperBound. // // Case 1: number of worthy segments < mergeFactor, no merge, done. // Case 2: number of worthy segments == mergeFactor, merge these segments. // If the doc count of the merged segment <= upperBound, done. // Otherwise, set lowerBound to upperBound, and multiply upperBound // by mergeFactor, go through the process again. // Case 3: number of worthy segments > mergeFactor (in the case mergeFactor // M changes), merge the leftmost* M segments. If the doc count of // the merged segment <= upperBound, consider the merged segment for // further merges on this same level. Merge the now leftmost* M // segments, and so on, until number of worthy segments < mergeFactor. // If the doc count of all the merged segments <= upperBound, done. // Otherwise, set lowerBound to upperBound, and multiply upperBound // by mergeFactor, go through the process again. // Note that case 2 can be considerd as a special case of case 3. // // This merge policy guarantees two invariants if M does not change and // segment doc count is not reaching maxMergeDocs: // B for maxBufferedDocs, f(n) defined as ceil(log_M(ceil(n/B))) // 1: If i (left*) and i+1 (right*) are two consecutive segments of doc // counts x and y, then f(x) >= f(y). // 2: The number of committed segments on the same level (f(n)) <= M. // This is called after pending added and deleted // documents have been flushed to the Directory but before // the change is committed (new segments_N file written). void doAfterFlush() throws IOException { } protected final void maybeFlushRamSegments() throws IOException { // A flush is triggered if enough new documents are buffered or // if enough delete terms are buffered if (ramSegmentInfos.size() >= minMergeDocs || numBufferedDeleteTerms >= maxBufferedDeleteTerms) { flushRamSegments(); } } /** Expert: Flushes all RAM-resident segments (buffered documents), then may merge segments. */ private final synchronized void flushRamSegments() throws IOException { if (ramSegmentInfos.size() > 0 || bufferedDeleteTerms.size() > 0) { mergeSegments(ramSegmentInfos, 0, ramSegmentInfos.size()); maybeMergeSegments(minMergeDocs); } } /** * Flush all in-memory buffered updates (adds and deletes) * to the Directory. * @throws IOException */ public final synchronized void flush() throws IOException { flushRamSegments(); } /** Expert: Return the total size of all index files currently cached in memory. * Useful for size management with flushRamDocs() */ public final long ramSizeInBytes() { return ramDirectory.sizeInBytes(); } /** Expert: Return the number of documents whose segments are currently cached in memory. * Useful when calling flushRamSegments() */ public final synchronized int numRamDocs() { return ramSegmentInfos.size(); } /** Incremental segment merger. */ private final void maybeMergeSegments(int startUpperBound) throws IOException { long lowerBound = -1; long upperBound = startUpperBound; while (upperBound < maxMergeDocs) { int minSegment = segmentInfos.size(); int maxSegment = -1; // find merge-worthy segments while (--minSegment >= 0) { SegmentInfo si = segmentInfos.info(minSegment); if (maxSegment == -1 && si.docCount > lowerBound && si.docCount <= upperBound) { // start from the rightmost* segment whose doc count is in bounds maxSegment = minSegment; } else if (si.docCount > upperBound) { // until the segment whose doc count exceeds upperBound break; } } minSegment++; maxSegment++; int numSegments = maxSegment - minSegment; if (numSegments < mergeFactor) { break; } else { boolean exceedsUpperLimit = false; // number of merge-worthy segments may exceed mergeFactor when // mergeFactor and/or maxBufferedDocs change(s) while (numSegments >= mergeFactor) { // merge the leftmost* mergeFactor segments int docCount = mergeSegments(segmentInfos, minSegment, minSegment + mergeFactor); numSegments -= mergeFactor; if (docCount > upperBound) { // continue to merge the rest of the worthy segments on this level minSegment++; exceedsUpperLimit = true; } else { // if the merged segment does not exceed upperBound, consider // this segment for further merges on this same level
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -