📄 jobclient.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.*;import org.apache.hadoop.filecache.*;import java.io.*;import java.net.*;import java.util.*;/******************************************************* * JobClient interacts with the JobTracker network interface. * This object implements the job-control interface, and * should be the primary method by which user programs interact * with the networked job system. * * @author Mike Cafarella *******************************************************/public class JobClient extends ToolBase implements MRConstants { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient"); static long MAX_JOBPROFILE_AGE = 1000 * 2; /** * A NetworkedJob is an implementation of RunningJob. It holds * a JobProfile object to provide some info, and interacts with the * remote service to provide certain functionality. */ class NetworkedJob implements RunningJob { JobProfile profile; JobStatus status; long statustime; /** * We store a JobProfile and a timestamp for when we last * acquired the job profile. If the job is null, then we cannot * perform any of the tasks. The job might be null if the JobTracker * has completely forgotten about the job. (eg, 24 hours after the * job completes.) */ public NetworkedJob(JobStatus job) throws IOException { this.status = job; this.profile = jobSubmitClient.getJobProfile(job.getJobId()); this.statustime = System.currentTimeMillis(); } /** * Some methods rely on having a recent job profile object. Refresh * it, if necessary */ synchronized void ensureFreshStatus() throws IOException { if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) { this.status = jobSubmitClient.getJobStatus(profile.getJobId()); this.statustime = System.currentTimeMillis(); } } /** * An identifier for the job */ public String getJobID() { return profile.getJobId(); } /** * The name of the job file */ public String getJobFile() { return profile.getJobFile(); } /** * A URL where the job's status can be seen */ public String getTrackingURL() { return profile.getURL().toString(); } /** * A float between 0.0 and 1.0, indicating the % of map work * completed. */ public float mapProgress() throws IOException { ensureFreshStatus(); return status.mapProgress(); } /** * A float between 0.0 and 1.0, indicating the % of reduce work * completed. */ public float reduceProgress() throws IOException { ensureFreshStatus(); return status.reduceProgress(); } /** * Returns immediately whether the whole job is done yet or not. */ public synchronized boolean isComplete() throws IOException { ensureFreshStatus(); return (status.getRunState() == JobStatus.SUCCEEDED || status.getRunState() == JobStatus.FAILED); } /** * True iff job completed successfully. */ public synchronized boolean isSuccessful() throws IOException { ensureFreshStatus(); return status.getRunState() == JobStatus.SUCCEEDED; } /** * Blocks until the job is finished */ public synchronized void waitForCompletion() throws IOException { while (! isComplete()) { try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } /** * Tells the service to terminate the current job. */ public synchronized void killJob() throws IOException { jobSubmitClient.killJob(getJobID()); } /** * Dump stats to screen */ public String toString() { try { ensureFreshStatus(); } catch (IOException e) { } return "Job: " + profile.getJobId() + "\n" + "file: " + profile.getJobFile() + "\n" + "tracking URL: " + profile.getURL() + "\n" + "map() completion: " + status.mapProgress() + "\n" + "reduce() completion: " + status.reduceProgress(); } } JobSubmissionProtocol jobSubmitClient; FileSystem fs = null; static Random r = new Random(); /** * Build a job client, connect to the default job tracker */ public JobClient() { } public JobClient(Configuration conf) throws IOException { setConf(conf); init(); } public void init() throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), conf); } } /** * Build a job client, connect to the indicated job tracker. */ public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, jobTrackAddr, conf); } /** */ public synchronized void close() throws IOException { } /** * Get a filesystem handle. We need this to prepare jobs * for submission to the MapReduce system. */ public synchronized FileSystem getFs() throws IOException { if (this.fs == null) { String fsName = jobSubmitClient.getFilesystemName(); this.fs = FileSystem.getNamed(fsName, this.conf); } return fs; } /** * Submit a job to the MR system */ public RunningJob submitJob(String jobFile) throws IOException { // Load in the submitted job details JobConf job = new JobConf(jobFile); return submitJob(job); } /** * Submit a job to the MR system */ public RunningJob submitJob(JobConf job) throws IOException { // // First figure out what fs the JobTracker is using. Copy the // job to it, under a temporary name. This allows DFS to work, // and under the local fs also provides UNIX-like object loading // semantics. (that is, if the job file is deleted right after // submission, we can still run the submission to completion) // // Create a number of filenames in the JobTracker's fs namespace Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path submitJobFile = new Path(submitJobDir, "job.xml"); Path submitJarFile = new Path(submitJobDir, "job.jar");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -