jobinprogress.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 507 行 · 第 1/2 页
JAVA
507 行
// Update JobInProgress status // double progressDelta = tip.getProgress() - oldProgress; if (tip.isMapTask()) { if (maps.length == 0) { this.status.setMapProgress(1.0f); } else { this.status.mapProgress += (progressDelta / maps.length); } } else { if (reduces.length == 0) { this.status.setReduceProgress(1.0f); } else { this.status.reduceProgress += (progressDelta / reduces.length); } } } ///////////////////////////////////////////////////// // Create/manage tasks ///////////////////////////////////////////////////// /** * Return a MapTask, if appropriate, to run on the given tasktracker */ public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; } Task t = null; int cacheTarget = -1; int stdTarget = -1; int specTarget = -1; // // We end up creating two tasks for the same bucket, because // we call obtainNewMapTask() really fast, twice in a row. // There's not enough time for the "recentTasks" // // // Compute avg progress through the map tasks // double avgProgress = status.mapProgress() / maps.length; // // See if there is a split over a block that is stored on // the TaskTracker checking in. That means the block // doesn't have to be transmitted from another node. // for (int i = 0; i < maps.length; i++) { if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) { if (cacheTarget < 0) { cacheTarget = i; break; } } } // // If there's no cached target, see if there's // a std. task to run. // if (cacheTarget < 0) { for (int i = 0; i < maps.length; i++) { if (maps[i].hasTask()) { if (stdTarget < 0) { stdTarget = i; break; } } } } // // If no cached-target and no std target, see if // there's a speculative task to run. // if (cacheTarget < 0 && stdTarget < 0) { for (int i = 0; i < maps.length; i++) { if (maps[i].hasSpeculativeTask(avgProgress)) { if (specTarget < 0) { specTarget = i; break; } } } } // // Run whatever we found // if (cacheTarget >= 0) { t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (stdTarget >= 0) { t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (specTarget >= 0) { t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress); } return t; } /** * Return a ReduceTask, if appropriate, to run on the given tasktracker. * We don't have cache-sensitivity for reduce tasks, as they * work on temporary MapRed files. */ public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) { if (! tasksInited) { LOG.info("Cannot create task split for " + profile.getJobId()); return null; } Task t = null; int stdTarget = -1; int specTarget = -1; double avgProgress = status.reduceProgress() / reduces.length; for (int i = 0; i < reduces.length; i++) { if (reduces[i].hasTask()) { if (stdTarget < 0) { stdTarget = i; } } else if (reduces[i].hasSpeculativeTask(avgProgress)) { if (specTarget < 0) { specTarget = i; } } } if (stdTarget >= 0) { t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress); } else if (specTarget >= 0) { t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress); } return t; } /** * A taskid assigned to this JobInProgress has reported in successfully. */ public synchronized void completedTask(TaskInProgress tip, String taskid) { LOG.info("Taskid '" + taskid + "' has finished successfully."); tip.completed(taskid); // // Figure out whether the Job is done // boolean allDone = true; for (int i = 0; i < maps.length; i++) { if (! maps[i].isComplete()) { allDone = false; break; } } if (allDone) { for (int i = 0; i < reduces.length; i++) { if (! reduces[i].isComplete()) { allDone = false; break; } } } // // If all tasks are complete, then the job is done! // if (status.getRunState() == JobStatus.RUNNING && allDone) { this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED); this.finishTime = System.currentTimeMillis(); } } /** * Kill the job and all its component tasks. */ public synchronized void kill() { if (status.getRunState() != JobStatus.FAILED) { this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED); this.finishTime = System.currentTimeMillis(); // // kill all TIPs. // for (int i = 0; i < maps.length; i++) { maps[i].kill(); } for (int i = 0; i < reduces.length; i++) { reduces[i].kill(); } } } /** * A task assigned to this JobInProgress has reported in as failed. * Most of the time, we'll just reschedule execution. However, after * many repeated failures we may instead decide to allow the entire * job to fail. * * Even if a task has reported as completed in the past, it might later * be reported as failed. That's because the TaskTracker that hosts a map * task might die before the entire job can complete. If that happens, * we need to schedule reexecution so that downstream reduce tasks can * obtain the map task's output. */ public void failedTask(TaskInProgress tip, String taskid, String trackerName) { tip.failedSubTask(taskid, trackerName); // // Check if we need to kill the job because of too many failures // if (tip.isFailed()) { LOG.info("Aborting job " + profile.getJobId()); kill(); } } /** * The job is dead. We're now GC'ing it, getting rid of the job * from all tables. Be sure to remove all of this job's tasks * from the various tables. */ public synchronized void garbageCollect() throws IOException { // // Remove this job from all tables // // Definitely remove the local-disk copy of the job file if (localJobFile != null) { localJobFile.delete(); localJobFile = null; } if (localJarFile != null) { localJarFile.delete(); localJarFile = null; } // // If the job file was in the temporary system directory, // we should delete it upon garbage collect. // if (deleteUponCompletion != null) { JobConf jd = new JobConf(deleteUponCompletion); FileSystem fs = FileSystem.get(conf); fs.delete(new File(jd.getJar())); fs.delete(new File(deleteUponCompletion)); deleteUponCompletion = null; } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?