📄 sequencefile.java
字号:
private ArrayList segmentLengths = new ArrayList(); private Reader in = null; private FSDataOutputStream out = null; private FSDataOutputStream indexOut = null; private Path outName; public int run(boolean deleteInput) throws IOException { int segments = 0; int currentFile = 0; boolean atEof = (currentFile >= inFiles.length); boolean isCompressed = false; boolean isBlockCompressed = false; CompressionCodec codec = null; segmentLengths.clear(); if (atEof) { return 0; } // Initialize in = new Reader(fs, inFiles[currentFile], conf); isCompressed = in.isCompressed(); isBlockCompressed = in.isBlockCompressed(); codec = in.getCompressionCodec(); for (int i=0; i < rawValues.length; ++i) { rawValues[i] = null; } while (!atEof) { int count = 0; int bytesProcessed = 0; rawKeys.reset(); while (!atEof && bytesProcessed < memoryLimit && count < recordLimit) { // Read a record into buffer // Note: Attempt to re-use 'rawValue' as far as possible int keyOffset = rawKeys.getLength(); ValueBytes rawValue = (count == keyOffsets.length || rawValues[count] == null) ? in.createValueBytes() : rawValues[count]; int recordLength = in.nextRaw(rawKeys, rawValue); if (recordLength == -1) { in.close(); if (deleteInput) { fs.delete(inFiles[currentFile]); } currentFile += 1; atEof = currentFile >= inFiles.length; if (!atEof) { in = new Reader(fs, inFiles[currentFile], conf); } else { in = null; } continue; } //int length = buffer.getLength() - start; int keyLength = rawKeys.getLength() - keyOffset; if (count == keyOffsets.length) grow(); keyOffsets[count] = keyOffset; // update pointers pointers[count] = count; keyLengths[count] = keyLength; rawValues[count] = rawValue; bytesProcessed += recordLength; count++; } // buffer is full -- sort & flush it LOG.debug("flushing segment " + segments); rawBuffer = rawKeys.getData(); sort(count); flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, segments==0 && atEof); segments++; } return segments; } public void close() throws IOException { if (in != null) { in.close(); } if (out != null) { out.close(); } if (indexOut != null) { indexOut.close(); } } private void grow() { int newLength = keyOffsets.length * 3 / 2; keyOffsets = grow(keyOffsets, newLength); pointers = grow(pointers, newLength); pointersCopy = new int[newLength]; keyLengths = grow(keyLengths, newLength); rawValues = grow(rawValues, newLength); } private int[] grow(int[] old, int newLength) { int[] result = new int[newLength]; System.arraycopy(old, 0, result, 0, old.length); return result; } private ValueBytes[] grow(ValueBytes[] old, int newLength) { ValueBytes[] result = new ValueBytes[newLength]; System.arraycopy(old, 0, result, 0, old.length); for (int i=old.length; i < newLength; ++i) { result[i] = null; } return result; } private void flush(int count, int bytesProcessed, boolean isCompressed, boolean isBlockCompressed, CompressionCodec codec, boolean done) throws IOException { if (out == null) { outName = done ? outFile : outFile.suffix(".0"); out = fs.create(outName); if (!done) { indexOut = fs.create(outName.suffix(".index")); } } long segmentStart = out.getPos(); Writer writer = createWriter(out, keyClass, valClass, isCompressed, isBlockCompressed, codec); if (!done) { writer.sync = null; // disable sync on temp files } for (int i = 0; i < count; i++) { // write in sorted order int p = pointers[i]; writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); } if (writer instanceof SequenceFile.BlockCompressWriter) { SequenceFile.BlockCompressWriter bcWriter = (SequenceFile.BlockCompressWriter) writer; bcWriter.writeBlock(); } writer.out.flush(); if (!done) { // Save the segment length WritableUtils.writeVLong(indexOut, segmentStart); WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart)); indexOut.flush(); } } private void sort(int count) { System.arraycopy(pointers, 0, pointersCopy, 0, count); mergeSort(pointersCopy, pointers, 0, count); } private int compare(int i, int j) { return comparator.compare(rawBuffer, keyOffsets[i], keyLengths[i], rawBuffer, keyOffsets[j], keyLengths[j]); } private void mergeSort(int src[], int dest[], int low, int high) { int length = high - low; // Insertion sort on smallest arrays if (length < 7) { for (int i=low; i<high; i++) for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--) swap(dest, j, j-1); return; } // Recursively sort halves of dest into src int mid = (low + high) >> 1; mergeSort(dest, src, low, mid); mergeSort(dest, src, mid, high); // If list is already sorted, just copy from src to dest. This is an // optimization that results in faster sorts for nearly ordered lists. if (compare(src[mid-1], src[mid]) <= 0) { System.arraycopy(src, low, dest, low, length); return; } // Merge sorted halves (now in src) into dest for (int i = low, p = low, q = mid; i < high; i++) { if (q>=high || p<mid && compare(src[p], src[q]) <= 0) dest[i] = src[p++]; else dest[i] = src[q++]; } } private void swap(int x[], int a, int b) { int t = x[a]; x[a] = x[b]; x[b] = t; } } // SequenceFile.Sorter.SortPass private int mergePass(int pass, boolean last) throws IOException { LOG.debug("running merge pass=" + pass); MergePass mergePass = new MergePass(pass, last); try { // make a merge pass return mergePass.run(); // run it } finally { mergePass.close(); // close it } } private class MergePass { private boolean last; private MergeQueue queue; private FSDataInputStream in = null; private Path inName; private FSDataInputStream indexIn = null; public MergePass(int pass, boolean last) throws IOException { this.last = last; this.queue = new MergeQueue(factor, last?outFile:outFile.suffix("."+pass), last); this.inName = outFile.suffix("."+(pass-1)); this.in = fs.open(inName); this.indexIn = fs.open(inName.suffix(".index")); } public void close() throws IOException { in.close(); // close and delete input fs.delete(inName); queue.close(); // close queue } public int run() throws IOException { int segments = 0; long end = fs.getLength(inName); while (in.getPos() < end) { LOG.debug("merging segment " + segments); long segmentStart = queue.out.getPos(); while (in.getPos() < end && queue.size() < factor) { long segmentOffset = WritableUtils.readVLong(indexIn); long segmentLength = WritableUtils.readVLong(indexIn); Reader reader = new Reader(fs, inName, memory/(factor+1), segmentOffset, segmentLength, conf); reader.sync = null; // disable sync on temp files MergeStream ms = new MergeStream(reader); // add segment to queue if (ms.next()) { queue.put(ms); } in.seek(reader.end); } queue.merge(); // do a merge if (!last) { WritableUtils.writeVLong(queue.indexOut, segmentStart); WritableUtils.writeVLong(queue.indexOut, (queue.out.getPos() - segmentStart)); } segments++; } return segments; } } // SequenceFile.Sorter.MergePass /** Merge the provided files.*/ public void merge(Path[] inFiles, Path outFile) throws IOException { this.inFiles = inFiles; this.outFile = outFile; this.factor = inFiles.length; if (fs.exists(outFile)) { throw new IOException("already exists: " + outFile); } MergeFiles mergeFiles = new MergeFiles(); try { // make a merge pass mergeFiles.run(); // run it } finally { mergeFiles.close(); // close it } } private class MergeFiles { private MergeQueue queue; public MergeFiles() throws IOException { this.queue = new MergeQueue(factor, outFile, true); } public void close() throws IOException { queue.close(); } public void run() throws IOException { LOG.debug("merging files=" + inFiles.length); for (int i = 0; i < inFiles.length; i++) { Path inFile = inFiles[i]; MergeStream ms = new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf)); if (ms.next()) queue.put(ms); } queue.merge(); } } // SequenceFile.Sorter.MergeFiles private class MergeStream { private Reader in; private DataOutputBuffer rawKey = null; private ValueBytes rawValue = null; public MergeStream(Reader reader) throws IOException { if (reader.keyClass != keyClass) throw new IOException("wrong key class: " + reader.getKeyClass() + " is not " + keyClass); if (reader.valClass != valClass) throw new IOException("wrong value class: "+reader.getValueClass()+ " is not " + valClass); this.in = reader; rawKey = new DataOutputBuffer(); rawValue = in.createValueBytes(); } public boolean next() throws IOException { rawKey.reset(); int recordLength = in.nextRaw(rawKey, rawValue); return (recordLength >= 0); } } // SequenceFile.Sorter.MergeStream private class MergeQueue extends PriorityQueue { private Path outName; private FSDataOutputStream out; private FSDataOutputStream indexOut; private boolean done; private boolean compress; private boolean blockCompress; private CompressionCodec codec = null; public void put(MergeStream stream) throws IOException { if (size() == 0) { compress = stream.in.isCompressed(); blockCompress = stream.in.isBlockCompressed(); codec = stream.in.getCompressionCodec(); } else if (compress != stream.in.isCompressed() || blockCompress != stream.in.isBlockCompressed()) { throw new IOException("All merged files must be compressed or not."); } super.put(stream); } public MergeQueue(int size, Path outName, boolean done) throws IOException { initialize(size); this.outName = outName; this.out = fs.create(this.outName, true, memory/(factor+1)); if (!done) { this.indexOut = fs.create(outName.suffix(".index"), true, memory/(factor+1)); } this.done = done; } protected boolean lessThan(Object a, Object b) { MergeStream msa = (MergeStream)a; MergeStream msb = (MergeStream)b; return comparator.compare(msa.rawKey.ge
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -