jobinprogress.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 507 行 · 第 1/2 页

JAVA
507
字号
        // Update JobInProgress status        //        double progressDelta = tip.getProgress() - oldProgress;        if (tip.isMapTask()) {          if (maps.length == 0) {            this.status.setMapProgress(1.0f);          } else {            this.status.mapProgress += (progressDelta / maps.length);          }        } else {          if (reduces.length == 0) {            this.status.setReduceProgress(1.0f);          } else {            this.status.reduceProgress += (progressDelta / reduces.length);          }        }    }       /////////////////////////////////////////////////////    // Create/manage tasks    /////////////////////////////////////////////////////    /**     * Return a MapTask, if appropriate, to run on the given tasktracker     */    public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) {        if (! tasksInited) {            LOG.info("Cannot create task split for " + profile.getJobId());            return null;        }        Task t = null;        int cacheTarget = -1;        int stdTarget = -1;        int specTarget = -1;        //        // We end up creating two tasks for the same bucket, because        // we call obtainNewMapTask() really fast, twice in a row.        // There's not enough time for the "recentTasks"        //        //        // Compute avg progress through the map tasks        //        double avgProgress = status.mapProgress() / maps.length;        //        // See if there is a split over a block that is stored on        // the TaskTracker checking in.  That means the block        // doesn't have to be transmitted from another node.        //        for (int i = 0; i < maps.length; i++) {            if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) {                if (cacheTarget < 0) {                    cacheTarget = i;                    break;                }            }        }        //        // If there's no cached target, see if there's        // a std. task to run.        //        if (cacheTarget < 0) {            for (int i = 0; i < maps.length; i++) {                if (maps[i].hasTask()) {                    if (stdTarget < 0) {                        stdTarget = i;                        break;                    }                }            }        }        //        // If no cached-target and no std target, see if        // there's a speculative task to run.        //        if (cacheTarget < 0 && stdTarget < 0) {            for (int i = 0; i < maps.length; i++) {                        if (maps[i].hasSpeculativeTask(avgProgress)) {                    if (specTarget < 0) {                        specTarget = i;                        break;                    }                }            }        }        //        // Run whatever we found        //        if (cacheTarget >= 0) {            t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);        } else if (stdTarget >= 0) {            t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);        } else if (specTarget >= 0) {            t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);        }        return t;    }    /**     * Return a ReduceTask, if appropriate, to run on the given tasktracker.     * We don't have cache-sensitivity for reduce tasks, as they     *  work on temporary MapRed files.       */    public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) {        if (! tasksInited) {            LOG.info("Cannot create task split for " + profile.getJobId());            return null;        }        Task t = null;        int stdTarget = -1;        int specTarget = -1;        double avgProgress = status.reduceProgress() / reduces.length;        for (int i = 0; i < reduces.length; i++) {            if (reduces[i].hasTask()) {                if (stdTarget < 0) {                    stdTarget = i;                }            } else if (reduces[i].hasSpeculativeTask(avgProgress)) {                if (specTarget < 0) {                    specTarget = i;                }            }        }                if (stdTarget >= 0) {            t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);        } else if (specTarget >= 0) {            t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);        }        return t;    }    /**     * A taskid assigned to this JobInProgress has reported in successfully.     */    public synchronized void completedTask(TaskInProgress tip, String taskid) {        LOG.info("Taskid '" + taskid + "' has finished successfully.");        tip.completed(taskid);        //        // Figure out whether the Job is done        //        boolean allDone = true;        for (int i = 0; i < maps.length; i++) {            if (! maps[i].isComplete()) {                allDone = false;                break;            }        }        if (allDone) {            for (int i = 0; i < reduces.length; i++) {                if (! reduces[i].isComplete()) {                    allDone = false;                    break;                }            }        }        //        // If all tasks are complete, then the job is done!        //        if (status.getRunState() == JobStatus.RUNNING && allDone) {            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);            this.finishTime = System.currentTimeMillis();        }    }    /**     * Kill the job and all its component tasks.     */    public synchronized void kill() {        if (status.getRunState() != JobStatus.FAILED) {            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);            this.finishTime = System.currentTimeMillis();            //            // kill all TIPs.            //            for (int i = 0; i < maps.length; i++) {                maps[i].kill();            }            for (int i = 0; i < reduces.length; i++) {                reduces[i].kill();            }        }    }    /**     * A task assigned to this JobInProgress has reported in as failed.     * Most of the time, we'll just reschedule execution.  However, after     * many repeated failures we may instead decide to allow the entire      * job to fail.     *     * Even if a task has reported as completed in the past, it might later     * be reported as failed.  That's because the TaskTracker that hosts a map     * task might die before the entire job can complete.  If that happens,     * we need to schedule reexecution so that downstream reduce tasks can      * obtain the map task's output.     */    public void failedTask(TaskInProgress tip, String taskid, String trackerName) {        tip.failedSubTask(taskid, trackerName);                    //        // Check if we need to kill the job because of too many failures        //        if (tip.isFailed()) {            LOG.info("Aborting job " + profile.getJobId());            kill();        }    }    /**     * The job is dead.  We're now GC'ing it, getting rid of the job     * from all tables.  Be sure to remove all of this job's tasks     * from the various tables.     */    public synchronized void garbageCollect() throws IOException {        //        // Remove this job from all tables        //        // Definitely remove the local-disk copy of the job file        if (localJobFile != null) {            localJobFile.delete();            localJobFile = null;        }        if (localJarFile != null) {            localJarFile.delete();            localJarFile = null;        }        //        // If the job file was in the temporary system directory,        // we should delete it upon garbage collect.        //        if (deleteUponCompletion != null) {            JobConf jd = new JobConf(deleteUponCompletion);            FileSystem fs = FileSystem.get(conf);            fs.delete(new File(jd.getJar()));            fs.delete(new File(deleteUponCompletion));            deleteUponCompletion = null;        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?