⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
    /** Create the named file with write-progress reporter. */    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */    public Writer(FileSystem fs, Path name, Class keyClass, Class valClass,            Progressable progress)      throws IOException {      this(fs, name, keyClass, valClass, false, progress);    }        /** Create the named file.     * @param compress if true, values are compressed.     */    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */    public Writer(FileSystem fs, Path name,                  Class keyClass, Class valClass, boolean compress)      throws IOException {      init(name, fs.create(name), keyClass, valClass, compress, null);       initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Create the named file with write-progress reporter.     * @param compress if true, values are compressed.     */    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */    public Writer(FileSystem fs, Path name,                  Class keyClass, Class valClass, boolean compress,                  Progressable progress)      throws IOException {      init(name, fs.create(name, progress), keyClass, valClass,           compress, null);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Create the named file. */    public Writer(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass)      throws IOException {      this(fs, name, keyClass, valClass, false);    }        /** Create the named file with write-progress reporter. */    public Writer(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass, Progressable progress)      throws IOException {      this(fs, name, keyClass, valClass, false, progress);    }    /** Write to an arbitrary stream using a specified buffer size. */    private Writer(FSDataOutputStream out, Class keyClass, Class valClass)    throws IOException {      init(null, out, keyClass, valClass, false, null);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }    /** Write the initial part of file header. */    void initializeFileHeader()     throws IOException{      out.write(VERSION);    }    /** Write the final part of file header. */    void finalizeFileHeader()     throws IOException{      out.write(sync);                       // write the sync bytes      out.flush();                           // flush header    }        boolean isCompressed() { return compress; }    boolean isBlockCompressed() { return false; }        /** Write and flush the file header. */    void writeFileHeader()     throws IOException {      Text.writeString(out, keyClass.getName());      Text.writeString(out, valClass.getName());            out.writeBoolean(this.isCompressed());      out.writeBoolean(this.isBlockCompressed());            if(this.isCompressed()) {        Text.writeString(out, (codec.getClass()).getName());      }    }    /** Initialize. */    void init(Path name, FSDataOutputStream out,                      Class keyClass, Class valClass,                      boolean compress, CompressionCodec codec)     throws IOException {      this.target = name;      this.out = out;      this.keyClass = keyClass;      this.valClass = valClass;      this.compress = compress;      this.codec = codec;      if(this.codec != null) {        this.deflateFilter = this.codec.createOutputStream(buffer);        this.deflateOut =           new DataOutputStream(new BufferedOutputStream(deflateFilter));      }    }        /** Returns the class of keys in this file. */    public Class getKeyClass() { return keyClass; }    /** Returns the class of values in this file. */    public Class getValueClass() { return valClass; }    /** Returns the compression codec of data in this file. */    public CompressionCodec getCompressionCodec() { return codec; }    /** Close the file. */    public synchronized void close() throws IOException {      if (out != null) {        out.close();        out = null;      }    }    synchronized void checkAndWriteSync() throws IOException {      if (sync != null &&          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync        lastSyncPos = out.getPos();               // update lastSyncPos        //LOG.info("sync@"+lastSyncPos);        out.writeInt(SYNC_ESCAPE);                // escape it        out.write(sync);                          // write sync      }    }    /** Append a key/value pair. */    public synchronized void append(Writable key, Writable val)      throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key.getClass().getName()            +" is not "+keyClass);      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val.getClass().getName()            +" is not "+valClass);      buffer.reset();      // Append the 'key'      key.write(buffer);      int keyLength = buffer.getLength();      if (keyLength == 0)        throw new IOException("zero length keys not allowed: " + key);      // Append the 'value'      if (compress) {        deflateFilter.resetState();        val.write(deflateOut);        deflateOut.flush();        deflateFilter.finish();      } else {        val.write(buffer);      }      // Write the record out      checkAndWriteSync();                                // sync      out.writeInt(buffer.getLength());                   // total record length      out.writeInt(keyLength);                            // key portion length      out.write(buffer.getData(), 0, buffer.getLength()); // data    }    /**      * Append a key/value pair.      * @deprecated Call {@link #appendRaw(byte[], int, int, SequenceFile.ValueBytes)}.      */    public synchronized void append(byte[] data, int start, int length,                                    int keyLength) throws IOException {      if (keyLength == 0)        throw new IOException("zero length keys not allowed");      checkAndWriteSync();                        // sync      out.writeInt(length);                       // total record length      out.writeInt(keyLength);                    // key portion length      out.write(data, start, length);             // data    }        public synchronized void appendRaw(        byte[] keyData, int keyOffset, int keyLength, ValueBytes val)     throws IOException {      if (keyLength == 0)        throw new IOException("zero length keys not allowed: " + keyLength);      UncompressedBytes value = (UncompressedBytes)val;      int valLength = value.getSize();      checkAndWriteSync();            out.writeInt(keyLength+valLength);          // total record length      out.writeInt(keyLength);                    // key portion length      out.write(keyData, keyOffset, keyLength);   // key      val.writeUncompressedBytes(out);            // value    }    /** Returns the current length of the output file.     *     * <p>This always returns a synchronized position.  In other words, {@link     * immediately after calling {@link Reader#seek(long)} with a position     * returned by this method, Reader#next(Writable) may be called.  However     * the key may be earlier in the file than key last written when this     * method was called (e.g., with block-compression, it may be the first key     * in the block that was being written when this method was called).     */    public synchronized long getLength() throws IOException {      return out.getPos();    }  } // class Writer  /** Write key/compressed-value pairs to a sequence-format file. */  static class RecordCompressWriter extends Writer {        /** Create the named file. */    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass, CompressionCodec codec)     throws IOException {      super.init(name, fs.create(name), keyClass, valClass, true, codec);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Create the named file with write-progress reporter. */    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass, CompressionCodec codec,        Progressable progress)    throws IOException {      super.init(name, fs.create(name, progress),           keyClass, valClass, true, codec);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Write to an arbitrary stream using a specified buffer size. */    private RecordCompressWriter(FSDataOutputStream out,                   Class keyClass, Class valClass, CompressionCodec codec)      throws IOException {      super.init(null, out, keyClass, valClass, true, codec);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();          }    boolean isCompressed() { return true; }    boolean isBlockCompressed() { return false; }    /** Append a key/value pair. */    public synchronized void append(Writable key, Writable val)      throws IOException {      if (key.getClass() != keyClass)        throw new IOException("wrong key class: "+key.getClass().getName()            +" is not "+keyClass);      if (val.getClass() != valClass)        throw new IOException("wrong value class: "+val.getClass().getName()            +" is not "+valClass);      buffer.reset();      // Append the 'key'      key.write(buffer);      int keyLength = buffer.getLength();      if (keyLength == 0)        throw new IOException("zero length keys not allowed: " + key);      // Compress 'value' and append it      deflateFilter.resetState();      val.write(deflateOut);      deflateOut.flush();      deflateFilter.finish();      // Write the record out      checkAndWriteSync();                                // sync      out.writeInt(buffer.getLength());                   // total record length      out.writeInt(keyLength);                            // key portion length      out.write(buffer.getData(), 0, buffer.getLength()); // data    }    /** Append a key/value pair. */    public synchronized void appendRaw(        byte[] keyData, int keyOffset, int keyLength,        ValueBytes val        ) throws IOException {      if (keyLength == 0)        throw new IOException("zero length keys not allowed");      CompressedBytes value = (CompressedBytes)val;      int valLength = value.getSize();            checkAndWriteSync();                        // sync      out.writeInt(keyLength+valLength);          // total record length      out.writeInt(keyLength);                    // key portion length      out.write(keyData, keyOffset, keyLength);   // 'key' data      val.writeCompressedBytes(out);              // 'value' data    }      } // RecordCompressionWriter  /** Write compressed key/value blocks to a sequence-format file. */  static class BlockCompressWriter extends Writer {        private int noBufferedRecords = 0;        private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();    private DataOutputBuffer keyBuffer = new DataOutputBuffer();    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();    private DataOutputBuffer valBuffer = new DataOutputBuffer();    private int compressionBlockSize;        /** Create the named file. */    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass, CompressionCodec codec)     throws IOException {      super.init(name, fs.create(name), keyClass, valClass, true, codec);      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Create the named file with write-progress reporter. */    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,         Class keyClass, Class valClass, CompressionCodec codec,        Progressable progress)    throws IOException {      super.init(name, fs.create(name, progress), keyClass, valClass,           true, codec);      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }        /** Write to an arbitrary stream using a specified buffer size. */    private BlockCompressWriter(FSDataOutputStream out,                   Class keyClass, Class valClass, CompressionCodec codec)      throws IOException {      super.init(null, out, keyClass, valClass, true, codec);      init(1000000);            initializeFileHeader();      writeFileHeader();      finalizeFileHeader();    }    boolean isCompressed() { return true; }    boolean isBlockCompressed() { return true; }    /** Initialize */    void init(int compressionBlockSize) {      this.compressionBlockSize = compressionBlockSize;    }        /** Workhorse to check and write out compressed data/lengths */    private synchronized     void writeBuffer(DataOutputBuffer uncompressedDataBuffer)     throws IOException {      deflateFilter.resetState();      buffer.reset();      deflateOut.write(uncompressedDataBuffer.getData(), 0,           uncompressedDataBuffer.getLength());      deflateOut.flush();      deflateFilter.finish();            WritableUtils.writeVInt(out, buffer.getLength());      out.write(buffer.getData(), 0, buffer.getLength());    }        /** Compress and flush contents to dfs */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -