📄 sequencefile.java
字号:
} /** Sorts key/value pairs in a sequence-format file. * * <p>For best performance, applications should make sure that the {@link * Writable#readFields(DataInput)} implementation of their keys is * very efficient. In particular, it should avoid allocating memory. */ public static class Sorter { private WritableComparator comparator; private String inFile; // when sorting private String[] inFiles; // when merging private String outFile; private int memory; // bytes private int factor; // merged per pass private FileSystem fs = null; private Class keyClass; private Class valClass; private Configuration conf; /** Sort and merge files containing the named classes. */ public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf) { this(fs, new WritableComparator(keyClass), valClass, conf); } /** Sort and merge using an arbitrary {@link WritableComparator}. */ public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, Configuration conf) { this.fs = fs; this.comparator = comparator; this.keyClass = comparator.getKeyClass(); this.valClass = valClass; this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; this.factor = conf.getInt("io.sort.factor", 100); this.conf = conf; } /** Set the number of streams to merge at once.*/ public void setFactor(int factor) { this.factor = factor; } /** Get the number of streams to merge at once.*/ public int getFactor() { return factor; } /** Set the total amount of buffer memory, in bytes.*/ public void setMemory(int memory) { this.memory = memory; } /** Get the total amount of buffer memory, in bytes.*/ public int getMemory() { return memory; } /** Perform a file sort.*/ public void sort(String inFile, String outFile) throws IOException { if (fs.exists(new File(outFile))) { throw new IOException("already exists: " + outFile); } this.inFile = inFile; this.outFile = outFile; int segments = sortPass(); int pass = 1; while (segments > 1) { segments = mergePass(pass, segments <= factor); pass++; } } private int sortPass() throws IOException { LOG.fine("running sort pass"); SortPass sortPass = new SortPass(this.conf); // make the SortPass try { return sortPass.run(); // run it } finally { sortPass.close(); // close it } } private class SortPass { private int limit = memory/4; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] rawBuffer; private int[] starts = new int[1024]; private int[] pointers = new int[starts.length]; private int[] pointersCopy = new int[starts.length]; private int[] keyLengths = new int[starts.length]; private int[] lengths = new int[starts.length]; private Reader in; private FSDataOutputStream out; private String outName; public SortPass(Configuration conf) throws IOException { in = new Reader(fs, inFile, conf); } public int run() throws IOException { int segments = 0; boolean atEof = false; while (!atEof) { int count = 0; buffer.reset(); while (!atEof && buffer.getLength() < limit) { int start = buffer.getLength(); // read an entry into buffer int keyLength = in.next(buffer); int length = buffer.getLength() - start; if (keyLength == -1) { atEof = true; break; } if (count == starts.length) grow(); starts[count] = start; // update pointers pointers[count] = count; lengths[count] = length; keyLengths[count] = keyLength; count++; } // buffer is full -- sort & flush it LOG.finer("flushing segment " + segments); rawBuffer = buffer.getData(); sort(count); flush(count, segments==0 && atEof); segments++; } return segments; } public void close() throws IOException { in.close(); if (out != null) { out.close(); } } private void grow() { int newLength = starts.length * 3 / 2; starts = grow(starts, newLength); pointers = grow(pointers, newLength); pointersCopy = new int[newLength]; keyLengths = grow(keyLengths, newLength); lengths = grow(lengths, newLength); } private int[] grow(int[] old, int newLength) { int[] result = new int[newLength]; System.arraycopy(old, 0, result, 0, old.length); return result; } private void flush(int count, boolean done) throws IOException { if (out == null) { outName = done ? outFile : outFile+".0"; out = fs.create(new File(outName)); } if (!done) { // an intermediate file long length = buffer.getLength(); // compute its size length += count*8; // allow for length/keyLength out.writeLong(length); // write size out.writeLong(count); // write count } Writer writer = new Writer(out, keyClass, valClass, in.isCompressed()); if (!done) { writer.sync = null; // disable sync on temp files } for (int i = 0; i < count; i++) { // write in sorted order int p = pointers[i]; writer.append(rawBuffer, starts[p], lengths[p], keyLengths[p]); } } private void sort(int count) { System.arraycopy(pointers, 0, pointersCopy, 0, count); mergeSort(pointersCopy, pointers, 0, count); } private int compare(int i, int j) { return comparator.compare(rawBuffer, starts[i], keyLengths[i], rawBuffer, starts[j], keyLengths[j]); } private void mergeSort(int src[], int dest[], int low, int high) { int length = high - low; // Insertion sort on smallest arrays if (length < 7) { for (int i=low; i<high; i++) for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--) swap(dest, j, j-1); return; } // Recursively sort halves of dest into src int mid = (low + high) >> 1; mergeSort(dest, src, low, mid); mergeSort(dest, src, mid, high); // If list is already sorted, just copy from src to dest. This is an // optimization that results in faster sorts for nearly ordered lists. if (compare(src[mid-1], src[mid]) <= 0) { System.arraycopy(src, low, dest, low, length); return; } // Merge sorted halves (now in src) into dest for(int i = low, p = low, q = mid; i < high; i++) { if (q>=high || p<mid && compare(src[p], src[q]) <= 0) dest[i] = src[p++]; else dest[i] = src[q++]; } } private void swap(int x[], int a, int b) { int t = x[a]; x[a] = x[b]; x[b] = t; } } private int mergePass(int pass, boolean last) throws IOException { LOG.fine("running merge pass=" + pass); MergePass mergePass = new MergePass(pass, last); try { // make a merge pass return mergePass.run(); // run it } finally { mergePass.close(); // close it } } private class MergePass { private int pass; private boolean last; private MergeQueue queue; private FSDataInputStream in; private String inName; public MergePass(int pass, boolean last) throws IOException { this.pass = pass; this.last = last; this.queue = new MergeQueue(factor, last ? outFile : outFile+"."+pass, last); this.inName = outFile+"."+(pass-1); this.in = fs.open(new File(inName)); } public void close() throws IOException { in.close(); // close and delete input fs.delete(new File(inName)); queue.close(); // close queue } public int run() throws IOException { int segments = 0; long end = fs.getLength(new File(inName)); while (in.getPos() < end) { LOG.finer("merging segment " + segments); long totalLength = 0; long totalCount = 0; while (in.getPos() < end && queue.size() < factor) { long length = in.readLong(); long count = in.readLong(); totalLength += length; totalCount+= count; Reader reader = new Reader(fs, inName, memory/(factor+1), in.getPos(), length); reader.sync = null; // disable sync on temp files MergeStream ms = new MergeStream(reader); // add segment to queue if (ms.next()) { queue.add(ms); } in.seek(reader.end); } if (!last) { // intermediate file queue.out.writeLong(totalLength); // write size queue.out.writeLong(totalCount); // write count } queue.merge(); // do a merge segments++; } return segments; } } /** Merge the provided files.*/ public void merge(String[] inFiles, String outFile) throws IOException { this.inFiles = inFiles; this.outFile = outFile; this.factor = inFiles.length; if (new File(outFile).exists()) { throw new IOException("already exists: " + outFile); } MergeFiles mergeFiles = new MergeFiles(); try { // make a merge pass mergeFiles.run(); // run it } finally { mergeFiles.close(); // close it } } private class MergeFiles { private MergeQueue queue; public MergeFiles() throws IOException { this.queue = new MergeQueue(factor, outFile, true); } public void close() throws IOException { queue.close(); } public void run() throws IOException { LOG.finer("merging files=" + inFiles.length); for (int i = 0; i < inFiles.length; i++) { String inFile = inFiles[i]; MergeStream ms = new MergeStream(new Reader(fs, inFile, memory/(factor+1))); if (ms.next()) queue.put(ms); } queue.merge(); } } private class MergeStream { private Reader in; private DataOutputBuffer buffer = new DataOutputBuffer(); private int keyLength; public MergeStream(Reader reader) throws IOException { if (reader.keyClass != keyClass) throw new IOException("wrong key class: " + reader.getKeyClass() + " is not " + keyClass); if (reader.valClass != valClass) throw new IOException("wrong value class: "+reader.getValueClass()+ " is not " + valClass); this.in = reader; } public boolean next() throws IOException { buffer.reset(); keyLength = in.next(buffer); return keyLength >= 0; } } private class MergeQueue extends PriorityQueue { private FSDataOutputStream out; private boolean done; private boolean compress; public void add(MergeStream stream) throws IOException { if (size() == 0) { compress = stream.in.isCompressed(); } else if (compress != stream.in.isCompressed()) { throw new IOException("All merged files must be compressed or not."); } put(stream); } public MergeQueue(int size, String outName, boolean done) throws IOException { initialize(size); this.out = fs.create(new File(outName), true, memory/(factor+1)); this.done = done; } protected boolean lessThan(Object a, Object b) { MergeStream msa = (MergeStream)a; MergeStream msb = (MergeStream)b; return comparator.compare(msa.buffer.getData(), 0, msa.keyLength, msb.buffer.getData(), 0, msb.keyLength) < 0; } public void merge() throws IOException { Writer writer = new Writer(out, keyClass, valClass, compress); if (!done) { writer.sync = null; // disable sync on temp files } while (size() != 0) { MergeStream ms = (MergeStream)top(); DataOutputBuffer buffer = ms.buffer; // write top entry writer.append(buffer.getData(), 0, buffer.getLength(), ms.keyLength); if (ms.next()) { // has another entry adjustTop(); } else { pop(); // done with this file ms.in.close(); } } } public void close() throws IOException { MergeStream ms; // close inputs while ((ms = (MergeStream)pop()) != null) { ms.in.close(); } out.close(); // close output } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -