📄 segmentmerger.java
字号:
if (matchingSegmentReader != null) { matchingVectorsReader = matchingSegmentReader.termVectorsReaderOrig; // If the TV* files are an older format then they // cannot read raw docs: if (matchingVectorsReader != null && !matchingVectorsReader.canReadRawDocs()) { matchingVectorsReader = null; hasMatchingReader = false; } else hasMatchingReader = matchingVectorsReader != null; } else { hasMatchingReader = false; matchingVectorsReader = null; } IndexReader reader = (IndexReader) readers.get(r); final boolean hasDeletions = reader.hasDeletions(); int maxDoc = reader.maxDoc(); for (int docNum = 0; docNum < maxDoc;) { // skip deleted docs if (!hasDeletions || !reader.isDeleted(docNum)) { if (hasMatchingReader) { // We can optimize this case (doing a bulk // byte copy) since the field numbers are // identical int start = docNum; int numDocs = 0; do { docNum++; numDocs++; if (docNum >= maxDoc) break; if (hasDeletions && matchingSegmentReader.isDeleted(docNum)) { docNum++; break; } } while(numDocs < MAX_RAW_MERGE_DOCS); matchingVectorsReader.rawDocs(rawDocLengths, rawDocLengths2, start, numDocs); termVectorsWriter.addRawDocuments(matchingVectorsReader, rawDocLengths, rawDocLengths2, numDocs); if (checkAbort != null) checkAbort.work(300*numDocs); } else { // NOTE: it's very important to first assign // to vectors then pass it to // termVectorsWriter.addAllDocVectors; see // LUCENE-1282 TermFreqVector[] vectors = reader.getTermFreqVectors(docNum); termVectorsWriter.addAllDocVectors(vectors); docNum++; if (checkAbort != null) checkAbort.work(300); } } else docNum++; } } } finally { termVectorsWriter.close(); } final long tvxSize = directory.fileLength(segment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION); if (4+mergedDocs*16 != tvxSize) // This is most likely a bug in Sun JRE 1.6.0_04/_05; // we detect that the bug has struck, here, and // throw an exception to prevent the corruption from // entering the index. See LUCENE-1282 for // details. throw new RuntimeException("mergeVectors produced an invalid result: mergedDocs is " + mergedDocs + " but tvx size is " + tvxSize + "; now aborting this merge to prevent index corruption"); } private IndexOutput freqOutput = null; private IndexOutput proxOutput = null; private TermInfosWriter termInfosWriter = null; private int skipInterval; private int maxSkipLevels; private SegmentMergeQueue queue = null; private DefaultSkipListWriter skipListWriter = null; private final void mergeTerms() throws CorruptIndexException, IOException { try { freqOutput = directory.createOutput(segment + ".frq"); if (hasProx()) proxOutput = directory.createOutput(segment + ".prx"); termInfosWriter = new TermInfosWriter(directory, segment, fieldInfos, termIndexInterval); skipInterval = termInfosWriter.skipInterval; maxSkipLevels = termInfosWriter.maxSkipLevels; skipListWriter = new DefaultSkipListWriter(skipInterval, maxSkipLevels, mergedDocs, freqOutput, proxOutput); queue = new SegmentMergeQueue(readers.size()); mergeTermInfos(); } finally { if (freqOutput != null) freqOutput.close(); if (proxOutput != null) proxOutput.close(); if (termInfosWriter != null) termInfosWriter.close(); if (queue != null) queue.close(); } } private final void mergeTermInfos() throws CorruptIndexException, IOException { int base = 0; final int readerCount = readers.size(); for (int i = 0; i < readerCount; i++) { IndexReader reader = (IndexReader) readers.get(i); TermEnum termEnum = reader.terms(); SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); int[] docMap = smi.getDocMap(); if (docMap != null) { if (docMaps == null) { docMaps = new int[readerCount][]; delCounts = new int[readerCount]; } docMaps[i] = docMap; delCounts[i] = smi.reader.maxDoc() - smi.reader.numDocs(); } base += reader.numDocs(); if (smi.next()) queue.put(smi); // initialize queue else smi.close(); } SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()]; while (queue.size() > 0) { int matchSize = 0; // pop matching terms match[matchSize++] = (SegmentMergeInfo) queue.pop(); Term term = match[0].term; SegmentMergeInfo top = (SegmentMergeInfo) queue.top(); while (top != null && term.compareTo(top.term) == 0) { match[matchSize++] = (SegmentMergeInfo) queue.pop(); top = (SegmentMergeInfo) queue.top(); } final int df = mergeTermInfo(match, matchSize); // add new TermInfo if (checkAbort != null) checkAbort.work(df/3.0); while (matchSize > 0) { SegmentMergeInfo smi = match[--matchSize]; if (smi.next()) queue.put(smi); // restore queue else smi.close(); // done with a segment } } } private final TermInfo termInfo = new TermInfo(); // minimize consing /** Merge one term found in one or more segments. The array <code>smis</code> * contains segments that are positioned at the same term. <code>N</code> * is the number of cells in the array actually occupied. * * @param smis array of segments * @param n number of cells in the array actually occupied * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ private final int mergeTermInfo(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { final long freqPointer = freqOutput.getFilePointer(); final long proxPointer; if (proxOutput != null) proxPointer = proxOutput.getFilePointer(); else proxPointer = 0; int df; if (fieldInfos.fieldInfo(smis[0].term.field).omitTf) { // append posting data df = appendPostingsNoTf(smis, n); } else{ df = appendPostings(smis, n); } long skipPointer = skipListWriter.writeSkip(freqOutput); if (df > 0) { // add an entry to the dictionary with pointers to prox and freq files termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer)); termInfosWriter.add(smis[0].term, termInfo); } return df; } private byte[] payloadBuffer; private int[][] docMaps; int[][] getDocMaps() { return docMaps; } private int[] delCounts; int[] getDelCounts() { return delCounts; } /** Process postings from multiple segments all positioned on the * same term. Writes out merged entries into freqOutput and * the proxOutput streams. * * @param smis array of segments * @param n number of cells in the array actually occupied * @return number of documents across all segments where this term was found * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ private final int appendPostings(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { int lastDoc = 0; int df = 0; // number of docs w/ term skipListWriter.resetSkip(); boolean storePayloads = fieldInfos.fieldInfo(smis[0].term.field).storePayloads; int lastPayloadLength = -1; // ensures that we write the first length for (int i = 0; i < n; i++) { SegmentMergeInfo smi = smis[i]; TermPositions postings = smi.getPositions(); assert postings != null; int base = smi.base; int[] docMap = smi.getDocMap(); postings.seek(smi.termEnum); while (postings.next()) { int doc = postings.doc(); if (docMap != null) doc = docMap[doc]; // map around deletions doc += base; // convert to merged space if (doc < 0 || (df > 0 && doc <= lastDoc)) throw new CorruptIndexException("docs out of order (" + doc + " <= " + lastDoc + " )"); df++; if ((df % skipInterval) == 0) { skipListWriter.setSkipData(lastDoc, storePayloads, lastPayloadLength); skipListWriter.bufferSkip(df); } int docCode = (doc - lastDoc) << 1; // use low bit to flag freq=1 lastDoc = doc; int freq = postings.freq(); if (freq == 1) { freqOutput.writeVInt(docCode | 1); // write doc & freq=1 } else { freqOutput.writeVInt(docCode); // write doc freqOutput.writeVInt(freq); // write frequency in doc } /** See {@link DocumentWriter#writePostings(Posting[], String)} for * documentation about the encoding of positions and payloads */ int lastPosition = 0; // write position deltas for (int j = 0; j < freq; j++) { int position = postings.nextPosition(); int delta = position - lastPosition; if (storePayloads) { int payloadLength = postings.getPayloadLength(); if (payloadLength == lastPayloadLength) { proxOutput.writeVInt(delta * 2); } else { proxOutput.writeVInt(delta * 2 + 1); proxOutput.writeVInt(payloadLength); lastPayloadLength = payloadLength; } if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) { payloadBuffer = new byte[payloadLength]; } postings.getPayload(payloadBuffer, 0); proxOutput.writeBytes(payloadBuffer, 0, payloadLength); } } else { proxOutput.writeVInt(delta); } lastPosition = position; } } } return df; } /** Process postings from multiple segments without tf, all positioned on the * same term. Writes out merged entries only into freqOutput, proxOut is not written. * * @param smis array of segments * @param n number of cells in the array actually occupied * @return number of documents across all segments where this term was found * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ private final int appendPostingsNoTf(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { int lastDoc = 0; int df = 0; // number of docs w/ term skipListWriter.resetSkip(); int lastPayloadLength = -1; // ensures that we write the first length for (int i = 0; i < n; i++) { SegmentMergeInfo smi = smis[i]; TermPositions postings = smi.getPositions(); assert postings != null; int base = smi.base; int[] docMap = smi.getDocMap(); postings.seek(smi.termEnum); while (postings.next()) { int doc = postings.doc(); if (docMap != null) doc = docMap[doc]; // map around deletions doc += base; // convert to merged space if (doc < 0 || (df > 0 && doc <= lastDoc)) throw new CorruptIndexException("docs out of order (" + doc + " <= " + lastDoc + " )"); df++; if ((df % skipInterval) == 0) { skipListWriter.setSkipData(lastDoc, false, lastPayloadLength); skipListWriter.bufferSkip(df); } int docCode = (doc - lastDoc); lastDoc = doc; freqOutput.writeVInt(docCode); // write doc & freq=1 } } return df; } private void mergeNorms() throws IOException { byte[] normBuffer = null; IndexOutput output = null; try { for (int i = 0; i < fieldInfos.size(); i++) { FieldInfo fi = fieldInfos.fieldInfo(i); if (fi.isIndexed && !fi.omitNorms) { if (output == null) { output = directory.createOutput(segment + "." + IndexFileNames.NORMS_EXTENSION); output.writeBytes(NORMS_HEADER,NORMS_HEADER.length); } for (int j = 0; j < readers.size(); j++) { IndexReader reader = (IndexReader) readers.get(j); int maxDoc = reader.maxDoc(); if (normBuffer == null || normBuffer.length < maxDoc) { // the buffer is too small for the current segment normBuffer = new byte[maxDoc]; } reader.norms(fi.name, normBuffer, 0); if (!reader.hasDeletions()) { //optimized case for segments without deleted docs output.writeBytes(normBuffer, maxDoc); } else { // this segment has deleted docs, so we have to // check for every doc if it is deleted or not for (int k = 0; k < maxDoc; k++) { if (!reader.isDeleted(k)) { output.writeByte(normBuffer[k]); } } } if (checkAbort != null) checkAbort.work(maxDoc); } } } } finally { if (output != null) { output.close(); } } } final static class CheckAbort { private double workCount; private MergePolicy.OneMerge merge; private Directory dir; public CheckAbort(MergePolicy.OneMerge merge, Directory dir) { this.merge = merge; this.dir = dir; } /** * Records the fact that roughly units amount of work * have been done since this method was last called. * When adding time-consuming code into SegmentMerger, * you should test different values for units to ensure * that the time in between calls to merge.checkAborted * is up to ~ 1 second. */ public void work(double units) throws MergePolicy.MergeAbortedException { workCount += units; if (workCount >= 10000.0) { merge.checkAborted(dir); workCount = 0; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -