📄 documentswriter.java
字号:
NumberFormat nf = NumberFormat.getInstance(); // TODO FI: this is not flexible -- we can't hardwire // extensions in here: private long segmentSize(String segmentName) throws IOException { // Used only when infoStream != null assert infoStream != null; long size = directory.fileLength(segmentName + ".tii") + directory.fileLength(segmentName + ".tis") + directory.fileLength(segmentName + ".frq") + directory.fileLength(segmentName + ".prx"); final String normFileName = segmentName + ".nrm"; if (directory.fileExists(normFileName)) size += directory.fileLength(normFileName); return size; } // Coarse estimates used to measure RAM usage of buffered deletes final static int OBJECT_HEADER_BYTES = 8; final static int POINTER_NUM_BYTE = 4; final static int INT_NUM_BYTE = 4; final static int CHAR_NUM_BYTE = 2; /* Initial chunks size of the shared byte[] blocks used to store postings data */ final static int BYTE_BLOCK_SHIFT = 15; final static int BYTE_BLOCK_SIZE = (int) (1 << BYTE_BLOCK_SHIFT); final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1; final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; private class ByteBlockAllocator extends ByteBlockPool.Allocator { ArrayList freeByteBlocks = new ArrayList(); /* Allocate another byte[] from the shared pool */ byte[] getByteBlock(boolean trackAllocations) { synchronized(DocumentsWriter.this) { final int size = freeByteBlocks.size(); final byte[] b; if (0 == size) { // Always record a block allocated, even if // trackAllocations is false. This is necessary // because this block will be shared between // things that don't track allocations (term // vectors) and things that do (freq/prox // postings). numBytesAlloc += BYTE_BLOCK_SIZE; b = new byte[BYTE_BLOCK_SIZE]; } else b = (byte[]) freeByteBlocks.remove(size-1); if (trackAllocations) numBytesUsed += BYTE_BLOCK_SIZE; assert numBytesUsed <= numBytesAlloc; return b; } } /* Return byte[]'s to the pool */ void recycleByteBlocks(byte[][] blocks, int start, int end) { synchronized(DocumentsWriter.this) { for(int i=start;i<end;i++) freeByteBlocks.add(blocks[i]); } } } /* Initial chunks size of the shared int[] blocks used to store postings data */ final static int INT_BLOCK_SHIFT = 13; final static int INT_BLOCK_SIZE = (int) (1 << INT_BLOCK_SHIFT); final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1; private ArrayList freeIntBlocks = new ArrayList(); /* Allocate another int[] from the shared pool */ synchronized int[] getIntBlock(boolean trackAllocations) { final int size = freeIntBlocks.size(); final int[] b; if (0 == size) { // Always record a block allocated, even if // trackAllocations is false. This is necessary // because this block will be shared between // things that don't track allocations (term // vectors) and things that do (freq/prox // postings). numBytesAlloc += INT_BLOCK_SIZE*INT_NUM_BYTE; b = new int[INT_BLOCK_SIZE]; } else b = (int[]) freeIntBlocks.remove(size-1); if (trackAllocations) numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE; assert numBytesUsed <= numBytesAlloc; return b; } synchronized void bytesAllocated(long numBytes) { numBytesAlloc += numBytes; assert numBytesUsed <= numBytesAlloc; } synchronized void bytesUsed(long numBytes) { numBytesUsed += numBytes; assert numBytesUsed <= numBytesAlloc; } /* Return int[]s to the pool */ synchronized void recycleIntBlocks(int[][] blocks, int start, int end) { for(int i=start;i<end;i++) freeIntBlocks.add(blocks[i]); } ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(); /* Initial chunk size of the shared char[] blocks used to store term text */ final static int CHAR_BLOCK_SHIFT = 14; final static int CHAR_BLOCK_SIZE = (int) (1 << CHAR_BLOCK_SHIFT); final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1; final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1; private ArrayList freeCharBlocks = new ArrayList(); /* Allocate another char[] from the shared pool */ synchronized char[] getCharBlock() { final int size = freeCharBlocks.size(); final char[] c; if (0 == size) { numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; c = new char[CHAR_BLOCK_SIZE]; } else c = (char[]) freeCharBlocks.remove(size-1); // We always track allocations of char blocks, for now, // because nothing that skips allocation tracking // (currently only term vectors) uses its own char // blocks. numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; assert numBytesUsed <= numBytesAlloc; return c; } /* Return char[]s to the pool */ synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) { for(int i=0;i<numBlocks;i++) freeCharBlocks.add(blocks[i]); } String toMB(long v) { return nf.format(v/1024./1024.); } /* We have three pools of RAM: Postings, byte blocks * (holds freq/prox posting data) and char blocks (holds * characters in the term). Different docs require * varying amount of storage from these three classes. * For example, docs with many unique single-occurrence * short terms will use up the Postings RAM and hardly any * of the other two. Whereas docs with very large terms * will use alot of char blocks RAM and relatively less of * the other two. This method just frees allocations from * the pools once we are over-budget, which balances the * pools to match the current docs. */ void balanceRAM() { // We flush when we've used our target usage final long flushTrigger = (long) ramBufferSize; if (numBytesAlloc > freeTrigger) { if (infoStream != null) message(" RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) + " vs trigger=" + toMB(flushTrigger) + " allocMB=" + toMB(numBytesAlloc) + " vs trigger=" + toMB(freeTrigger) + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE)); final long startBytesAlloc = numBytesAlloc; int iter = 0; // We free equally from each pool in 32 KB // chunks until we are below our threshold // (freeLevel) boolean any = true; while(numBytesAlloc > freeLevel) { synchronized(this) { if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) { // Nothing else to free -- must flush now. bufferIsFull = numBytesUsed > flushTrigger; if (infoStream != null) { if (numBytesUsed > flushTrigger) message(" nothing to free; now set bufferIsFull"); else message(" nothing to free"); } assert numBytesUsed <= numBytesAlloc; break; } if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); numBytesAlloc -= BYTE_BLOCK_SIZE; } if ((1 == iter % 4) && freeCharBlocks.size() > 0) { freeCharBlocks.remove(freeCharBlocks.size()-1); numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; } if ((2 == iter % 4) && freeIntBlocks.size() > 0) { freeIntBlocks.remove(freeIntBlocks.size()-1); numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; } } if ((3 == iter % 4) && any) // Ask consumer to free any recycled state any = consumer.freeRAM(); iter++; } if (infoStream != null) message(" after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc)/1024./1024.) + " usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.)); } else { // If we have not crossed the 100% mark, but have // crossed the 95% mark of RAM we are actually // using, go ahead and flush. This prevents // over-allocating and then freeing, with every // flush. synchronized(this) { if (numBytesUsed > flushTrigger) { if (infoStream != null) message(" RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.) + " triggerMB=" + nf.format(flushTrigger/1024./1024.)); bufferIsFull = true; } } } } final WaitQueue waitQueue = new WaitQueue(); private class WaitQueue { DocWriter[] waiting; int nextWriteDocID; int nextWriteLoc; int numWaiting; long waitingBytes; public WaitQueue() { waiting = new DocWriter[10]; } synchronized void reset() { // NOTE: nextWriteLoc doesn't need to be reset assert numWaiting == 0; assert waitingBytes == 0; nextWriteDocID = 0; } synchronized boolean doResume() { return waitingBytes <= waitQueueResumeBytes; } synchronized boolean doPause() { return waitingBytes > waitQueuePauseBytes; } synchronized void abort() { int count = 0; for(int i=0;i<waiting.length;i++) { final DocWriter doc = waiting[i]; if (doc != null) { doc.abort(); waiting[i] = null; count++; } } waitingBytes = 0; assert count == numWaiting; numWaiting = 0; } private void writeDocument(DocWriter doc) throws IOException { assert doc == skipDocWriter || nextWriteDocID == doc.docID; boolean success = false; try { doc.finish(); nextWriteDocID++; numDocsInStore++; nextWriteLoc++; assert nextWriteLoc <= waiting.length; if (nextWriteLoc == waiting.length) nextWriteLoc = 0; success = true; } finally { if (!success) setAborting(); } } synchronized public boolean add(DocWriter doc) throws IOException { assert doc.docID >= nextWriteDocID; if (doc.docID == nextWriteDocID) { writeDocument(doc); while(true) { doc = waiting[nextWriteLoc]; if (doc != null) { numWaiting--; waiting[nextWriteLoc] = null; waitingBytes -= doc.sizeInBytes(); writeDocument(doc); } else break; } } else { // I finished before documents that were added // before me. This can easily happen when I am a // small doc and the docs before me were large, or, // just due to luck in the thread scheduling. Just // add myself to the queue and when that large doc // finishes, it will flush me: int gap = doc.docID - nextWriteDocID; if (gap >= waiting.length) { // Grow queue DocWriter[] newArray = new DocWriter[ArrayUtil.getNextSize(gap)]; assert nextWriteLoc >= 0; System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); nextWriteLoc = 0; waiting = newArray; gap = doc.docID - nextWriteDocID; } int loc = nextWriteLoc + gap; if (loc >= waiting.length) loc -= waiting.length; // We should only wrap one time assert loc < waiting.length; // Nobody should be in my spot! assert waiting[loc] == null; waiting[loc] = doc; numWaiting++; waitingBytes += doc.sizeInBytes(); } return doPause(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -