📄 streamjob.java
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.File;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.Map;import java.util.TreeMap;import java.util.TreeSet;import org.apache.commons.logging.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.Text;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.RunningJob;/** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn */public class StreamJob { protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName()); final static String REDUCE_NONE = "NONE"; private boolean reducerNone_; public StreamJob(String[] argv, boolean mayExit) { argv_ = argv; mayExit_ = mayExit; } public void go() throws IOException { init(); preProcessArgs(); parseArgv(); postProcessArgs(); setJobConf(); submitAndMonitorJob(); } protected void init() { try { env_ = new Environment(); } catch (IOException io) { throw new RuntimeException(io); } } void preProcessArgs() { verbose_ = false; addTaskEnvironment_ = ""; } void postProcessArgs() throws IOException { if (cluster_ == null) { // hadoop-default.xml is standard, hadoop-local.xml is not. cluster_ = "default"; } hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml"; if (inputSpecs_.size() == 0) { fail("Required argument: -input <name>"); } if (output_ == null) { fail("Required argument: -output "); } msg("addTaskEnvironment=" + addTaskEnvironment_); Iterator it = packageFiles_.iterator(); while (it.hasNext()) { File f = new File((String) it.next()); if (f.isFile()) { shippedCanonFiles_.add(f.getCanonicalPath()); } } msg("shippedCanonFiles_=" + shippedCanonFiles_); // careful with class names.. mapCmd_ = unqualifyIfLocalPath(mapCmd_); comCmd_ = unqualifyIfLocalPath(comCmd_); redCmd_ = unqualifyIfLocalPath(redCmd_); } String[] parseNameEqValue(String neqv) { String[] nv = neqv.split("=", 2); if (nv.length < 2) { fail("Invalid name=value spec: " + neqv); } msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]); return nv; } String unqualifyIfLocalPath(String cmd) throws IOException { if (cmd == null) { // } else { String prog = cmd; String args = ""; int s = cmd.indexOf(" "); if (s != -1) { prog = cmd.substring(0, s); args = cmd.substring(s + 1); } String progCanon; try { progCanon = new File(prog).getCanonicalPath(); } catch (IOException io) { progCanon = prog; } boolean shipped = shippedCanonFiles_.contains(progCanon); msg("shipped: " + shipped + " " + progCanon); if (shipped) { // Change path to simple filename. // That way when PipeMapRed calls Runtime.exec(), // it will look for the excutable in Task's working dir. // And this is where TaskRunner unjars our job jar. prog = new File(prog).getName(); if (args.length() > 0) { cmd = prog + " " + args; } else { cmd = prog; } } } msg("cmd=" + cmd); return cmd; } String getHadoopAliasConfFile() { return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath(); } void parseArgv() { if (argv_.length == 0) { exitUsage(false); } int i = 0; while (i < argv_.length) { String s; if (argv_[i].equals("-verbose")) { verbose_ = true; } else if (argv_[i].equals("-info")) { detailedUsage_ = true; } else if (argv_[i].equals("-debug")) { debug_++; } else if ((s = optionArg(argv_, i, "-input", false)) != null) { i++; inputSpecs_.add(s); } else if (argv_[i].equals("-inputtagged")) { inputTagged_ = true; } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) { i++; output_ = s; } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) { i++; mapsideoutURI_ = s; } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) { i++; mapCmd_ = s; } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) { i++; comCmd_ = s; } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) { i++; redCmd_ = s; } else if ((s = optionArg(argv_, i, "-file", false)) != null) { i++; packageFiles_.add(s); } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) { i++; cluster_ = s; } else if ((s = optionArg(argv_, i, "-config", false)) != null) { i++; configPath_.add(s); } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) { i++; userJobConfProps_.put("fs.default.name", s); } else if ((s = optionArg(argv_, i, "-jt", false)) != null) { i++; userJobConfProps_.put("mapred.job.tracker", s); } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) { i++; String[] nv = parseNameEqValue(s); userJobConfProps_.put(nv[0], nv[1]); } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) { i++; parseNameEqValue(s); if (addTaskEnvironment_.length() > 0) { addTaskEnvironment_ += " "; } addTaskEnvironment_ += s; } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) { i++; inReaderSpec_ = s; } else { System.err.println("Unexpected argument: " + argv_[i]); exitUsage(false); } i++; } if (detailedUsage_) { exitUsage(true); } } String optionArg(String[] args, int index, String arg, boolean argSet) { if (index >= args.length || !args[index].equals(arg)) { return null; } if (argSet) { throw new IllegalArgumentException("Can only have one " + arg + " option"); } if (index >= args.length - 1) { throw new IllegalArgumentException("Expected argument after option " + args[index]); } return args[index + 1]; } protected void msg(String msg) { if (verbose_) { System.out.println("STREAM: " + msg); } } public void exitUsage(boolean detailed) { // 1 2 3 4 5 6 7 //1234567890123456789012345678901234567890123456789012345678901234567890123456789 System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\"); System.out.println(" $HADOOP_HOME/hadoop-streaming.jar [options]"); System.out.println("Options:"); System.out.println(" -input <path> DFS input file(s) for the Map step"); System.out.println(" -output <path> DFS output directory for the Reduce step"); System.out.println(" -mapper <cmd> The streaming command to run"); System.out.println(" -combiner <cmd> The streaming command to run"); System.out.println(" -reducer <cmd> The streaming command to run"); System.out.println(" -file <file> File/dir to be shipped in the Job jar file"); //Only advertise the standard way: [--config dir] in our launcher //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); //System.out.println(" -config <file> Optional. One or more paths to xml config files"); System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration"); System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration"); System.out.println(" -inputreader <spec> Optional."); System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property"); System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands"); System.out.println(" -verbose"); System.out.println(); if (!detailed) { System.out.println("For more details about these options:"); System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info"); fail(""); } System.out.println("In -input: globbing on <path> is supported and can have multiple -input"); System.out.println("Default Map input format: a line is a record in UTF-8"); System.out.println(" the key part ends at first TAB, the rest of the line is the value"); System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v "); System.out .println(" comma-separated name-values can be specified to configure the InputFormat"); System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'"); System.out.println("Map output format, reduce input/output format:"); System.out.println(" Format defined by what the mapper command outputs. Line-oriented"); System.out.println(); System.out.println("The files or directories named in the -file argument[s] end up in the"); System.out.println(" working directory when the mapper and reducer are run."); System.out.println(" The location of this working directory is unspecified."); System.out.println(); //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote "); //System.out.println(" Hadoop clusters. "); //System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml"); //System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml"); //System.out.println(); System.out.println("To skip the sort/combine/shuffle/sort/reduce step:"); System.out.println(" Use -reducer " + REDUCE_NONE); System.out .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input"); System.out .println(" This speeds up processing, This also feels more like \"in-place\" processing"); System.out.println(" because the input filename and the map input order are preserved"); System.out.println("To specify a single side-effect output file"); System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated System.out.println(" If the jobtracker is local this is a local file"); System.out.println(" This currently requires -reducer NONE"); System.out.println(); System.out.println("To set the number of reduce tasks (num. of output files):"); System.out.println(" -jobconf mapred.reduce.tasks=10"); System.out.println("To speed up the last reduces:"); System.out.println(" -jobconf mapred.speculative.execution=true"); System.out.println(" Do not use this along -reducer " + REDUCE_NONE); System.out.println("To name the job (appears in the JobTracker Web UI):"); System.out.println(" -jobconf mapred.job.name='My Job' "); System.out.println("To specify that line-oriented input is in gzip format:"); System.out .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)"); System.out.println(" -jobconf stream.recordreader.compression=gzip "); System.out.println("To change the local temp directory:"); System.out.println(" -jobconf dfs.data.dir=/tmp/dfs"); System.out.println(" -jobconf stream.tmpdir=/tmp/streaming"); System.out.println("Additional local temp directories with -cluster local:"); System.out.println(" -jobconf mapred.local.dir=/tmp/local"); System.out.println(" -jobconf mapred.system.dir=/tmp/system"); System.out.println(" -jobconf mapred.temp.dir=/tmp/temp"); System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:"); System.out.println(" $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\"); System.out .println(" [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar"); System.out.println("For more details about jobconf parameters see:"); System.out.println(" http://wiki.apache.org/lucene-hadoop/JobConfFile"); System.out.println("To set an environement variable in a streaming command:"); System.out.println(" -cmdenv EXAMPLE_DIR=/home/example/dictionaries/"); System.out.println(); System.out.println("Shortcut:"); System.out .println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\""); System.out.println(); System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\""); System.out.println(" -file /local/filter.pl -input \"/logs/0604*/*\" [...]"); System.out.println(" Ships a script, invokes the non-shipped perl interpreter"); System.out.println(" Shipped files go to the working directory so filter.pl is found by perl"); System.out.println(" Input files are all the daily logs for days in month 2006-04"); fail(""); } public void fail(String message) { if (mayExit_) { System.err.println(message); System.exit(1); } else { throw new IllegalArgumentException(message); } } // -------------------------------------------- protected String getHadoopClientHome() { String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop if (h == null) { //fail("Missing required environment variable: HADOOP_HOME"); h = "UNDEF"; } return h; } protected boolean isLocalHadoop() { boolean local; if (jobConf_ == null) { local = getClusterNick().equals("local"); } else { local = StreamUtil.isLocalJobTracker(jobConf_); } return local; } protected String getClusterNick() { return cluster_; } /** @return path to the created Jar file or null if no files are necessary. */ protected String packageJobJar() throws IOException { ArrayList unjarFiles = new ArrayList(); // Runtime code: ship same version of code as self (job submitter code) // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar // First try an explicit spec: it's too hard to find our own location in this case: // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -