📄 segmentmerger.java
字号:
checkAbort.work(300*numDocs); } else { fieldsWriter.addDocument(reader.document(j, fieldSelectorMerge)); j++; docCount++; if (checkAbort != null) checkAbort.work(300); } } else j++; } } } finally { fieldsWriter.close(); } assert docCount*8 == directory.fileLength(segment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) : "after mergeFields: fdx size mismatch: " + docCount + " docs vs " + directory.fileLength(segment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION) + " length in bytes of " + segment + "." + IndexFileNames.FIELDS_INDEX_EXTENSION; } else // If we are skipping the doc stores, that means there // are no deletions in any of these segments, so we // just sum numDocs() of each segment to get total docCount for (int i = 0; i < readers.size(); i++) docCount += ((IndexReader) readers.elementAt(i)).numDocs(); return docCount; } /** * Merge the TermVectors from each of the segments into the new one. * @throws IOException */ private final void mergeVectors() throws IOException { TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos); try { for (int r = 0; r < readers.size(); r++) { IndexReader reader = (IndexReader) readers.elementAt(r); int maxDoc = reader.maxDoc(); for (int docNum = 0; docNum < maxDoc; docNum++) { // skip deleted docs if (reader.isDeleted(docNum)) continue; termVectorsWriter.addAllDocVectors(reader.getTermFreqVectors(docNum)); if (checkAbort != null) checkAbort.work(300); } } } finally { termVectorsWriter.close(); } assert 4+mergedDocs*8 == directory.fileLength(segment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) : "after mergeVectors: tvx size mismatch: " + mergedDocs + " docs vs " + directory.fileLength(segment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION) + " length in bytes of " + segment + "." + IndexFileNames.VECTORS_INDEX_EXTENSION; } private IndexOutput freqOutput = null; private IndexOutput proxOutput = null; private TermInfosWriter termInfosWriter = null; private int skipInterval; private int maxSkipLevels; private SegmentMergeQueue queue = null; private DefaultSkipListWriter skipListWriter = null; private final void mergeTerms() throws CorruptIndexException, IOException { try { freqOutput = directory.createOutput(segment + ".frq"); proxOutput = directory.createOutput(segment + ".prx"); termInfosWriter = new TermInfosWriter(directory, segment, fieldInfos, termIndexInterval); skipInterval = termInfosWriter.skipInterval; maxSkipLevels = termInfosWriter.maxSkipLevels; skipListWriter = new DefaultSkipListWriter(skipInterval, maxSkipLevels, mergedDocs, freqOutput, proxOutput); queue = new SegmentMergeQueue(readers.size()); mergeTermInfos(); } finally { if (freqOutput != null) freqOutput.close(); if (proxOutput != null) proxOutput.close(); if (termInfosWriter != null) termInfosWriter.close(); if (queue != null) queue.close(); } } private final void mergeTermInfos() throws CorruptIndexException, IOException { int base = 0; for (int i = 0; i < readers.size(); i++) { IndexReader reader = (IndexReader) readers.elementAt(i); TermEnum termEnum = reader.terms(); SegmentMergeInfo smi = new SegmentMergeInfo(base, termEnum, reader); base += reader.numDocs(); if (smi.next()) queue.put(smi); // initialize queue else smi.close(); } SegmentMergeInfo[] match = new SegmentMergeInfo[readers.size()]; while (queue.size() > 0) { int matchSize = 0; // pop matching terms match[matchSize++] = (SegmentMergeInfo) queue.pop(); Term term = match[0].term; SegmentMergeInfo top = (SegmentMergeInfo) queue.top(); while (top != null && term.compareTo(top.term) == 0) { match[matchSize++] = (SegmentMergeInfo) queue.pop(); top = (SegmentMergeInfo) queue.top(); } final int df = mergeTermInfo(match, matchSize); // add new TermInfo if (checkAbort != null) checkAbort.work(df/3.0); while (matchSize > 0) { SegmentMergeInfo smi = match[--matchSize]; if (smi.next()) queue.put(smi); // restore queue else smi.close(); // done with a segment } } } private final TermInfo termInfo = new TermInfo(); // minimize consing /** Merge one term found in one or more segments. The array <code>smis</code> * contains segments that are positioned at the same term. <code>N</code> * is the number of cells in the array actually occupied. * * @param smis array of segments * @param n number of cells in the array actually occupied * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ private final int mergeTermInfo(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { long freqPointer = freqOutput.getFilePointer(); long proxPointer = proxOutput.getFilePointer(); int df = appendPostings(smis, n); // append posting data long skipPointer = skipListWriter.writeSkip(freqOutput); if (df > 0) { // add an entry to the dictionary with pointers to prox and freq files termInfo.set(df, freqPointer, proxPointer, (int) (skipPointer - freqPointer)); termInfosWriter.add(smis[0].term, termInfo); } return df; } private byte[] payloadBuffer = null; /** Process postings from multiple segments all positioned on the * same term. Writes out merged entries into freqOutput and * the proxOutput streams. * * @param smis array of segments * @param n number of cells in the array actually occupied * @return number of documents across all segments where this term was found * @throws CorruptIndexException if the index is corrupt * @throws IOException if there is a low-level IO error */ private final int appendPostings(SegmentMergeInfo[] smis, int n) throws CorruptIndexException, IOException { int lastDoc = 0; int df = 0; // number of docs w/ term skipListWriter.resetSkip(); boolean storePayloads = fieldInfos.fieldInfo(smis[0].term.field).storePayloads; int lastPayloadLength = -1; // ensures that we write the first length for (int i = 0; i < n; i++) { SegmentMergeInfo smi = smis[i]; TermPositions postings = smi.getPositions(); assert postings != null; int base = smi.base; int[] docMap = smi.getDocMap(); postings.seek(smi.termEnum); while (postings.next()) { int doc = postings.doc(); if (docMap != null) doc = docMap[doc]; // map around deletions doc += base; // convert to merged space if (doc < 0 || (df > 0 && doc <= lastDoc)) throw new CorruptIndexException("docs out of order (" + doc + " <= " + lastDoc + " )"); df++; if ((df % skipInterval) == 0) { skipListWriter.setSkipData(lastDoc, storePayloads, lastPayloadLength); skipListWriter.bufferSkip(df); } int docCode = (doc - lastDoc) << 1; // use low bit to flag freq=1 lastDoc = doc; int freq = postings.freq(); if (freq == 1) { freqOutput.writeVInt(docCode | 1); // write doc & freq=1 } else { freqOutput.writeVInt(docCode); // write doc freqOutput.writeVInt(freq); // write frequency in doc } /** See {@link DocumentWriter#writePostings(Posting[], String) for * documentation about the encoding of positions and payloads */ int lastPosition = 0; // write position deltas for (int j = 0; j < freq; j++) { int position = postings.nextPosition(); int delta = position - lastPosition; if (storePayloads) { int payloadLength = postings.getPayloadLength(); if (payloadLength == lastPayloadLength) { proxOutput.writeVInt(delta * 2); } else { proxOutput.writeVInt(delta * 2 + 1); proxOutput.writeVInt(payloadLength); lastPayloadLength = payloadLength; } if (payloadLength > 0) { if (payloadBuffer == null || payloadBuffer.length < payloadLength) { payloadBuffer = new byte[payloadLength]; } postings.getPayload(payloadBuffer, 0); proxOutput.writeBytes(payloadBuffer, 0, payloadLength); } } else { proxOutput.writeVInt(delta); } lastPosition = position; } } } return df; } private void mergeNorms() throws IOException { byte[] normBuffer = null; IndexOutput output = null; try { for (int i = 0; i < fieldInfos.size(); i++) { FieldInfo fi = fieldInfos.fieldInfo(i); if (fi.isIndexed && !fi.omitNorms) { if (output == null) { output = directory.createOutput(segment + "." + IndexFileNames.NORMS_EXTENSION); output.writeBytes(NORMS_HEADER,NORMS_HEADER.length); } for (int j = 0; j < readers.size(); j++) { IndexReader reader = (IndexReader) readers.elementAt(j); int maxDoc = reader.maxDoc(); if (normBuffer == null || normBuffer.length < maxDoc) { // the buffer is too small for the current segment normBuffer = new byte[maxDoc]; } reader.norms(fi.name, normBuffer, 0); if (!reader.hasDeletions()) { //optimized case for segments without deleted docs output.writeBytes(normBuffer, maxDoc); } else { // this segment has deleted docs, so we have to // check for every doc if it is deleted or not for (int k = 0; k < maxDoc; k++) { if (!reader.isDeleted(k)) { output.writeByte(normBuffer[k]); } } } if (checkAbort != null) checkAbort.work(maxDoc); } } } } finally { if (output != null) { output.close(); } } } final static class CheckAbort { private double workCount; private MergePolicy.OneMerge merge; private Directory dir; public CheckAbort(MergePolicy.OneMerge merge, Directory dir) { this.merge = merge; this.dir = dir; } /** * Records the fact that roughly units amount of work * have been done since this method was last called. * When adding time-consuming code into SegmentMerger, * you should test different values for units to ensure * that the time in between calls to merge.checkAborted * is up to ~ 1 second. */ public void work(double units) throws MergePolicy.MergeAbortedException { workCount += units; if (workCount >= 10000.0) { merge.checkAborted(dir); workCount = 0; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -