📄 sequencefile.java
字号:
/** Create the named file with write-progress reporter. */ /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass, Progressable progress) throws IOException { this(fs, name, keyClass, valClass, false, progress); } /** Create the named file. * @param compress if true, values are compressed. */ /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class)}. */ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass, boolean compress) throws IOException { init(name, fs.create(name), keyClass, valClass, compress, null); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Create the named file with write-progress reporter. * @param compress if true, values are compressed. */ /** @deprecated Call {@link #SequenceFile.Writer(FileSystem,Configuration,Path,Class,Class,Progressable)}. */ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass, boolean compress, Progressable progress) throws IOException { init(name, fs.create(name, progress), keyClass, valClass, compress, null); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Create the named file. */ public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException { this(fs, name, keyClass, valClass, false); } /** Create the named file with write-progress reporter. */ public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, Progressable progress) throws IOException { this(fs, name, keyClass, valClass, false, progress); } /** Write to an arbitrary stream using a specified buffer size. */ private Writer(FSDataOutputStream out, Class keyClass, Class valClass) throws IOException { init(null, out, keyClass, valClass, false, null); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Write the initial part of file header. */ void initializeFileHeader() throws IOException{ out.write(VERSION); } /** Write the final part of file header. */ void finalizeFileHeader() throws IOException{ out.write(sync); // write the sync bytes out.flush(); // flush header } boolean isCompressed() { return compress; } boolean isBlockCompressed() { return false; } /** Write and flush the file header. */ void writeFileHeader() throws IOException { Text.writeString(out, keyClass.getName()); Text.writeString(out, valClass.getName()); out.writeBoolean(this.isCompressed()); out.writeBoolean(this.isBlockCompressed()); if(this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName()); } } /** Initialize. */ void init(Path name, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec) throws IOException { this.target = name; this.out = out; this.keyClass = keyClass; this.valClass = valClass; this.compress = compress; this.codec = codec; if(this.codec != null) { this.deflateFilter = this.codec.createOutputStream(buffer); this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter)); } } /** Returns the class of keys in this file. */ public Class getKeyClass() { return keyClass; } /** Returns the class of values in this file. */ public Class getValueClass() { return valClass; } /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } /** Close the file. */ public synchronized void close() throws IOException { if (out != null) { out.close(); out = null; } } synchronized void checkAndWriteSync() throws IOException { if (sync != null && out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync lastSyncPos = out.getPos(); // update lastSyncPos //LOG.info("sync@"+lastSyncPos); out.writeInt(SYNC_ESCAPE); // escape it out.write(sync); // write sync } } /** Append a key/value pair. */ public synchronized void append(Writable key, Writable val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' key.write(buffer); int keyLength = buffer.getLength(); if (keyLength == 0) throw new IOException("zero length keys not allowed: " + key); // Append the 'value' if (compress) { deflateFilter.resetState(); val.write(deflateOut); deflateOut.flush(); deflateFilter.finish(); } else { val.write(buffer); } // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } /** * Append a key/value pair. * @deprecated Call {@link #appendRaw(byte[], int, int, SequenceFile.ValueBytes)}. */ public synchronized void append(byte[] data, int start, int length, int keyLength) throws IOException { if (keyLength == 0) throw new IOException("zero length keys not allowed"); checkAndWriteSync(); // sync out.writeInt(length); // total record length out.writeInt(keyLength); // key portion length out.write(data, start, length); // data } public synchronized void appendRaw( byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException { if (keyLength == 0) throw new IOException("zero length keys not allowed: " + keyLength); UncompressedBytes value = (UncompressedBytes)val; int valLength = value.getSize(); checkAndWriteSync(); out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // key val.writeUncompressedBytes(out); // value } /** Returns the current length of the output file. * * <p>This always returns a synchronized position. In other words, {@link * immediately after calling {@link Reader#seek(long)} with a position * returned by this method, Reader#next(Writable) may be called. However * the key may be earlier in the file than key last written when this * method was called (e.g., with block-compression, it may be the first key * in the block that was being written when this method was called). */ public synchronized long getLength() throws IOException { return out.getPos(); } } // class Writer /** Write key/compressed-value pairs to a sequence-format file. */ static class RecordCompressWriter extends Writer { /** Create the named file. */ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { super.init(name, fs.create(name), keyClass, valClass, true, codec); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Create the named file with write-progress reporter. */ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress) throws IOException { super.init(name, fs.create(name, progress), keyClass, valClass, true, codec); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Write to an arbitrary stream using a specified buffer size. */ private RecordCompressWriter(FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { super.init(null, out, keyClass, valClass, true, codec); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } boolean isCompressed() { return true; } boolean isBlockCompressed() { return false; } /** Append a key/value pair. */ public synchronized void append(Writable key, Writable val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' key.write(buffer); int keyLength = buffer.getLength(); if (keyLength == 0) throw new IOException("zero length keys not allowed: " + key); // Compress 'value' and append it deflateFilter.resetState(); val.write(deflateOut); deflateOut.flush(); deflateFilter.finish(); // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length out.writeInt(keyLength); // key portion length out.write(buffer.getData(), 0, buffer.getLength()); // data } /** Append a key/value pair. */ public synchronized void appendRaw( byte[] keyData, int keyOffset, int keyLength, ValueBytes val ) throws IOException { if (keyLength == 0) throw new IOException("zero length keys not allowed"); CompressedBytes value = (CompressedBytes)val; int valLength = value.getSize(); checkAndWriteSync(); // sync out.writeInt(keyLength+valLength); // total record length out.writeInt(keyLength); // key portion length out.write(keyData, keyOffset, keyLength); // 'key' data val.writeCompressedBytes(out); // 'value' data } } // RecordCompressionWriter /** Write compressed key/value blocks to a sequence-format file. */ static class BlockCompressWriter extends Writer { private int noBufferedRecords = 0; private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); private DataOutputBuffer keyBuffer = new DataOutputBuffer(); private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); private DataOutputBuffer valBuffer = new DataOutputBuffer(); private int compressionBlockSize; /** Create the named file. */ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { super.init(name, fs.create(name), keyClass, valClass, true, codec); init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Create the named file with write-progress reporter. */ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress) throws IOException { super.init(name, fs.create(name, progress), keyClass, valClass, true, codec); init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } /** Write to an arbitrary stream using a specified buffer size. */ private BlockCompressWriter(FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec) throws IOException { super.init(null, out, keyClass, valClass, true, codec); init(1000000); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); } boolean isCompressed() { return true; } boolean isBlockCompressed() { return true; } /** Initialize */ void init(int compressionBlockSize) { this.compressionBlockSize = compressionBlockSize; } /** Workhorse to check and write out compressed data/lengths */ private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException { deflateFilter.resetState(); buffer.reset(); deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength()); deflateOut.flush(); deflateFilter.finish(); WritableUtils.writeVInt(out, buffer.getLength()); out.write(buffer.getData(), 0, buffer.getLength()); } /** Compress and flush contents to dfs */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -