⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 documentswriter.java

📁 lucene-2.4.0 是一个全文收索的工具包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
      String s = docStoreSegment;      docStoreSegment = null;      docStoreOffset = 0;      numDocsInStore = 0;      success = true;      return s;    } finally {      if (!success) {        abort();      }    }  }  private Collection abortedFiles;               // List of files that were written before last abort()  private FlushState flushState;  Collection abortedFiles() {    return abortedFiles;  }  void message(String message) {    writer.message("DW: " + message);  }  final List openFiles = new ArrayList();  final List closedFiles = new ArrayList();  /* Returns Collection of files in use by this instance,   * including any flushed segments. */  synchronized List openFiles() {    return (List) ((ArrayList) openFiles).clone();  }  synchronized List closedFiles() {    return (List) ((ArrayList) closedFiles).clone();  }  synchronized void addOpenFile(String name) {    assert !openFiles.contains(name);    openFiles.add(name);  }  synchronized void removeOpenFile(String name) {    assert openFiles.contains(name);    openFiles.remove(name);    closedFiles.add(name);  }  synchronized void setAborting() {    aborting = true;  }  /** Called if we hit an exception at a bad time (when   *  updating the index files) and must discard all   *  currently buffered docs.  This resets our state,   *  discarding any docs added since last flush. */  synchronized void abort() throws IOException {    try {      message("docWriter: now abort");      // Forcefully remove waiting ThreadStates from line      waitQueue.abort();      // Wait for all other threads to finish with      // DocumentsWriter:      pauseAllThreads();      try {        assert 0 == waitQueue.numWaiting;        waitQueue.waitingBytes = 0;        try {          abortedFiles = openFiles();        } catch (Throwable t) {          abortedFiles = null;        }        deletesInRAM.clear();        openFiles.clear();        for(int i=0;i<threadStates.length;i++)          try {            threadStates[i].consumer.abort();          } catch (Throwable t) {          }        try {          consumer.abort();        } catch (Throwable t) {        }        docStoreSegment = null;        numDocsInStore = 0;        docStoreOffset = 0;        // Reset all postings data        doAfterFlush();      } finally {        resumeAllThreads();      }    } finally {      aborting = false;      notifyAll();    }  }  /** Reset after a flush */  private void doAfterFlush() throws IOException {    // All ThreadStates should be idle when we are called    assert allThreadsIdle();    threadBindings.clear();    waitQueue.reset();    segment = null;    numDocsInRAM = 0;    nextDocID = 0;    bufferIsFull = false;    flushPending = false;    for(int i=0;i<threadStates.length;i++)      threadStates[i].doAfterFlush();    numBytesUsed = 0;  }  // Returns true if an abort is in progress  synchronized boolean pauseAllThreads() {    pauseThreads++;    while(!allThreadsIdle()) {      try {        wait();      } catch (InterruptedException e) {        Thread.currentThread().interrupt();      }    }    return aborting;  }  synchronized void resumeAllThreads() {    pauseThreads--;    assert pauseThreads >= 0;    if (0 == pauseThreads)      notifyAll();  }  private synchronized boolean allThreadsIdle() {    for(int i=0;i<threadStates.length;i++)      if (!threadStates[i].isIdle)        return false;    return true;  }  synchronized private void initFlushState(boolean onlyDocStore) {    initSegmentName(onlyDocStore);    if (flushState == null) {      flushState = new FlushState();      flushState.directory = directory;      flushState.docWriter = this;    }    flushState.docStoreSegmentName = docStoreSegment;    flushState.segmentName = segment;    flushState.numDocsInRAM = numDocsInRAM;    flushState.numDocsInStore = numDocsInStore;    flushState.flushedFiles = new HashSet();  }  /** Flush all pending docs to a new segment */  synchronized int flush(boolean closeDocStore) throws IOException {    assert allThreadsIdle();    assert numDocsInRAM > 0;    assert nextDocID == numDocsInRAM;    assert waitQueue.numWaiting == 0;    assert waitQueue.waitingBytes == 0;    initFlushState(false);    docStoreOffset = numDocsInStore;    if (infoStream != null)      message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);        boolean success = false;    try {      if (closeDocStore) {        assert flushState.docStoreSegmentName != null;        assert flushState.docStoreSegmentName.equals(flushState.segmentName);        closeDocStore();        flushState.numDocsInStore = 0;      }      Collection threads = new HashSet();      for(int i=0;i<threadStates.length;i++)        threads.add(threadStates[i].consumer);      consumer.flush(threads, flushState);      if (infoStream != null) {        final long newSegmentSize = segmentSize(flushState.segmentName);        String message = "  oldRAMSize=" + numBytesUsed +          " newFlushedSize=" + newSegmentSize +          " docs/MB=" + nf.format(numDocsInRAM/(newSegmentSize/1024./1024.)) +          " new/old=" + nf.format(100.0*newSegmentSize/numBytesUsed) + "%";        message(message);      }      flushedDocCount += flushState.numDocsInRAM;      doAfterFlush();      success = true;    } finally {      if (!success) {        abort();      }    }    assert waitQueue.waitingBytes == 0;    return flushState.numDocsInRAM;  }  /** Build compound file for the segment we just flushed */  void createCompoundFile(String segment) throws IOException {        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION);    Iterator it = flushState.flushedFiles.iterator();    while(it.hasNext())      cfsWriter.addFile((String) it.next());          // Perform the merge    cfsWriter.close();  }  /** Set flushPending if it is not already set and returns   *  whether it was set. This is used by IndexWriter to   *  trigger a single flush even when multiple threads are   *  trying to do so. */  synchronized boolean setFlushPending() {    if (flushPending)      return false;    else {      flushPending = true;      return true;    }  }  synchronized void clearFlushPending() {    flushPending = false;  }  synchronized void pushDeletes() {    deletesFlushed.update(deletesInRAM);  }  synchronized void close() {    closed = true;    notifyAll();  }  synchronized void initSegmentName(boolean onlyDocStore) {    if (segment == null && (!onlyDocStore || docStoreSegment == null)) {      segment = writer.newSegmentName();      assert numDocsInRAM == 0;    }    if (docStoreSegment == null) {      docStoreSegment = segment;      assert numDocsInStore == 0;    }  }  /** Returns a free (idle) ThreadState that may be used for   * indexing this one document.  This call also pauses if a   * flush is pending.  If delTerm is non-null then we   * buffer this deleted term after the thread state has   * been acquired. */  synchronized DocumentsWriterThreadState getThreadState(Document doc, Term delTerm) throws IOException {    // First, find a thread state.  If this thread already    // has affinity to a specific ThreadState, use that one    // again.    DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread());    if (state == null) {      // First time this thread has called us since last      // flush.  Find the least loaded thread state:      DocumentsWriterThreadState minThreadState = null;      for(int i=0;i<threadStates.length;i++) {        DocumentsWriterThreadState ts = threadStates[i];        if (minThreadState == null || ts.numThreads < minThreadState.numThreads)          minThreadState = ts;      }      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= MAX_THREAD_STATE)) {        state = minThreadState;        state.numThreads++;      } else {        // Just create a new "private" thread state        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];        if (threadStates.length > 0)          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);        threadStates = newArray;      }      threadBindings.put(Thread.currentThread(), state);    }    // Next, wait until my thread state is idle (in case    // it's shared with other threads) and for threads to    // not be paused nor a flush pending:    waitReady(state);    // Allocate segment name if this is the first doc since    // last flush:    initSegmentName(false);    state.isIdle = false;    boolean success = false;    try {      state.docState.docID = nextDocID;      assert writer.testPoint("DocumentsWriter.ThreadState.init start");      if (delTerm != null) {        addDeleteTerm(delTerm, state.docState.docID);        state.doFlushAfter = timeToFlushDeletes();      }      assert writer.testPoint("DocumentsWriter.ThreadState.init after delTerm");      nextDocID++;      numDocsInRAM++;      // We must at this point commit to flushing to ensure we      // always get N docs when we flush by doc count, even if      // > 1 thread is adding documents:      if (!flushPending &&          maxBufferedDocs != IndexWriter.DISABLE_AUTO_FLUSH          && numDocsInRAM >= maxBufferedDocs) {        flushPending = true;        state.doFlushAfter = true;      }      success = true;    } finally {      if (!success) {        // Forcefully idle this ThreadState:        state.isIdle = true;        notifyAll();        if (state.doFlushAfter) {          state.doFlushAfter = false;          flushPending = false;        }      }    }    return state;  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -