📄 documentswriter.java
字号:
String s = docStoreSegment; docStoreSegment = null; docStoreOffset = 0; numDocsInStore = 0; success = true; return s; } finally { if (!success) { abort(); } } } private Collection abortedFiles; // List of files that were written before last abort() private FlushState flushState; Collection abortedFiles() { return abortedFiles; } void message(String message) { writer.message("DW: " + message); } final List openFiles = new ArrayList(); final List closedFiles = new ArrayList(); /* Returns Collection of files in use by this instance, * including any flushed segments. */ synchronized List openFiles() { return (List) ((ArrayList) openFiles).clone(); } synchronized List closedFiles() { return (List) ((ArrayList) closedFiles).clone(); } synchronized void addOpenFile(String name) { assert !openFiles.contains(name); openFiles.add(name); } synchronized void removeOpenFile(String name) { assert openFiles.contains(name); openFiles.remove(name); closedFiles.add(name); } synchronized void setAborting() { aborting = true; } /** Called if we hit an exception at a bad time (when * updating the index files) and must discard all * currently buffered docs. This resets our state, * discarding any docs added since last flush. */ synchronized void abort() throws IOException { try { message("docWriter: now abort"); // Forcefully remove waiting ThreadStates from line waitQueue.abort(); // Wait for all other threads to finish with // DocumentsWriter: pauseAllThreads(); try { assert 0 == waitQueue.numWaiting; waitQueue.waitingBytes = 0; try { abortedFiles = openFiles(); } catch (Throwable t) { abortedFiles = null; } deletesInRAM.clear(); openFiles.clear(); for(int i=0;i<threadStates.length;i++) try { threadStates[i].consumer.abort(); } catch (Throwable t) { } try { consumer.abort(); } catch (Throwable t) { } docStoreSegment = null; numDocsInStore = 0; docStoreOffset = 0; // Reset all postings data doAfterFlush(); } finally { resumeAllThreads(); } } finally { aborting = false; notifyAll(); } } /** Reset after a flush */ private void doAfterFlush() throws IOException { // All ThreadStates should be idle when we are called assert allThreadsIdle(); threadBindings.clear(); waitQueue.reset(); segment = null; numDocsInRAM = 0; nextDocID = 0; bufferIsFull = false; flushPending = false; for(int i=0;i<threadStates.length;i++) threadStates[i].doAfterFlush(); numBytesUsed = 0; } // Returns true if an abort is in progress synchronized boolean pauseAllThreads() { pauseThreads++; while(!allThreadsIdle()) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } return aborting; } synchronized void resumeAllThreads() { pauseThreads--; assert pauseThreads >= 0; if (0 == pauseThreads) notifyAll(); } private synchronized boolean allThreadsIdle() { for(int i=0;i<threadStates.length;i++) if (!threadStates[i].isIdle) return false; return true; } synchronized private void initFlushState(boolean onlyDocStore) { initSegmentName(onlyDocStore); if (flushState == null) { flushState = new FlushState(); flushState.directory = directory; flushState.docWriter = this; } flushState.docStoreSegmentName = docStoreSegment; flushState.segmentName = segment; flushState.numDocsInRAM = numDocsInRAM; flushState.numDocsInStore = numDocsInStore; flushState.flushedFiles = new HashSet(); } /** Flush all pending docs to a new segment */ synchronized int flush(boolean closeDocStore) throws IOException { assert allThreadsIdle(); assert numDocsInRAM > 0; assert nextDocID == numDocsInRAM; assert waitQueue.numWaiting == 0; assert waitQueue.waitingBytes == 0; initFlushState(false); docStoreOffset = numDocsInStore; if (infoStream != null) message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM); boolean success = false; try { if (closeDocStore) { assert flushState.docStoreSegmentName != null; assert flushState.docStoreSegmentName.equals(flushState.segmentName); closeDocStore(); flushState.numDocsInStore = 0; } Collection threads = new HashSet(); for(int i=0;i<threadStates.length;i++) threads.add(threadStates[i].consumer); consumer.flush(threads, flushState); if (infoStream != null) { final long newSegmentSize = segmentSize(flushState.segmentName); String message = " oldRAMSize=" + numBytesUsed + " newFlushedSize=" + newSegmentSize + " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) + " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%"; message(message); } flushedDocCount += flushState.numDocsInRAM; doAfterFlush(); success = true; } finally { if (!success) { abort(); } } assert waitQueue.waitingBytes == 0; return flushState.numDocsInRAM; } /** Build compound file for the segment we just flushed */ void createCompoundFile(String segment) throws IOException { CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); Iterator it = flushState.flushedFiles.iterator(); while(it.hasNext()) cfsWriter.addFile((String) it.next()); // Perform the merge cfsWriter.close(); } /** Set flushPending if it is not already set and returns * whether it was set. This is used by IndexWriter to * trigger a single flush even when multiple threads are * trying to do so. */ synchronized boolean setFlushPending() { if (flushPending) return false; else { flushPending = true; return true; } } synchronized void clearFlushPending() { flushPending = false; } synchronized void pushDeletes() { deletesFlushed.update(deletesInRAM); } synchronized void close() { closed = true; notifyAll(); } synchronized void initSegmentName(boolean onlyDocStore) { if (segment == null && (!onlyDocStore || docStoreSegment == null)) { segment = writer.newSegmentName(); assert numDocsInRAM == 0; } if (docStoreSegment == null) { docStoreSegment = segment; assert numDocsInStore == 0; } } /** Returns a free (idle) ThreadState that may be used for * indexing this one document. This call also pauses if a * flush is pending. If delTerm is non-null then we * buffer this deleted term after the thread state has * been acquired. */ synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException { // First, find a thread state. If this thread already // has affinity to a specific ThreadState, use that one // again. DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread()); if (state == null) { // First time this thread has called us since last // flush. Find the least loaded thread state: DocumentsWriterThreadState minThreadState = null; for(int i=0;i<threadStates.length;i++) { DocumentsWriterThreadState ts = threadStates[i]; if (minThreadState == null || ts.numThreads < minThreadState.numThreads) minThreadState = ts; } if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= MAX_THREAD_STATE)) { state = minThreadState; state.numThreads++; } else { // Just create a new "private" thread state DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length]; if (threadStates.length > 0) System.arraycopy(threadStates, 0, newArray, 0, threadStates.length); state = newArray[threadStates.length] = new DocumentsWriterThreadState(this); threadStates = newArray; } threadBindings.put(Thread.currentThread(), state); } // Next, wait until my thread state is idle (in case // it's shared with other threads) and for threads to // not be paused nor a flush pending: waitReady(state); // Allocate segment name if this is the first doc since // last flush: initSegmentName(false); state.isIdle = false; boolean success = false; try { state.docState.docID = nextDocID; assert writer.testPoint("DocumentsWriter.ThreadState.init start"); if (delTerm != null) { addDeleteTerm(delTerm, state.docState.docID); state.doFlushAfter = timeToFlushDeletes(); } assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm"); nextDocID++; numDocsInRAM++; // We must at this point commit to flushing to ensure we // always get N docs when we flush by doc count, even if // > 1 thread is adding documents: if (!flushPending && maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH && numDocsInRAM >= maxBufferedDocs) { flushPending = true; state.doFlushAfter = true; } success = true; } finally { if (!success) { // Forcefully idle this ThreadState: state.isIdle = true; notifyAll(); if (state.doFlushAfter) { state.doFlushAfter = false; flushPending = false; } } } return state; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -