📄 datanodeindexhandler.java
字号:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.contrib.dlucene.data;import java.io.File;import java.io.IOException;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.contrib.dlucene.Constants;import org.apache.hadoop.contrib.dlucene.DataNode;import org.apache.hadoop.contrib.dlucene.DataNodeConfiguration;import org.apache.hadoop.contrib.dlucene.DataNodeToDataNodeProtocol;import org.apache.hadoop.contrib.dlucene.DataNodeToNameNodeProtocol;import org.apache.hadoop.contrib.dlucene.IndexLocation;import org.apache.hadoop.contrib.dlucene.IndexState;import org.apache.hadoop.contrib.dlucene.IndexVersion;import org.apache.hadoop.contrib.dlucene.Lease;import org.apache.hadoop.contrib.dlucene.Utils;import org.apache.hadoop.contrib.dlucene.writable.SearchResults;import org.apache.hadoop.ipc.RPC;import org.apache.lucene.analysis.Analyzer;import org.apache.lucene.document.Document;import org.apache.lucene.index.IndexReader;import org.apache.lucene.index.IndexWriter;import org.apache.lucene.index.Term;import org.apache.lucene.search.Hits;import org.apache.lucene.search.IndexSearcher;import org.apache.lucene.search.Query;import org.apache.lucene.search.Sort;import org.apache.lucene.search.TermQuery;import org.apache.lucene.store.Directory;import org.apache.lucene.store.IndexInput;import org.apache.lucene.store.IndexOutput;import org.apache.lucene.store.RAMDirectory;/** * This class manages Lucene indexes on a {@link DataNode}. */public class DataNodeIndexHandler { /** The indexes. */ private DataNodeIndexes indexes = null; /** Log file for this node. */ private static final Log LOG = LogFactory .getLog("org.apache.hadoop.dlucene.data.DataNodeLuceneHandler"); /** The Lucene analyzer to use. */ private Analyzer analyzer = null; /** Hadoop configuration file. */ private Configuration conf = null; /** The DataNode configuration. */ private DataNodeConfiguration dataconf = null; /** Structure holding the ram indexes. */ private static Map<File, RAMDirectory> ramIndexes = null; /** Use the ram index or the filesystem? */ private boolean useRamIndex ; /** The lease manager. */ private DataNodeLeaseManager leases = null; /** * Constructor. * * @param dataNodeConfiguration the datanode configuration * @param configuration the Hadoop configuration * @param analyzer Lucene analzyer * @param useRamIndex use RAM based Lucene index? * @throws IOException */ public DataNodeIndexHandler(DataNodeConfiguration dataNodeConfiguration, Configuration configuration, Analyzer analyzer, boolean useRamIndex, DataNodeToNameNodeProtocol namenode) throws IOException { this.conf = configuration; this.analyzer = analyzer; this.dataconf = dataNodeConfiguration; if (ramIndexes == null && useRamIndex) { ramIndexes = new HashMap<File, RAMDirectory>(); } this.useRamIndex = useRamIndex; indexes = new DataNodeIndexes(dataNodeConfiguration); leases = new DataNodeLeaseManager(namenode, dataconf.getAddress()); // the directory structure is we have root/id/version String[] files = dataNodeConfiguration.getRootDir().list(); if (files != null && !useRamIndex) { for (String indexName : files) { File index = new File(dataNodeConfiguration.getRootDir(), indexName); Utils.checkDirectoryIsReadableWritable(index); for (String version : index.list()) { File versionDirectory = new File(index, version); if (indexExists(versionDirectory)) { Utils.checkDirectoryIsReadableWritable(versionDirectory); if (version.startsWith(Constants.VERSION_PREFIX)) { int versionNumber = Integer.parseInt(version .substring(Constants.VERSION_PREFIX.length())); IndexLocation location = new IndexLocation(dataNodeConfiguration .getAddress(), new IndexVersion(indexName, versionNumber), IndexState.LIVE); indexes.add(location); } else { throw new IOException("Index directory " + version + " does not begin with version number"); } } } } } } public Lease[] getLeases() { return leases.getLeases(); } /** * Get all the files used in a specific index. * * @param indexVersion the index * @return all the files used in the index * @throws IOException */ public String[] getFileSet(IndexVersion indexVersion) throws IOException { IndexReader reader = null; Directory directory = null; String[] results = null; try { File dir = indexes.getKnownIndexDirectory(indexVersion); reader = getIndexReader(dir); directory = reader.directory(); results = directory.list(); } finally { close(reader, directory, null, null); } return results; } /** * Get a particular file from a specific index. * * @param indexVersion the index * @param file the file * @return the data in the file * @throws IOException */ public byte[] getFileContent(IndexVersion indexVersion, String file) throws IOException { IndexReader reader = null; Directory directory = null; IndexInput is = null; // FIXME Is there a problem here, because the max size of buf is an int ? byte[] buf = null; try { File dir = indexes.getKnownIndexDirectory(indexVersion); reader = getIndexReader(dir); directory = reader.directory(); try { // read current file is = directory.openInput(file); int len = (int) is.length(); // FIXME - ugly cast here buf = new byte[len]; is.readBytes(buf, 0, len); } finally { if (is != null) is.close(); } } finally { close(reader, directory, null, null); } return buf; } /** * Add a document to an index. * * @param indexName The index * @param document The document * @throws IOException */ public void addDocument(String indexName, Document document) throws IOException { if (indexes.hasIndex(indexName)) { IndexWriter writer = null; try { writer = openNewIndexVersion(indexName); LOG.debug("Before adding document " + indexName + " contains " + writer.docCount() + " documents."); writer.addDocument(document); LOG.debug("After adding document " + indexName + " contains " + writer.docCount() + " documents."); } finally { close(null, null, writer, null); } } else { throw new IOException("Datanode does not have index " + indexName); } } /** * Create an index on the datanode. * * @param indexName the name of the index * @return the new index * @throws IOException */ public IndexVersion createIndex(String indexName) throws IOException { IndexWriter targetWriter = null; IndexVersion sourceIndexVersion = null; if (indexes.hasIndex(indexName)) { throw new IOException("Index " + indexName + " already exists"); } try { sourceIndexVersion = new IndexVersion(indexName); IndexLocation location = new IndexLocation(dataconf.getAddress(), sourceIndexVersion, IndexState.LIVE); indexes.add(location); File newIndex = indexes.getIndexDirectory(sourceIndexVersion); targetWriter = getIndexWriter(newIndex, true); LOG.info("Created index " + location); } finally { close(null, null, targetWriter, null); } return sourceIndexVersion; } /** * Add an existing remote index to an index on this datanode. * * @param indexName destination * @param source source remote index * @throws IOException */ public void addIndex(String indexName, IndexLocation source) throws IOException { if (indexes.hasIndex(indexName)) { // first replicate the index locally copyRemoteIndex(source); IndexReader reader = null; IndexWriter writer = null; try { writer = openNewIndexVersion(indexName); File dir = indexes.getKnownIndexDirectory(source.getIndexVersion()); reader = getIndexReader(dir); IndexReader[] readers = new IndexReader[1]; readers[0] = reader; writer.addIndexes(readers); } finally { close(reader, null, writer, null); } } else { throw new IOException("DataNode does not have index " + indexName); } } /** * Perform a query on an index. * * @param indexVersion the index * @param query the query * @param sort how to sort the results * @param n the number of results * @return the query results * @throws IOException */ public SearchResults search(IndexVersion indexVersion, Query query, Sort sort, int n) throws IOException { IndexReader reader = null; Directory directory = null; SearchResults hits = null; IndexSearcher searcher = null; try { File dir = indexes.getKnownIndexDirectory(indexVersion); reader = getIndexReader(dir); directory = reader.directory(); searcher = new IndexSearcher(directory); // FIXME - for now we throw away n, not sure how to use it // and still return hits with Lucene API Hits h1 = searcher.search(query, sort); LOG.info("Found " + h1.length() + " search results"); hits = new SearchResults(h1); } finally { close(reader, directory, null, searcher); } return hits; } public int size(String index) throws IOException { return size(indexes.getPrimaryIndex(index).getIndexVersion()); } public int size(IndexVersion indexVersion) throws IOException { IndexReader reader = null; Directory directory = null; int num = 0; try { File dir = indexes.getKnownIndexDirectory(indexVersion); reader = getIndexReader(dir); num = reader.numDocs(); } finally { close(reader, directory, null, null); } return num; } // FIXME I changed this from returning int[] to return int // because that was closer to the Lucene API /** * Remove documents from this index. * * @param indexName the index * @param term A Lucene Term used to identify the documents to remove * @return the number of documents remove * @throws IOException
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -