📄 mapfile.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.io.SequenceFile.CompressionType;/** A file-based map from keys to values. * * <p>A map is a directory containing two files, the <code>data</code> file, * containing all keys and values in the map, and a smaller <code>index</code> * file, containing a fraction of the keys. The fraction is determined by * {@link Writer#getIndexInterval()}. * * <p>The index file is read entirely into memory. Thus key implementations * should try to keep themselves small. * * <p>Map files are created by adding entries in-order. To maintain a large * database, perform updates by copying the previous version of a database and * merging in a sorted change list, to create a new version of the database in * a new file. Sorting large change lists can be done with {@link * SequenceFile.Sorter}. */public class MapFile { /** The name of the index file. */ public static final String INDEX_FILE_NAME = "index"; /** The name of the data file. */ public static final String DATA_FILE_NAME = "data"; protected MapFile() {} // no public ctor /** Writes a new map. */ public static class Writer { private SequenceFile.Writer data; private SequenceFile.Writer index; private int indexInterval = 128; private long size; private LongWritable position = new LongWritable(); // the following fields are used only for checking key order private WritableComparator comparator; private DataInputBuffer inBuf = new DataInputBuffer(); private DataOutputBuffer outBuf = new DataOutputBuffer(); private WritableComparable lastKey; /** Create the named map for keys of the named class. */ public Writer(FileSystem fs, String dirName, Class keyClass, Class valClass) throws IOException { this(fs, dirName, WritableComparator.get(keyClass), valClass, false); } /** Create the named map for keys of the named class. * @deprecated specify a {@link CompressionType} instead */ public Writer(FileSystem fs, String dirName, Class keyClass, Class valClass, boolean compress) throws IOException { this(fs, dirName, WritableComparator.get(keyClass), valClass, compress); } /** Create the named map for keys of the named class. */ public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress) throws IOException { this(conf,fs,dirName,WritableComparator.get(keyClass),valClass,compress); } /** Create the named map using the named key comparator. */ public Writer(FileSystem fs, String dirName, WritableComparator comparator, Class valClass) throws IOException { this(fs, dirName, comparator, valClass, false); } /** Create the named map using the named key comparator. * @deprecated specify a {@link CompressionType} instead */ public Writer(FileSystem fs, String dirName, WritableComparator comparator, Class valClass, boolean compress) throws IOException { this(new Configuration(), fs, dirName, comparator, valClass, compress ? CompressionType.RECORD : CompressionType.NONE); } /** Create the named map using the named key comparator. */ public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress) throws IOException { this.comparator = comparator; this.lastKey = comparator.newKey(); Path dir = new Path(dirName); fs.mkdirs(dir); Path dataFile = new Path(dir, DATA_FILE_NAME); Path indexFile = new Path(dir, INDEX_FILE_NAME); Class keyClass = comparator.getKeyClass(); this.data = SequenceFile.createWriter (fs,conf,dataFile,keyClass,valClass,compress); this.index = SequenceFile.createWriter (fs,conf,indexFile,keyClass,LongWritable.class,CompressionType.BLOCK); } /** The number of entries that are added before an index entry is added.*/ public int getIndexInterval() { return indexInterval; } /** Sets the index interval. * @see #getIndexInterval() */ public void setIndexInterval(int interval) { indexInterval = interval; } /** Close the map. */ public synchronized void close() throws IOException { data.close(); index.close(); } /** Append a key/value pair to the map. The key must be greater or equal * to the previous key added to the map. */ public synchronized void append(WritableComparable key, Writable val) throws IOException { checkKey(key); if (size % indexInterval == 0) { // add an index entry position.set(data.getLength()); // point to current eof index.append(key, position); } data.append(key, val); // append key/value to data size++; } private void checkKey(WritableComparable key) throws IOException { // check that keys are well-ordered if (size != 0 && comparator.compare(lastKey, key) > 0) throw new IOException("key out of order: "+key+" after "+lastKey); // update lastKey with a copy of key by writing and reading outBuf.reset(); key.write(outBuf); // write new key inBuf.reset(outBuf.getData(), outBuf.getLength()); lastKey.readFields(inBuf); // read into lastKey } } /** Provide access to an existing map. */ public static class Reader { /** Number of index entries to skip between each entry. Zero by default. * Setting this to values larger than zero can facilitate opening large map * files using less memory. */ private int INDEX_SKIP = 0; private WritableComparator comparator; private WritableComparable nextKey; private long seekPosition = -1; private int seekIndex = -1; private long firstPosition; private WritableComparable getKey; // the data, on disk private SequenceFile.Reader data; private SequenceFile.Reader index; // whether the index Reader was closed private boolean indexClosed = false; // the index, in memory private int count = -1; private WritableComparable[] keys; private long[] positions; /** Returns the class of keys in this file. */ public Class getKeyClass() { return data.getKeyClass(); } /** Returns the class of values in this file. */ public Class getValueClass() { return data.getValueClass(); } /** Construct a map reader for the named map.*/ public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException { this(fs, dirName, null, conf); INDEX_SKIP = conf.getInt("io.map.index.skip", 0); } /** Construct a map reader for the named map using the named comparator.*/ public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf) throws IOException { Path dir = new Path(dirName); Path dataFile = new Path(dir, DATA_FILE_NAME); Path indexFile = new Path(dir, INDEX_FILE_NAME); // open the data this.data = new SequenceFile.Reader(fs, dataFile, conf); this.firstPosition = data.getPosition(); if (comparator == null) this.comparator = WritableComparator.get(data.getKeyClass()); else this.comparator = comparator; this.getKey = this.comparator.newKey(); // open the index this.index = new SequenceFile.Reader(fs, indexFile, conf); } private void readIndex() throws IOException { // read the index entirely into memory if (this.keys != null) return; this.count = 0; this.keys = new WritableComparable[1024]; this.positions = new long[1024]; try { int skip = INDEX_SKIP; LongWritable position = new LongWritable(); WritableComparable lastKey = null; while (true) { WritableComparable k = comparator.newKey(); if (!index.next(k, position)) break; // check order to make sure comparator is compatible if (lastKey != null && comparator.compare(lastKey, k) > 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -