📄 documentswriter.java
字号:
/** Returns true if the caller (IndexWriter) should now * flush. */ boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { return updateDocument(doc, analyzer, null); } boolean updateDocument(Term t, Document doc, Analyzer analyzer) throws CorruptIndexException, IOException { return updateDocument(doc, analyzer, t); } boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm) throws CorruptIndexException, IOException { // This call is synchronized but fast final DocumentsWriterThreadState state = getThreadState(doc, delTerm); final DocState docState = state.docState; docState.doc = doc; docState.analyzer = analyzer; boolean success = false; try { // This call is not synchronized and does all the // work final DocWriter perDoc = state.consumer.processDocument(); // This call is synchronized but fast finishDocument(state, perDoc); success = true; } finally { if (!success) { synchronized(this) { if (aborting) { state.isIdle = true; notifyAll(); abort(); } else { skipDocWriter.docID = docState.docID; boolean success2 = false; try { waitQueue.add(skipDocWriter); success2 = true; } finally { if (!success2) { state.isIdle = true; notifyAll(); abort(); return false; } } state.isIdle = true; notifyAll(); // If this thread state had decided to flush, we // must clear it so another thread can flush if (state.doFlushAfter) { state.doFlushAfter = false; flushPending = false; notifyAll(); } // Immediately mark this document as deleted // since likely it was partially added. This // keeps indexing as "all or none" (atomic) when // adding a document: addDeleteDocID(state.docState.docID); } } } } return state.doFlushAfter || timeToFlushDeletes(); } // for testing synchronized int getNumBufferedDeleteTerms() { return deletesInRAM.numTerms; } // for testing synchronized HashMap getBufferedDeleteTerms() { return deletesInRAM.terms; } /** Called whenever a merge has completed and the merged segments had deletions */ synchronized void remapDeletes(SegmentInfos infos, int[][] docMaps, int[] delCounts, MergePolicy.OneMerge merge, int mergeDocCount) { if (docMaps == null) // The merged segments had no deletes so docIDs did not change and we have nothing to do return; MergeDocIDRemapper mapper = new MergeDocIDRemapper(infos, docMaps, delCounts, merge, mergeDocCount); deletesInRAM.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); deletesFlushed.remap(mapper, infos, docMaps, delCounts, merge, mergeDocCount); flushedDocCount -= mapper.docShift; } synchronized private void waitReady(DocumentsWriterThreadState state) { while (!closed && ((state != null && !state.isIdle) || pauseThreads != 0 || flushPending || aborting)) { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (closed) throw new AlreadyClosedException("this IndexWriter is closed"); } synchronized boolean bufferDeleteTerms(Term[] terms) throws IOException { waitReady(null); for (int i = 0; i < terms.length; i++) addDeleteTerm(terms[i], numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean bufferDeleteTerm(Term term) throws IOException { waitReady(null); addDeleteTerm(term, numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean bufferDeleteQueries(Query[] queries) throws IOException { waitReady(null); for (int i = 0; i < queries.length; i++) addDeleteQuery(queries[i], numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean bufferDeleteQuery(Query query) throws IOException { waitReady(null); addDeleteQuery(query, numDocsInRAM); return timeToFlushDeletes(); } synchronized boolean deletesFull() { return maxBufferedDeleteTerms != IndexWriter.DISABLE_AUTO_FLUSH && ((deletesInRAM.numTerms + deletesInRAM.queries.size() + deletesInRAM.docIDs.size()) >= maxBufferedDeleteTerms); } synchronized private boolean timeToFlushDeletes() { return (bufferIsFull || deletesFull()) && setFlushPending(); } void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) { this.maxBufferedDeleteTerms = maxBufferedDeleteTerms; } int getMaxBufferedDeleteTerms() { return maxBufferedDeleteTerms; } synchronized boolean hasDeletes() { return deletesFlushed.any(); } synchronized boolean applyDeletes(SegmentInfos infos) throws IOException { if (!hasDeletes()) return false; if (infoStream != null) message("apply " + deletesFlushed.numTerms + " buffered deleted terms and " + deletesFlushed.docIDs.size() + " deleted docIDs and " + deletesFlushed.queries.size() + " deleted queries on " + + infos.size() + " segments."); final int infosEnd = infos.size(); int docStart = 0; boolean any = false; for (int i = 0; i < infosEnd; i++) { IndexReader reader = SegmentReader.get(infos.info(i), false); boolean success = false; try { any |= applyDeletes(reader, docStart); docStart += reader.maxDoc(); success = true; } finally { if (reader != null) { try { if (success) reader.doCommit(); } finally { reader.doClose(); } } } } deletesFlushed.clear(); return any; } // Apply buffered delete terms, queries and docIDs to the // provided reader private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) throws CorruptIndexException, IOException { final int docEnd = docIDStart + reader.maxDoc(); boolean any = false; // Delete by term Iterator iter = deletesFlushed.terms.entrySet().iterator(); while (iter.hasNext()) { Entry entry = (Entry) iter.next(); Term term = (Term) entry.getKey(); TermDocs docs = reader.termDocs(term); if (docs != null) { int limit = ((BufferedDeletes.Num) entry.getValue()).getNum(); try { while (docs.next()) { int docID = docs.doc(); if (docIDStart+docID >= limit) break; reader.deleteDocument(docID); any = true; } } finally { docs.close(); } } } // Delete by docID iter = deletesFlushed.docIDs.iterator(); while(iter.hasNext()) { int docID = ((Integer) iter.next()).intValue(); if (docID >= docIDStart && docID < docEnd) { reader.deleteDocument(docID-docIDStart); any = true; } } // Delete by query IndexSearcher searcher = new IndexSearcher(reader); iter = deletesFlushed.queries.entrySet().iterator(); while(iter.hasNext()) { Entry entry = (Entry) iter.next(); Query query = (Query) entry.getKey(); int limit = ((Integer) entry.getValue()).intValue(); Weight weight = query.weight(searcher); Scorer scorer = weight.scorer(reader); while(scorer.next()) { final int docID = scorer.doc(); if (docIDStart + docID >= limit) break; reader.deleteDocument(docID); any = true; } } searcher.close(); return any; } // Buffer a term in bufferedDeleteTerms, which records the // current number of documents buffered in ram so that the // delete term will be applied to those documents as well // as the disk segments. synchronized private void addDeleteTerm(Term term, int docCount) { BufferedDeletes.Num num = (BufferedDeletes.Num) deletesInRAM.terms.get(term); final int docIDUpto = flushedDocCount + docCount; if (num == null) deletesInRAM.terms.put(term, new BufferedDeletes.Num(docIDUpto)); else num.setNum(docIDUpto); deletesInRAM.numTerms++; } // Buffer a specific docID for deletion. Currently only // used when we hit a exception when adding a document synchronized private void addDeleteDocID(int docID) { deletesInRAM.docIDs.add(new Integer(flushedDocCount+docID)); } synchronized private void addDeleteQuery(Query query, int docID) { deletesInRAM.queries.put(query, new Integer(flushedDocCount + docID)); } synchronized boolean doBalanceRAM() { return ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH && !bufferIsFull && (numBytesUsed >= ramBufferSize || numBytesAlloc >= freeTrigger); } /** Does the synchronized work to finish/flush the * inverted document. */ private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException { if (doBalanceRAM()) // Must call this w/o holding synchronized(this) else // we'll hit deadlock: balanceRAM(); synchronized(this) { assert docWriter == null || docWriter.docID == perThread.docState.docID; if (aborting) { // We are currently aborting, and another thread is // waiting for me to become idle. We just forcefully // idle this threadState; it will be fully reset by // abort() if (docWriter != null) try { docWriter.abort(); } catch (Throwable t) { } perThread.isIdle = true; notifyAll(); return; } final boolean doPause; if (docWriter != null) doPause = waitQueue.add(docWriter); else { skipDocWriter.docID = perThread.docState.docID; doPause = waitQueue.add(skipDocWriter); } if (doPause) waitForWaitQueue(); if (bufferIsFull && !flushPending) { flushPending = true; perThread.doFlushAfter = true; } perThread.isIdle = true; notifyAll(); } } synchronized void waitForWaitQueue() { do { try { wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } while (!waitQueue.doResume()); } private static class SkipDocWriter extends DocWriter { void finish() { } void abort() { } long sizeInBytes() { return 0; } } final SkipDocWriter skipDocWriter = new SkipDocWriter(); long getRAMUsed() { return numBytesUsed; } long numBytesAlloc; long numBytesUsed;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -