📄 sequencefile.java
字号:
private synchronized void writeBlock() throws IOException { if (noBufferedRecords > 0) { // Write 'sync' marker if (sync != null) { out.writeInt(SYNC_ESCAPE); out.write(sync); } // No. of records WritableUtils.writeVInt(out, noBufferedRecords); // Write 'keys' and lengths writeBuffer(keyLenBuffer); writeBuffer(keyBuffer); // Write 'values' and lengths writeBuffer(valLenBuffer); writeBuffer(valBuffer); // Flush the file-stream out.flush(); // Reset internal states keyLenBuffer.reset(); keyBuffer.reset(); valLenBuffer.reset(); valBuffer.reset(); noBufferedRecords = 0; } } /** Close the file. */ public synchronized void close() throws IOException { if (out != null) { writeBlock(); out.close(); out = null; } } /** Append a key/value pair. */ public synchronized void append(Writable key, Writable val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key+" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); // Save key/value into respective buffers int oldKeyLength = keyBuffer.getLength(); key.write(keyBuffer); int keyLength = keyBuffer.getLength() - oldKeyLength; if (keyLength == 0) throw new IOException("zero length keys not allowed: " + key); WritableUtils.writeVInt(keyLenBuffer, keyLength); int oldValLength = valBuffer.getLength(); val.write(valBuffer); int valLength = valBuffer.getLength() - oldValLength; WritableUtils.writeVInt(valLenBuffer, valLength); // Added another key/value pair ++noBufferedRecords; // Compress and flush? int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); if (currentBlockSize >= compressionBlockSize) { writeBlock(); } } /** Append a key/value pair. */ public synchronized void appendRaw( byte[] keyData, int keyOffset, int keyLength, ValueBytes val ) throws IOException { if (keyLength == 0) throw new IOException("zero length keys not allowed"); UncompressedBytes value = (UncompressedBytes)val; int valLength = value.getSize(); // Save key/value data in relevant buffers WritableUtils.writeVInt(keyLenBuffer, keyLength); keyBuffer.write(keyData, keyOffset, keyLength); WritableUtils.writeVInt(valLenBuffer, valLength); val.writeUncompressedBytes(valBuffer); // Added another key/value pair ++noBufferedRecords; // Compress and flush? int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); if (currentBlockSize >= compressionBlockSize) { writeBlock(); } } } // BlockCompressionWriter /** Reads key/value pairs from a sequence-format file. */ public static class Reader { private Path file; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); private byte version; private Class keyClass; private Class valClass; private CompressionCodec codec = null; private byte[] sync = new byte[SYNC_HASH_SIZE]; private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; private long end; private int keyLength; private boolean decompress; private boolean blockCompressed; private Configuration conf; private int noBufferedRecords = 0; private boolean lazyDecompress = true; private boolean valuesDecompressed = true; private int noBufferedKeys = 0; private int noBufferedValues = 0; private DataInputBuffer keyLenBuffer = null; private CompressionInputStream keyLenInFilter = null; private DataInputStream keyLenIn = null; private DataInputBuffer keyBuffer = null; private CompressionInputStream keyInFilter = null; private DataInputStream keyIn = null; private DataInputBuffer valLenBuffer = null; private CompressionInputStream valLenInFilter = null; private DataInputStream valLenIn = null; private DataInputBuffer valBuffer = null; private CompressionInputStream valInFilter = null; private DataInputStream valIn = null; /** @deprecated Call {@link #SequenceFile.Reader(FileSystem,Path,Configuration)}.*/ public Reader(FileSystem fs, String file, Configuration conf) throws IOException { this(fs, new Path(file), conf); } /** Open the named file. */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf); } private Reader(FileSystem fs, Path name, int bufferSize, Configuration conf) throws IOException { this.file = name; this.in = fs.open(file, bufferSize); this.end = fs.getLength(file); this.conf = conf; init(); } private Reader(FileSystem fs, Path file, int bufferSize, long start, long length, Configuration conf) throws IOException { this.file = file; this.in = fs.open(file, bufferSize); this.conf = conf; seek(start); this.end = in.getPos() + length; init(); } private void init() throws IOException { byte[] versionBlock = new byte[VERSION.length]; in.readFully(versionBlock); if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1]) || (versionBlock[2] != VERSION[2])) throw new IOException(file + " not a SequenceFile"); // Set 'version' version = versionBlock[3]; if (version > VERSION[3]) throw new VersionMismatchException(VERSION[3], version); if (version < BLOCK_COMPRESS_VERSION) { UTF8 className = new UTF8(); className.readFields(in); // read key class name this.keyClass = WritableName.getClass(className.toString(), conf); className.readFields(in); // read val class name this.valClass = WritableName.getClass(className.toString(), conf); } else { this.keyClass = WritableName.getClass(Text.readString(in), conf); this.valClass = WritableName.getClass(Text.readString(in), conf); } if (version > 2) { // if version > 2 this.decompress = in.readBoolean(); // is compressed? } else { decompress = false; } if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 this.blockCompressed = in.readBoolean(); // is block-compressed? } else { blockCompressed = false; } // if version >= 5 // setup the compression codec if (decompress) { if (version >= CUSTOM_COMPRESS_VERSION) { String codecClassname = Text.readString(in); try { Class codecClass = conf.getClassByName(codecClassname); this.codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("Unknown codec: " + codecClassname, cnfe); } } else { codec = new DefaultCodec(); } } if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes } // Initialize valBuffer = new DataInputBuffer(); if (decompress) { valInFilter = this.codec.createInputStream(valBuffer); valIn = new DataInputStream(valInFilter); } else { valIn = valBuffer; } if (blockCompressed) { keyLenBuffer = new DataInputBuffer(); keyBuffer = new DataInputBuffer(); valLenBuffer = new DataInputBuffer(); keyLenInFilter = this.codec.createInputStream(keyLenBuffer); keyLenIn = new DataInputStream(keyLenInFilter); keyInFilter = this.codec.createInputStream(keyBuffer); keyIn = new DataInputStream(keyInFilter); valLenInFilter = this.codec.createInputStream(valLenBuffer); valLenIn = new DataInputStream(valLenInFilter); } lazyDecompress = conf.getBoolean("io.seqfile.lazydecompress", true); } /** Close the file. */ public synchronized void close() throws IOException { in.close(); } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Returns true if values are compressed. */ public boolean isCompressed() { return decompress; } /** Returns true if records are block-compressed. */ public boolean isBlockCompressed() { return blockCompressed; } /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } /** Read a compressed buffer */ private synchronized void readBuffer(DataInputBuffer buffer, CompressionInputStream filter) throws IOException { // Read data into a temporary buffer DataOutputBuffer dataBuffer = new DataOutputBuffer(); int dataBufferLength = WritableUtils.readVInt(in); dataBuffer.write(in, dataBufferLength); // Set up 'buffer' connected to the input-stream buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); // Reset the codec filter.resetState(); } /** Read the next 'compressed' block */ private synchronized void readBlock() throws IOException { // Check if we need to throw away a whole block of // 'values' due to 'lazy decompression' if (lazyDecompress && !valuesDecompressed) { in.seek(WritableUtils.readVInt(in)+in.getPos()); in.seek(WritableUtils.readVInt(in)+in.getPos()); } // Reset internal states noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; valuesDecompressed = false; //Process sync if (sync != null) { in.readInt(); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); } syncSeen = true; // Read number of records in this block noBufferedRecords = WritableUtils.readVInt(in); // Read key lengths and keys readBuffer(keyLenBuffer, keyLenInFilter); readBuffer(keyBuffer, keyInFilter); noBufferedKeys = noBufferedRecords; // Read value lengths and values if (!lazyDecompress) { readBuffer(valLenBuffer, valLenInFilter); readBuffer(valBuffer, valInFilter); noBufferedValues = noBufferedRecords; valuesDecompressed = true; } } /** * Position valLenIn/valIn to the 'value' * corresponding to the 'current' key */ private synchronized void seekToCurrentValue() throws IOException { if (!blockCompressed) { if (decompress) { valInFilter.resetState(); } valBuffer.reset(); } else { // Check if this is the first value in the 'block' to be read if (lazyDecompress && !valuesDecompressed) { // Read the value lengths and values readBuffer(valLenBuffer, valLenInFilter); readBuffer(valBuffer, valInFilter); noBufferedValues = noBufferedRecords; valuesDecompressed = true; } // Calculate the no. of bytes to skip // Note: 'current' key has already been read! int skipValBytes = 0; int currentKey = noBufferedKeys + 1; for (int i=noBufferedValues; i > currentKey; --i) { skipValBytes += WritableUtils.readVInt(valLenIn); --noBufferedValues; } // Skip to the 'val' corresponding to 'current' key if (skipValBytes > 0) { if (valIn.skipBytes(skipValBytes) != skipValBytes) { throw new IOException("Failed to seek to " + currentKey + "(th) value!"); } } } } /** * Get the 'value' corresponding to the last read 'key'. * @param val : The 'value' to be read. * @throws IOException */ public synchronized void getCurrentValue(Writable val) throws IOException { if (val instanceof Configurable) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -