⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testsequencefile.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.io;import java.io.*;import java.util.*;import junit.framework.TestCase;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.conf.*;/** Support for flat files of binary key/value pairs. */public class TestSequenceFile extends TestCase {  private static Log LOG = SequenceFile.LOG;  private static Configuration conf = new Configuration();    public TestSequenceFile(String name) { super(name); }  /** Unit tests for SequenceFile. */  public void testSequenceFile() throws Exception {    int count = 1024 * 10;    int megabytes = 1;    int factor = 5;    Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq");    Path recordCompressedFile =       new Path(System.getProperty("test.build.data",".")+"/test.rc.seq");    Path blockCompressedFile =       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");     int seed = new Random().nextInt();    LOG.info("Seed = " + seed);    FileSystem fs = new LocalFileSystem(conf);    try {        //LOG.setLevel(Level.FINE);        // SequenceFile.Writer        writeTest(fs, count, seed, file, CompressionType.NONE);        readTest(fs, count, seed, file);        sortTest(fs, count, megabytes, factor, false, file);        checkSort(fs, count, seed, file);        sortTest(fs, count, megabytes, factor, true, file);        checkSort(fs, count, seed, file);        mergeTest(fs, count, seed, file, CompressionType.NONE, false,             factor, megabytes);        checkSort(fs, count, seed, file);        mergeTest(fs, count, seed, file, CompressionType.NONE, true,             factor, megabytes);        checkSort(fs, count, seed, file);                // SequenceFile.RecordCompressWriter        writeTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD);        readTest(fs, count, seed, recordCompressedFile);        sortTest(fs, count, megabytes, factor, false, recordCompressedFile);        checkSort(fs, count, seed, recordCompressedFile);        sortTest(fs, count, megabytes, factor, true, recordCompressedFile);        checkSort(fs, count, seed, recordCompressedFile);        mergeTest(fs, count, seed, recordCompressedFile,             CompressionType.RECORD, false, factor, megabytes);        checkSort(fs, count, seed, recordCompressedFile);        mergeTest(fs, count, seed, recordCompressedFile,             CompressionType.RECORD, true, factor, megabytes);        checkSort(fs, count, seed, recordCompressedFile);                // SequenceFile.BlockCompressWriter        writeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK);        readTest(fs, count, seed, blockCompressedFile);        sortTest(fs, count, megabytes, factor, false, blockCompressedFile);        checkSort(fs, count, seed, blockCompressedFile);        sortTest(fs, count, megabytes, factor, true, blockCompressedFile);        checkSort(fs, count, seed, blockCompressedFile);        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,             false, factor, megabytes);        checkSort(fs, count, seed, blockCompressedFile);        mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,             true, factor, megabytes);        checkSort(fs, count, seed, blockCompressedFile);        } finally {        fs.close();    }  }  private static void writeTest(FileSystem fs, int count, int seed, Path file,       CompressionType compressionType)    throws IOException {    fs.delete(file);    LOG.info("creating " + count + " records with " + compressionType +              " compression");    SequenceFile.Writer writer =       SequenceFile.createWriter(fs, conf, file,           RandomDatum.class, RandomDatum.class, compressionType);    RandomDatum.Generator generator = new RandomDatum.Generator(seed);    for (int i = 0; i < count; i++) {      generator.next();      RandomDatum key = generator.getKey();      RandomDatum value = generator.getValue();      writer.append(key, value);    }    writer.close();  }  private static void readTest(FileSystem fs, int count, int seed, Path file)    throws IOException {    LOG.debug("reading " + count + " records");    SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);    RandomDatum.Generator generator = new RandomDatum.Generator(seed);    RandomDatum k = new RandomDatum();    RandomDatum v = new RandomDatum();    DataOutputBuffer rawKey = new DataOutputBuffer();    SequenceFile.ValueBytes rawValue = reader.createValueBytes();        for (int i = 0; i < count; i++) {      generator.next();      RandomDatum key = generator.getKey();      RandomDatum value = generator.getValue();      try {        if ((i%5) == 10) {          // Testing 'raw' apis          rawKey.reset();          reader.nextRaw(rawKey, rawValue);        } else {          // Testing 'non-raw' apis           if ((i%2) == 0) {            reader.next(k);            reader.getCurrentValue(v);          } else {            reader.next(k, v);          }          // Sanity check          if (!k.equals(key))            throw new RuntimeException("wrong key at " + i);          if (!v.equals(value))            throw new RuntimeException("wrong value at " + i);        }      } catch (IOException ioe) {        LOG.info("Problem on row " + i);        LOG.info("Expected value = " + value);        LOG.info("Expected len = " + value.getLength());        LOG.info("Actual value = " + v);        LOG.info("Actual len = " + v.getLength());        LOG.info("Key equals: " + k.equals(key));        LOG.info("value equals: " + v.equals(value));        throw ioe;      }    }    reader.close();  }  private static void sortTest(FileSystem fs, int count, int megabytes,                                int factor, boolean fast, Path file)    throws IOException {    fs.delete(new Path(file+".sorted"));    SequenceFile.Sorter sorter = newSorter(fs, fast, megabytes, factor);    LOG.debug("sorting " + count + " records");    sorter.sort(file, file.suffix(".sorted"));    LOG.info("done sorting " + count + " debug");  }  private static void checkSort(FileSystem fs, int count, int seed, Path file)    throws IOException {    LOG.info("sorting " + count + " records in memory for debug");    RandomDatum.Generator generator = new RandomDatum.Generator(seed);    SortedMap map = new TreeMap();    for (int i = 0; i < count; i++) {      generator.next();      RandomDatum key = generator.getKey();      RandomDatum value = generator.getValue();      map.put(key, value);    }    LOG.debug("checking order of " + count + " records");    RandomDatum k = new RandomDatum();    RandomDatum v = new RandomDatum();    Iterator iterator = map.entrySet().iterator();    SequenceFile.Reader reader =      new SequenceFile.Reader(fs, file.suffix(".sorted"), conf);    for (int i = 0; i < count; i++) {      Map.Entry entry = (Map.Entry)iterator.next();      RandomDatum key = (RandomDatum)entry.getKey();      RandomDatum value = (RandomDatum)entry.getValue();      reader.next(k, v);      if (!k.equals(key))        throw new RuntimeException("wrong key at " + i);      if (!v.equals(value))        throw new RuntimeException("wrong value at " + i);    }    reader.close();    LOG.debug("sucessfully checked " + count + " records");  }  private static void mergeTest(FileSystem fs, int count, int seed, Path file,                                 CompressionType compressionType,                                boolean fast, int factor, int megabytes)    throws IOException {    LOG.debug("creating "+factor+" files with "+count/factor+" records");    SequenceFile.Writer[] writers = new SequenceFile.Writer[factor];    Path[] names = new Path[factor];    Path[] sortedNames = new Path[factor];        for (int i = 0; i < factor; i++) {      names[i] = file.suffix("."+i);      sortedNames[i] = names[i].suffix(".sorted");      fs.delete(names[i]);      fs.delete(sortedNames[i]);      writers[i] = SequenceFile.createWriter(fs, conf, names[i],           RandomDatum.class, RandomDatum.class, compressionType);    }    RandomDatum.Generator generator = new RandomDatum.Generator(seed);    for (int i = 0; i < count; i++) {      generator.next();      RandomDatum key = generator.getKey();      RandomDatum value = generator.getValue();      writers[i%factor].append(key, value);    }    for (int i = 0; i < factor; i++)      writers[i].close();    for (int i = 0; i < factor; i++) {      LOG.debug("sorting file " + i + " with " + count/factor + " records");      newSorter(fs, fast, megabytes, factor).sort(names[i], sortedNames[i]);    }    LOG.info("merging " + factor + " files with " + count/factor + " debug");    fs.delete(new Path(file+".sorted"));    newSorter(fs, fast, megabytes, factor)      .merge(sortedNames, file.suffix(".sorted"));  }  private static SequenceFile.Sorter newSorter(FileSystem fs,                                                boolean fast,                                               int megabytes, int factor) {    SequenceFile.Sorter sorter =       fast      ? new SequenceFile.Sorter(fs, new RandomDatum.Comparator(),RandomDatum.class, conf)      : new SequenceFile.Sorter(fs, RandomDatum.class, RandomDatum.class, conf);    sorter.setMemory(megabytes * 1024*1024);    sorter.setFactor(factor);    return sorter;  }  /** For debugging and testing. */  public static void main(String[] args) throws Exception {    int count = 1024 * 1024;    int megabytes = 1;    int factor = 10;    boolean create = true;    boolean rwonly = false;    boolean check = false;    boolean fast = false;    boolean merge = false;    String compressType = "NONE";    Path file = null;    int seed = new Random().nextInt();    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +        "[-count N] " +         "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " +        "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +        " file";    if (args.length == 0) {        System.err.println(usage);        System.exit(-1);    }        FileSystem fs = FileSystem.parseArgs(args, 0, conf);          try {      for (int i=0; i < args.length; ++i) {       // parse command line          if (args[i] == null) {              continue;          } else if (args[i].equals("-count")) {              count = Integer.parseInt(args[++i]);          } else if (args[i].equals("-megabytes")) {              megabytes = Integer.parseInt(args[++i]);          } else if (args[i].equals("-factor")) {            factor = Integer.parseInt(args[++i]);          } else if (args[i].equals("-seed")) {            seed = Integer.parseInt(args[++i]);          } else if (args[i].equals("-rwonly")) {              rwonly = true;          } else if (args[i].equals("-nocreate")) {              create = false;          } else if (args[i].equals("-check")) {              check = true;          } else if (args[i].equals("-fast")) {              fast = true;          } else if (args[i].equals("-merge")) {              merge = true;          } else if (args[i].equals("-compressType")) {              compressType = args[++i];          } else {              // file is required parameter              file = new Path(args[i]);          }        }        LOG.info("count = " + count);        LOG.info("megabytes = " + megabytes);        LOG.info("factor = " + factor);        LOG.info("create = " + create);        LOG.info("seed = " + seed);        LOG.info("rwonly = " + rwonly);        LOG.info("check = " + check);        LOG.info("fast = " + fast);        LOG.info("merge = " + merge);        LOG.info("compressType = " + compressType);        LOG.info("file = " + file);        if (rwonly && (!create || merge || fast)) {          System.err.println(usage);          System.exit(-1);        }        CompressionType compressionType =           CompressionType.valueOf(compressType);        if (rwonly || (create && !merge)) {            writeTest(fs, count, seed, file, compressionType);            readTest(fs, count, seed, file);        }        if (!rwonly) {          if (merge) {            mergeTest(fs, count, seed, file, compressionType,                 fast, factor, megabytes);          } else {            sortTest(fs, count, megabytes, factor, fast, file);          }        }            if (check) {            checkSort(fs, count, seed, file);        }      } finally {          fs.close();      }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -