⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        ((Configurable) val).setConf(this.conf);      }      // Position stream to 'current' value      seekToCurrentValue();      if (!blockCompressed) {        val.readFields(valIn);                if (valIn.read() > 0) {          LOG.info("available bytes: " + valIn.available());          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)              + " bytes, should read " +              (valBuffer.getLength()-keyLength));        }      } else {        // Get the value        int valLength = WritableUtils.readVInt(valLenIn);        val.readFields(valIn);                // Read another compressed 'value'        --noBufferedValues;                // Sanity check        if (valLength < 0) {          LOG.debug(val + " is a zero-length value");        }      }    }        /** Read the next key in the file into <code>key</code>, skipping its     * value.  True if another entry exists, and false at end of file. */    public synchronized boolean next(Writable key) throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key.getClass().getName()            +" is not "+keyClass);      if (!blockCompressed) {        outBuf.reset();                keyLength = next(outBuf);        if (keyLength < 0)          return false;                valBuffer.reset(outBuf.getData(), outBuf.getLength());                key.readFields(valBuffer);        valBuffer.mark(0);        if (valBuffer.getPosition() != keyLength)          throw new IOException(key + " read " + valBuffer.getPosition()              + " bytes, should read " + keyLength);      } else {        //Reset syncSeen        syncSeen = false;                if (noBufferedKeys == 0) {          try {            readBlock();          } catch (EOFException eof) {            return false;          }        }                int keyLength = WritableUtils.readVInt(keyLenIn);                // Sanity check        if (keyLength < 0) {          return false;        }                //Read another compressed 'key'        key.readFields(keyIn);        --noBufferedKeys;      }      return true;    }    /** Read the next key/value pair in the file into <code>key</code> and     * <code>val</code>.  Returns true if such a pair exists and false when at     * end of file */    public synchronized boolean next(Writable key, Writable val)      throws IOException {      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val+" is not "+valClass);      boolean more = next(key);            if (more) {        getCurrentValue(val);      }      return more;    }        private synchronized int checkAndReadSync(int length)     throws IOException {      if (version > 1 && sync != null &&          length == SYNC_ESCAPE) {              // process a sync entry        //LOG.info("sync@"+in.getPos());        in.readFully(syncCheck);                // read syncCheck        if (!Arrays.equals(sync, syncCheck))    // check it          throw new IOException("File is corrupt!");        syncSeen = true;        length = in.readInt();                  // re-read length      } else {        syncSeen = false;      }            return length;    }        /** Read the next key/value pair in the file into <code>buffer</code>.     * Returns the length of the key read, or -1 if at end of file.  The length     * of the value may be computed by calling buffer.getLength() before and     * after calls to this method. */    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */    public synchronized int next(DataOutputBuffer buffer) throws IOException {      // Unsupported for block-compressed sequence files      if (blockCompressed) {        throw new IOException("Unsupported call for block-compressed" +            " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");      }      if (in.getPos() >= end)        return -1;      try {        int length = checkAndReadSync(in.readInt());        int keyLength = in.readInt();        buffer.write(in, length);        return keyLength;      } catch (ChecksumException e) {             // checksum failure        handleChecksumException(e);        return next(buffer);      }    }        public ValueBytes createValueBytes() {      ValueBytes val = null;      if (!decompress || blockCompressed) {        val = new UncompressedBytes();      } else {        val = new CompressedBytes(codec);      }      return val;    }    /**     * Read 'raw' records.     * @param key - The buffer into which the key is read     * @param val - The 'raw' value     * @return Returns the total record length     * @throws IOException     */    public int nextRaw(DataOutputBuffer key, ValueBytes val)     throws IOException {      if (!blockCompressed) {        if (in.getPos() >= end)           return -1;        int length = checkAndReadSync(in.readInt());        int keyLength = in.readInt();        int valLength = length - keyLength;        key.write(in, keyLength);        if (decompress) {          CompressedBytes value = (CompressedBytes)val;          value.reset(in, valLength);        } else {          UncompressedBytes value = (UncompressedBytes)val;          value.reset(in, valLength);        }                return length;      } else {        //Reset syncSeen        syncSeen = false;                // Read 'key'        if (noBufferedKeys == 0) {          if (in.getPos() >= end)             return -1;          try {             readBlock();          } catch (EOFException eof) {            return -1;          }        }        int keyLength = WritableUtils.readVInt(keyLenIn);        if (keyLength < 0) {          throw new IOException("zero length key found!");        }        key.write(keyIn, keyLength);        --noBufferedKeys;                // Read raw 'value'        seekToCurrentValue();        int valLength = WritableUtils.readVInt(valLenIn);        UncompressedBytes rawValue = (UncompressedBytes)val;        rawValue.reset(valIn, valLength);        --noBufferedValues;                return (keyLength+valLength);      }          }    private void handleChecksumException(ChecksumException e)      throws IOException {      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));      } else {        throw e;      }    }    /** Set the current byte position in the input file.     *     * <p>The position passed must be a position returned by {@link     * Writer#getLength()} when writing this file.  To seek to an arbitrary     * position, use {@link Reader#sync(long)}.     */    public synchronized void seek(long position) throws IOException {      in.seek(position);      if (blockCompressed) {                      // trigger block read        noBufferedKeys = 0;        valuesDecompressed = true;      }    }    /** Seek to the next sync mark past a given position.*/    public synchronized void sync(long position) throws IOException {      if (position+SYNC_SIZE >= end) {        seek(end);        return;      }      try {        seek(position+4);                         // skip escape        in.readFully(syncCheck);        int syncLen = sync.length;        for (int i = 0; in.getPos() < end; i++) {          int j = 0;          for (; j < syncLen; j++) {            if (sync[j] != syncCheck[(i+j)%syncLen])              break;          }          if (j == syncLen) {            in.seek(in.getPos() - SYNC_SIZE);     // position before sync            return;          }          syncCheck[i%syncLen] = in.readByte();        }      } catch (ChecksumException e) {             // checksum failure        handleChecksumException(e);      }    }    /** Returns true iff the previous call to next passed a sync mark.*/    public boolean syncSeen() { return syncSeen; }    /** Return the current byte position in the input file. */    public synchronized long getPosition() throws IOException {      return in.getPos();    }    /** Returns the name of the file. */    public String toString() {      return file.toString();    }  }  /** Sorts key/value pairs in a sequence-format file.   *   * <p>For best performance, applications should make sure that the {@link   * Writable#readFields(DataInput)} implementation of their keys is   * very efficient.  In particular, it should avoid allocating memory.   */  public static class Sorter {    private WritableComparator comparator;    private Path[] inFiles;                     // when merging or sorting    private Path outFile;    private int memory; // bytes    private int factor; // merged per pass    private FileSystem fs = null;    private Class keyClass;    private Class valClass;    private Configuration conf;    /** Sort and merge files containing the named classes. */    public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {      this(fs, new WritableComparator(keyClass), valClass, conf);    }    /** Sort and merge using an arbitrary {@link WritableComparator}. */    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass,         Configuration conf) {      this.fs = fs;      this.comparator = comparator;      this.keyClass = comparator.getKeyClass();      this.valClass = valClass;      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;      this.factor = conf.getInt("io.sort.factor", 100);      this.conf = conf;    }    /** Set the number of streams to merge at once.*/    public void setFactor(int factor) { this.factor = factor; }    /** Get the number of streams to merge at once.*/    public int getFactor() { return factor; }    /** Set the total amount of buffer memory, in bytes.*/    public void setMemory(int memory) { this.memory = memory; }    /** Get the total amount of buffer memory, in bytes.*/    public int getMemory() { return memory; }    /**      * Perform a file sort from a set of input files into an output file.     * @param inFiles the files to be sorted     * @param outFile the sorted output file     * @param deleteInput should the input files be deleted as they are read?     */    public void sort(Path[] inFiles, Path outFile,                     boolean deleteInput) throws IOException {      if (fs.exists(outFile)) {        throw new IOException("already exists: " + outFile);      }      this.inFiles = inFiles;      this.outFile = outFile;      int segments = sortPass(deleteInput);      int pass = 1;      while (segments > 1) {        segments = mergePass(pass, segments <= factor);        pass++;      }            // Clean up intermediate files      for (int i=0; i < pass; ++i) {        fs.delete(new Path(outFile.toString() + "." + i));        fs.delete(new Path(outFile.toString() + "." + i + ".index"));      }    }    /**     * The backwards compatible interface to sort.     * @param inFile the input file to sort     * @param outFile the sorted output file     */    public void sort(Path inFile, Path outFile) throws IOException {      sort(new Path[]{inFile}, outFile, false);    }        private int sortPass(boolean deleteInput) throws IOException {      LOG.debug("running sort pass");      SortPass sortPass = new SortPass();         // make the SortPass      try {        return sortPass.run(deleteInput);         // run it      } finally {        sortPass.close();                         // close it      }    }    private class SortPass {      private int memoryLimit = memory/4;      private int recordLimit = 1000000;            private DataOutputBuffer rawKeys = new DataOutputBuffer();      private byte[] rawBuffer;      private int[] keyOffsets = new int[1024];      private int[] pointers = new int[keyOffsets.length];      private int[] pointersCopy = new int[keyOffsets.length];      private int[] keyLengths = new int[keyOffsets.length];      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];      

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -