⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 testtextinputformat.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import java.io.*;import java.util.*;import junit.framework.TestCase;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.io.compress.*;public class TestTextInputFormat extends TestCase {  private static final Log LOG =    LogFactory.getLog(TestTextInputFormat.class.getName());  private static int MAX_LENGTH = 10000;    private static JobConf defaultConf = new JobConf();  private static FileSystem localFs = null;   static {    try {      localFs = FileSystem.getNamed("local", defaultConf);    } catch (IOException e) {      throw new RuntimeException("init failure", e);    }  }  private static Path workDir =     new Path(new Path(System.getProperty("test.build.data", "."), "data"),             "TestTextInputFormat");    public void testFormat() throws Exception {    JobConf job = new JobConf();    Path file = new Path(workDir, "test.txt");    Reporter reporter = new Reporter() {        public void setStatus(String status) throws IOException {}        public void progress() throws IOException {}      };        int seed = new Random().nextInt();    LOG.info("seed = "+seed);    Random random = new Random(seed);    localFs.delete(workDir);    job.setInputPath(workDir);    // for a variety of lengths    for (int length = 0; length < MAX_LENGTH;         length+= random.nextInt(MAX_LENGTH/10)+1) {      LOG.debug("creating; entries = " + length);      // create a file with length entries      Writer writer = new OutputStreamWriter(localFs.create(file));      try {        for (int i = 0; i < length; i++) {          writer.write(Integer.toString(i));          writer.write("\n");        }      } finally {        writer.close();      }      // try splitting the file in a variety of sizes      TextInputFormat format = new TextInputFormat();      format.configure(job);      LongWritable key = new LongWritable();      Text value = new Text();      for (int i = 0; i < 3; i++) {        int numSplits = random.nextInt(MAX_LENGTH/20)+1;        LOG.debug("splitting: requesting = " + numSplits);        FileSplit[] splits = format.getSplits(localFs, job, numSplits);        LOG.debug("splitting: got =        " + splits.length);        // check each split        BitSet bits = new BitSet(length);        for (int j = 0; j < splits.length; j++) {          LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +                   splits[j].getLength());          RecordReader reader =            format.getRecordReader(localFs, splits[j], job, reporter);          try {            int count = 0;            while (reader.next(key, value)) {              int v = Integer.parseInt(value.toString());              LOG.debug("read " + v);              if (bits.get(v)) {                LOG.warn("conflict with " + v +                          " in split " + j +                         " at position "+reader.getPos());              }              assertFalse("Key in multiple partitions.", bits.get(v));              bits.set(v);              count++;            }            LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);          } finally {            reader.close();          }        }        assertEquals("Some keys in no partition.", length, bits.cardinality());      }    }  }  private InputStream makeStream(String str) throws IOException {    Text text = new Text(str);    return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());  }    public void testUTF8() throws Exception {    InputStream in = makeStream("abcd\u20acbdcd\u20ac");    ByteArrayOutputStream out = new ByteArrayOutputStream();    TextInputFormat.readLine(in, out);    Text line = new Text();    line.set(out.toByteArray());    assertEquals("readLine changed utf8 characters",                  "abcd\u20acbdcd\u20ac", line.toString());    in = makeStream("abc\u200axyz");    out.reset();    TextInputFormat.readLine(in, out);    line.set(out.toByteArray());    assertEquals("split on fake newline", "abc\u200axyz", line.toString());  }  public void testNewLines() throws Exception {    InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");    ByteArrayOutputStream out = new ByteArrayOutputStream();    TextInputFormat.readLine(in, out);    assertEquals("line1 length", 1, out.size());    out.reset();    TextInputFormat.readLine(in, out);    assertEquals("line2 length", 2, out.size());    out.reset();    TextInputFormat.readLine(in, out);    assertEquals("line3 length", 0, out.size());    out.reset();    TextInputFormat.readLine(in, out);    assertEquals("line4 length", 3, out.size());    out.reset();    TextInputFormat.readLine(in, out);    assertEquals("line5 length", 4, out.size());    out.reset();    TextInputFormat.readLine(in, out);    assertEquals("line5 length", 5, out.size());    assertEquals("end of file", 0, TextInputFormat.readLine(in, out));  }    private static void writeFile(FileSystem fs, Path name,                                 CompressionCodec codec,                                String contents) throws IOException {    OutputStream stm;    if (codec == null) {      stm = fs.create(name);    } else {      stm = codec.createOutputStream(fs.create(name));    }    stm.write(contents.getBytes());    stm.close();  }    private static class VoidReporter implements Reporter {    public void progress() {}    public void setStatus(String msg) {}  }  private static final Reporter voidReporter = new VoidReporter();    private static List<Text> readSplit(InputFormat format,                                       FileSplit split,                                       JobConf job) throws IOException {    List<Text> result = new ArrayList<Text>();    RecordReader reader = format.getRecordReader(localFs, split, job,                                                 voidReporter);    LongWritable key = (LongWritable) reader.createKey();    Text value = (Text) reader.createValue();    while (reader.next(key, value)) {      result.add(value);      value = (Text) reader.createValue();    }    return result;  }    /**   * Test using the gzip codec for reading   */  public static void testGzip() throws IOException {    CompressionCodec gzip = new GzipCodec();    localFs.delete(workDir);    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,               "the quick\nbrown\nfox jumped\nover\n the lazy\n dog\n");    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,              "this is a test\nof gzip\n");    JobConf job = new JobConf();    job.setInputPath(workDir);    TextInputFormat format = new TextInputFormat();    format.configure(job);    FileSplit[] splits = format.getSplits(localFs, job, 100);    assertEquals("compressed splits == 2", 2, splits.length);    List<Text> results = readSplit(format, splits[0], job);    assertEquals("splits[0] length", 6, results.size());    assertEquals("splits[0][5]", " dog", results.get(5).toString());    results = readSplit(format, splits[1], job);    assertEquals("splits[1] length", 2, results.size());    assertEquals("splits[1][0]", "this is a test",                  results.get(0).toString());        assertEquals("splits[1][1]", "of gzip",                  results.get(1).toString());      }    public static void main(String[] args) throws Exception {    new TestTextInputFormat().testFormat();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -