📄 jobclient.java
字号:
FileSystem fs = getFs(); // try getting the md5 of the archives URI[] tarchives = DistributedCache.getCacheArchives(job); URI[] tfiles = DistributedCache.getCacheFiles(job); if ((tarchives != null) || (tfiles != null)) { // prepare these archives for md5 checksums if (tarchives != null) { String md5Archives = StringUtils.byteToHexString(DistributedCache .createMD5(tarchives[0], job)); for (int i = 1; i < tarchives.length; i++) { md5Archives = md5Archives + "," + StringUtils.byteToHexString(DistributedCache .createMD5(tarchives[i], job)); } DistributedCache.setArchiveMd5(job, md5Archives); //job.set("mapred.cache.archivemd5", md5Archives); } if (tfiles != null) { String md5Files = StringUtils.byteToHexString(DistributedCache .createMD5(tfiles[0], job)); for (int i = 1; i < tfiles.length; i++) { md5Files = md5Files + "," + StringUtils.byteToHexString(DistributedCache .createMD5(tfiles[i], job)); } DistributedCache.setFileMd5(job, md5Files); //"mapred.cache.filemd5", md5Files); } } String originalJarPath = job.getJar(); short replication = (short)job.getInt("mapred.submit.replication", 10); if (originalJarPath != null) { // copy jar to JobTracker's fs job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile); fs.setReplication(submitJarFile, replication); } // Set the user's name and working directory String user = System.getProperty("user.name"); job.setUser(user != null ? user : "Dr Who"); if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(fs.getWorkingDirectory()); } FileSystem userFileSys = FileSystem.get(job); Path[] inputDirs = job.getInputPaths(); boolean[] validDirs = job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs); for(int i=0; i < validDirs.length; ++i) { if (!validDirs[i]) { String msg = "Input directory " + inputDirs[i] + " in " + userFileSys.getName() + " is invalid."; LOG.error(msg); throw new IOException(msg); } } // Check the output specification job.getOutputFormat().checkOutputSpecs(fs, job); // Write job file to JobTracker's fs FSDataOutputStream out = fs.create(submitJobFile, replication); try { job.write(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString()); if (status != null) { return new NetworkedJob(status); } else { throw new IOException("Could not launch job"); } } /** * Get an RunningJob object to track an ongoing job. Returns * null if the id does not correspond to any known job. */ public RunningJob getJob(String jobid) throws IOException { JobStatus status = jobSubmitClient.getJobStatus(jobid); if (status != null) { return new NetworkedJob(status); } else { return null; } } public ClusterStatus getClusterStatus() throws IOException { return jobSubmitClient.getClusterStatus(); } public JobStatus[] jobsToComplete() throws IOException { return jobSubmitClient.jobsToComplete(); } /** Utility that submits a job, then polls for progress until the job is * complete. */ public static void runJob(JobConf job) throws IOException { JobClient jc = new JobClient(job); boolean error = true; RunningJob running = null; String lastReport = null; final int MAX_RETRIES = 5; int retries = MAX_RETRIES; try { running = jc.submitJob(job); String jobId = running.getJobID(); LOG.info("Running job: " + jobId); while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) {} try { if (running.isComplete()) { break; } running = jc.getJob(jobId); String report = (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+ " reduce " + StringUtils.formatPercent(running.reduceProgress(), 0)); if (!report.equals(lastReport)) { LOG.info(report); lastReport = report; } retries = MAX_RETRIES; } catch (IOException ie) { if (--retries == 0) { LOG.warn("Final attempt failed, killing job."); throw ie; } LOG.info("Communication problem with server: " + StringUtils.stringifyException(ie)); } } if (!running.isSuccessful()) { throw new IOException("Job failed!"); } LOG.info("Job complete: " + jobId); error = false; } finally { if (error && (running != null)) { running.killJob(); } jc.close(); } } static Configuration getConfiguration(String jobTrackerSpec) { Configuration conf = new Configuration(); if(jobTrackerSpec != null) { if(jobTrackerSpec.indexOf(":") >= 0) { conf.set("mapred.job.tracker", jobTrackerSpec); } else { String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; URL validate = conf.getResource(classpathFile); if(validate == null) { throw new RuntimeException(classpathFile + " not found on CLASSPATH"); } conf.addFinalResource(classpathFile); } } return conf; } public int run(String[] argv) throws Exception { // TODO Auto-generated method stub if (argv.length < 2) { System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]"); System.exit(-1); } // initialize JobClient init(); // Process args String submitJobFile = null; String jobid = null; boolean getStatus = false; boolean killJob = false; for (int i = 0; i < argv.length; i++) { if ("-submit".equals(argv[i])) { submitJobFile = argv[i+1]; i++; } else if ("-status".equals(argv[i])) { jobid = argv[i+1]; getStatus = true; i++; } else if ("-kill".equals(argv[i])) { jobid = argv[i+1]; killJob = true; i++; } } // Submit the request int exitCode = -1; try { if (submitJobFile != null) { RunningJob job = submitJob(submitJobFile); System.out.println("Created job " + job.getJobID()); } else if (getStatus) { RunningJob job = getJob(jobid); if (job == null) { System.out.println("Could not find job " + jobid); } else { System.out.println(); System.out.println(job); exitCode = 0; } } else if (killJob) { RunningJob job = getJob(jobid); if (job == null) { System.out.println("Could not find job " + jobid); } else { job.killJob(); System.out.println("Killed job " + jobid); exitCode = 0; } } } finally { close(); } return exitCode; } /** */ public static void main(String argv[]) throws Exception { int res = new JobClient().doMain(new Configuration(), argv); System.exit(res); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -