⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import java.util.*;import java.net.InetAddress;import java.rmi.server.UID;import java.security.MessageDigest;import org.apache.lucene.util.PriorityQueue;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionInputStream;import org.apache.hadoop.io.compress.CompressionOutputStream;import org.apache.hadoop.io.compress.DefaultCodec;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.Progressable;import org.apache.hadoop.util.ReflectionUtils;/** Support for flat files of binary key/value pairs. */public class SequenceFile {  public static final Log LOG =    LogFactory.getLog("org.apache.hadoop.io.SequenceFile");  private SequenceFile() {}                         // no public ctor  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;  private static byte[] VERSION = new byte[] {    (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION  };  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash   private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash  /** The number of bytes between sync points.*/  public static final int SYNC_INTERVAL = 100*SYNC_SIZE;   /** The type of compression.   * @see SequenceFile#Writer   */  public static enum CompressionType {    /** Do not compress records. */    NONE,     /** Compress values only, each separately. */    RECORD,    /** Compress sequences of records together in blocks. */    BLOCK  }  /**   * Get the compression type for the reduce outputs   * @param job the job config to look in   * @return the kind of compression to use   */  static public CompressionType getCompressionType(Configuration job) {    String name = job.get("io.seqfile.compression.type");    return name == null ? CompressionType.RECORD :                           CompressionType.valueOf(name);  }    /**   * Set the compression type for sequence files.   * @param job the configuration to modify   * @param val the new compression type (none, block, record)   */  static public void setCompressionType(Configuration job,                                         CompressionType val) {    job.set("io.seqfile.compression.type", val.toString());  }    /**   * Construct the preferred type of SequenceFile Writer.   * @param fs The configured filesystem.    * @param conf The configuration.   * @param name The name of the file.    * @param keyClass The 'key' type.   * @param valClass The 'value' type.   * @param compressionType The compression type.   * @return Returns the handle to the constructed SequenceFile Writer.   * @throws IOException   */  public static Writer   createWriter(FileSystem fs, Configuration conf, Path name,       Class keyClass, Class valClass, CompressionType compressionType)   throws IOException {    Writer writer = null;        if (compressionType == CompressionType.NONE) {      writer = new Writer(fs, conf, name, keyClass, valClass);    } else if (compressionType == CompressionType.RECORD) {      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,           new DefaultCodec());    } else if (compressionType == CompressionType.BLOCK){      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,           new DefaultCodec());    }        return writer;  }    /**   * Construct the preferred type of SequenceFile Writer.   * @param fs The configured filesystem.    * @param conf The configuration.   * @param name The name of the file.    * @param keyClass The 'key' type.   * @param valClass The 'value' type.   * @param compressionType The compression type.   * @param progress The Progressable object to track progress.   * @return Returns the handle to the constructed SequenceFile Writer.   * @throws IOException   */  public static Writer  createWriter(FileSystem fs, Configuration conf, Path name,       Class keyClass, Class valClass, CompressionType compressionType,      Progressable progress) throws IOException {    Writer writer = null;        if (compressionType == CompressionType.NONE) {      writer = new Writer(fs, conf, name, keyClass, valClass, progress);     } else if (compressionType == CompressionType.RECORD) {      writer = new RecordCompressWriter(fs, conf, name,           keyClass, valClass, new DefaultCodec(), progress);    } else if (compressionType == CompressionType.BLOCK){      writer = new BlockCompressWriter(fs, conf, name,           keyClass, valClass, new DefaultCodec(), progress);    }        return writer;  }  /**   * Construct the preferred type of SequenceFile Writer.   * @param fs The configured filesystem.    * @param conf The configuration.   * @param name The name of the file.    * @param keyClass The 'key' type.   * @param valClass The 'value' type.   * @param compressionType The compression type.   * @param codec The compression codec.   * @return Returns the handle to the constructed SequenceFile Writer.   * @throws IOException   */  public static Writer   createWriter(FileSystem fs, Configuration conf, Path name,       Class keyClass, Class valClass,       CompressionType compressionType, CompressionCodec codec)   throws IOException {    Writer writer = null;        if (compressionType == CompressionType.NONE) {      writer = new Writer(fs, conf, name, keyClass, valClass);     } else if (compressionType == CompressionType.RECORD) {      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,           codec);    } else if (compressionType == CompressionType.BLOCK){      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,           codec);    }        return writer;  }    /**   * Construct the preferred type of SequenceFile Writer.   * @param fs The configured filesystem.    * @param conf The configuration.   * @param name The name of the file.    * @param keyClass The 'key' type.   * @param valClass The 'value' type.   * @param compressionType The compression type.   * @param codec The compression codec.   * @param progress The Progressable object to track progress.   * @return Returns the handle to the constructed SequenceFile Writer.   * @throws IOException   */  public static Writer  createWriter(FileSystem fs, Configuration conf, Path name,       Class keyClass, Class valClass,       CompressionType compressionType, CompressionCodec codec,      Progressable progress) throws IOException {    Writer writer = null;        if (compressionType == CompressionType.NONE) {      writer = new Writer(fs, conf, name, keyClass, valClass, progress);    } else if (compressionType == CompressionType.RECORD) {      writer = new RecordCompressWriter(fs, conf, name,           keyClass, valClass, codec, progress);    } else if (compressionType == CompressionType.BLOCK){      writer = new BlockCompressWriter(fs, conf, name,           keyClass, valClass, codec, progress);    }        return writer;  }  /**   * Construct the preferred type of 'raw' SequenceFile Writer.   * @param out The stream on top which the writer is to be constructed.   * @param keyClass The 'key' type.   * @param valClass The 'value' type.   * @param compress Compress data?   * @param blockCompress Compress blocks?   * @return Returns the handle to the constructed SequenceFile Writer.   * @throws IOException   */  private static Writer  createWriter(FSDataOutputStream out,       Class keyClass, Class valClass, boolean compress, boolean blockCompress,      CompressionCodec codec)  throws IOException {    Writer writer = null;    if (!compress) {      writer = new Writer(out, keyClass, valClass);    } else if (compress && !blockCompress) {      writer = new RecordCompressWriter(out, keyClass, valClass, codec);    } else {      writer = new BlockCompressWriter(out, keyClass, valClass, codec);    }        return writer;  }  /** The interface to 'raw' values of SequenceFiles. */  public static interface ValueBytes {    /** Writes the uncompressed bytes to the outStream.     * @param outStream : Stream to write uncompressed bytes into.     * @throws IOException     */    public void writeUncompressedBytes(DataOutputStream outStream)    throws IOException;    /** Write compressed bytes to outStream.      * Note: that it will NOT compress the bytes if they are not compressed.     * @param outStream : Stream to write compressed bytes into.     */    public void writeCompressedBytes(DataOutputStream outStream)     throws IllegalArgumentException, IOException;  }    private static class UncompressedBytes implements ValueBytes {    private int dataSize;    private byte[] data;        private UncompressedBytes() {      data = null;      dataSize = 0;    }        private void reset(DataInputStream in, int length) throws IOException {      data = new byte[length];      dataSize = -1;            in.readFully(data);      dataSize = data.length;    }        public int getSize() {      return dataSize;    }        public void writeUncompressedBytes(DataOutputStream outStream)    throws IOException {      outStream.write(data, 0, dataSize);    }    public void writeCompressedBytes(DataOutputStream outStream)     throws IllegalArgumentException, IOException {      throw       new IllegalArgumentException("UncompressedBytes cannot be compressed!");    }  } // UncompressedBytes    private static class CompressedBytes implements ValueBytes {    private int dataSize;    private byte[] data;    DataInputBuffer rawData = null;    CompressionCodec codec = null;    CompressionInputStream decompressedStream = null;    private CompressedBytes(CompressionCodec codec) {      data = null;      dataSize = 0;      this.codec = codec;    }    private void reset(DataInputStream in, int length) throws IOException {      data = new byte[length];      dataSize = -1;      in.readFully(data);      dataSize = data.length;    }        public int getSize() {      return dataSize;    }        public void writeUncompressedBytes(DataOutputStream outStream)    throws IOException {      if (decompressedStream == null) {        rawData = new DataInputBuffer();        decompressedStream = codec.createInputStream(rawData);      } else {        decompressedStream.resetState();      }      rawData.reset(data, 0, dataSize);      byte[] buffer = new byte[8192];      int bytesRead = 0;      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {        outStream.write(buffer, 0, bytesRead);      }    }    public void writeCompressedBytes(DataOutputStream outStream)     throws IllegalArgumentException, IOException {      outStream.write(data, 0, dataSize);    }  } // CompressedBytes    /** Write key/value pairs to a sequence-format file. */  public static class Writer {    FSDataOutputStream out;    DataOutputBuffer buffer = new DataOutputBuffer();    Path target = null;    Class keyClass;    Class valClass;    private boolean compress;    CompressionCodec codec = null;    CompressionOutputStream deflateFilter = null;    DataOutputStream deflateOut = null;    // Insert a globally unique 16-byte value every few entries, so that one    // can seek into the middle of a file and then synchronize with record    // starts and ends by scanning for this value.    long lastSyncPos;                     // position of last sync    byte[] sync;                          // 16 random bytes    {      try {                                       // use hash of uid + host        MessageDigest digester = MessageDigest.getInstance("MD5");        digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());        sync = digester.digest();      } catch (Exception e) {        throw new RuntimeException(e);      }    }    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Path,Class,Class)}. */    public Writer(FileSystem fs, String name, Class keyClass, Class valClass)      throws IOException {      this(fs, new Path(name), keyClass, valClass, false);    }    /** Implicit constructor: needed for the period of transition!*/    Writer()    {}        /** Create the named file. */    /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */    public Writer(FileSystem fs, Path name, Class keyClass, Class valClass)      throws IOException {      this(fs, name, keyClass, valClass, false);    }    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -