📄 taskinprogress.java
字号:
* @return the list of diagnostics for that task */ synchronized List<String> getDiagnosticInfo(String taskId) { return taskDiagnosticData.get(taskId); } //////////////////////////////////////////////// // Update methods, usually invoked by the owning // job. //////////////////////////////////////////////// /** * A status message from a client has arrived. * It updates the status of a single component-thread-task, * which might result in an overall TaskInProgress status update. * @return has the task changed its state noticably? */ synchronized boolean updateStatus(TaskStatus status) { String taskid = status.getTaskId(); String diagInfo = status.getDiagnosticInfo(); TaskStatus oldStatus = (TaskStatus) taskStatuses.get(taskid); boolean changed = true; if (diagInfo != null && diagInfo.length() > 0) { LOG.info("Error from "+taskid+": "+diagInfo); List diagHistory = (List) taskDiagnosticData.get(taskid); if (diagHistory == null) { diagHistory = new ArrayList(); taskDiagnosticData.put(taskid, diagHistory); } diagHistory.add(diagInfo); } if (oldStatus != null) { TaskStatus.State oldState = oldStatus.getRunState(); TaskStatus.State newState = status.getRunState(); // The task is not allowed to move from completed back to running. // We have seen out of order status messagesmoving tasks from complete // to running. This is a spot fix, but it should be addressed more // globally. if (newState == TaskStatus.State.RUNNING && (oldState == TaskStatus.State.FAILED || oldState == TaskStatus.State.KILLED || oldState == TaskStatus.State.SUCCEEDED)) { return false; } changed = oldState != newState; } taskStatuses.put(taskid, status); // Recompute progress recomputeProgress(); return changed; } /** * Indicate that one of the taskids in this TaskInProgress * has failed. */ public void failedSubTask(String taskid, String trackerName) { // // Note the failure and its location // LOG.info("Task '" + taskid + "' has been lost."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status != null) { status.setRunState(TaskStatus.State.FAILED); // tasktracker went down and failed time was not reported. if( 0 == status.getFinishTime() ){ status.setFinishTime(System.currentTimeMillis()); } } this.recentTasks.remove(taskid); if (this.completes > 0) { this.completes--; } numTaskFailures++; if (numTaskFailures >= MAX_TASK_FAILURES) { LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); kill(); } machinesWhereFailed.add(trackerName); } /** * Indicate that one of the taskids in this TaskInProgress * has successfully completed! */ public void completed(String taskid) { LOG.info("Task '" + taskid + "' has completed."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); status.setRunState(TaskStatus.State.SUCCEEDED); recentTasks.remove(taskid); // // Now that the TIP is complete, the other speculative // subtasks will be closed when the owning tasktracker // reports in and calls shouldClose() on this object. // this.completes++; recomputeProgress(); } /** * Get the Status of the tasks managed by this TIP */ public TaskStatus[] getTaskStatuses() { return (TaskStatus[])taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]); } /** * The TIP's been ordered kill()ed. */ public void kill() { if (isComplete() || failed) { return; } this.failed = true; killed = true; recomputeProgress(); } /** * Was the task killed? * @return true if the task killed */ public boolean wasKilled() { return killed; } /** * This method is called whenever there's a status change * for one of the TIP's sub-tasks. It recomputes the overall * progress for the TIP. We examine all sub-tasks and find * the one that's most advanced (and non-failed). */ void recomputeProgress() { if (isComplete()) { this.progress = 1; this.execFinishTime = System.currentTimeMillis(); } else if (failed) { this.progress = 0; this.execFinishTime = System.currentTimeMillis(); } else { double bestProgress = 0; String bestState = ""; for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) { String taskid = (String) it.next(); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status.getRunState() == TaskStatus.State.SUCCEEDED) { bestProgress = 1; bestState = status.getStateString(); break; } else if (status.getRunState() == TaskStatus.State.RUNNING) { if (status.getProgress() >= bestProgress) { bestProgress = status.getProgress(); bestState = status.getStateString(); } } } this.progress = bestProgress; this.state = bestState; } } ///////////////////////////////////////////////// // "Action" methods that actually require the TIP // to do something. ///////////////////////////////////////////////// /** * Return whether this TIP still needs to run */ boolean isRunnable() { return !failed && (completes == 0); } /** * Return whether the TIP has a speculative task to run. We * only launch a speculative task if the current TIP is really * far behind, and has been behind for a non-trivial amount of * time. */ boolean hasSpeculativeTask(double averageProgress) { // // REMIND - mjc - these constants should be examined // in more depth eventually... // if (isMapTask() && recentTasks.size() <= MAX_TASK_EXECS && runSpeculative && (averageProgress - progress >= SPECULATIVE_GAP) && (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { return true; } return false; } /** * Return a Task that can be sent to a TaskTracker for execution. */ public Task getTaskToRun(String taskTracker) { Task t = null; if( 0 == execStartTime ){ // assume task starts running now execStartTime = System.currentTimeMillis(); } String taskid = (String) usableTaskIds.first(); usableTaskIds.remove(taskid); String jobId = job.getProfile().getJobId(); if (isMapTask()) { t = new MapTask(jobId, jobFile, taskid, partition, split); } else { t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps); } t.setConf(conf); recentTasks.add(taskid); // Ask JobTracker to note that the task exists jobtracker.createTaskEntry(taskid, taskTracker, this); return t; } /** * Has this task already failed on this machine? * @param tracker The task tracker name * @return Has it failed? */ public boolean hasFailedOnMachine(String tracker) { return machinesWhereFailed.contains(tracker); } /** * Get the number of machines where this task has failed. * @return the size of the failed machine set */ public int getNumberOfFailedMachines() { return machinesWhereFailed.size(); } /** * Get the id of this map or reduce task. * @return The index of this tip in the maps/reduces lists. */ public int getIdWithinJob() { return partition; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -