⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 streamjob.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");        if (runtimeClasses == null) {      runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());    }    if (runtimeClasses == null) {      throw new IOException("runtime classes not found: " + getClass().getPackage());    } else {      msg("Found runtime classes in: " + runtimeClasses);    }    if (isLocalHadoop()) {      // don't package class files (they might get unpackaged in "." and then      //  hide the intended CLASSPATH entry)      // we still package everything else (so that scripts and executable are found in      //  Task workdir like distributed Hadoop)    } else {      if (new File(runtimeClasses).isDirectory()) {        packageFiles_.add(runtimeClasses);      } else {        unjarFiles.add(runtimeClasses);      }    }    if (packageFiles_.size() + unjarFiles.size() == 0) {      return null;    }    String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/"    File tmpDir = (tmp == null) ? null : new File(tmp);    // tmpDir=null means OS default tmp dir    File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar        + " tmpDir=" + tmpDir);    if (debug_ == 0) {      jobJar.deleteOnExit();    }    JarBuilder builder = new JarBuilder();    if (verbose_) {      builder.setVerbose(true);    }    String jobJarName = jobJar.getAbsolutePath();    builder.merge(packageFiles_, unjarFiles, jobJarName);    return jobJarName;  }  protected void setUserJobConfProps(boolean doEarlyProps) {    Iterator it = userJobConfProps_.keySet().iterator();    while (it.hasNext()) {      String key = (String) it.next();      String val = (String)userJobConfProps_.get(key);      boolean earlyName = key.equals("fs.default.name");      earlyName |= key.equals("stream.shipped.hadoopstreaming");      if (doEarlyProps == earlyName) {        msg("xxxJobConf: set(" + key + ", " + val + ") early=" + doEarlyProps);        jobConf_.set(key, val);      }    }  }  protected void setJobConf() throws IOException {    msg("hadoopAliasConf_ = " + hadoopAliasConf_);    config_ = new Configuration();    if (!cluster_.equals("default")) {      config_.addFinalResource(new Path(getHadoopAliasConfFile()));    } else {      // use only defaults: hadoop-default.xml and hadoop-site.xml    }    Iterator it = configPath_.iterator();    while (it.hasNext()) {      String pathName = (String) it.next();      config_.addFinalResource(new Path(pathName));    }    testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));    // general MapRed job properties    jobConf_ = new JobConf(config_);    setUserJobConfProps(true);    // The correct FS must be set before this is called!    // (to resolve local vs. dfs drive letter differences)     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)    for (int i = 0; i < inputSpecs_.size(); i++) {      addInputSpec((String) inputSpecs_.get(i), i);    }    jobConf_.setBoolean("stream.inputtagged", inputTagged_);    jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());    Class fmt;    if (testMerge_ && false == hasSimpleInputSpecs_) {      // this ignores -inputreader      fmt = MergerInputFormat.class;    } else {      // need to keep this case to support custom -inputreader       // and their parameters ,n=v,n=v      fmt = StreamInputFormat.class;    }    jobConf_.setInputFormat(fmt);    // for SequenceFile, input classes may be overriden in getRecordReader    jobConf_.setInputKeyClass(Text.class);    jobConf_.setInputValueClass(Text.class);    jobConf_.setOutputKeyClass(Text.class);    jobConf_.setOutputValueClass(Text.class);    jobConf_.set("stream.addenvironment", addTaskEnvironment_);    String defaultPackage = this.getClass().getPackage().getName();    Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);    if (c != null) {      jobConf_.setMapperClass(c);    } else {      jobConf_.setMapperClass(PipeMapper.class);      jobConf_.set("stream.map.streamprocessor", mapCmd_);    }    if (comCmd_ != null) {      c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);      if (c != null) {        jobConf_.setCombinerClass(c);      } else {        jobConf_.setCombinerClass(PipeCombiner.class);        jobConf_.set("stream.combine.streamprocessor", comCmd_);      }    }    reducerNone_ = false;    if (redCmd_ != null) {      reducerNone_ = redCmd_.equals(REDUCE_NONE);      c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);      if (c != null) {        jobConf_.setReducerClass(c);      } else {        jobConf_.setReducerClass(PipeReducer.class);        jobConf_.set("stream.reduce.streamprocessor", redCmd_);      }    }    if (inReaderSpec_ != null) {      String[] args = inReaderSpec_.split(",");      String readerClass = args[0];      // this argument can only be a Java class      c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);      if (c != null) {        jobConf_.set("stream.recordreader.class", c.getName());      } else {        fail("-inputreader: class not found: " + readerClass);      }      for (int i = 1; i < args.length; i++) {        String[] nv = args[i].split("=", 2);        String k = "stream.recordreader." + nv[0];        String v = (nv.length > 1) ? nv[1] : "";        jobConf_.set(k, v);      }    }    // output setup is done late so we can customize for reducerNone_    //jobConf_.setOutputDir(new File(output_));    setOutputSpec();    if (testMerge_) {      fmt = MuxOutputFormat.class;    } else {      fmt = StreamOutputFormat.class;    }    jobConf_.setOutputFormat(fmt);    // last, allow user to override anything    // (although typically used with properties we didn't touch)    setUserJobConfProps(false);    jar_ = packageJobJar();    if (jar_ != null) {      jobConf_.setJar(jar_);    }    if(verbose_) {      listJobConfProperties();    }        msg("submitting to jobconf: " + getJobTrackerHostPort());  }  protected void listJobConfProperties()  {    msg("==== JobConf properties:");    Iterator it = jobConf_.entries();    TreeMap sorted = new TreeMap();    while(it.hasNext()) {      Map.Entry en = (Map.Entry)it.next();      sorted.put(en.getKey(), en.getValue());    }    it = sorted.entrySet().iterator();    while(it.hasNext()) {      Map.Entry en = (Map.Entry)it.next();      msg(en.getKey() + "=" + en.getValue());    }    msg("====");  }    /** InputSpec-s encode: a glob pattern x additional column files x additional joins */  protected void addInputSpec(String inSpec, int index) {    if (!testMerge_) {      jobConf_.addInputPath(new Path(inSpec));    } else {      CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);      msg("Parsed -input:\n" + spec.toTableString());      if (index == 0) {        hasSimpleInputSpecs_ = (spec.paths_.length == 0);        msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);      }      String primary = spec.primarySpec();      if (!seenPrimary_.add(primary)) {        // this won't detect glob overlaps and noncanonical path variations        fail("Primary used in multiple -input spec: " + primary);      }      jobConf_.addInputPath(new Path(primary));      // during Job execution, will reparse into a CompoundDirSpec       jobConf_.set("stream.inputspecs." + index, inSpec);    }  }  /** uses output_ and mapsideoutURI_ */  protected void setOutputSpec() throws IOException {    CompoundDirSpec spec = new CompoundDirSpec(output_, false);    msg("Parsed -output:\n" + spec.toTableString());    String primary = spec.primarySpec();    String channel0;    // TODO simplify cases, encapsulate in a StreamJobConf    if (!reducerNone_) {      channel0 = primary;    } else {      if (mapsideoutURI_ != null) {        // user can override in case this is in a difft filesystem..        try {          URI uri = new URI(mapsideoutURI_);          if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")            if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {              fail("Must be absolute: " + mapsideoutURI_);            }          } else if (uri.getScheme().equals("socket")) {            // ok          } else {            fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);          }        } catch (URISyntaxException e) {          throw (IOException) new IOException().initCause(e);        }      } else {        mapsideoutURI_ = primary;      }      // an empty reduce output named "part-00002" will go here and not collide.      channel0 = primary + ".NONE";      // the side-effect of the first split of an input named "part-00002"       // will go in this directory      jobConf_.set("stream.sideoutput.dir", primary);      // oops if user overrides low-level this isn't set yet :-(      boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);      // just a guess user may prefer remote..      jobConf_.setBoolean("stream.sideoutput.localfs", localjt);    }    // a path in fs.name.default filesystem    System.out.println(channel0);    System.out.println(new Path(channel0));    jobConf_.setOutputPath(new Path(channel0));    // will reparse remotely    jobConf_.set("stream.outputspec", output_);    if (null != mapsideoutURI_) {      // a path in "jobtracker's filesystem"      // overrides sideoutput.dir      jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);    }  }  protected String getJobTrackerHostPort() {    return jobConf_.get("mapred.job.tracker");  }  protected void jobInfo() {    if (isLocalHadoop()) {      LOG.info("Job running in-process (local Hadoop)");    } else {      String hp = getJobTrackerHostPort();      LOG.info("To kill this job, run:");      LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill "          + jobId_);      //LOG.info("Job file: " + running_.getJobFile() );      LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));    }  }  // Based on JobClient  public void submitAndMonitorJob() throws IOException {    if (jar_ != null && isLocalHadoop()) {      // getAbs became required when shell and subvm have different working dirs...      File wd = new File(".").getAbsoluteFile();      StreamUtil.unJar(new File(jar_), wd);    }    // if jobConf_ changes must recreate a JobClient    jc_ = new JobClient(jobConf_);    boolean error = true;    running_ = null;    String lastReport = null;    try {      running_ = jc_.submitJob(jobConf_);      jobId_ = running_.getJobID();      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));      LOG.info("Running job: " + jobId_);      jobInfo();      while (!running_.isComplete()) {        try {          Thread.sleep(1000);        } catch (InterruptedException e) {        }        running_ = jc_.getJob(jobId_);        String report = null;        report = " map " + Math.round(running_.mapProgress() * 100) + "%  reduce "            + Math.round(running_.reduceProgress() * 100) + "%";        if (!report.equals(lastReport)) {          LOG.info(report);          lastReport = report;        }      }      if (!running_.isSuccessful()) {        jobInfo();        throw new IOException("Job not Successful!");      }      LOG.info("Job complete: " + jobId_);      LOG.info("Output: " + output_);      error = false;    } finally {      if (error && (running_ != null)) {        LOG.info("killJob...");        running_.killJob();      }      jc_.close();    }  }  protected boolean mayExit_;  protected String[] argv_;  protected boolean verbose_;  protected boolean detailedUsage_;  protected int debug_;  protected Environment env_;  protected String jar_;  protected boolean localHadoop_;  protected Configuration config_;  protected JobConf jobConf_;  protected JobClient jc_;  // command-line arguments  protected ArrayList inputSpecs_ = new ArrayList(); // <String>  protected boolean inputTagged_ = false;  protected TreeSet seenPrimary_ = new TreeSet(); // <String>  protected boolean hasSimpleInputSpecs_;  protected ArrayList packageFiles_ = new ArrayList(); // <String>  protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>  //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value  protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();   protected String output_;  protected String mapsideoutURI_;  protected String mapCmd_;  protected String comCmd_;  protected String redCmd_;  protected String cluster_;  protected ArrayList configPath_ = new ArrayList(); // <String>  protected String hadoopAliasConf_;  protected String inReaderSpec_;  protected boolean testMerge_;  // Use to communicate config to the external processes (ex env.var.HADOOP_USER)  // encoding "a=b c=d"  protected String addTaskEnvironment_;  protected boolean outputSingleNode_;  protected long minRecWrittenToEnableSkip_;  protected RunningJob running_;  protected String jobId_;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -