⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datanodeindexhandler.java

📁 分布式全文搜索工具包 可以支持集群 主要使用java開發 比較方便使用
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   */  public int removeDocuments(String indexName, Term term) throws IOException {    //      FIXME - shouldn't this make a new version of an index?        // not sure what the return value is, lucene returns int on a reader    // but void on a writer    IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion();    leases.getLease(iv);    int results;    IndexReader reader = null;    Directory directory = null;    Hits hits = null;    IndexSearcher searcher = null;    try {      File dir = indexes.getKnownIndexDirectory(iv);      reader = getIndexReader(dir);      directory = reader.directory();      searcher = new IndexSearcher(directory);      hits = searcher.search(new TermQuery(term));      results = hits.length();      LOG.info("removeDocuments: found " + results + " matching documents");    } finally {      close(reader, directory, null, searcher);    }    // only create a new version of index if some documents will be deleted    if (results > 0) {      IndexWriter writer = openNewIndexVersion(indexName);      Directory file = writer.getDirectory();      close(null, null, writer, null);      try {        reader = IndexReader.open(file);        results = reader.deleteDocuments(term);      } finally {        close(reader, null, null, null);      }    }    return results;  }  /**   * If needed, open a new version of an index for committing changes.   *    * @param indexName the index   * @return a Lucene IndexWriter   * @throws IOException   */  IndexWriter openNewIndexVersion(String indexName) throws IOException {    IndexWriter writer = null;    IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion();    IndexLocation index = indexes.getPrimaryIndex(indexName);    if (!index.getState().equals(IndexState.UNCOMMITTED)) {      LOG.debug("Creating a new index");      // there is no version of this index already open with uncommitted changes      IndexVersion newVersion = iv.nextVersion();      IndexLocation location = new IndexLocation(dataconf.getAddress(),          newVersion, IndexState.UNCOMMITTED);      if (leases.getLease(newVersion)) {        File directory = indexes.getIndexDirectory(newVersion);        LOG.debug("Destination directory is " + newVersion);        writer = getIndexWriter(directory, true);        Directory targetDirectory = null;        try {          // there are no uncommitted changes - make a new version          targetDirectory = writer.getDirectory();          copyIndex(iv, targetDirectory);          indexes.add(location);          LOG.debug("Copied from " + iv + " to " + targetDirectory);          iv = newVersion;        } finally {          close(null, targetDirectory, writer, null);        }      }    } else {      LOG.debug("Index already exists");    }    if (leases.getLease(iv)) {      return getIndexWriter(indexes.getIndexDirectory(iv),        false);    }     throw new IOException("Could not open index " + indexName);  }    IndexVersion getNewIndexVersion(String indexName) {    IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion();    IndexLocation index = indexes.getPrimaryIndex(indexName);    if (!index.getState().equals(IndexState.UNCOMMITTED)) {      LOG.debug("Creating a new index");      // there is no version of this index already open with uncommitted changes      iv = iv.nextVersion();    } else {      LOG.debug("Index already exists");    }    return iv;  }    /**   * Get the directory of an index.   *    * @param index the index.   * @return the directory   */  File getIndexDirectory(IndexVersion index) {    return indexes.getIndexDirectory(index);  }  /**   * Copy an index to a target directory.   *    * @param indexVersion the index   * @param targetDirectory the target directory   * @param newIndex    * @return the files to be copied   * @throws IOException   */  Set<String> copyIndex(IndexVersion indexVersion, Directory targetDirectory,      File newIndex) throws IOException {    Set<String> localFiles = new HashSet<String>();    IndexReader reader = null;    copyIndex(indexVersion, targetDirectory);    try {      reader = getIndexReader(newIndex);      Directory directory = reader.directory();      for (String localFile : directory.list()) {        localFiles.add(localFile);      }    } finally {      close(reader, null, null, null);    }    return localFiles;  }  /**   * Copy an index from a location to a target directory.   *    * @param indexVersion the index   * @param targetDirectory the target directory   * @throws IOException   */  private void copyIndex(IndexVersion indexVersion, Directory targetDirectory)      throws IOException {    File source = indexes.getIndexDirectory(indexVersion);    LOG.info("Source is " + source.getAbsolutePath());    Directory sourceDirectory = null;    IndexReader sourceReader = null;    try {      sourceReader = getIndexReader(source);      LOG.debug("Copying index - there are " + sourceReader.numDocs()          + " documents in source index");      sourceDirectory = sourceReader.directory();      Directory.copy(sourceDirectory, targetDirectory, false);      LOG.debug("Copied from " + indexVersion + " to " + targetDirectory);      String[] files = targetDirectory.list();      for (String s : files) {        LOG.debug("Copied " + s);      }    } finally {      close(sourceReader, sourceDirectory, null, null);    }  }  /**   * Copy a remote index to this machine. If a older local copy exists, take   * advantage of it to minimize network traffic.   *    * @param indexToCopy the index to copy   * @throws IOException   */  public void copyRemoteIndex(IndexLocation indexToCopy) throws IOException {    if (indexes.hasIndex(indexToCopy.getIndexVersion())) {      // node already has index!      LOG.info("Data node is already replicating " + indexToCopy.toString()          + "\n");    } else {      DataNodeToDataNodeProtocol datanode = (DataNodeToDataNodeProtocol) RPC          .waitForProxy(DataNodeToDataNodeProtocol.class,              DataNodeToDataNodeProtocol.VERSION_ID, indexToCopy.getAddress(),              conf);      Directory targetDirectory = null;      IndexWriter targetWriter = null;      IndexVersion indexVersionToCopy = indexToCopy.getIndexVersion();      String sourceId = indexVersionToCopy.getName();      IndexVersion primaryVersion = null;      if (indexes.hasIndex(sourceId)) {        primaryVersion = indexes.getPrimaryIndex(sourceId).getIndexVersion();      }      try {        IndexLocation location = new IndexLocation(dataconf.getAddress(),            indexVersionToCopy, IndexState.REPLICATING);        indexes.add(location);        File newIndex = indexes.getIndexDirectory(indexVersionToCopy);        targetWriter = getIndexWriter(newIndex, true);        targetDirectory = targetWriter.getDirectory();        String[] remoteFiles = datanode.getFileSet(indexVersionToCopy);        // if there is a local version, copy that first as it is quicker        // then get the remote diffs        Set<String> localFiles = new HashSet<String>();        if (primaryVersion != null) {          localFiles = copyIndex(primaryVersion, targetDirectory, newIndex);        }        // copy all the files that are not in the local index        for (String remoteFile : remoteFiles) {          if (!localFiles.contains(remoteFile)              || remoteFile.startsWith("segments")              || remoteFile.startsWith("_")) {            byte[] file = datanode.getFileContent(indexVersionToCopy,                remoteFile);            copyRemoteFile(remoteFile, file, targetDirectory);          }        }        indexes            .setIndexState(location, IndexState.REPLICATING, IndexState.LIVE);      } finally {        close(null, targetDirectory, targetWriter, null);      }    }  }  /**   * Copy a remote file to a target directory.   *    * @param remoteFile the name of the remote file   * @param file the contents of the file   * @param targetDirectory the target directory   * @throws IOException   */  private static void copyRemoteFile(String remoteFile, byte[] file,      Directory targetDirectory) throws IOException {    IndexOutput os = null;    LOG.info("Copying " + remoteFile + " to " + targetDirectory);    try {      // create file in dest directory      os = targetDirectory.createOutput(remoteFile);      // and copy to dest directory      long len = file.length;      long readCount = 0;      while (readCount < len) {        int toRead = readCount + Constants.BUFFER_SIZE > len ? (int) (len - readCount)            : Constants.BUFFER_SIZE;        os.writeBytes(file, toRead);        readCount += toRead;      }    } finally {      if (os != null)        os.close();    }  }  /**   * Commit an index.   *    * @param index the index to be committed   * @return the committed index   * @throws IOException   */  public IndexVersion commitVersion(String index) throws IOException {    IndexVersion iv = indexes.getPrimaryIndex(index).getIndexVersion();    IndexLocation location = indexes.getPrimaryIndex(index);    indexes.setIndexState(location, IndexState.UNCOMMITTED, IndexState.LIVE);    leases.relinquishLease(iv);    return iv;  }  /**   * Get all the indexes stored on this datanode.   *    * @return   */  public IndexLocation[] getIndexes() {    return indexes.getIndexes();  }  /**   * Does this index exist?   *    * @param directory the directory storing the index   * @return does the index exist?   */  private boolean indexExists(File directory) {    return !useRamIndex ? IndexReader.indexExists(directory) : ramIndexes        .containsKey(directory);  }  /**   * Get the IndexWriter for this index.   *    * @param directory the directory storing the index   * @param create create the index or use the existing index   * @return the IndexWriter   * @throws IOException   */  IndexWriter getIndexWriter(File directory, boolean create) throws IOException {    IndexWriter writer = null;    if (!useRamIndex) {      writer = new IndexWriter(directory, analyzer, create);    } else {      if (create) {      ramIndexes.put(directory, new RAMDirectory());      }      RAMDirectory rd = ramIndexes.get(directory);      LOG.debug("Directory is " + directory + " RamDirectory is " + rd);      writer = new IndexWriter(rd, analyzer, create);    }    LOG.debug("IndexWriter is " + writer.toString());    return writer;  }  /**   * Get the IndexReader for this index.   *    * @param directory the directory storing the index   * @return the IndexReader   * @throws IOException   */  IndexReader getIndexReader(File directory) throws IOException {    return !useRamIndex ? IndexReader.open(directory) : IndexReader        .open(ramIndexes.get(directory));  }  /**   * Close index readers, directories, writers or searchers.   *    * @param reader the Lucene IndexReader   * @param directory the Lucene Directory   * @param writer the Lucene IndexWriter   * @param searcher the Lucene IndexSearcher   * @throws IOException   */  private void close(IndexReader reader, Directory directory,      IndexWriter writer, IndexSearcher searcher) throws IOException {    if (directory != null && !useRamIndex)      directory.close();    if (reader != null)      reader.close();    if (writer != null)      writer.close();    if (searcher != null)      searcher.close();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -