⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 documentswriter.java

📁 全文检索lucene2.0的源码 请笑纳
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
  NumberFormat nf = NumberFormat.getInstance();  // TODO FI: this is not flexible -- we can't hardwire  // extensions in here:  private long segmentSize(String segmentName) throws IOException {    // Used only when infoStream != null    assert infoStream != null;        long size = directory.fileLength(segmentName + ".tii") +      directory.fileLength(segmentName + ".tis") +      directory.fileLength(segmentName + ".frq") +      directory.fileLength(segmentName + ".prx");    final String normFileName = segmentName + ".nrm";    if (directory.fileExists(normFileName))      size += directory.fileLength(normFileName);    return size;  }  // Coarse estimates used to measure RAM usage of buffered deletes  final static int OBJECT_HEADER_BYTES = 8;  final static int POINTER_NUM_BYTE = 4;  final static int INT_NUM_BYTE = 4;  final static int CHAR_NUM_BYTE = 2;  /* Initial chunks size of the shared byte[] blocks used to     store postings data */  final static int BYTE_BLOCK_SHIFT = 15;  final static int BYTE_BLOCK_SIZE = (int) (1 << BYTE_BLOCK_SHIFT);  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;  private class ByteBlockAllocator extends ByteBlockPool.Allocator {    ArrayList freeByteBlocks = new ArrayList();        /* Allocate another byte[] from the shared pool */    byte[] getByteBlock(boolean trackAllocations) {      synchronized(DocumentsWriter.this) {        final int size = freeByteBlocks.size();        final byte[] b;        if (0 == size) {          // Always record a block allocated, even if          // trackAllocations is false.  This is necessary          // because this block will be shared between          // things that don't track allocations (term          // vectors) and things that do (freq/prox          // postings).          numBytesAlloc += BYTE_BLOCK_SIZE;          b = new byte[BYTE_BLOCK_SIZE];        } else          b = (byte[]) freeByteBlocks.remove(size-1);        if (trackAllocations)          numBytesUsed += BYTE_BLOCK_SIZE;        assert numBytesUsed <= numBytesAlloc;        return b;      }    }    /* Return byte[]'s to the pool */    void recycleByteBlocks(byte[][] blocks, int start, int end) {      synchronized(DocumentsWriter.this) {        for(int i=start;i<end;i++)          freeByteBlocks.add(blocks[i]);      }    }  }  /* Initial chunks size of the shared int[] blocks used to     store postings data */  final static int INT_BLOCK_SHIFT = 13;  final static int INT_BLOCK_SIZE = (int) (1 << INT_BLOCK_SHIFT);  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;  private ArrayList freeIntBlocks = new ArrayList();  /* Allocate another int[] from the shared pool */  synchronized int[] getIntBlock(boolean trackAllocations) {    final int size = freeIntBlocks.size();    final int[] b;    if (0 == size) {      // Always record a block allocated, even if      // trackAllocations is false.  This is necessary      // because this block will be shared between      // things that don't track allocations (term      // vectors) and things that do (freq/prox      // postings).      numBytesAlloc += INT_BLOCK_SIZE*INT_NUM_BYTE;      b = new int[INT_BLOCK_SIZE];    } else      b = (int[]) freeIntBlocks.remove(size-1);    if (trackAllocations)      numBytesUsed += INT_BLOCK_SIZE*INT_NUM_BYTE;    assert numBytesUsed <= numBytesAlloc;    return b;  }  synchronized void bytesAllocated(long numBytes) {    numBytesAlloc += numBytes;    assert numBytesUsed <= numBytesAlloc;  }  synchronized void bytesUsed(long numBytes) {    numBytesUsed += numBytes;    assert numBytesUsed <= numBytesAlloc;  }  /* Return int[]s to the pool */  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {    for(int i=start;i<end;i++)      freeIntBlocks.add(blocks[i]);  }  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator();  /* Initial chunk size of the shared char[] blocks used to     store term text */  final static int CHAR_BLOCK_SHIFT = 14;  final static int CHAR_BLOCK_SIZE = (int) (1 << CHAR_BLOCK_SHIFT);  final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;  final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;  private ArrayList freeCharBlocks = new ArrayList();  /* Allocate another char[] from the shared pool */  synchronized char[] getCharBlock() {    final int size = freeCharBlocks.size();    final char[] c;    if (0 == size) {      numBytesAlloc += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;      c = new char[CHAR_BLOCK_SIZE];    } else      c = (char[]) freeCharBlocks.remove(size-1);    // We always track allocations of char blocks, for now,    // because nothing that skips allocation tracking    // (currently only term vectors) uses its own char    // blocks.    numBytesUsed += CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;    assert numBytesUsed <= numBytesAlloc;    return c;  }  /* Return char[]s to the pool */  synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {    for(int i=0;i<numBlocks;i++)      freeCharBlocks.add(blocks[i]);  }  String toMB(long v) {    return nf.format(v/1024./1024.);  }  /* We have three pools of RAM: Postings, byte blocks   * (holds freq/prox posting data) and char blocks (holds   * characters in the term).  Different docs require   * varying amount of storage from these three classes.   * For example, docs with many unique single-occurrence   * short terms will use up the Postings RAM and hardly any   * of the other two.  Whereas docs with very large terms   * will use alot of char blocks RAM and relatively less of   * the other two.  This method just frees allocations from   * the pools once we are over-budget, which balances the   * pools to match the current docs. */  void balanceRAM() {    // We flush when we've used our target usage    final long flushTrigger = (long) ramBufferSize;    if (numBytesAlloc > freeTrigger) {      if (infoStream != null)        message("  RAM: now balance allocations: usedMB=" + toMB(numBytesUsed) +                " vs trigger=" + toMB(flushTrigger) +                " allocMB=" + toMB(numBytesAlloc) +                " vs trigger=" + toMB(freeTrigger) +                " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +                " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));      final long startBytesAlloc = numBytesAlloc;      int iter = 0;      // We free equally from each pool in 32 KB      // chunks until we are below our threshold      // (freeLevel)      boolean any = true;      while(numBytesAlloc > freeLevel) {              synchronized(this) {          if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) {            // Nothing else to free -- must flush now.            bufferIsFull = numBytesUsed > flushTrigger;            if (infoStream != null) {              if (numBytesUsed > flushTrigger)                message("    nothing to free; now set bufferIsFull");              else                message("    nothing to free");            }            assert numBytesUsed <= numBytesAlloc;            break;          }          if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) {            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);            numBytesAlloc -= BYTE_BLOCK_SIZE;          }          if ((1 == iter % 4) && freeCharBlocks.size() > 0) {            freeCharBlocks.remove(freeCharBlocks.size()-1);            numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;          }          if ((2 == iter % 4) && freeIntBlocks.size() > 0) {            freeIntBlocks.remove(freeIntBlocks.size()-1);            numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;          }        }        if ((3 == iter % 4) && any)          // Ask consumer to free any recycled state          any = consumer.freeRAM();        iter++;      }      if (infoStream != null)        message("    after free: freedMB=" + nf.format((startBytesAlloc-numBytesAlloc)/1024./1024.) + " usedMB=" + nf.format(numBytesUsed/1024./1024.) + " allocMB=" + nf.format(numBytesAlloc/1024./1024.));          } else {      // If we have not crossed the 100% mark, but have      // crossed the 95% mark of RAM we are actually      // using, go ahead and flush.  This prevents      // over-allocating and then freeing, with every      // flush.      synchronized(this) {        if (numBytesUsed > flushTrigger) {          if (infoStream != null)            message("  RAM: now flush @ usedMB=" + nf.format(numBytesUsed/1024./1024.) +                    " allocMB=" + nf.format(numBytesAlloc/1024./1024.) +                    " triggerMB=" + nf.format(flushTrigger/1024./1024.));          bufferIsFull = true;        }      }    }  }  final WaitQueue waitQueue = new WaitQueue();  private class WaitQueue {    DocWriter[] waiting;    int nextWriteDocID;    int nextWriteLoc;    int numWaiting;    long waitingBytes;    public WaitQueue() {      waiting = new DocWriter[10];    }    synchronized void reset() {      // NOTE: nextWriteLoc doesn't need to be reset      assert numWaiting == 0;      assert waitingBytes == 0;      nextWriteDocID = 0;    }    synchronized boolean doResume() {      return waitingBytes <= waitQueueResumeBytes;    }    synchronized boolean doPause() {      return waitingBytes > waitQueuePauseBytes;    }    synchronized void abort() {      int count = 0;      for(int i=0;i<waiting.length;i++) {        final DocWriter doc = waiting[i];        if (doc != null) {          doc.abort();          waiting[i] = null;          count++;        }      }      waitingBytes = 0;      assert count == numWaiting;      numWaiting = 0;    }    private void writeDocument(DocWriter doc) throws IOException {      assert doc == skipDocWriter || nextWriteDocID == doc.docID;      boolean success = false;      try {        doc.finish();        nextWriteDocID++;        numDocsInStore++;        nextWriteLoc++;        assert nextWriteLoc <= waiting.length;        if (nextWriteLoc == waiting.length)          nextWriteLoc = 0;        success = true;      } finally {        if (!success)          setAborting();      }    }    synchronized public boolean add(DocWriter doc) throws IOException {      assert doc.docID >= nextWriteDocID;      if (doc.docID == nextWriteDocID) {        writeDocument(doc);        while(true) {          doc = waiting[nextWriteLoc];          if (doc != null) {            numWaiting--;            waiting[nextWriteLoc] = null;            waitingBytes -= doc.sizeInBytes();            writeDocument(doc);          } else            break;        }      } else {        // I finished before documents that were added        // before me.  This can easily happen when I am a        // small doc and the docs before me were large, or,        // just due to luck in the thread scheduling.  Just        // add myself to the queue and when that large doc        // finishes, it will flush me:        int gap = doc.docID - nextWriteDocID;        if (gap >= waiting.length) {          // Grow queue          DocWriter[] newArray = new DocWriter[ArrayUtil.getNextSize(gap)];          assert nextWriteLoc >= 0;          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);          nextWriteLoc = 0;          waiting = newArray;          gap = doc.docID - nextWriteDocID;        }        int loc = nextWriteLoc + gap;        if (loc >= waiting.length)          loc -= waiting.length;        // We should only wrap one time        assert loc < waiting.length;        // Nobody should be in my spot!        assert waiting[loc] == null;        waiting[loc] = doc;        numWaiting++;        waitingBytes += doc.sizeInBytes();      }            return doPause();    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -