taskinprogress.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 453 行 · 第 1/2 页

JAVA
453
字号
         (String[])diagnostics.toArray(new String[diagnostics.size()]));    }    ////////////////////////////////////////////////    // Update methods, usually invoked by the owning    // job.    ////////////////////////////////////////////////    /**     * A status message from a client has arrived.     * It updates the status of a single component-thread-task,     * which might result in an overall TaskInProgress status update.     */    public void updateStatus(TaskStatus status) {        String taskid = status.getTaskId();        String diagInfo = status.getDiagnosticInfo();        if (diagInfo != null && diagInfo.length() > 0) {            LOG.info("Error from "+taskid+": "+diagInfo);            Vector diagHistory = (Vector) taskDiagnosticData.get(taskid);            if (diagHistory == null) {                diagHistory = new Vector();                taskDiagnosticData.put(taskid, diagHistory);            }            diagHistory.add(diagInfo);        }        taskStatuses.put(taskid, status);        // Recompute progress        recomputeProgress();    }    /**     * Indicate that one of the taskids in this TaskInProgress     * has failed.     */    public void failedSubTask(String taskid, String trackerName) {        //        // Note the failure and its location        //        LOG.info("Task '" + taskid + "' has been lost.");        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);        if (status != null) {            status.setRunState(TaskStatus.FAILED);        }        this.recentTasks.remove(taskid);        if (this.completes > 0) {            this.completes--;        }        numTaskFailures++;        if (numTaskFailures >= MAX_TASK_FAILURES) {            LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");            kill();        }        machinesWhereFailed.add(trackerName);        // Ask JobTracker to forget about this task        jobtracker.removeTaskEntry(taskid);        recomputeProgress();    }    /**     * Indicate that one of the taskids in this TaskInProgress     * has successfully completed!     */    public void completed(String taskid) {        LOG.info("Task '" + taskid + "' has completed.");        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);        status.setRunState(TaskStatus.SUCCEEDED);        recentTasks.remove(taskid);        //        // Now that the TIP is complete, the other speculative         // subtasks will be closed when the owning tasktracker         // reports in and calls shouldClose() on this object.        //        this.completes++;        recomputeProgress();    }    /**     * The TIP's been ordered kill()ed.     */    public void kill() {        if (isComplete() || failed) {            return;        }        this.failed = true;        recomputeProgress();    }    /**     * This method is called whenever there's a status change     * for one of the TIP's sub-tasks.  It recomputes the overall      * progress for the TIP.  We examine all sub-tasks and find      * the one that's most advanced (and non-failed).     */    void recomputeProgress() {        if (isComplete()) {            this.progress = 1;        } else if (failed) {            this.progress = 0;        } else {            double bestProgress = 0;            String bestState = "";            for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {                String taskid = (String) it.next();                TaskStatus status = (TaskStatus) taskStatuses.get(taskid);                if (status.getRunState() == TaskStatus.SUCCEEDED) {                    bestProgress = 1;                    bestState = status.getStateString();                    break;                } else if (status.getRunState() == TaskStatus.RUNNING) {                  if (status.getProgress() >= bestProgress) {                    bestProgress = status.getProgress();                    bestState = status.getStateString();                  }                }            }            this.progress = bestProgress;            this.state = bestState;        }    }    /////////////////////////////////////////////////    // "Action" methods that actually require the TIP    // to do something.    /////////////////////////////////////////////////    /**     * Return whether this TIP has an DFS cache-driven task      * to run at the given taskTracker.     */    boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) {        if (failed || isComplete() || recentTasks.size() > 0) {            return false;        } else {            try {                if (isMapTask()) {                    if (hints == null) {                        hints = job.getFileCacheHints(getTIPId(), split.getFile(), split.getStart(), split.getLength());                    }                    if (hints != null) {                        for (int i = 0; i < hints.length; i++) {                            for (int j = 0; j < hints[i].length; j++) {                                if (hints[i][j].equals(tts.getHost())) {                                    return true;                                }                            }                        }                    }                }            } catch (IOException ie) {            }            return false;        }    }    /**     * Return whether this TIP has a non-speculative task to run     */    boolean hasTask() {        if (failed || isComplete() || recentTasks.size() > 0) {            return false;        } else {            for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) {                TaskStatus ts = (TaskStatus) it.next();                if (ts.getRunState() == TaskStatus.RUNNING) {                    return false;                }            }            return true;        }    }    /**     * Return whether the TIP has a speculative task to run.  We     * only launch a speculative task if the current TIP is really     * far behind, and has been behind for a non-trivial amount of      * time.     */    boolean hasSpeculativeTask(double averageProgress) {        //        // REMIND - mjc - these constants should be examined        // in more depth eventually...        //        if (isMapTask() &&            recentTasks.size() <= MAX_TASK_EXECS &&            conf.getSpeculativeExecution() &&            (averageProgress - progress >= SPECULATIVE_GAP) &&            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {            return true;        }        return false;    }        /**     * Return a Task that can be sent to a TaskTracker for execution.     */    public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) {        Task t = null;        if (hasTaskWithCacheHit(taskTracker, tts) ||            hasTask() ||             hasSpeculativeTask(avgProgress)) {            String taskid = (String) usableTaskIds.first();            usableTaskIds.remove(taskid);            if (isMapTask()) {                t = new MapTask(jobFile, taskid, split);            } else {                String mapIdPredecessors[][] = new String[predecessors.length][];                for (int i = 0; i < mapIdPredecessors.length; i++) {                    mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds();                }                t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition);            }            t.setConf(conf);            recentTasks.add(taskid);            // Ask JobTracker to note that the task exists            jobtracker.createTaskEntry(taskid, taskTracker, this);        }        return t;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?