📄 sequencefile.java
字号:
((Configurable) val).setConf(this.conf); } // Position stream to 'current' value seekToCurrentValue(); if (!blockCompressed) { val.readFields(valIn); if (valIn.read() > 0) { LOG.info("available bytes: " + valIn.available()); throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) + " bytes, should read " + (valBuffer.getLength()-keyLength)); } } else { // Get the value int valLength = WritableUtils.readVInt(valLenIn); val.readFields(valIn); // Read another compressed 'value' --noBufferedValues; // Sanity check if (valLength < 0) { LOG.debug(val + " is a zero-length value"); } } } /** Read the next key in the file into <code>key</code>, skipping its * value. True if another entry exists, and false at end of file. */ public synchronized boolean next(Writable key) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (!blockCompressed) { outBuf.reset(); keyLength = next(outBuf); if (keyLength < 0) return false; valBuffer.reset(outBuf.getData(), outBuf.getLength()); key.readFields(valBuffer); valBuffer.mark(0); if (valBuffer.getPosition() != keyLength) throw new IOException(key + " read " + valBuffer.getPosition() + " bytes, should read " + keyLength); } else { //Reset syncSeen syncSeen = false; if (noBufferedKeys == 0) { try { readBlock(); } catch (EOFException eof) { return false; } } int keyLength = WritableUtils.readVInt(keyLenIn); // Sanity check if (keyLength < 0) { return false; } //Read another compressed 'key' key.readFields(keyIn); --noBufferedKeys; } return true; } /** Read the next key/value pair in the file into <code>key</code> and * <code>val</code>. Returns true if such a pair exists and false when at * end of file */ public synchronized boolean next(Writable key, Writable val) throws IOException { if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); boolean more = next(key); if (more) { getCurrentValue(val); } return more; } private synchronized int checkAndReadSync(int length) throws IOException { if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process a sync entry //LOG.info("sync@"+in.getPos()); in.readFully(syncCheck); // read syncCheck if (!Arrays.equals(sync, syncCheck)) // check it throw new IOException("File is corrupt!"); syncSeen = true; length = in.readInt(); // re-read length } else { syncSeen = false; } return length; } /** Read the next key/value pair in the file into <code>buffer</code>. * Returns the length of the key read, or -1 if at end of file. The length * of the value may be computed by calling buffer.getLength() before and * after calls to this method. */ /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ public synchronized int next(DataOutputBuffer buffer) throws IOException { // Unsupported for block-compressed sequence files if (blockCompressed) { throw new IOException("Unsupported call for block-compressed" + " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); } if (in.getPos() >= end) return -1; try { int length = checkAndReadSync(in.readInt()); int keyLength = in.readInt(); buffer.write(in, length); return keyLength; } catch (ChecksumException e) { // checksum failure handleChecksumException(e); return next(buffer); } } public ValueBytes createValueBytes() { ValueBytes val = null; if (!decompress || blockCompressed) { val = new UncompressedBytes(); } else { val = new CompressedBytes(codec); } return val; } /** * Read 'raw' records. * @param key - The buffer into which the key is read * @param val - The 'raw' value * @return Returns the total record length * @throws IOException */ public int nextRaw(DataOutputBuffer key, ValueBytes val) throws IOException { if (!blockCompressed) { if (in.getPos() >= end) return -1; int length = checkAndReadSync(in.readInt()); int keyLength = in.readInt(); int valLength = length - keyLength; key.write(in, keyLength); if (decompress) { CompressedBytes value = (CompressedBytes)val; value.reset(in, valLength); } else { UncompressedBytes value = (UncompressedBytes)val; value.reset(in, valLength); } return length; } else { //Reset syncSeen syncSeen = false; // Read 'key' if (noBufferedKeys == 0) { if (in.getPos() >= end) return -1; try { readBlock(); } catch (EOFException eof) { return -1; } } int keyLength = WritableUtils.readVInt(keyLenIn); if (keyLength < 0) { throw new IOException("zero length key found!"); } key.write(keyIn, keyLength); --noBufferedKeys; // Read raw 'value' seekToCurrentValue(); int valLength = WritableUtils.readVInt(valLenIn); UncompressedBytes rawValue = (UncompressedBytes)val; rawValue.reset(valIn, valLength); --noBufferedValues; return (keyLength+valLength); } } private void handleChecksumException(ChecksumException e) throws IOException { if (this.conf.getBoolean("io.skip.checksum.errors", false)) { LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); } else { throw e; } } /** Set the current byte position in the input file. * * <p>The position passed must be a position returned by {@link * Writer#getLength()} when writing this file. To seek to an arbitrary * position, use {@link Reader#sync(long)}. */ public synchronized void seek(long position) throws IOException { in.seek(position); if (blockCompressed) { // trigger block read noBufferedKeys = 0; valuesDecompressed = true; } } /** Seek to the next sync mark past a given position.*/ public synchronized void sync(long position) throws IOException { if (position+SYNC_SIZE >= end) { seek(end); return; } try { seek(position+4); // skip escape in.readFully(syncCheck); int syncLen = sync.length; for (int i = 0; in.getPos() < end; i++) { int j = 0; for (; j < syncLen; j++) { if (sync[j] != syncCheck[(i+j)%syncLen]) break; } if (j == syncLen) { in.seek(in.getPos() - SYNC_SIZE); // position before sync return; } syncCheck[i%syncLen] = in.readByte(); } } catch (ChecksumException e) { // checksum failure handleChecksumException(e); } } /** Returns true iff the previous call to next passed a sync mark.*/ public boolean syncSeen() { return syncSeen; } /** Return the current byte position in the input file. */ public synchronized long getPosition() throws IOException { return in.getPos(); } /** Returns the name of the file. */ public String toString() { return file.toString(); } } /** Sorts key/value pairs in a sequence-format file. * * <p>For best performance, applications should make sure that the {@link * Writable#readFields(DataInput)} implementation of their keys is * very efficient. In particular, it should avoid allocating memory. */ public static class Sorter { private WritableComparator comparator; private Path[] inFiles; // when merging or sorting private Path outFile; private int memory; // bytes private int factor; // merged per pass private FileSystem fs = null; private Class keyClass; private Class valClass; private Configuration conf; /** Sort and merge files containing the named classes. */ public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf) { this(fs, new WritableComparator(keyClass), valClass, conf); } /** Sort and merge using an arbitrary {@link WritableComparator}. */ public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, Configuration conf) { this.fs = fs; this.comparator = comparator; this.keyClass = comparator.getKeyClass(); this.valClass = valClass; this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; this.factor = conf.getInt("io.sort.factor", 100); this.conf = conf; } /** Set the number of streams to merge at once.*/ public void setFactor(int factor) { this.factor = factor; } /** Get the number of streams to merge at once.*/ public int getFactor() { return factor; } /** Set the total amount of buffer memory, in bytes.*/ public void setMemory(int memory) { this.memory = memory; } /** Get the total amount of buffer memory, in bytes.*/ public int getMemory() { return memory; } /** * Perform a file sort from a set of input files into an output file. * @param inFiles the files to be sorted * @param outFile the sorted output file * @param deleteInput should the input files be deleted as they are read? */ public void sort(Path[] inFiles, Path outFile, boolean deleteInput) throws IOException { if (fs.exists(outFile)) { throw new IOException("already exists: " + outFile); } this.inFiles = inFiles; this.outFile = outFile; int segments = sortPass(deleteInput); int pass = 1; while (segments > 1) { segments = mergePass(pass, segments <= factor); pass++; } // Clean up intermediate files for (int i=0; i < pass; ++i) { fs.delete(new Path(outFile.toString() + "." + i)); fs.delete(new Path(outFile.toString() + "." + i + ".index")); } } /** * The backwards compatible interface to sort. * @param inFile the input file to sort * @param outFile the sorted output file */ public void sort(Path inFile, Path outFile) throws IOException { sort(new Path[]{inFile}, outFile, false); } private int sortPass(boolean deleteInput) throws IOException { LOG.debug("running sort pass"); SortPass sortPass = new SortPass(); // make the SortPass try { return sortPass.run(deleteInput); // run it } finally { sortPass.close(); // close it } } private class SortPass { private int memoryLimit = memory/4; private int recordLimit = 1000000; private DataOutputBuffer rawKeys = new DataOutputBuffer(); private byte[] rawBuffer; private int[] keyOffsets = new int[1024]; private int[] pointers = new int[keyOffsets.length]; private int[] pointersCopy = new int[keyOffsets.length]; private int[] keyLengths = new int[keyOffsets.length]; private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -