📄 datanodeindexhandler.java
字号:
*/ public int removeDocuments(String indexName, Term term) throws IOException { // FIXME - shouldn't this make a new version of an index? // not sure what the return value is, lucene returns int on a reader // but void on a writer IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion(); leases.getLease(iv); int results; IndexReader reader = null; Directory directory = null; Hits hits = null; IndexSearcher searcher = null; try { File dir = indexes.getKnownIndexDirectory(iv); reader = getIndexReader(dir); directory = reader.directory(); searcher = new IndexSearcher(directory); hits = searcher.search(new TermQuery(term)); results = hits.length(); LOG.info("removeDocuments: found " + results + " matching documents"); } finally { close(reader, directory, null, searcher); } // only create a new version of index if some documents will be deleted if (results > 0) { IndexWriter writer = openNewIndexVersion(indexName); Directory file = writer.getDirectory(); close(null, null, writer, null); try { reader = IndexReader.open(file); results = reader.deleteDocuments(term); } finally { close(reader, null, null, null); } } return results; } /** * If needed, open a new version of an index for committing changes. * * @param indexName the index * @return a Lucene IndexWriter * @throws IOException */ IndexWriter openNewIndexVersion(String indexName) throws IOException { IndexWriter writer = null; IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion(); IndexLocation index = indexes.getPrimaryIndex(indexName); if (!index.getState().equals(IndexState.UNCOMMITTED)) { LOG.debug("Creating a new index"); // there is no version of this index already open with uncommitted changes IndexVersion newVersion = iv.nextVersion(); IndexLocation location = new IndexLocation(dataconf.getAddress(), newVersion, IndexState.UNCOMMITTED); if (leases.getLease(newVersion)) { File directory = indexes.getIndexDirectory(newVersion); LOG.debug("Destination directory is " + newVersion); writer = getIndexWriter(directory, true); Directory targetDirectory = null; try { // there are no uncommitted changes - make a new version targetDirectory = writer.getDirectory(); copyIndex(iv, targetDirectory); indexes.add(location); LOG.debug("Copied from " + iv + " to " + targetDirectory); iv = newVersion; } finally { close(null, targetDirectory, writer, null); } } } else { LOG.debug("Index already exists"); } if (leases.getLease(iv)) { return getIndexWriter(indexes.getIndexDirectory(iv), false); } throw new IOException("Could not open index " + indexName); } IndexVersion getNewIndexVersion(String indexName) { IndexVersion iv = indexes.getPrimaryIndex(indexName).getIndexVersion(); IndexLocation index = indexes.getPrimaryIndex(indexName); if (!index.getState().equals(IndexState.UNCOMMITTED)) { LOG.debug("Creating a new index"); // there is no version of this index already open with uncommitted changes iv = iv.nextVersion(); } else { LOG.debug("Index already exists"); } return iv; } /** * Get the directory of an index. * * @param index the index. * @return the directory */ File getIndexDirectory(IndexVersion index) { return indexes.getIndexDirectory(index); } /** * Copy an index to a target directory. * * @param indexVersion the index * @param targetDirectory the target directory * @param newIndex * @return the files to be copied * @throws IOException */ Set<String> copyIndex(IndexVersion indexVersion, Directory targetDirectory, File newIndex) throws IOException { Set<String> localFiles = new HashSet<String>(); IndexReader reader = null; copyIndex(indexVersion, targetDirectory); try { reader = getIndexReader(newIndex); Directory directory = reader.directory(); for (String localFile : directory.list()) { localFiles.add(localFile); } } finally { close(reader, null, null, null); } return localFiles; } /** * Copy an index from a location to a target directory. * * @param indexVersion the index * @param targetDirectory the target directory * @throws IOException */ private void copyIndex(IndexVersion indexVersion, Directory targetDirectory) throws IOException { File source = indexes.getIndexDirectory(indexVersion); LOG.info("Source is " + source.getAbsolutePath()); Directory sourceDirectory = null; IndexReader sourceReader = null; try { sourceReader = getIndexReader(source); LOG.debug("Copying index - there are " + sourceReader.numDocs() + " documents in source index"); sourceDirectory = sourceReader.directory(); Directory.copy(sourceDirectory, targetDirectory, false); LOG.debug("Copied from " + indexVersion + " to " + targetDirectory); String[] files = targetDirectory.list(); for (String s : files) { LOG.debug("Copied " + s); } } finally { close(sourceReader, sourceDirectory, null, null); } } /** * Copy a remote index to this machine. If a older local copy exists, take * advantage of it to minimize network traffic. * * @param indexToCopy the index to copy * @throws IOException */ public void copyRemoteIndex(IndexLocation indexToCopy) throws IOException { if (indexes.hasIndex(indexToCopy.getIndexVersion())) { // node already has index! LOG.info("Data node is already replicating " + indexToCopy.toString() + "\n"); } else { DataNodeToDataNodeProtocol datanode = (DataNodeToDataNodeProtocol) RPC .waitForProxy(DataNodeToDataNodeProtocol.class, DataNodeToDataNodeProtocol.VERSION_ID, indexToCopy.getAddress(), conf); Directory targetDirectory = null; IndexWriter targetWriter = null; IndexVersion indexVersionToCopy = indexToCopy.getIndexVersion(); String sourceId = indexVersionToCopy.getName(); IndexVersion primaryVersion = null; if (indexes.hasIndex(sourceId)) { primaryVersion = indexes.getPrimaryIndex(sourceId).getIndexVersion(); } try { IndexLocation location = new IndexLocation(dataconf.getAddress(), indexVersionToCopy, IndexState.REPLICATING); indexes.add(location); File newIndex = indexes.getIndexDirectory(indexVersionToCopy); targetWriter = getIndexWriter(newIndex, true); targetDirectory = targetWriter.getDirectory(); String[] remoteFiles = datanode.getFileSet(indexVersionToCopy); // if there is a local version, copy that first as it is quicker // then get the remote diffs Set<String> localFiles = new HashSet<String>(); if (primaryVersion != null) { localFiles = copyIndex(primaryVersion, targetDirectory, newIndex); } // copy all the files that are not in the local index for (String remoteFile : remoteFiles) { if (!localFiles.contains(remoteFile) || remoteFile.startsWith("segments") || remoteFile.startsWith("_")) { byte[] file = datanode.getFileContent(indexVersionToCopy, remoteFile); copyRemoteFile(remoteFile, file, targetDirectory); } } indexes .setIndexState(location, IndexState.REPLICATING, IndexState.LIVE); } finally { close(null, targetDirectory, targetWriter, null); } } } /** * Copy a remote file to a target directory. * * @param remoteFile the name of the remote file * @param file the contents of the file * @param targetDirectory the target directory * @throws IOException */ private static void copyRemoteFile(String remoteFile, byte[] file, Directory targetDirectory) throws IOException { IndexOutput os = null; LOG.info("Copying " + remoteFile + " to " + targetDirectory); try { // create file in dest directory os = targetDirectory.createOutput(remoteFile); // and copy to dest directory long len = file.length; long readCount = 0; while (readCount < len) { int toRead = readCount + Constants.BUFFER_SIZE > len ? (int) (len - readCount) : Constants.BUFFER_SIZE; os.writeBytes(file, toRead); readCount += toRead; } } finally { if (os != null) os.close(); } } /** * Commit an index. * * @param index the index to be committed * @return the committed index * @throws IOException */ public IndexVersion commitVersion(String index) throws IOException { IndexVersion iv = indexes.getPrimaryIndex(index).getIndexVersion(); IndexLocation location = indexes.getPrimaryIndex(index); indexes.setIndexState(location, IndexState.UNCOMMITTED, IndexState.LIVE); leases.relinquishLease(iv); return iv; } /** * Get all the indexes stored on this datanode. * * @return */ public IndexLocation[] getIndexes() { return indexes.getIndexes(); } /** * Does this index exist? * * @param directory the directory storing the index * @return does the index exist? */ private boolean indexExists(File directory) { return !useRamIndex ? IndexReader.indexExists(directory) : ramIndexes .containsKey(directory); } /** * Get the IndexWriter for this index. * * @param directory the directory storing the index * @param create create the index or use the existing index * @return the IndexWriter * @throws IOException */ IndexWriter getIndexWriter(File directory, boolean create) throws IOException { IndexWriter writer = null; if (!useRamIndex) { writer = new IndexWriter(directory, analyzer, create); } else { if (create) { ramIndexes.put(directory, new RAMDirectory()); } RAMDirectory rd = ramIndexes.get(directory); LOG.debug("Directory is " + directory + " RamDirectory is " + rd); writer = new IndexWriter(rd, analyzer, create); } LOG.debug("IndexWriter is " + writer.toString()); return writer; } /** * Get the IndexReader for this index. * * @param directory the directory storing the index * @return the IndexReader * @throws IOException */ IndexReader getIndexReader(File directory) throws IOException { return !useRamIndex ? IndexReader.open(directory) : IndexReader .open(ramIndexes.get(directory)); } /** * Close index readers, directories, writers or searchers. * * @param reader the Lucene IndexReader * @param directory the Lucene Directory * @param writer the Lucene IndexWriter * @param searcher the Lucene IndexSearcher * @throws IOException */ private void close(IndexReader reader, Directory directory, IndexWriter writer, IndexSearcher searcher) throws IOException { if (directory != null && !useRamIndex) directory.close(); if (reader != null) reader.close(); if (writer != null) writer.close(); if (searcher != null) searcher.close(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -