⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    private synchronized void writeBlock() throws IOException {      if (noBufferedRecords > 0) {        // Write 'sync' marker        if (sync != null) {          out.writeInt(SYNC_ESCAPE);          out.write(sync);        }                // No. of records        WritableUtils.writeVInt(out, noBufferedRecords);                // Write 'keys' and lengths        writeBuffer(keyLenBuffer);        writeBuffer(keyBuffer);                // Write 'values' and lengths        writeBuffer(valLenBuffer);        writeBuffer(valBuffer);                // Flush the file-stream        out.flush();                // Reset internal states        keyLenBuffer.reset();        keyBuffer.reset();        valLenBuffer.reset();        valBuffer.reset();        noBufferedRecords = 0;      }          }        /** Close the file. */    public synchronized void close() throws IOException {      if (out != null) {        writeBlock();        out.close();        out = null;      }    }    /** Append a key/value pair. */    public synchronized void append(Writable key, Writable val)      throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key+" is not "+keyClass);      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val+" is not "+valClass);      // Save key/value into respective buffers       int oldKeyLength = keyBuffer.getLength();      key.write(keyBuffer);      int keyLength = keyBuffer.getLength() - oldKeyLength;      if (keyLength == 0)        throw new IOException("zero length keys not allowed: " + key);      WritableUtils.writeVInt(keyLenBuffer, keyLength);      int oldValLength = valBuffer.getLength();      val.write(valBuffer);      int valLength = valBuffer.getLength() - oldValLength;      WritableUtils.writeVInt(valLenBuffer, valLength);            // Added another key/value pair      ++noBufferedRecords;            // Compress and flush?      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();      if (currentBlockSize >= compressionBlockSize) {        writeBlock();      }    }        /** Append a key/value pair. */    public synchronized void appendRaw(        byte[] keyData, int keyOffset, int keyLength,        ValueBytes val        ) throws IOException {            if (keyLength == 0)        throw new IOException("zero length keys not allowed");      UncompressedBytes value = (UncompressedBytes)val;      int valLength = value.getSize();            // Save key/value data in relevant buffers      WritableUtils.writeVInt(keyLenBuffer, keyLength);      keyBuffer.write(keyData, keyOffset, keyLength);      WritableUtils.writeVInt(valLenBuffer, valLength);      val.writeUncompressedBytes(valBuffer);      // Added another key/value pair      ++noBufferedRecords;      // Compress and flush?      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();       if (currentBlockSize >= compressionBlockSize) {        writeBlock();      }    }    } // BlockCompressionWriter    /** Reads key/value pairs from a sequence-format file. */  public static class Reader {    private Path file;    private FSDataInputStream in;    private DataOutputBuffer outBuf = new DataOutputBuffer();    private byte version;    private Class keyClass;    private Class valClass;    private CompressionCodec codec = null;        private byte[] sync = new byte[SYNC_HASH_SIZE];    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];    private boolean syncSeen;    private long end;    private int keyLength;    private boolean decompress;    private boolean blockCompressed;        private Configuration conf;    private int noBufferedRecords = 0;    private boolean lazyDecompress = true;    private boolean valuesDecompressed = true;        private int noBufferedKeys = 0;    private int noBufferedValues = 0;        private DataInputBuffer keyLenBuffer = null;    private CompressionInputStream keyLenInFilter = null;    private DataInputStream keyLenIn = null;    private DataInputBuffer keyBuffer = null;    private CompressionInputStream keyInFilter = null;    private DataInputStream keyIn = null;    private DataInputBuffer valLenBuffer = null;    private CompressionInputStream valLenInFilter = null;    private DataInputStream valLenIn = null;    private DataInputBuffer valBuffer = null;    private CompressionInputStream valInFilter = null;    private DataInputStream valIn = null;    /** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/    public Reader(FileSystem fs, String file, Configuration conf)      throws IOException {      this(fs, new Path(file), conf);    }    /** Open the named file. */    public Reader(FileSystem fs, Path file, Configuration conf)      throws IOException {      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf);    }    private Reader(FileSystem fs, Path name, int bufferSize,                   Configuration conf) throws IOException {      this.file = name;      this.in = fs.open(file, bufferSize);      this.end = fs.getLength(file);      this.conf = conf;      init();    }        private Reader(FileSystem fs, Path file, int bufferSize, long start,                   long length, Configuration conf) throws IOException {      this.file = file;      this.in = fs.open(file, bufferSize);      this.conf = conf;      seek(start);      this.end = in.getPos() + length;      init();    }        private void init() throws IOException {      byte[] versionBlock = new byte[VERSION.length];      in.readFully(versionBlock);      if ((versionBlock[0] != VERSION[0]) ||          (versionBlock[1] != VERSION[1]) ||          (versionBlock[2] != VERSION[2]))        throw new IOException(file + " not a SequenceFile");      // Set 'version'      version = versionBlock[3];      if (version > VERSION[3])        throw new VersionMismatchException(VERSION[3], version);      if (version < BLOCK_COMPRESS_VERSION) {        UTF8 className = new UTF8();                className.readFields(in);                   // read key class name        this.keyClass = WritableName.getClass(className.toString(), conf);                className.readFields(in);                   // read val class name        this.valClass = WritableName.getClass(className.toString(), conf);      } else {        this.keyClass = WritableName.getClass(Text.readString(in), conf);        this.valClass = WritableName.getClass(Text.readString(in), conf);      }      if (version > 2) {                          // if version > 2        this.decompress = in.readBoolean();       // is compressed?      } else {        decompress = false;      }      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4        this.blockCompressed = in.readBoolean();  // is block-compressed?      } else {        blockCompressed = false;      }            // if version >= 5      // setup the compression codec      if (decompress) {        if (version >= CUSTOM_COMPRESS_VERSION) {          String codecClassname = Text.readString(in);          try {            Class codecClass = conf.getClassByName(codecClassname);            this.codec = (CompressionCodec)                 ReflectionUtils.newInstance(codecClass, conf);          } catch (ClassNotFoundException cnfe) {            throw new IllegalArgumentException("Unknown codec: " +                                                codecClassname, cnfe);          }        } else {          codec = new DefaultCodec();        }      }            if (version > 1) {                          // if version > 1        in.readFully(sync);                       // read sync bytes      }            // Initialize      valBuffer = new DataInputBuffer();      if (decompress) {        valInFilter = this.codec.createInputStream(valBuffer);        valIn = new DataInputStream(valInFilter);      } else {        valIn = valBuffer;      }            if (blockCompressed) {        keyLenBuffer = new DataInputBuffer();        keyBuffer = new DataInputBuffer();        valLenBuffer = new DataInputBuffer();                keyLenInFilter = this.codec.createInputStream(keyLenBuffer);        keyLenIn = new DataInputStream(keyLenInFilter);        keyInFilter = this.codec.createInputStream(keyBuffer);        keyIn = new DataInputStream(keyInFilter);        valLenInFilter = this.codec.createInputStream(valLenBuffer);        valLenIn = new DataInputStream(valLenInFilter);      }            lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true);    }        /** Close the file. */    public synchronized void close() throws IOException {      in.close();    }    /** Returns the class of keys in this file. */    public Class getKeyClass() { return keyClass; }    /** Returns the class of values in this file. */    public Class getValueClass() { return valClass; }    /** Returns true if values are compressed. */    public boolean isCompressed() { return decompress; }        /** Returns true if records are block-compressed. */    public boolean isBlockCompressed() { return blockCompressed; }        /** Returns the compression codec of data in this file. */    public CompressionCodec getCompressionCodec() { return codec; }    /** Read a compressed buffer */    private synchronized void readBuffer(DataInputBuffer buffer,         CompressionInputStream filter) throws IOException {      // Read data into a temporary buffer      DataOutputBuffer dataBuffer = new DataOutputBuffer();      int dataBufferLength = WritableUtils.readVInt(in);      dataBuffer.write(in, dataBufferLength);            // Set up 'buffer' connected to the input-stream      buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());      // Reset the codec      filter.resetState();    }        /** Read the next 'compressed' block */    private synchronized void readBlock() throws IOException {      // Check if we need to throw away a whole block of       // 'values' due to 'lazy decompression'       if (lazyDecompress && !valuesDecompressed) {        in.seek(WritableUtils.readVInt(in)+in.getPos());        in.seek(WritableUtils.readVInt(in)+in.getPos());      }            // Reset internal states      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;      valuesDecompressed = false;      //Process sync      if (sync != null) {        in.readInt();        in.readFully(syncCheck);                // read syncCheck        if (!Arrays.equals(sync, syncCheck))    // check it          throw new IOException("File is corrupt!");      }      syncSeen = true;      // Read number of records in this block      noBufferedRecords = WritableUtils.readVInt(in);            // Read key lengths and keys      readBuffer(keyLenBuffer, keyLenInFilter);      readBuffer(keyBuffer, keyInFilter);      noBufferedKeys = noBufferedRecords;            // Read value lengths and values      if (!lazyDecompress) {        readBuffer(valLenBuffer, valLenInFilter);        readBuffer(valBuffer, valInFilter);        noBufferedValues = noBufferedRecords;        valuesDecompressed = true;      }    }    /**      * Position valLenIn/valIn to the 'value'      * corresponding to the 'current' key      */    private synchronized void seekToCurrentValue() throws IOException {      if (!blockCompressed) {        if (decompress) {          valInFilter.resetState();        }        valBuffer.reset();      } else {        // Check if this is the first value in the 'block' to be read        if (lazyDecompress && !valuesDecompressed) {          // Read the value lengths and values          readBuffer(valLenBuffer, valLenInFilter);          readBuffer(valBuffer, valInFilter);          noBufferedValues = noBufferedRecords;          valuesDecompressed = true;        }                // Calculate the no. of bytes to skip        // Note: 'current' key has already been read!        int skipValBytes = 0;        int currentKey = noBufferedKeys + 1;                  for (int i=noBufferedValues; i > currentKey; --i) {          skipValBytes += WritableUtils.readVInt(valLenIn);          --noBufferedValues;        }                // Skip to the 'val' corresponding to 'current' key        if (skipValBytes > 0) {          if (valIn.skipBytes(skipValBytes) != skipValBytes) {            throw new IOException("Failed to seek to " + currentKey +                 "(th) value!");          }        }      }    }    /**     * Get the 'value' corresponding to the last read 'key'.     * @param val : The 'value' to be read.     * @throws IOException     */    public synchronized void getCurrentValue(Writable val)     throws IOException {      if (val instanceof Configurable) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -