📄 jobinprogress.java
字号:
reduces[target].getTIPId(), Values.REDUCE.name(), System.currentTimeMillis()); } return result; } /** * Find a new task to run. * @param tts The task tracker that is asking for a task * @param clusterSize The number of task trackers in the cluster * @param avgProgress The average progress of this kind of task in this job * @param tasks The list of potential tasks to try * @param firstTaskToTry The first index in tasks to check * @param cachedTasks A list of tasks that would like to run on this node * @return the index in tasks of the selected task (or -1 for no task) */ private int findNewTask(TaskTrackerStatus tts, int clusterSize, double avgProgress, TaskInProgress[] tasks, int firstTaskToTry, List cachedTasks) { String taskTracker = tts.getTrackerName(); // // See if there is a split over a block that is stored on // the TaskTracker checking in. That means the block // doesn't have to be transmitted from another node. // if (cachedTasks != null) { Iterator i = cachedTasks.iterator(); while (i.hasNext()) { TaskInProgress tip = (TaskInProgress)i.next(); i.remove(); if (tip.isRunnable() && !tip.isRunning() && !tip.hasFailedOnMachine(taskTracker)) { LOG.info("Choosing cached task " + tip.getTIPId()); int cacheTarget = tip.getIdWithinJob(); return cacheTarget; } } } // // If there's no cached target, see if there's // a std. task to run. // int failedTarget = -1; int specTarget = -1; for (int i = 0; i < tasks.length; i++) { int realIdx = (i + firstTaskToTry) % tasks.length; TaskInProgress task = tasks[realIdx]; if (task.isRunnable()) { // if it failed here and we haven't tried every machine, we // don't schedule it here. boolean hasFailed = task.hasFailedOnMachine(taskTracker); if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) { continue; } boolean isRunning = task.isRunning(); if (hasFailed) { // failed tasks that aren't running can be scheduled as a last // resort if (!isRunning && failedTarget == -1) { failedTarget = realIdx; } } else { if (!isRunning) { LOG.info("Choosing normal task " + tasks[realIdx].getTIPId()); return realIdx; } else if (specTarget == -1 && task.hasSpeculativeTask(avgProgress)) { specTarget = realIdx; } } } } if (specTarget != -1) { LOG.info("Choosing speculative task " + tasks[specTarget].getTIPId()); } else if (failedTarget != -1) { LOG.info("Choosing failed task " + tasks[failedTarget].getTIPId()); } return specTarget != -1 ? specTarget : failedTarget; } /** * A taskid assigned to this JobInProgress has reported in successfully. */ public synchronized void completedTask(TaskInProgress tip, TaskStatus status, JobTrackerMetrics metrics) { String taskid = status.getTaskId(); if (tip.isComplete()) { LOG.info("Already complete TIP " + tip.getTIPId() + " has completed task " + taskid); return; } else { LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + " successfully."); String taskTrackerName = status.getTaskTracker(); if(status.getIsMap()){ JobHistory.MapAttempt.logStarted(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getStartTime(), taskTrackerName); JobHistory.MapAttempt.logFinished(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getFinishTime(), taskTrackerName); JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), Values.MAP.name(), status.getFinishTime()); }else{ JobHistory.ReduceAttempt.logStarted(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getStartTime(), taskTrackerName); JobHistory.ReduceAttempt.logFinished(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(), status.getSortFinishTime(), status.getFinishTime(), taskTrackerName); JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), Values.REDUCE.name(), status.getFinishTime()); } } tip.completed(taskid); // updating the running/finished map/reduce counts if (tip.isMapTask()){ runningMapTasks -= 1; finishedMapTasks += 1; metrics.completeMap(); } else{ runningReduceTasks -= 1; finishedReduceTasks += 1; metrics.completeReduce(); } // // Figure out whether the Job is done // boolean allDone = true; for (int i = 0; i < maps.length; i++) { if (! maps[i].isComplete()) { allDone = false; break; } } if (allDone) { if (tip.isMapTask()) { this.status.setMapProgress(1.0f); } for (int i = 0; i < reduces.length; i++) { if (! reduces[i].isComplete()) { allDone = false; break; } } } // // If all tasks are complete, then the job is done! // if (this.status.getRunState() == JobStatus.RUNNING && allDone) { this.status.setRunState(JobStatus.SUCCEEDED); this.status.setReduceProgress(1.0f); this.finishTime = System.currentTimeMillis(); garbageCollect(); LOG.info("Job " + this.status.getJobId() + " has completed successfully."); JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks); metrics.completeJob(); } } /** * Kill the job and all its component tasks. */ public synchronized void kill() { if (status.getRunState() != JobStatus.FAILED) { this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED); this.finishTime = System.currentTimeMillis(); this.runningMapTasks = 0; this.runningReduceTasks = 0; // // kill all TIPs. // for (int i = 0; i < maps.length; i++) { maps[i].kill(); } for (int i = 0; i < reduces.length; i++) { reduces[i].kill(); } JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks); garbageCollect(); } } /** * A task assigned to this JobInProgress has reported in as failed. * Most of the time, we'll just reschedule execution. However, after * many repeated failures we may instead decide to allow the entire * job to fail. * * Even if a task has reported as completed in the past, it might later * be reported as failed. That's because the TaskTracker that hosts a map * task might die before the entire job can complete. If that happens, * we need to schedule reexecution so that downstream reduce tasks can * obtain the map task's output. */ private void failedTask(TaskInProgress tip, String taskid, TaskStatus status, String trackerName, boolean wasRunning, boolean wasComplete) { tip.failedSubTask(taskid, trackerName); boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); //update running count on task failure. if (wasRunning && !isRunning) { if (tip.isMapTask()){ runningMapTasks -= 1; } else { runningReduceTasks -= 1; } } // the case when the map was complete but the task tracker went down. if (wasComplete && !isComplete) { if (tip.isMapTask()){ finishedMapTasks -= 1; } } // update job history String taskTrackerName = status.getTaskTracker(); if (status.getIsMap()) { JobHistory.MapAttempt.logStarted(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getStartTime(), taskTrackerName); JobHistory.MapAttempt.logFailed(profile.getJobId(), tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(), taskTrackerName, status.getDiagnosticInfo()); } else { JobHistory.ReduceAttempt.logStarted(profile.getJobId(), tip.getTIPId(), status.getTaskId(), status.getStartTime(), taskTrackerName); JobHistory.ReduceAttempt.logFailed(profile.getJobId(), tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(), taskTrackerName, status.getDiagnosticInfo()); } // After this, try to assign tasks with the one after this, so that // the failed task goes to the end of the list. if (tip.isMapTask()) { firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length; failedMapTasks++; } else { firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length; failedReduceTasks++; } // // Check if we need to kill the job because of too many failures // if (tip.isFailed()) { LOG.info("Aborting job " + profile.getJobId()); JobHistory.JobInfo.logFailed(profile.getJobId(), System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks); kill(); } jobtracker.removeTaskEntry(taskid); JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(), System.currentTimeMillis(), status.getDiagnosticInfo()); } /** * Fail a task with a given reason, but without a status object. * @author Owen O'Malley * @param tip The task's tip * @param taskid The task id * @param reason The reason that the task failed * @param trackerName The task tracker the task failed on */ public void failedTask(TaskInProgress tip, String taskid, String reason, Phase phase, String hostname, String trackerName, JobTrackerMetrics metrics) { TaskStatus status = new TaskStatus(taskid, tip.isMapTask(), 0.0f, TaskStatus.State.FAILED, reason, reason, trackerName, phase); updateTaskStatus(tip, status, metrics); JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), System.currentTimeMillis(), reason); } /** * The job is dead. We're now GC'ing it, getting rid of the job * from all tables. Be sure to remove all of this job's tasks * from the various tables. */ synchronized void garbageCollect() { try { // Definitely remove the local-disk copy of the job file if (localJobFile != null) { localFs.delete(localJobFile); localJobFile = null; } if (localJarFile != null) { localFs.delete(localJarFile); localJarFile = null; } // JobClient always creates a new directory with job files // so we remove that directory to cleanup FileSystem fs = FileSystem.get(conf); fs.delete(new Path(profile.getJobFile()).getParent()); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobId()+": "+e); } } /** * Return the TaskInProgress that matches the tipid. */ public TaskInProgress getTaskInProgress(String tipid){ for (int i = 0; i < maps.length; i++) { if (tipid.equals(maps[i].getTIPId())){ return maps[i]; } } for (int i = 0; i < reduces.length; i++) { if (tipid.equals(reduces[i].getTIPId())){ return reduces[i]; } } return null; } /** * Find the details of someplace where a map has finished * @param mapId the id of the map * @return the task status of the completed task */ public TaskStatus findFinishedMap(int mapId) { TaskInProgress tip = maps[mapId]; if (tip.isComplete()) { TaskStatus[] statuses = tip.getTaskStatuses(); for(int i=0; i < statuses.length; i++) { if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) { return statuses[i]; } } } return null; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -