⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 teststreamedmerge.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.DataOutputStream;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.io.LineNumberInputStream;import java.io.OutputStream;import java.net.ServerSocket;import java.net.Socket;import java.util.ArrayList;import java.util.Arrays;import junit.framework.TestCase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.dfs.DFSShell;import org.apache.hadoop.dfs.MiniDFSCluster;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** * This JUnit test is not pure-Java and is not run as  * part of the standard ant test* targets.    * Two ways to run this:<pre> * 1. main(), a Java application. * 2. cd src/contrib/streaming/  *    ant \ *     [-Dfs.default.name=h:p] \  *     [-Dhadoop.test.localoutputfile=/tmp/fifo] \  *     test-unix  * </pre> * @author michel */public class TestStreamedMerge extends TestCase {  public TestStreamedMerge() throws IOException {    UtilTest utilTest = new UtilTest(getClass().getName());    utilTest.checkUserDir();    //  utilTest.redirectIfAntJunit();  }  final static int NAME_PORT = 8200;  final static int SOC_PORT = 1888;  void addInput(String path, String contents) throws IOException {    OutputStream out = fs_.create(new Path(path));    DataOutputStream dout = new DataOutputStream(out);    dout.write(contents.getBytes("UTF-8"));    dout.close();    System.err.println("addInput done: " + path);  }  String createInputs(boolean tag) throws IOException {    fs_.delete(new Path("/input/"));    // i18n() replaces some ASCII with multibyte UTF-8 chars    addInput("/input/part-00", i18n("k1\tv1\n" + "k3\tv5\n"));    addInput("/input/part-01", i18n("k1\tv2\n" + "k2\tv4\n"));    addInput("/input/part-02", i18n("k1\tv3\n"));    addInput("/input/part-03", "");        // tags are one-based: ">1" corresponds to part-00, etc.    // Expected result it the merge-sort order of the records.    // keys are compared as Strings and ties are broken by stream index    // For example (k1; stream 2) < (k1; stream 3)    String expect = i18n(        unt(">1\tk1\tv1\n", tag) +         unt(">2\tk1\tv2\n", tag) +         unt(">3\tk1\tv3\n", tag) +         unt(">2\tk2\tv4\n", tag) +        unt(">1\tk3\tv5\n", tag)    );    return expect;  }    String unt(String line, boolean keepTag)  {    return keepTag ? line : line.substring(line.indexOf('\t')+1);  }  String i18n(String c) {    c = c.replace('k', '\u20ac'); // Euro sign, in UTF-8: E282AC    c = c.replace('v', '\u00a2'); // Cent sign, in UTF-8: C2A2 ; UTF-16 contains null    // "\ud800\udc00" // A surrogate pair, U+10000. OK also works    return c;  }  void lsr() {    try {      System.out.println("lsr /");      DFSShell shell = new DFSShell();      shell.setConf(conf_);      shell.init();      shell.ls("/", true);    } catch (Exception e) {      e.printStackTrace();    }  }  void printSampleInput() {    try {      System.out.println("cat /input/part-00");      String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);      System.out.println(content);      System.out.println("cat done.");    } catch (Exception e) {      e.printStackTrace();    }  }  void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {    String[] testargs = new String[] {        //"-jobconf", "stream.debug=1",        "-verbose",         "-jobconf", "stream.testmerge=1",         "-input", "+/input/part-00 | /input/part-01 | /input/part-02",         "-mapper", StreamUtil.localizeBin("/bin/cat"),         "-reducer", "NONE",         "-output", "/my.output",        "-mapsideoutput", argSideOutput,         "-dfs", conf_.get("fs.default.name"),         "-jt", "local",        "-jobconf", "stream.sideoutput.localfs=true",     };    ArrayList argList = new ArrayList();    argList.addAll(Arrays.asList(testargs));    if (inputTagged) {      argList.add("-inputtagged");    }    testargs = (String[])argList.toArray(new String[0]);    String sss = StreamUtil.collate(argList, " ");    System.out.println("bin/hadoop jar build/hadoop-streaming.jar " + sss);    //HadoopStreaming.main(testargs);    StreamJob job = new StreamJob(testargs, false);    job.go();  }  SideEffectConsumer startSideEffectConsumer(StringBuffer outBuf) {    SideEffectConsumer t = new SideEffectConsumer(outBuf) {      ServerSocket listen;      Socket client;      InputStream in;            InputStream connectInputStream() throws IOException {        listen = new ServerSocket(SOC_PORT);        client = listen.accept();        in = client.getInputStream();        return in;      }            void close() throws IOException      {        listen.close();        System.out.println("@@@listen closed");      }    };    t.start();    return t;  }  abstract class SideEffectConsumer extends Thread {    SideEffectConsumer(StringBuffer buf) {      buf_ = buf;      setDaemon(true);    }    abstract InputStream connectInputStream() throws IOException;        abstract void close() throws IOException;        public void run() {      try {        in_ = connectInputStream();        while (true) {          byte[] b = UTF8ByteArrayUtils.readLine(in_);          if (b == null) {            break;          }          buf_.append(new String(b, "UTF-8"));          buf_.append('\n');        }        in_.close();      } catch (IOException io) {        throw new RuntimeException(io);      }    }        InputStream in_;    StringBuffer buf_;  }  public void testMain() throws IOException {    boolean success = false;    String base = new File(".").getAbsolutePath();    System.setProperty("hadoop.log.dir", base + "/logs");    conf_ = new Configuration();    String overrideFS = StreamUtil.getBoundAntProperty("fs.default.name", null);    MiniDFSCluster cluster = null;    try {      if(overrideFS == null) {        cluster = new MiniDFSCluster(NAME_PORT, conf_, false);        fs_ = cluster.getFileSystem();      } else {        System.out.println("overrideFS: " + overrideFS);        conf_.set("fs.default.name", overrideFS);        fs_ = FileSystem.get(conf_);      }      doAllTestJobs();      success = true;    } catch (IOException io) {      io.printStackTrace();    } finally {      try {        fs_.close();      } catch (IOException io) {      }      if (cluster != null) {        cluster.shutdown();        System.out.println("cluster.shutdown(); DONE");      }      System.out.println(getClass().getName() + ": success=" + success);    }  }  void doAllTestJobs() throws IOException  {    goSocketTagged(true, false);    goSocketTagged(false, false);    goSocketTagged(true, true);  }    void goSocketTagged(boolean socket, boolean inputTagged) throws IOException {    System.out.println("***** goSocketTagged: " + socket + ", " + inputTagged);    String expect = createInputs(inputTagged);    lsr();    printSampleInput();    StringBuffer outputBuf = new StringBuffer();    String sideOutput = null;    File f = null;    SideEffectConsumer consumer = null;    if (socket) {      consumer = startSideEffectConsumer(outputBuf);      sideOutput = "socket://localhost:" + SOC_PORT + "/";    } else {      String userOut = StreamUtil.getBoundAntProperty(          "hadoop.test.localoutputfile", null);      if(userOut != null) {        f = new File(userOut);        // don't delete so they can mkfifo        maybeFifoOutput_ = true;      } else {        f = new File("localoutputfile");        f.delete();        maybeFifoOutput_ = false;      }      String s = new Path(f.getAbsolutePath()).toString();      if(! s.startsWith("/")) {        s = "/" + s; // Windows "file:/C:/"      }      sideOutput = "file:" + s;    }    System.out.println("sideOutput=" + sideOutput);    callStreaming(sideOutput, inputTagged);    String output;    if (socket) {      try {        consumer.join();        consumer.close();      } catch (InterruptedException e) {        throw (IOException) new IOException().initCause(e);      }      output = outputBuf.toString();    } else {      if(maybeFifoOutput_) {        System.out.println("assertEquals will fail.");        output = "potential FIFO: not retrieving to avoid blocking on open() "          + f.getAbsoluteFile();      } else {        output = StreamUtil.slurp(f.getAbsoluteFile());      }    }    lsr();        System.out.println("output=|" + output + "|");    System.out.println("expect=|" + expect + "|");    assertEquals(expect, output);  }  Configuration conf_;  FileSystem fs_;  boolean maybeFifoOutput_;  public static void main(String[] args) throws Throwable {    TestStreamedMerge test = new TestStreamedMerge();    test.testMain();  }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -