⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamjob.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.File;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.Map;import java.util.TreeMap;import java.util.TreeSet;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.Text;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.RunningJob;/** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn */public class StreamJob {  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());  final static String REDUCE_NONE = "NONE";  private boolean reducerNone_;  public StreamJob(String[] argv, boolean mayExit) {    argv_ = argv;    mayExit_ = mayExit;  }  public void go() throws IOException {    init();    preProcessArgs();    parseArgv();    postProcessArgs();    setJobConf();    submitAndMonitorJob();  }  protected void init() {    try {      env_ = new Environment();    } catch (IOException io) {      throw new RuntimeException(io);    }  }  void preProcessArgs() {    verbose_ = false;    addTaskEnvironment_ = "";  }  void postProcessArgs() throws IOException {    if (cluster_ == null) {      // hadoop-default.xml is standard, hadoop-local.xml is not.      cluster_ = "default";    }    hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";    if (inputSpecs_.size() == 0) {      fail("Required argument: -input <name>");    }    if (output_ == null) {      fail("Required argument: -output ");    }    msg("addTaskEnvironment=" + addTaskEnvironment_);    Iterator it = packageFiles_.iterator();    while (it.hasNext()) {      File f = new File((String) it.next());      if (f.isFile()) {        shippedCanonFiles_.add(f.getCanonicalPath());      }    }    msg("shippedCanonFiles_=" + shippedCanonFiles_);    // careful with class names..    mapCmd_ = unqualifyIfLocalPath(mapCmd_);    comCmd_ = unqualifyIfLocalPath(comCmd_);    redCmd_ = unqualifyIfLocalPath(redCmd_);  }  String[] parseNameEqValue(String neqv) {    String[] nv = neqv.split("=", 2);    if (nv.length < 2) {      fail("Invalid name=value spec: " + neqv);    }    msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);    return nv;  }  String unqualifyIfLocalPath(String cmd) throws IOException {    if (cmd == null) {      //    } else {      String prog = cmd;      String args = "";      int s = cmd.indexOf(" ");      if (s != -1) {        prog = cmd.substring(0, s);        args = cmd.substring(s + 1);      }      String progCanon;      try {        progCanon = new File(prog).getCanonicalPath();      } catch (IOException io) {        progCanon = prog;      }      boolean shipped = shippedCanonFiles_.contains(progCanon);      msg("shipped: " + shipped + " " + progCanon);      if (shipped) {        // Change path to simple filename.        // That way when PipeMapRed calls Runtime.exec(),        // it will look for the excutable in Task's working dir.        // And this is where TaskRunner unjars our job jar.        prog = new File(prog).getName();        if (args.length() > 0) {          cmd = prog + " " + args;        } else {          cmd = prog;        }      }    }    msg("cmd=" + cmd);    return cmd;  }  String getHadoopAliasConfFile() {    return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();  }  void parseArgv() {    if (argv_.length == 0) {      exitUsage(false);    }    int i = 0;    while (i < argv_.length) {      String s;      if (argv_[i].equals("-verbose")) {        verbose_ = true;      } else if (argv_[i].equals("-info")) {        detailedUsage_ = true;      } else if (argv_[i].equals("-debug")) {        debug_++;      } else if ((s = optionArg(argv_, i, "-input", false)) != null) {        i++;        inputSpecs_.add(s);      } else if (argv_[i].equals("-inputtagged")) {        inputTagged_ = true;      } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) {        i++;        output_ = s;      } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) {        i++;        mapsideoutURI_ = s;      } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {        i++;        mapCmd_ = s;      } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {        i++;        comCmd_ = s;      } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {        i++;        redCmd_ = s;      } else if ((s = optionArg(argv_, i, "-file", false)) != null) {        i++;        packageFiles_.add(s);      } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {        i++;        cluster_ = s;      } else if ((s = optionArg(argv_, i, "-config", false)) != null) {        i++;        configPath_.add(s);      } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {        i++;        userJobConfProps_.put("fs.default.name", s);      } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {        i++;        userJobConfProps_.put("mapred.job.tracker", s);      } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {        i++;        String[] nv = parseNameEqValue(s);        userJobConfProps_.put(nv[0], nv[1]);      } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {        i++;        parseNameEqValue(s);        if (addTaskEnvironment_.length() > 0) {          addTaskEnvironment_ += " ";        }        addTaskEnvironment_ += s;      } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {        i++;        inReaderSpec_ = s;      } else {        System.err.println("Unexpected argument: " + argv_[i]);        exitUsage(false);      }      i++;    }    if (detailedUsage_) {      exitUsage(true);    }  }  String optionArg(String[] args, int index, String arg, boolean argSet) {    if (index >= args.length || !args[index].equals(arg)) {      return null;    }    if (argSet) {      throw new IllegalArgumentException("Can only have one " + arg + " option");    }    if (index >= args.length - 1) {      throw new IllegalArgumentException("Expected argument after option " + args[index]);    }    return args[index + 1];  }  protected void msg(String msg) {    if (verbose_) {      System.out.println("STREAM: " + msg);    }  }  public void exitUsage(boolean detailed) {    //         1         2         3         4         5         6         7    //1234567890123456789012345678901234567890123456789012345678901234567890123456789    System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");    System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");    System.out.println("Options:");    System.out.println("  -input    <path>     DFS input file(s) for the Map step");    System.out.println("  -output   <path>     DFS output directory for the Reduce step");    System.out.println("  -mapper   <cmd>      The streaming command to run");    System.out.println("  -combiner <cmd>      The streaming command to run");    System.out.println("  -reducer  <cmd>      The streaming command to run");    System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");    //Only advertise the standard way: [--config dir] in our launcher     //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");    //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");    System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");    System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");    System.out.println("  -inputreader <spec>  Optional.");    System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");    System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");    System.out.println("  -verbose");    System.out.println();    if (!detailed) {      System.out.println("For more details about these options:");      System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");      fail("");    }    System.out.println("In -input: globbing on <path> is supported and can have multiple -input");    System.out.println("Default Map input format: a line is a record in UTF-8");    System.out.println("  the key part ends at first TAB, the rest of the line is the value");    System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");    System.out        .println("  comma-separated name-values can be specified to configure the InputFormat");    System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");    System.out.println("Map output format, reduce input/output format:");    System.out.println("  Format defined by what the mapper command outputs. Line-oriented");    System.out.println();    System.out.println("The files or directories named in the -file argument[s] end up in the");    System.out.println("  working directory when the mapper and reducer are run.");    System.out.println("  The location of this working directory is unspecified.");    System.out.println();    //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");    //System.out.println("  Hadoop clusters. ");    //System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");    //System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");    //System.out.println();    System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");    System.out.println("  Use -reducer " + REDUCE_NONE);    System.out        .println("  A Task's Map output then becomes a 'side-effect output' rather than a reduce input");    System.out        .println("  This speeds up processing, This also feels more like \"in-place\" processing");    System.out.println("  because the input filename and the map input order are preserved");    System.out.println("To specify a single side-effect output file");    System.out.println("    -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated    System.out.println("  If the jobtracker is local this is a local file");    System.out.println("  This currently requires -reducer NONE");    System.out.println();    System.out.println("To set the number of reduce tasks (num. of output files):");    System.out.println("  -jobconf mapred.reduce.tasks=10");    System.out.println("To speed up the last reduces:");    System.out.println("  -jobconf mapred.speculative.execution=true");    System.out.println("  Do not use this along -reducer " + REDUCE_NONE);    System.out.println("To name the job (appears in the JobTracker Web UI):");    System.out.println("  -jobconf mapred.job.name='My Job' ");    System.out.println("To specify that line-oriented input is in gzip format:");    System.out        .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");    System.out.println("   -jobconf stream.recordreader.compression=gzip ");    System.out.println("To change the local temp directory:");    System.out.println("  -jobconf dfs.data.dir=/tmp/dfs");    System.out.println("  -jobconf stream.tmpdir=/tmp/streaming");    System.out.println("Additional local temp directories with -cluster local:");    System.out.println("  -jobconf mapred.local.dir=/tmp/local");    System.out.println("  -jobconf mapred.system.dir=/tmp/system");    System.out.println("  -jobconf mapred.temp.dir=/tmp/temp");    System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");    System.out.println("  $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");    System.out        .println("    [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");    System.out.println("For more details about jobconf parameters see:");    System.out.println("  http://wiki.apache.org/lucene-hadoop/JobConfFile");    System.out.println("To set an environement variable in a streaming command:");    System.out.println("   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");    System.out.println();    System.out.println("Shortcut:");    System.out        .println("   setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");    System.out.println();    System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");    System.out.println("           -file /local/filter.pl -input \"/logs/0604*/*\" [...]");    System.out.println("  Ships a script, invokes the non-shipped perl interpreter");    System.out.println("  Shipped files go to the working directory so filter.pl is found by perl");    System.out.println("  Input files are all the daily logs for days in month 2006-04");    fail("");  }  public void fail(String message) {    if (mayExit_) {      System.err.println(message);      System.exit(1);    } else {      throw new IllegalArgumentException(message);    }  }  // --------------------------------------------  protected String getHadoopClientHome() {    String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop    if (h == null) {      //fail("Missing required environment variable: HADOOP_HOME");      h = "UNDEF";    }    return h;  }  protected boolean isLocalHadoop() {    boolean local;    if (jobConf_ == null) {      local = getClusterNick().equals("local");    } else {      local = StreamUtil.isLocalJobTracker(jobConf_);    }    return local;  }  protected String getClusterNick() {    return cluster_;  }  /** @return path to the created Jar file or null if no files are necessary.   */  protected String packageJobJar() throws IOException {    ArrayList unjarFiles = new ArrayList();    // Runtime code: ship same version of code as self (job submitter code)    // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar    // First try an explicit spec: it's too hard to find our own location in this case:    // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar    // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME    String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -