📄 mapfile.java
字号:
throw new IOException("key out of order: "+k+" after "+lastKey); lastKey = k; if (skip > 0) { skip--; continue; // skip this entry } else { skip = INDEX_SKIP; // reset skip } if (count == keys.length) { // time to grow arrays int newLength = (keys.length*3)/2; WritableComparable[] newKeys = new WritableComparable[newLength]; long[] newPositions = new long[newLength]; System.arraycopy(keys, 0, newKeys, 0, count); System.arraycopy(positions, 0, newPositions, 0, count); keys = newKeys; positions = newPositions; } keys[count] = k; positions[count] = position.get(); count++; } } catch (EOFException e) { SequenceFile.LOG.warn("Unexpected EOF reading " + index + " at entry #" + count + ". Ignoring."); } finally { indexClosed = true; index.close(); } } /** Re-positions the reader before its first key. */ public synchronized void reset() throws IOException { data.seek(firstPosition); } /** Reads the final key from the file. * * @param key key to read into */ public synchronized void finalKey(WritableComparable key) throws IOException { long originalPosition = data.getPosition(); // save position try { readIndex(); // make sure index is valid if (count > 0) { data.seek(positions[count-1]); // skip to last indexed entry } else { reset(); // start at the beginning } while (data.next(key)) {} // scan to eof } finally { data.seek(originalPosition); // restore position } } /** Positions the reader at the named key, or if none such exists, at the * first entry after the named key. Returns true iff the named key exists * in this map. */ public synchronized boolean seek(WritableComparable key) throws IOException { readIndex(); // make sure index is read if (seekIndex != -1 // seeked before && seekIndex+1 < count && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed && comparator.compare(key, nextKey) >= 0) { // but after last seeked // do nothing } else { seekIndex = binarySearch(key); if (seekIndex < 0) // decode insertion point seekIndex = -seekIndex-2; if (seekIndex == -1) // belongs before first entry seekPosition = firstPosition; // use beginning of file else seekPosition = positions[seekIndex]; // else use index } data.seek(seekPosition); if (nextKey == null) nextKey = comparator.newKey(); while (data.next(nextKey)) { int c = comparator.compare(key, nextKey); if (c <= 0) { // at or beyond desired return c == 0; } } return false; } private int binarySearch(WritableComparable key) { int low = 0; int high = count-1; while (low <= high) { int mid = (low + high) >> 1; WritableComparable midVal = keys[mid]; int cmp = comparator.compare(midVal, key); if (cmp < 0) low = mid + 1; else if (cmp > 0) high = mid - 1; else return mid; // key found } return -(low + 1); // key not found. } /** Read the next key/value pair in the map into <code>key</code> and * <code>val</code>. Returns true if such a pair exists and false when at * the end of the map */ public synchronized boolean next(WritableComparable key, Writable val) throws IOException { return data.next(key, val); } /** Return the value for the named key, or null if none exists. */ public synchronized Writable get(WritableComparable key, Writable val) throws IOException { if (seek(key)) { data.getCurrentValue(val); return val; } else return null; } /** Close the map. */ public synchronized void close() throws IOException { if (! indexClosed) { index.close(); } data.close(); } } /** Renames an existing map directory. */ public static void rename(FileSystem fs, String oldName, String newName) throws IOException { Path oldDir = new Path(oldName); Path newDir = new Path(newName); if (!fs.rename(oldDir, newDir)) { throw new IOException("Could not rename " + oldDir + " to " + newDir); } } /** Deletes the named map file. */ public static void delete(FileSystem fs, String name) throws IOException { Path dir = new Path(name); Path data = new Path(dir, DATA_FILE_NAME); Path index = new Path(dir, INDEX_FILE_NAME); fs.delete(data); fs.delete(index); fs.delete(dir); } /** * This method attempts to fix a corrupt MapFile by re-creating its index. * @param fs filesystem * @param dir directory containing the MapFile data and index * @param keyClass key class (has to be a subclass of Writable) * @param valueClass value class (has to be a subclass of Writable) * @param dryrun do not perform any changes, just report what needs to be done * @return number of valid entries in this MapFile, or -1 if no fixing was needed * @throws Exception */ public static long fix(FileSystem fs, Path dir, Class keyClass, Class valueClass, boolean dryrun, Configuration conf) throws Exception { String dr = (dryrun ? "[DRY RUN ] " : ""); Path data = new Path(dir, DATA_FILE_NAME); Path index = new Path(dir, INDEX_FILE_NAME); int indexInterval = 128; if (!fs.exists(data)) { // there's nothing we can do to fix this! throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this."); } if (fs.exists(index)) { // no fixing needed return -1; } SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf); if (!dataReader.getKeyClass().equals(keyClass)) { throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + ", got " + dataReader.getKeyClass().getName()); } if (!dataReader.getValueClass().equals(valueClass)) { throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + ", got " + dataReader.getValueClass().getName()); } long cnt = 0L; Writable key = (Writable)keyClass.getConstructor(new Class[0]).newInstance(new Object[0]); Writable value = (Writable)valueClass.getConstructor(new Class[0]).newInstance(new Object[0]); SequenceFile.Writer indexWriter = null; if (!dryrun) indexWriter = new SequenceFile.Writer(fs, index, keyClass, LongWritable.class); try { long pos = 0L; LongWritable position = new LongWritable(); while(dataReader.next(key, value)) { cnt++; if (cnt % indexInterval == 0) { position.set(pos); if (!dryrun) indexWriter.append(key, position); } pos = dataReader.getPosition(); } } catch(Throwable t) { // truncated data file. swallow it. } dataReader.close(); if (!dryrun) indexWriter.close(); return cnt; } public static void main(String[] args) throws Exception { String usage = "Usage: MapFile inFile outFile"; if (args.length != 2) { System.err.println(usage); System.exit(-1); } String in = args[0]; String out = args[1]; Configuration conf = new Configuration(); int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096); FileSystem fs = new LocalFileSystem(conf); MapFile.Reader reader = new MapFile.Reader(fs, in, conf); MapFile.Writer writer = new MapFile.Writer(fs, out, reader.getKeyClass(), reader.getValueClass()); WritableComparable key = (WritableComparable)reader.getKeyClass().newInstance(); Writable value = (Writable)reader.getValueClass().newInstance(); while (reader.next(key, value)) // copy all entries writer.append(key, value); writer.close(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -