⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。在 Hadoop中实现了Google的MapReduce算法
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  }  /** Sorts key/value pairs in a sequence-format file.   *   * <p>For best performance, applications should make sure that the {@link   * Writable#readFields(DataInput)} implementation of their keys is   * very efficient.  In particular, it should avoid allocating memory.   */  public static class Sorter {    private WritableComparator comparator;    private String inFile;                        // when sorting    private String[] inFiles;                     // when merging    private String outFile;    private int memory; // bytes    private int factor; // merged per pass    private FileSystem fs = null;    private Class keyClass;    private Class valClass;    private Configuration conf;    /** Sort and merge files containing the named classes. */    public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {      this(fs, new WritableComparator(keyClass), valClass, conf);    }    /** Sort and merge using an arbitrary {@link WritableComparator}. */    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, Configuration conf) {      this.fs = fs;      this.comparator = comparator;      this.keyClass = comparator.getKeyClass();      this.valClass = valClass;      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;      this.factor = conf.getInt("io.sort.factor", 100);      this.conf = conf;    }    /** Set the number of streams to merge at once.*/    public void setFactor(int factor) { this.factor = factor; }    /** Get the number of streams to merge at once.*/    public int getFactor() { return factor; }    /** Set the total amount of buffer memory, in bytes.*/    public void setMemory(int memory) { this.memory = memory; }    /** Get the total amount of buffer memory, in bytes.*/    public int getMemory() { return memory; }    /** Perform a file sort.*/    public void sort(String inFile, String outFile) throws IOException {      if (fs.exists(new File(outFile))) {        throw new IOException("already exists: " + outFile);      }      this.inFile = inFile;      this.outFile = outFile;      int segments = sortPass();      int pass = 1;      while (segments > 1) {        segments = mergePass(pass, segments <= factor);        pass++;      }    }    private int sortPass() throws IOException {      LOG.fine("running sort pass");      SortPass sortPass = new SortPass(this.conf);         // make the SortPass      try {        return sortPass.run();                    // run it      } finally {        sortPass.close();                         // close it      }    }    private class SortPass {      private int limit = memory/4;      private DataOutputBuffer buffer = new DataOutputBuffer();      private byte[] rawBuffer;      private int[] starts = new int[1024];      private int[] pointers = new int[starts.length];      private int[] pointersCopy = new int[starts.length];      private int[] keyLengths = new int[starts.length];      private int[] lengths = new int[starts.length];            private Reader in;      private FSDataOutputStream out;        private String outName;      public SortPass(Configuration conf) throws IOException {        in = new Reader(fs, inFile, conf);      }            public int run() throws IOException {        int segments = 0;        boolean atEof = false;        while (!atEof) {          int count = 0;          buffer.reset();          while (!atEof && buffer.getLength() < limit) {            int start = buffer.getLength();       // read an entry into buffer            int keyLength = in.next(buffer);            int length = buffer.getLength() - start;            if (keyLength == -1) {              atEof = true;              break;            }            if (count == starts.length)              grow();            starts[count] = start;                // update pointers            pointers[count] = count;            lengths[count] = length;            keyLengths[count] = keyLength;            count++;          }          // buffer is full -- sort & flush it          LOG.finer("flushing segment " + segments);          rawBuffer = buffer.getData();          sort(count);          flush(count, segments==0 && atEof);          segments++;        }        return segments;      }      public void close() throws IOException {        in.close();        if (out != null) {          out.close();        }      }      private void grow() {        int newLength = starts.length * 3 / 2;        starts = grow(starts, newLength);        pointers = grow(pointers, newLength);        pointersCopy = new int[newLength];        keyLengths = grow(keyLengths, newLength);        lengths = grow(lengths, newLength);      }      private int[] grow(int[] old, int newLength) {        int[] result = new int[newLength];        System.arraycopy(old, 0, result, 0, old.length);        return result;      }      private void flush(int count, boolean done) throws IOException {        if (out == null) {          outName = done ? outFile : outFile+".0";          out = fs.create(new File(outName));        }        if (!done) {                              // an intermediate file          long length = buffer.getLength();       // compute its size          length += count*8;                      // allow for length/keyLength          out.writeLong(length);                  // write size          out.writeLong(count);                   // write count        }        Writer writer = new Writer(out, keyClass, valClass, in.isCompressed());        if (!done) {          writer.sync = null;                     // disable sync on temp files        }        for (int i = 0; i < count; i++) {         // write in sorted order          int p = pointers[i];          writer.append(rawBuffer, starts[p], lengths[p], keyLengths[p]);        }      }      private void sort(int count) {        System.arraycopy(pointers, 0, pointersCopy, 0, count);        mergeSort(pointersCopy, pointers, 0, count);      }      private int compare(int i, int j) {        return comparator.compare(rawBuffer, starts[i], keyLengths[i],                                  rawBuffer, starts[j], keyLengths[j]);      }      private void mergeSort(int src[], int dest[], int low, int high) {        int length = high - low;        // Insertion sort on smallest arrays        if (length < 7) {          for (int i=low; i<high; i++)            for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--)              swap(dest, j, j-1);          return;        }        // Recursively sort halves of dest into src        int mid = (low + high) >> 1;        mergeSort(dest, src, low, mid);        mergeSort(dest, src, mid, high);        // If list is already sorted, just copy from src to dest.  This is an        // optimization that results in faster sorts for nearly ordered lists.        if (compare(src[mid-1], src[mid]) <= 0) {          System.arraycopy(src, low, dest, low, length);          return;        }        // Merge sorted halves (now in src) into dest        for(int i = low, p = low, q = mid; i < high; i++) {          if (q>=high || p<mid && compare(src[p], src[q]) <= 0)            dest[i] = src[p++];          else            dest[i] = src[q++];        }      }      private void swap(int x[], int a, int b) {	int t = x[a];	x[a] = x[b];	x[b] = t;      }    }    private int mergePass(int pass, boolean last) throws IOException {      LOG.fine("running merge pass=" + pass);      MergePass mergePass = new MergePass(pass, last);      try {                                       // make a merge pass        return mergePass.run();                  // run it      } finally {        mergePass.close();                       // close it      }    }    private class MergePass {      private int pass;      private boolean last;      private MergeQueue queue;      private FSDataInputStream in;      private String inName;      public MergePass(int pass, boolean last) throws IOException {        this.pass = pass;        this.last = last;        this.queue =          new MergeQueue(factor, last ? outFile : outFile+"."+pass, last);        this.inName = outFile+"."+(pass-1);        this.in = fs.open(new File(inName));      }      public void close() throws IOException {        in.close();                               // close and delete input        fs.delete(new File(inName));        queue.close();                            // close queue      }      public int run() throws IOException {        int segments = 0;        long end = fs.getLength(new File(inName));        while (in.getPos() < end) {          LOG.finer("merging segment " + segments);          long totalLength = 0;          long totalCount = 0;          while (in.getPos() < end && queue.size() < factor) {            long length = in.readLong();            long count = in.readLong();            totalLength += length;            totalCount+= count;            Reader reader = new Reader(fs, inName, memory/(factor+1),                                       in.getPos(), length);            reader.sync = null;                   // disable sync on temp files            MergeStream ms = new MergeStream(reader); // add segment to queue            if (ms.next()) {              queue.add(ms);            }            in.seek(reader.end);          }          if (!last) {                             // intermediate file            queue.out.writeLong(totalLength);     // write size            queue.out.writeLong(totalCount);      // write count          }          queue.merge();                          // do a merge          segments++;        }        return segments;      }    }    /** Merge the provided files.*/    public void merge(String[] inFiles, String outFile) throws IOException {      this.inFiles = inFiles;      this.outFile = outFile;      this.factor = inFiles.length;      if (new File(outFile).exists()) {        throw new IOException("already exists: " + outFile);      }      MergeFiles mergeFiles = new MergeFiles();      try {                                       // make a merge pass        mergeFiles.run();                         // run it      } finally {        mergeFiles.close();                       // close it      }    }    private class MergeFiles {      private MergeQueue queue;      public MergeFiles() throws IOException {        this.queue = new MergeQueue(factor, outFile, true);      }      public void close() throws IOException {        queue.close();      }      public void run() throws IOException {        LOG.finer("merging files=" + inFiles.length);        for (int i = 0; i < inFiles.length; i++) {          String inFile = inFiles[i];          MergeStream ms =            new MergeStream(new Reader(fs, inFile, memory/(factor+1)));          if (ms.next())            queue.put(ms);        }        queue.merge();      }    }    private class MergeStream {      private Reader in;      private DataOutputBuffer buffer = new DataOutputBuffer();      private int keyLength;            public MergeStream(Reader reader) throws IOException {        if (reader.keyClass != keyClass)          throw new IOException("wrong key class: " + reader.getKeyClass() +                                " is not " + keyClass);        if (reader.valClass != valClass)          throw new IOException("wrong value class: "+reader.getValueClass()+                                " is not " + valClass);        this.in = reader;      }      public boolean next() throws IOException {        buffer.reset();        keyLength = in.next(buffer);        return keyLength >= 0;      }    }    private class MergeQueue extends PriorityQueue {      private FSDataOutputStream out;      private boolean done;      private boolean compress;      public void add(MergeStream stream) throws IOException {        if (size() == 0) {          compress = stream.in.isCompressed();        } else if (compress != stream.in.isCompressed()) {          throw new IOException("All merged files must be compressed or not.");        }        put(stream);      }      public MergeQueue(int size, String outName, boolean done)        throws IOException {        initialize(size);        this.out = fs.create(new File(outName), true, memory/(factor+1));        this.done = done;      }      protected boolean lessThan(Object a, Object b) {        MergeStream msa = (MergeStream)a;        MergeStream msb = (MergeStream)b;        return comparator.compare(msa.buffer.getData(), 0, msa.keyLength,                                  msb.buffer.getData(), 0, msb.keyLength) < 0;      }      public void merge() throws IOException {        Writer writer = new Writer(out, keyClass, valClass, compress);        if (!done) {          writer.sync = null;                     // disable sync on temp files        }        while (size() != 0) {          MergeStream ms = (MergeStream)top();          DataOutputBuffer buffer = ms.buffer;    // write top entry          writer.append(buffer.getData(), 0, buffer.getLength(), ms.keyLength);                    if (ms.next()) {                        // has another entry            adjustTop();          } else {            pop();                                // done with this file            ms.in.close();          }        }      }      public void close() throws IOException {        MergeStream ms;                           // close inputs        while ((ms = (MergeStream)pop()) != null) {          ms.in.close();        }        out.close();                              // close output      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -