⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mapfile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            throw new IOException("key out of order: "+k+" after "+lastKey);          lastKey = k;                    if (skip > 0) {            skip--;            continue;                             // skip this entry          } else {            skip = INDEX_SKIP;                    // reset skip          }          if (count == keys.length) {                // time to grow arrays            int newLength = (keys.length*3)/2;            WritableComparable[] newKeys = new WritableComparable[newLength];            long[] newPositions = new long[newLength];            System.arraycopy(keys, 0, newKeys, 0, count);            System.arraycopy(positions, 0, newPositions, 0, count);            keys = newKeys;            positions = newPositions;          }          keys[count] = k;          positions[count] = position.get();          count++;        }      } catch (EOFException e) {        SequenceFile.LOG.warn("Unexpected EOF reading " + index +                                 " at entry #" + count + ".  Ignoring.");      } finally {	indexClosed = true;        index.close();      }    }    /** Re-positions the reader before its first key. */    public synchronized void reset() throws IOException {      data.seek(firstPosition);    }    /** Reads the final key from the file.     *     * @param key key to read into     */    public synchronized void finalKey(WritableComparable key)      throws IOException {      long originalPosition = data.getPosition(); // save position      try {        readIndex();                              // make sure index is valid        if (count > 0) {          data.seek(positions[count-1]);          // skip to last indexed entry        } else {          reset();                                // start at the beginning        }        while (data.next(key)) {}                 // scan to eof      } finally {        data.seek(originalPosition);              // restore position      }    }    /** Positions the reader at the named key, or if none such exists, at the     * first entry after the named key.  Returns true iff the named key exists     * in this map.     */    public synchronized boolean seek(WritableComparable key)      throws IOException {      readIndex();                                // make sure index is read      if (seekIndex != -1                         // seeked before          && seekIndex+1 < count                     && comparator.compare(key,keys[seekIndex+1])<0 // before next indexed          && comparator.compare(key, nextKey)          >= 0) {                                 // but after last seeked        // do nothing      } else {        seekIndex = binarySearch(key);        if (seekIndex < 0)                        // decode insertion point          seekIndex = -seekIndex-2;        if (seekIndex == -1)                      // belongs before first entry          seekPosition = firstPosition;           // use beginning of file        else          seekPosition = positions[seekIndex];    // else use index      }      data.seek(seekPosition);            if (nextKey == null)        nextKey = comparator.newKey();      while (data.next(nextKey)) {        int c = comparator.compare(key, nextKey);        if (c <= 0) {                             // at or beyond desired          return c == 0;        }      }      return false;    }    private int binarySearch(WritableComparable key) {      int low = 0;      int high = count-1;      while (low <= high) {        int mid = (low + high) >> 1;        WritableComparable midVal = keys[mid];        int cmp = comparator.compare(midVal, key);        if (cmp < 0)          low = mid + 1;        else if (cmp > 0)          high = mid - 1;        else          return mid;                             // key found      }      return -(low + 1);                          // key not found.    }    /** Read the next key/value pair in the map into <code>key</code> and     * <code>val</code>.  Returns true if such a pair exists and false when at     * the end of the map */    public synchronized boolean next(WritableComparable key, Writable val)      throws IOException {      return data.next(key, val);    }    /** Return the value for the named key, or null if none exists. */    public synchronized Writable get(WritableComparable key, Writable val)      throws IOException {      if (seek(key)) {        data.getCurrentValue(val);        return val;      } else        return null;    }    /** Close the map. */    public synchronized void close() throws IOException {      if (! indexClosed) {	index.close();      }      data.close();    }  }  /** Renames an existing map directory. */  public static void rename(FileSystem fs, String oldName, String newName)    throws IOException {    Path oldDir = new Path(oldName);    Path newDir = new Path(newName);    if (!fs.rename(oldDir, newDir)) {      throw new IOException("Could not rename " + oldDir + " to " + newDir);    }  }  /** Deletes the named map file. */  public static void delete(FileSystem fs, String name) throws IOException {    Path dir = new Path(name);    Path data = new Path(dir, DATA_FILE_NAME);    Path index = new Path(dir, INDEX_FILE_NAME);    fs.delete(data);    fs.delete(index);    fs.delete(dir);  }  /**   * This method attempts to fix a corrupt MapFile by re-creating its index.   * @param fs filesystem   * @param dir directory containing the MapFile data and index   * @param keyClass key class (has to be a subclass of Writable)   * @param valueClass value class (has to be a subclass of Writable)   * @param dryrun do not perform any changes, just report what needs to be done   * @return number of valid entries in this MapFile, or -1 if no fixing was needed   * @throws Exception   */  public static long fix(FileSystem fs, Path dir,          Class keyClass, Class valueClass, boolean dryrun, Configuration conf) throws Exception {    String dr = (dryrun ? "[DRY RUN ] " : "");    Path data = new Path(dir, DATA_FILE_NAME);    Path index = new Path(dir, INDEX_FILE_NAME);    int indexInterval = 128;    if (!fs.exists(data)) {      // there's nothing we can do to fix this!      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");    }    if (fs.exists(index)) {      // no fixing needed      return -1;    }    SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);    if (!dataReader.getKeyClass().equals(keyClass)) {      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +              ", got " + dataReader.getKeyClass().getName());    }    if (!dataReader.getValueClass().equals(valueClass)) {      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +              ", got " + dataReader.getValueClass().getName());    }    long cnt = 0L;    Writable key = (Writable)keyClass.getConstructor(new Class[0]).newInstance(new Object[0]);    Writable value = (Writable)valueClass.getConstructor(new Class[0]).newInstance(new Object[0]);    SequenceFile.Writer indexWriter = null;    if (!dryrun) indexWriter = new SequenceFile.Writer(fs, index, keyClass, LongWritable.class);    try {      long pos = 0L;      LongWritable position = new LongWritable();      while(dataReader.next(key, value)) {        cnt++;        if (cnt % indexInterval == 0) {          position.set(pos);          if (!dryrun) indexWriter.append(key, position);        }        pos = dataReader.getPosition();      }    } catch(Throwable t) {      // truncated data file. swallow it.    }    dataReader.close();    if (!dryrun) indexWriter.close();    return cnt;  }  public static void main(String[] args) throws Exception {    String usage = "Usage: MapFile inFile outFile";          if (args.length != 2) {      System.err.println(usage);      System.exit(-1);    }          String in = args[0];    String out = args[1];    Configuration conf = new Configuration();    int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);    FileSystem fs = new LocalFileSystem(conf);    MapFile.Reader reader = new MapFile.Reader(fs, in, conf);    MapFile.Writer writer =      new MapFile.Writer(fs, out, reader.getKeyClass(), reader.getValueClass());    WritableComparable key =      (WritableComparable)reader.getKeyClass().newInstance();    Writable value = (Writable)reader.getValueClass().newInstance();    while (reader.next(key, value))               // copy all entries      writer.append(key, value);    writer.close();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -