taskinprogress.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 453 行 · 第 1/2 页
JAVA
453 行
(String[])diagnostics.toArray(new String[diagnostics.size()])); } //////////////////////////////////////////////// // Update methods, usually invoked by the owning // job. //////////////////////////////////////////////// /** * A status message from a client has arrived. * It updates the status of a single component-thread-task, * which might result in an overall TaskInProgress status update. */ public void updateStatus(TaskStatus status) { String taskid = status.getTaskId(); String diagInfo = status.getDiagnosticInfo(); if (diagInfo != null && diagInfo.length() > 0) { LOG.info("Error from "+taskid+": "+diagInfo); Vector diagHistory = (Vector) taskDiagnosticData.get(taskid); if (diagHistory == null) { diagHistory = new Vector(); taskDiagnosticData.put(taskid, diagHistory); } diagHistory.add(diagInfo); } taskStatuses.put(taskid, status); // Recompute progress recomputeProgress(); } /** * Indicate that one of the taskids in this TaskInProgress * has failed. */ public void failedSubTask(String taskid, String trackerName) { // // Note the failure and its location // LOG.info("Task '" + taskid + "' has been lost."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status != null) { status.setRunState(TaskStatus.FAILED); } this.recentTasks.remove(taskid); if (this.completes > 0) { this.completes--; } numTaskFailures++; if (numTaskFailures >= MAX_TASK_FAILURES) { LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times."); kill(); } machinesWhereFailed.add(trackerName); // Ask JobTracker to forget about this task jobtracker.removeTaskEntry(taskid); recomputeProgress(); } /** * Indicate that one of the taskids in this TaskInProgress * has successfully completed! */ public void completed(String taskid) { LOG.info("Task '" + taskid + "' has completed."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); status.setRunState(TaskStatus.SUCCEEDED); recentTasks.remove(taskid); // // Now that the TIP is complete, the other speculative // subtasks will be closed when the owning tasktracker // reports in and calls shouldClose() on this object. // this.completes++; recomputeProgress(); } /** * The TIP's been ordered kill()ed. */ public void kill() { if (isComplete() || failed) { return; } this.failed = true; recomputeProgress(); } /** * This method is called whenever there's a status change * for one of the TIP's sub-tasks. It recomputes the overall * progress for the TIP. We examine all sub-tasks and find * the one that's most advanced (and non-failed). */ void recomputeProgress() { if (isComplete()) { this.progress = 1; } else if (failed) { this.progress = 0; } else { double bestProgress = 0; String bestState = ""; for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) { String taskid = (String) it.next(); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status.getRunState() == TaskStatus.SUCCEEDED) { bestProgress = 1; bestState = status.getStateString(); break; } else if (status.getRunState() == TaskStatus.RUNNING) { if (status.getProgress() >= bestProgress) { bestProgress = status.getProgress(); bestState = status.getStateString(); } } } this.progress = bestProgress; this.state = bestState; } } ///////////////////////////////////////////////// // "Action" methods that actually require the TIP // to do something. ///////////////////////////////////////////////// /** * Return whether this TIP has an DFS cache-driven task * to run at the given taskTracker. */ boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) { if (failed || isComplete() || recentTasks.size() > 0) { return false; } else { try { if (isMapTask()) { if (hints == null) { hints = job.getFileCacheHints(getTIPId(), split.getFile(), split.getStart(), split.getLength()); } if (hints != null) { for (int i = 0; i < hints.length; i++) { for (int j = 0; j < hints[i].length; j++) { if (hints[i][j].equals(tts.getHost())) { return true; } } } } } } catch (IOException ie) { } return false; } } /** * Return whether this TIP has a non-speculative task to run */ boolean hasTask() { if (failed || isComplete() || recentTasks.size() > 0) { return false; } else { for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) { TaskStatus ts = (TaskStatus) it.next(); if (ts.getRunState() == TaskStatus.RUNNING) { return false; } } return true; } } /** * Return whether the TIP has a speculative task to run. We * only launch a speculative task if the current TIP is really * far behind, and has been behind for a non-trivial amount of * time. */ boolean hasSpeculativeTask(double averageProgress) { // // REMIND - mjc - these constants should be examined // in more depth eventually... // if (isMapTask() && recentTasks.size() <= MAX_TASK_EXECS && conf.getSpeculativeExecution() && (averageProgress - progress >= SPECULATIVE_GAP) && (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { return true; } return false; } /** * Return a Task that can be sent to a TaskTracker for execution. */ public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) { Task t = null; if (hasTaskWithCacheHit(taskTracker, tts) || hasTask() || hasSpeculativeTask(avgProgress)) { String taskid = (String) usableTaskIds.first(); usableTaskIds.remove(taskid); if (isMapTask()) { t = new MapTask(jobFile, taskid, split); } else { String mapIdPredecessors[][] = new String[predecessors.length][]; for (int i = 0; i < mapIdPredecessors.length; i++) { mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds(); } t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition); } t.setConf(conf); recentTasks.add(taskid); // Ask JobTracker to note that the task exists jobtracker.createTaskEntry(taskid, taskTracker, this); } return t; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?