⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobclient.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        FileSystem fs = getFs();        // try getting the md5 of the archives        URI[] tarchives = DistributedCache.getCacheArchives(job);        URI[] tfiles = DistributedCache.getCacheFiles(job);        if ((tarchives != null) || (tfiles != null)) {          // prepare these archives for md5 checksums          if (tarchives != null) {            String md5Archives = StringUtils.byteToHexString(DistributedCache                .createMD5(tarchives[0], job));            for (int i = 1; i < tarchives.length; i++) {              md5Archives = md5Archives                  + ","                  + StringUtils.byteToHexString(DistributedCache                      .createMD5(tarchives[i], job));            }            DistributedCache.setArchiveMd5(job, md5Archives);            //job.set("mapred.cache.archivemd5", md5Archives);          }          if (tfiles != null) {            String md5Files = StringUtils.byteToHexString(DistributedCache                .createMD5(tfiles[0], job));            for (int i = 1; i < tfiles.length; i++) {              md5Files = md5Files                  + ","                  + StringUtils.byteToHexString(DistributedCache                      .createMD5(tfiles[i], job));            }            DistributedCache.setFileMd5(job, md5Files);            //"mapred.cache.filemd5", md5Files);          }        }               String originalJarPath = job.getJar();        short replication = (short)job.getInt("mapred.submit.replication", 10);        if (originalJarPath != null) {           // copy jar to JobTracker's fs          job.setJar(submitJarFile.toString());          fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);          fs.setReplication(submitJarFile, replication);        }        // Set the user's name and working directory        String user = System.getProperty("user.name");        job.setUser(user != null ? user : "Dr Who");        if (job.getWorkingDirectory() == null) {          job.setWorkingDirectory(fs.getWorkingDirectory());                  }        FileSystem userFileSys = FileSystem.get(job);        Path[] inputDirs = job.getInputPaths();        boolean[] validDirs =           job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);        for(int i=0; i < validDirs.length; ++i) {          if (!validDirs[i]) {            String msg = "Input directory " + inputDirs[i] +                          " in " + userFileSys.getName() + " is invalid.";            LOG.error(msg);            throw new IOException(msg);          }        }        // Check the output specification        job.getOutputFormat().checkOutputSpecs(fs, job);        // Write job file to JobTracker's fs                FSDataOutputStream out = fs.create(submitJobFile, replication);        try {          job.write(out);        } finally {          out.close();        }        //        // Now, actually submit the job (using the submit name)        //        JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString());        if (status != null) {            return new NetworkedJob(status);        } else {            throw new IOException("Could not launch job");        }    }    /**     * Get an RunningJob object to track an ongoing job.  Returns     * null if the id does not correspond to any known job.     */    public RunningJob getJob(String jobid) throws IOException {        JobStatus status = jobSubmitClient.getJobStatus(jobid);        if (status != null) {            return new NetworkedJob(status);        } else {            return null;        }    }    public ClusterStatus getClusterStatus() throws IOException {      return jobSubmitClient.getClusterStatus();    }        public JobStatus[] jobsToComplete() throws IOException {	return jobSubmitClient.jobsToComplete();    }        /** Utility that submits a job, then polls for progress until the job is     * complete. */    public static void runJob(JobConf job) throws IOException {      JobClient jc = new JobClient(job);      boolean error = true;      RunningJob running = null;      String lastReport = null;      final int MAX_RETRIES = 5;      int retries = MAX_RETRIES;      try {        running = jc.submitJob(job);        String jobId = running.getJobID();        LOG.info("Running job: " + jobId);        while (true) {          try {            Thread.sleep(1000);          } catch (InterruptedException e) {}          try {            if (running.isComplete()) {              break;            }            running = jc.getJob(jobId);            String report =               (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+               " reduce " +                StringUtils.formatPercent(running.reduceProgress(), 0));            if (!report.equals(lastReport)) {              LOG.info(report);              lastReport = report;            }            retries = MAX_RETRIES;          } catch (IOException ie) {            if (--retries == 0) {              LOG.warn("Final attempt failed, killing job.");              throw ie;            }            LOG.info("Communication problem with server: " +                     StringUtils.stringifyException(ie));          }        }        if (!running.isSuccessful()) {          throw new IOException("Job failed!");        }        LOG.info("Job complete: " + jobId);        error = false;      } finally {        if (error && (running != null)) {          running.killJob();        }        jc.close();      }    }    static Configuration getConfiguration(String jobTrackerSpec)    {      Configuration conf = new Configuration();      if(jobTrackerSpec != null) {                if(jobTrackerSpec.indexOf(":") >= 0) {          conf.set("mapred.job.tracker", jobTrackerSpec);        } else {          String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";          URL validate = conf.getResource(classpathFile);          if(validate == null) {            throw new RuntimeException(classpathFile + " not found on CLASSPATH");          }          conf.addFinalResource(classpathFile);        }      }      return conf;    }            public int run(String[] argv) throws Exception {        // TODO Auto-generated method stub        if (argv.length < 2) {            System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]");            System.exit(-1);        }        // initialize JobClient        init();                // Process args        String submitJobFile = null;        String jobid = null;        boolean getStatus = false;        boolean killJob = false;        for (int i = 0; i < argv.length; i++) {            if ("-submit".equals(argv[i])) {                submitJobFile = argv[i+1];                i++;            } else if ("-status".equals(argv[i])) {                jobid = argv[i+1];                getStatus = true;                i++;            } else if ("-kill".equals(argv[i])) {                jobid = argv[i+1];                killJob = true;                i++;            }        }        // Submit the request        int exitCode = -1;        try {            if (submitJobFile != null) {                RunningJob job = submitJob(submitJobFile);                System.out.println("Created job " + job.getJobID());            } else if (getStatus) {                RunningJob job = getJob(jobid);                if (job == null) {                    System.out.println("Could not find job " + jobid);                } else {                    System.out.println();                    System.out.println(job);                    exitCode = 0;                }            } else if (killJob) {                RunningJob job = getJob(jobid);                if (job == null) {                    System.out.println("Could not find job " + jobid);                } else {                    job.killJob();                    System.out.println("Killed job " + jobid);                    exitCode = 0;                }            }        } finally {            close();        }        return exitCode;    }        /**     */    public static void main(String argv[]) throws Exception {        int res = new JobClient().doMain(new Configuration(), argv);        System.exit(res);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -