📄 jobinprogress.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;import org.apache.hadoop.mapred.JobHistory.Values ; import java.io.*;import java.net.*;import java.util.*;///////////////////////////////////////////////////////// JobInProgress maintains all the info for keeping// a Job on the straight and narrow. It keeps its JobProfile// and its latest JobStatus, plus a set of tables for // doing bookkeeping of its Tasks.///////////////////////////////////////////////////////class JobInProgress { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress"); JobProfile profile; JobStatus status; Path localJobFile = null; Path localJarFile = null; TaskInProgress maps[] = new TaskInProgress[0]; TaskInProgress reduces[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; int runningMapTasks = 0; int runningReduceTasks = 0; int finishedMapTasks = 0; int finishedReduceTasks = 0; int failedMapTasks = 0 ; int failedReduceTasks = 0 ; JobTracker jobtracker = null; HashMap hostToMaps = new HashMap(); long startTime; long finishTime; private JobConf conf; private int firstMapToTry = 0; private int firstReduceToTry = 0; boolean tasksInited = false; private LocalFileSystem localFs; private String uniqueString; /** * Create a JobInProgress with the given job file, plus a handle * to the tracker. */ public JobInProgress(String jobFile, JobTracker jobtracker, Configuration default_conf) throws IOException { uniqueString = jobtracker.createUniqueId(); String jobid = "job_" + uniqueString; String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid; this.jobtracker = jobtracker; this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP); this.startTime = System.currentTimeMillis(); this.localFs = (LocalFileSystem)FileSystem.getNamed("local", default_conf); JobConf default_job_conf = new JobConf(default_conf); this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR +"/"+jobid + ".xml"); this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR +"/"+ jobid + ".jar"); FileSystem fs = FileSystem.get(default_conf); fs.copyToLocalFile(new Path(jobFile), localJobFile); conf = new JobConf(localJobFile); this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url, conf.getJobName()); String jarFile = conf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new Path(jarFile), localJarFile); conf.setJar(localJarFile.toString()); } this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); } /** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ public synchronized void initTasks() throws IOException { if (tasksInited) { return; } // // construct input splits // String jobFile = profile.getJobFile(); FileSystem fs = FileSystem.get(conf); if (localJarFile != null) { ClassLoader loader = new URLClassLoader(new URL[]{ localFs.pathToFile(localJarFile).toURL() }); conf.setClassLoader(loader); } InputFormat inputFormat = conf.getInputFormat(); FileSplit[] splits = inputFormat.getSplits(fs, conf, numMapTasks); // // sort splits by decreasing length, to reduce job's tail // Arrays.sort(splits, new Comparator() { public int compare(Object a, Object b) { long diff = ((FileSplit)b).getLength() - ((FileSplit)a).getLength(); return diff==0 ? 0 : (diff > 0 ? 1 : -1); } }); // // adjust number of map tasks to actual number of splits // this.numMapTasks = splits.length; // if no split is returned, job is considered completed and successful if (numMapTasks == 0) { this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED); tasksInited = true; return; } // create a map task for each split this.maps = new TaskInProgress[numMapTasks]; for (int i = 0; i < numMapTasks; i++) { maps[i] = new TaskInProgress(uniqueString, jobFile, splits[i], jobtracker, conf, this, i); } // // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(uniqueString, jobFile, numMapTasks, i, jobtracker, conf, this); } // // Obtain some tasktracker-cache information for the map task splits. // for (int i = 0; i < maps.length; i++) { String hints[][] = fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(), splits[i].getLength()); if (hints != null) { for (int k = 0; k < hints.length; k++) { for (int j = 0; j < hints[k].length; j++) { ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]); if (hostMaps == null) { hostMaps = new ArrayList(); hostToMaps.put(hints[k][j], hostMaps); } hostMaps.add(maps[i]); } } } } this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING); tasksInited = true; JobHistory.JobInfo.logStarted(profile.getJobId(), System.currentTimeMillis(), numMapTasks, numReduceTasks); } ///////////////////////////////////////////////////// // Accessors for the JobInProgress ///////////////////////////////////////////////////// public JobProfile getProfile() { return profile; } public JobStatus getStatus() { return status; } public long getStartTime() { return startTime; } public long getFinishTime() { return finishTime; } public int desiredMaps() { return numMapTasks; } public int finishedMaps() { return finishedMapTasks; } public int desiredReduces() { return numReduceTasks; } public synchronized int runningMaps() { return runningMapTasks; } public synchronized int runningReduces() { return runningReduceTasks; } public int finishedReduces() { return finishedReduceTasks; } /** * Get the list of map tasks * @return the raw array of maps for this job */ TaskInProgress[] getMapTasks() { return maps; } /** * Get the list of reduce tasks * @return the raw array of reduce tasks for this job */ TaskInProgress[] getReduceTasks() { return reduces; } /** * Get the job configuration * @return the job's configuration */ JobConf getJobConf() { return conf; } /** * Return a treeset of completed TaskInProgress objects */ public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) { Vector results = new Vector(); TaskInProgress tips[] = null; if (shouldBeMap) { tips = maps; } else { tips = reduces; } for (int i = 0; i < tips.length; i++) { if (tips[i].isComplete() == shouldBeComplete) { results.add(tips[i]); } } return results; } //////////////////////////////////////////////////// // Status update methods //////////////////////////////////////////////////// public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus status, JobTrackerMetrics metrics) { double oldProgress = tip.getProgress(); // save old progress boolean wasRunning = tip.isRunning(); boolean wasComplete = tip.isComplete(); boolean change = tip.updateStatus(status); if (change) { TaskStatus.State state = status.getRunState(); if (state == TaskStatus.State.SUCCEEDED) { completedTask(tip, status, metrics); } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { // Tell the job to fail the relevant task failedTask(tip, status.getTaskId(), status, status.getTaskTracker(), wasRunning, wasComplete); } } // // Update JobInProgress status // LOG.debug("Taking progress for " + tip.getTIPId() + " from " + oldProgress + " to " + tip.getProgress()); double progressDelta = tip.getProgress() - oldProgress; if (tip.isMapTask()) { if (maps.length == 0) { this.status.setMapProgress(1.0f); } else { this.status.setMapProgress((float) (this.status.mapProgress() + progressDelta / maps.length)); } } else { if (reduces.length == 0) { this.status.setReduceProgress(1.0f); } else { this.status.setReduceProgress ((float) (this.status.reduceProgress() + (progressDelta / reduces.length))); } } } ///////////////////////////////////////////////////// // Create/manage tasks ///////////////////////////////////////////////////// /** * Return a MapTask, if appropriate, to run on the given tasktracker */ public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; } ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost()); double avgProgress = status.mapProgress() / maps.length; int target = findNewTask(tts, clusterSize, avgProgress, maps, firstMapToTry, mapCache); if (target == -1) { return null; } boolean wasRunning = maps[target].isRunning(); Task result = maps[target].getTaskToRun(tts.getTrackerName()); if (!wasRunning) { runningMapTasks += 1; JobHistory.Task.logStarted(profile.getJobId(), maps[target].getTIPId(), Values.MAP.name(), System.currentTimeMillis()); } return result; } /** * Return a ReduceTask, if appropriate, to run on the given tasktracker. * We don't have cache-sensitivity for reduce tasks, as they * work on temporary MapRed files. */ public Task obtainNewReduceTask(TaskTrackerStatus tts, int clusterSize) { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; } double avgProgress = status.reduceProgress() / reduces.length; int target = findNewTask(tts, clusterSize, avgProgress, reduces, firstReduceToTry, null); if (target == -1) { return null; } boolean wasRunning = reduces[target].isRunning(); Task result = reduces[target].getTaskToRun(tts.getTrackerName()); if (!wasRunning) { runningReduceTasks += 1; JobHistory.Task.logStarted(profile.getJobId(),
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -