📄 streamjob.java
字号:
System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming"); if (runtimeClasses == null) { runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName()); } if (runtimeClasses == null) { throw new IOException("runtime classes not found: " + getClass().getPackage()); } else { msg("Found runtime classes in: " + runtimeClasses); } if (isLocalHadoop()) { // don't package class files (they might get unpackaged in "." and then // hide the intended CLASSPATH entry) // we still package everything else (so that scripts and executable are found in // Task workdir like distributed Hadoop) } else { if (new File(runtimeClasses).isDirectory()) { packageFiles_.add(runtimeClasses); } else { unjarFiles.add(runtimeClasses); } } if (packageFiles_.size() + unjarFiles.size() == 0) { return null; } String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/" File tmpDir = (tmp == null) ? null : new File(tmp); // tmpDir=null means OS default tmp dir File jobJar = File.createTempFile("streamjob", ".jar", tmpDir); System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar + " tmpDir=" + tmpDir); if (debug_ == 0) { jobJar.deleteOnExit(); } JarBuilder builder = new JarBuilder(); if (verbose_) { builder.setVerbose(true); } String jobJarName = jobJar.getAbsolutePath(); builder.merge(packageFiles_, unjarFiles, jobJarName); return jobJarName; } protected void setUserJobConfProps(boolean doEarlyProps) { Iterator it = userJobConfProps_.keySet().iterator(); while (it.hasNext()) { String key = (String) it.next(); String val = (String)userJobConfProps_.get(key); boolean earlyName = key.equals("fs.default.name"); earlyName |= key.equals("stream.shipped.hadoopstreaming"); if (doEarlyProps == earlyName) { msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps); jobConf_.set(key, val); } } } protected void setJobConf() throws IOException { msg("hadoopAliasConf_ = " + hadoopAliasConf_); config_ = new Configuration(); if (!cluster_.equals("default")) { config_.addFinalResource(new Path(getHadoopAliasConfFile())); } else { // use only defaults: hadoop-default.xml and hadoop-site.xml } Iterator it = configPath_.iterator(); while (it.hasNext()) { String pathName = (String) it.next(); config_.addFinalResource(new Path(pathName)); } testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge")); // general MapRed job properties jobConf_ = new JobConf(config_); setUserJobConfProps(true); // The correct FS must be set before this is called! // (to resolve local vs. dfs drive letter differences) // (mapred.working.dir will be lazily initialized ONCE and depends on FS) for (int i = 0; i < inputSpecs_.size(); i++) { addInputSpec((String) inputSpecs_.get(i), i); } jobConf_.setBoolean("stream.inputtagged", inputTagged_); jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size()); Class fmt; if (testMerge_ && false == hasSimpleInputSpecs_) { // this ignores -inputreader fmt = MergerInputFormat.class; } else { // need to keep this case to support custom -inputreader // and their parameters ,n=v,n=v fmt = StreamInputFormat.class; } jobConf_.setInputFormat(fmt); // for SequenceFile, input classes may be overriden in getRecordReader jobConf_.setInputKeyClass(Text.class); jobConf_.setInputValueClass(Text.class); jobConf_.setOutputKeyClass(Text.class); jobConf_.setOutputValueClass(Text.class); jobConf_.set("stream.addenvironment", addTaskEnvironment_); String defaultPackage = this.getClass().getPackage().getName(); Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage); if (c != null) { jobConf_.setMapperClass(c); } else { jobConf_.setMapperClass(PipeMapper.class); jobConf_.set("stream.map.streamprocessor", mapCmd_); } if (comCmd_ != null) { c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage); if (c != null) { jobConf_.setCombinerClass(c); } else { jobConf_.setCombinerClass(PipeCombiner.class); jobConf_.set("stream.combine.streamprocessor", comCmd_); } } reducerNone_ = false; if (redCmd_ != null) { reducerNone_ = redCmd_.equals(REDUCE_NONE); c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage); if (c != null) { jobConf_.setReducerClass(c); } else { jobConf_.setReducerClass(PipeReducer.class); jobConf_.set("stream.reduce.streamprocessor", redCmd_); } } if (inReaderSpec_ != null) { String[] args = inReaderSpec_.split(","); String readerClass = args[0]; // this argument can only be a Java class c = StreamUtil.goodClassOrNull(readerClass, defaultPackage); if (c != null) { jobConf_.set("stream.recordreader.class", c.getName()); } else { fail("-inputreader: class not found: " + readerClass); } for (int i = 1; i < args.length; i++) { String[] nv = args[i].split("=", 2); String k = "stream.recordreader." + nv[0]; String v = (nv.length > 1) ? nv[1] : ""; jobConf_.set(k, v); } } // output setup is done late so we can customize for reducerNone_ //jobConf_.setOutputDir(new File(output_)); setOutputSpec(); if (testMerge_) { fmt = MuxOutputFormat.class; } else { fmt = StreamOutputFormat.class; } jobConf_.setOutputFormat(fmt); // last, allow user to override anything // (although typically used with properties we didn't touch) setUserJobConfProps(false); jar_ = packageJobJar(); if (jar_ != null) { jobConf_.setJar(jar_); } if(verbose_) { listJobConfProperties(); } msg("submitting to jobconf: " + getJobTrackerHostPort()); } protected void listJobConfProperties() { msg("==== JobConf properties:"); Iterator it = jobConf_.entries(); TreeMap sorted = new TreeMap(); while(it.hasNext()) { Map.Entry en = (Map.Entry)it.next(); sorted.put(en.getKey(), en.getValue()); } it = sorted.entrySet().iterator(); while(it.hasNext()) { Map.Entry en = (Map.Entry)it.next(); msg(en.getKey() + "=" + en.getValue()); } msg("===="); } /** InputSpec-s encode: a glob pattern x additional column files x additional joins */ protected void addInputSpec(String inSpec, int index) { if (!testMerge_) { jobConf_.addInputPath(new Path(inSpec)); } else { CompoundDirSpec spec = new CompoundDirSpec(inSpec, true); msg("Parsed -input:\n" + spec.toTableString()); if (index == 0) { hasSimpleInputSpecs_ = (spec.paths_.length == 0); msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_); } String primary = spec.primarySpec(); if (!seenPrimary_.add(primary)) { // this won't detect glob overlaps and noncanonical path variations fail("Primary used in multiple -input spec: " + primary); } jobConf_.addInputPath(new Path(primary)); // during Job execution, will reparse into a CompoundDirSpec jobConf_.set("stream.inputspecs." + index, inSpec); } } /** uses output_ and mapsideoutURI_ */ protected void setOutputSpec() throws IOException { CompoundDirSpec spec = new CompoundDirSpec(output_, false); msg("Parsed -output:\n" + spec.toTableString()); String primary = spec.primarySpec(); String channel0; // TODO simplify cases, encapsulate in a StreamJobConf if (!reducerNone_) { channel0 = primary; } else { if (mapsideoutURI_ != null) { // user can override in case this is in a difft filesystem.. try { URI uri = new URI(mapsideoutURI_); if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs") if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) { fail("Must be absolute: " + mapsideoutURI_); } } else if (uri.getScheme().equals("socket")) { // ok } else { fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_); } } catch (URISyntaxException e) { throw (IOException) new IOException().initCause(e); } } else { mapsideoutURI_ = primary; } // an empty reduce output named "part-00002" will go here and not collide. channel0 = primary + ".NONE"; // the side-effect of the first split of an input named "part-00002" // will go in this directory jobConf_.set("stream.sideoutput.dir", primary); // oops if user overrides low-level this isn't set yet :-( boolean localjt = StreamUtil.isLocalJobTracker(jobConf_); // just a guess user may prefer remote.. jobConf_.setBoolean("stream.sideoutput.localfs", localjt); } // a path in fs.name.default filesystem System.out.println(channel0); System.out.println(new Path(channel0)); jobConf_.setOutputPath(new Path(channel0)); // will reparse remotely jobConf_.set("stream.outputspec", output_); if (null != mapsideoutURI_) { // a path in "jobtracker's filesystem" // overrides sideoutput.dir jobConf_.set("stream.sideoutput.uri", mapsideoutURI_); } } protected String getJobTrackerHostPort() { return jobConf_.get("mapred.job.tracker"); } protected void jobInfo() { if (isLocalHadoop()) { LOG.info("Job running in-process (local Hadoop)"); } else { String hp = getJobTrackerHostPort(); LOG.info("To kill this job, run:"); LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill " + jobId_); //LOG.info("Job file: " + running_.getJobFile() ); LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL())); } } // Based on JobClient public void submitAndMonitorJob() throws IOException { if (jar_ != null && isLocalHadoop()) { // getAbs became required when shell and subvm have different working dirs... File wd = new File(".").getAbsoluteFile(); StreamUtil.unJar(new File(jar_), wd); } // if jobConf_ changes must recreate a JobClient jc_ = new JobClient(jobConf_); boolean error = true; running_ = null; String lastReport = null; try { running_ = jc_.submitJob(jobConf_); jobId_ = running_.getJobID(); LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs())); LOG.info("Running job: " + jobId_); jobInfo(); while (!running_.isComplete()) { try { Thread.sleep(1000); } catch (InterruptedException e) { } running_ = jc_.getJob(jobId_); String report = null; report = " map " + Math.round(running_.mapProgress() * 100) + "% reduce " + Math.round(running_.reduceProgress() * 100) + "%"; if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } } if (!running_.isSuccessful()) { jobInfo(); throw new IOException("Job not Successful!"); } LOG.info("Job complete: " + jobId_); LOG.info("Output: " + output_); error = false; } finally { if (error && (running_ != null)) { LOG.info("killJob..."); running_.killJob(); } jc_.close(); } } protected boolean mayExit_; protected String[] argv_; protected boolean verbose_; protected boolean detailedUsage_; protected int debug_; protected Environment env_; protected String jar_; protected boolean localHadoop_; protected Configuration config_; protected JobConf jobConf_; protected JobClient jc_; // command-line arguments protected ArrayList inputSpecs_ = new ArrayList(); // <String> protected boolean inputTagged_ = false; protected TreeSet seenPrimary_ = new TreeSet(); // <String> protected boolean hasSimpleInputSpecs_; protected ArrayList packageFiles_ = new ArrayList(); // <String> protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String> //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); protected String output_; protected String mapsideoutURI_; protected String mapCmd_; protected String comCmd_; protected String redCmd_; protected String cluster_; protected ArrayList configPath_ = new ArrayList(); // <String> protected String hadoopAliasConf_; protected String inReaderSpec_; protected boolean testMerge_; // Use to communicate config to the external processes (ex env.var.HADOOP_USER) // encoding "a=b c=d" protected String addTaskEnvironment_; protected boolean outputSingleNode_; protected long minRecWrittenToEnableSkip_; protected RunningJob running_; protected String jobId_;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -