⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      private ArrayList segmentLengths = new ArrayList();            private Reader in = null;      private FSDataOutputStream out = null;      private FSDataOutputStream indexOut = null;      private Path outName;      public int run(boolean deleteInput) throws IOException {        int segments = 0;        int currentFile = 0;        boolean atEof = (currentFile >= inFiles.length);        boolean isCompressed = false;        boolean isBlockCompressed = false;        CompressionCodec codec = null;        segmentLengths.clear();        if (atEof) {          return 0;        }                // Initialize        in = new Reader(fs, inFiles[currentFile], conf);        isCompressed = in.isCompressed();        isBlockCompressed = in.isBlockCompressed();        codec = in.getCompressionCodec();                for (int i=0; i < rawValues.length; ++i) {          rawValues[i] = null;        }                while (!atEof) {          int count = 0;          int bytesProcessed = 0;          rawKeys.reset();          while (!atEof &&               bytesProcessed < memoryLimit && count < recordLimit) {            // Read a record into buffer            // Note: Attempt to re-use 'rawValue' as far as possible            int keyOffset = rawKeys.getLength();                   ValueBytes rawValue =               (count == keyOffsets.length || rawValues[count] == null) ?                   in.createValueBytes() :                   rawValues[count];            int recordLength = in.nextRaw(rawKeys, rawValue);            if (recordLength == -1) {              in.close();              if (deleteInput) {                fs.delete(inFiles[currentFile]);              }              currentFile += 1;              atEof = currentFile >= inFiles.length;              if (!atEof) {                in = new Reader(fs, inFiles[currentFile], conf);              } else {                in = null;              }              continue;            }            //int length = buffer.getLength() - start;            int keyLength = rawKeys.getLength() - keyOffset;            if (count == keyOffsets.length)              grow();            keyOffsets[count] = keyOffset;                // update pointers            pointers[count] = count;            keyLengths[count] = keyLength;            rawValues[count] = rawValue;            bytesProcessed += recordLength;             count++;          }          // buffer is full -- sort & flush it          LOG.debug("flushing segment " + segments);          rawBuffer = rawKeys.getData();          sort(count);          flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec,               segments==0 && atEof);          segments++;        }        return segments;      }      public void close() throws IOException {        if (in != null) {          in.close();        }        if (out != null) {          out.close();        }        if (indexOut != null) {          indexOut.close();        }      }      private void grow() {        int newLength = keyOffsets.length * 3 / 2;        keyOffsets = grow(keyOffsets, newLength);        pointers = grow(pointers, newLength);        pointersCopy = new int[newLength];        keyLengths = grow(keyLengths, newLength);        rawValues = grow(rawValues, newLength);      }      private int[] grow(int[] old, int newLength) {        int[] result = new int[newLength];        System.arraycopy(old, 0, result, 0, old.length);        return result;      }            private ValueBytes[] grow(ValueBytes[] old, int newLength) {        ValueBytes[] result = new ValueBytes[newLength];        System.arraycopy(old, 0, result, 0, old.length);        for (int i=old.length; i < newLength; ++i) {          result[i] = null;        }        return result;      }      private void flush(int count, int bytesProcessed, boolean isCompressed,           boolean isBlockCompressed, CompressionCodec codec, boolean done)       throws IOException {        if (out == null) {          outName = done ? outFile : outFile.suffix(".0");          out = fs.create(outName);          if (!done) {            indexOut = fs.create(outName.suffix(".index"));          }        }        long segmentStart = out.getPos();        Writer writer = createWriter(out, keyClass, valClass,             isCompressed, isBlockCompressed, codec);                if (!done) {          writer.sync = null;                     // disable sync on temp files        }        for (int i = 0; i < count; i++) {         // write in sorted order          int p = pointers[i];          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);        }        if (writer instanceof SequenceFile.BlockCompressWriter) {          SequenceFile.BlockCompressWriter bcWriter =             (SequenceFile.BlockCompressWriter) writer;          bcWriter.writeBlock();        }        writer.out.flush();                        if (!done) {          // Save the segment length          WritableUtils.writeVLong(indexOut, segmentStart);          WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart));          indexOut.flush();        }      }      private void sort(int count) {        System.arraycopy(pointers, 0, pointersCopy, 0, count);        mergeSort(pointersCopy, pointers, 0, count);      }      private int compare(int i, int j) {        return comparator.compare(rawBuffer, keyOffsets[i], keyLengths[i],                                  rawBuffer, keyOffsets[j], keyLengths[j]);      }      private void mergeSort(int src[], int dest[], int low, int high) {        int length = high - low;        // Insertion sort on smallest arrays        if (length < 7) {          for (int i=low; i<high; i++)            for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--)              swap(dest, j, j-1);          return;        }        // Recursively sort halves of dest into src        int mid = (low + high) >> 1;        mergeSort(dest, src, low, mid);        mergeSort(dest, src, mid, high);        // If list is already sorted, just copy from src to dest.  This is an        // optimization that results in faster sorts for nearly ordered lists.        if (compare(src[mid-1], src[mid]) <= 0) {          System.arraycopy(src, low, dest, low, length);          return;        }        // Merge sorted halves (now in src) into dest        for (int i = low, p = low, q = mid; i < high; i++) {          if (q>=high || p<mid && compare(src[p], src[q]) <= 0)            dest[i] = src[p++];          else            dest[i] = src[q++];        }      }      private void swap(int x[], int a, int b) {        int t = x[a];        x[a] = x[b];        x[b] = t;      }    } // SequenceFile.Sorter.SortPass    private int mergePass(int pass, boolean last) throws IOException {      LOG.debug("running merge pass=" + pass);      MergePass mergePass = new MergePass(pass, last);      try {                                       // make a merge pass        return mergePass.run();                  // run it      } finally {        mergePass.close();                       // close it      }    }    private class MergePass {      private boolean last;      private MergeQueue queue;      private FSDataInputStream in = null;      private Path inName;      private FSDataInputStream indexIn = null;      public MergePass(int pass, boolean last) throws IOException {        this.last = last;        this.queue =          new MergeQueue(factor, last?outFile:outFile.suffix("."+pass), last);        this.inName = outFile.suffix("."+(pass-1));        this.in = fs.open(inName);        this.indexIn = fs.open(inName.suffix(".index"));      }      public void close() throws IOException {        in.close();                               // close and delete input        fs.delete(inName);        queue.close();                            // close queue      }      public int run() throws IOException {        int segments = 0;        long end = fs.getLength(inName);        while (in.getPos() < end) {          LOG.debug("merging segment " + segments);          long segmentStart = queue.out.getPos();          while (in.getPos() < end && queue.size() < factor) {            long segmentOffset = WritableUtils.readVLong(indexIn);            long segmentLength = WritableUtils.readVLong(indexIn);            Reader reader = new Reader(fs, inName, memory/(factor+1),                                        segmentOffset, segmentLength, conf);            reader.sync = null;                   // disable sync on temp files            MergeStream ms = new MergeStream(reader); // add segment to queue            if (ms.next()) {              queue.put(ms);            }            in.seek(reader.end);          }          queue.merge();                          // do a merge          if (!last) {            WritableUtils.writeVLong(queue.indexOut, segmentStart);            WritableUtils.writeVLong(queue.indexOut,                 (queue.out.getPos() - segmentStart));          }                    segments++;        }        return segments;      }    } // SequenceFile.Sorter.MergePass    /** Merge the provided files.*/    public void merge(Path[] inFiles, Path outFile) throws IOException {      this.inFiles = inFiles;      this.outFile = outFile;      this.factor = inFiles.length;      if (fs.exists(outFile)) {        throw new IOException("already exists: " + outFile);      }      MergeFiles mergeFiles = new MergeFiles();      try {                                       // make a merge pass        mergeFiles.run();                         // run it      } finally {        mergeFiles.close();                       // close it      }    }    private class MergeFiles {      private MergeQueue queue;      public MergeFiles() throws IOException {        this.queue = new MergeQueue(factor, outFile, true);      }      public void close() throws IOException {        queue.close();      }      public void run() throws IOException {        LOG.debug("merging files=" + inFiles.length);        for (int i = 0; i < inFiles.length; i++) {          Path inFile = inFiles[i];          MergeStream ms =            new MergeStream(new Reader(fs, inFile, memory/(factor+1), conf));          if (ms.next())            queue.put(ms);        }        queue.merge();      }    } // SequenceFile.Sorter.MergeFiles    private class MergeStream {      private Reader in;      private DataOutputBuffer rawKey = null;      private ValueBytes rawValue = null;            public MergeStream(Reader reader) throws IOException {        if (reader.keyClass != keyClass)          throw new IOException("wrong key class: " + reader.getKeyClass() +                                " is not " + keyClass);        if (reader.valClass != valClass)          throw new IOException("wrong value class: "+reader.getValueClass()+                                " is not " + valClass);        this.in = reader;        rawKey = new DataOutputBuffer();        rawValue = in.createValueBytes();      }      public boolean next() throws IOException {        rawKey.reset();        int recordLength =           in.nextRaw(rawKey, rawValue);        return (recordLength >= 0);      }    } // SequenceFile.Sorter.MergeStream    private class MergeQueue extends PriorityQueue {      private Path outName;      private FSDataOutputStream out;      private FSDataOutputStream indexOut;      private boolean done;      private boolean compress;      private boolean blockCompress;      private CompressionCodec codec = null;      public void put(MergeStream stream) throws IOException {        if (size() == 0) {          compress = stream.in.isCompressed();          blockCompress = stream.in.isBlockCompressed();          codec = stream.in.getCompressionCodec();        } else if (compress != stream.in.isCompressed() ||             blockCompress != stream.in.isBlockCompressed()) {          throw new IOException("All merged files must be compressed or not.");        }         super.put(stream);      }      public MergeQueue(int size, Path outName, boolean done)        throws IOException {        initialize(size);        this.outName = outName;        this.out = fs.create(this.outName, true, memory/(factor+1));        if (!done) {          this.indexOut = fs.create(outName.suffix(".index"), true,               memory/(factor+1));        }        this.done = done;      }      protected boolean lessThan(Object a, Object b) {        MergeStream msa = (MergeStream)a;        MergeStream msb = (MergeStream)b;        return comparator.compare(msa.rawKey.ge

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -