📄 sequencefile.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import java.util.*;import java.net.InetAddress;import java.rmi.server.UID;import java.security.MessageDigest;import org.apache.lucene.util.PriorityQueue;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionInputStream;import org.apache.hadoop.io.compress.CompressionOutputStream;import org.apache.hadoop.io.compress.DefaultCodec;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.Progressable;import org.apache.hadoop.util.ReflectionUtils;/** Support for flat files of binary key/value pairs. */public class SequenceFile { public static final Log LOG = LogFactory.getLog("org.apache.hadoop.io.SequenceFile"); private SequenceFile() {} // no public ctor private static final byte BLOCK_COMPRESS_VERSION = (byte)4; private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; private static byte[] VERSION = new byte[] { (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION }; private static final int SYNC_ESCAPE = -1; // "length" of sync entries private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash /** The number of bytes between sync points.*/ public static final int SYNC_INTERVAL = 100*SYNC_SIZE; /** The type of compression. * @see SequenceFile#Writer */ public static enum CompressionType { /** Do not compress records. */ NONE, /** Compress values only, each separately. */ RECORD, /** Compress sequences of records together in blocks. */ BLOCK } /** * Get the compression type for the reduce outputs * @param job the job config to look in * @return the kind of compression to use */ static public CompressionType getCompressionType(Configuration job) { String name = job.get("io.seqfile.compression.type"); return name == null ? CompressionType.RECORD : CompressionType.valueOf(name); } /** * Set the compression type for sequence files. * @param job the configuration to modify * @param val the new compression type (none, block, record) */ static public void setCompressionType(Configuration job, CompressionType val) { job.set("io.seqfile.compression.type", val.toString()); } /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. * @param conf The configuration. * @param name The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compressionType The compression type. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType) throws IOException { Writer writer = null; if (compressionType == CompressionType.NONE) { writer = new Writer(fs, conf, name, keyClass, valClass); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, new DefaultCodec()); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, new DefaultCodec()); } return writer; } /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. * @param conf The configuration. * @param name The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compressionType The compression type. * @param progress The Progressable object to track progress. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, Progressable progress) throws IOException { Writer writer = null; if (compressionType == CompressionType.NONE) { writer = new Writer(fs, conf, name, keyClass, valClass, progress); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, new DefaultCodec(), progress); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, new DefaultCodec(), progress); } return writer; } /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. * @param conf The configuration. * @param name The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compressionType The compression type. * @param codec The compression codec. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, CompressionCodec codec) throws IOException { Writer writer = null; if (compressionType == CompressionType.NONE) { writer = new Writer(fs, conf, name, keyClass, valClass); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, codec); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, codec); } return writer; } /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. * @param conf The configuration. * @param name The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compressionType The compression type. * @param codec The compression codec. * @param progress The Progressable object to track progress. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, CompressionCodec codec, Progressable progress) throws IOException { Writer writer = null; if (compressionType == CompressionType.NONE) { writer = new Writer(fs, conf, name, keyClass, valClass, progress); } else if (compressionType == CompressionType.RECORD) { writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, codec, progress); } else if (compressionType == CompressionType.BLOCK){ writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, codec, progress); } return writer; } /** * Construct the preferred type of 'raw' SequenceFile Writer. * @param out The stream on top which the writer is to be constructed. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param compress Compress data? * @param blockCompress Compress blocks? * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ private static Writer createWriter(FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, boolean blockCompress, CompressionCodec codec) throws IOException { Writer writer = null; if (!compress) { writer = new Writer(out, keyClass, valClass); } else if (compress && !blockCompress) { writer = new RecordCompressWriter(out, keyClass, valClass, codec); } else { writer = new BlockCompressWriter(out, keyClass, valClass, codec); } return writer; } /** The interface to 'raw' values of SequenceFiles. */ public static interface ValueBytes { /** Writes the uncompressed bytes to the outStream. * @param outStream : Stream to write uncompressed bytes into. * @throws IOException */ public void writeUncompressedBytes(DataOutputStream outStream) throws IOException; /** Write compressed bytes to outStream. * Note: that it will NOT compress the bytes if they are not compressed. * @param outStream : Stream to write compressed bytes into. */ public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException; } private static class UncompressedBytes implements ValueBytes { private int dataSize; private byte[] data; private UncompressedBytes() { data = null; dataSize = 0; } private void reset(DataInputStream in, int length) throws IOException { data = new byte[length]; dataSize = -1; in.readFully(data); dataSize = data.length; } public int getSize() { return dataSize; } public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { outStream.write(data, 0, dataSize); } public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { throw new IllegalArgumentException("UncompressedBytes cannot be compressed!"); } } // UncompressedBytes private static class CompressedBytes implements ValueBytes { private int dataSize; private byte[] data; DataInputBuffer rawData = null; CompressionCodec codec = null; CompressionInputStream decompressedStream = null; private CompressedBytes(CompressionCodec codec) { data = null; dataSize = 0; this.codec = codec; } private void reset(DataInputStream in, int length) throws IOException { data = new byte[length]; dataSize = -1; in.readFully(data); dataSize = data.length; } public int getSize() { return dataSize; } public void writeUncompressedBytes(DataOutputStream outStream) throws IOException { if (decompressedStream == null) { rawData = new DataInputBuffer(); decompressedStream = codec.createInputStream(rawData); } else { decompressedStream.resetState(); } rawData.reset(data, 0, dataSize); byte[] buffer = new byte[8192]; int bytesRead = 0; while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { outStream.write(buffer, 0, bytesRead); } } public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException { outStream.write(data, 0, dataSize); } } // CompressedBytes /** Write key/value pairs to a sequence-format file. */ public static class Writer { FSDataOutputStream out; DataOutputBuffer buffer = new DataOutputBuffer(); Path target = null; Class keyClass; Class valClass; private boolean compress; CompressionCodec codec = null; CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record // starts and ends by scanning for this value. long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes { try { // use hash of uid + host MessageDigest digester = MessageDigest.getInstance("MD5"); digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } } /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Path,Class,Class)}. */ public Writer(FileSystem fs, String name, Class keyClass, Class valClass) throws IOException { this(fs, new Path(name), keyClass, valClass, false); } /** Implicit constructor: needed for the period of transition!*/ Writer() {} /** Create the named file. */ /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass) throws IOException { this(fs, name, keyClass, valClass, false); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -