⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobinprogress.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
              reduces[target].getTIPId(), Values.REDUCE.name(),              System.currentTimeMillis());        }        return result;    }        /**     * Find a new task to run.     * @param tts The task tracker that is asking for a task     * @param clusterSize The number of task trackers in the cluster     * @param avgProgress The average progress of this kind of task in this job     * @param tasks The list of potential tasks to try     * @param firstTaskToTry The first index in tasks to check     * @param cachedTasks A list of tasks that would like to run on this node     * @return the index in tasks of the selected task (or -1 for no task)     */    private int findNewTask(TaskTrackerStatus tts,                             int clusterSize,                            double avgProgress,                            TaskInProgress[] tasks,                            int firstTaskToTry,                            List cachedTasks) {        String taskTracker = tts.getTrackerName();        //        // See if there is a split over a block that is stored on        // the TaskTracker checking in.  That means the block        // doesn't have to be transmitted from another node.        //        if (cachedTasks != null) {          Iterator i = cachedTasks.iterator();          while (i.hasNext()) {            TaskInProgress tip = (TaskInProgress)i.next();            i.remove();            if (tip.isRunnable() &&                 !tip.isRunning() &&                !tip.hasFailedOnMachine(taskTracker)) {              LOG.info("Choosing cached task " + tip.getTIPId());              int cacheTarget = tip.getIdWithinJob();              return cacheTarget;            }          }        }        //        // If there's no cached target, see if there's        // a std. task to run.        //        int failedTarget = -1;        int specTarget = -1;        for (int i = 0; i < tasks.length; i++) {          int realIdx = (i + firstTaskToTry) % tasks.length;           TaskInProgress task = tasks[realIdx];          if (task.isRunnable()) {            // if it failed here and we haven't tried every machine, we            // don't schedule it here.            boolean hasFailed = task.hasFailedOnMachine(taskTracker);            if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {              continue;            }            boolean isRunning = task.isRunning();            if (hasFailed) {              // failed tasks that aren't running can be scheduled as a last              // resort              if (!isRunning && failedTarget == -1) {                failedTarget = realIdx;              }            } else {              if (!isRunning) {                LOG.info("Choosing normal task " + tasks[realIdx].getTIPId());                return realIdx;              } else if (specTarget == -1 &&                         task.hasSpeculativeTask(avgProgress)) {                specTarget = realIdx;              }            }          }        }        if (specTarget != -1) {          LOG.info("Choosing speculative task " +                     tasks[specTarget].getTIPId());        } else if (failedTarget != -1) {          LOG.info("Choosing failed task " +                     tasks[failedTarget].getTIPId());                  }        return specTarget != -1 ? specTarget : failedTarget;    }    /**     * A taskid assigned to this JobInProgress has reported in successfully.     */    public synchronized void completedTask(TaskInProgress tip,                                            TaskStatus status,                                           JobTrackerMetrics metrics) {        String taskid = status.getTaskId();        if (tip.isComplete()) {          LOG.info("Already complete TIP " + tip.getTIPId() +                    " has completed task " + taskid);          return;        } else {          LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +                    " successfully.");                    String taskTrackerName = status.getTaskTracker();                    if(status.getIsMap()){            JobHistory.MapAttempt.logStarted(profile.getJobId(),                 tip.getTIPId(), status.getTaskId(), status.getStartTime(),                 taskTrackerName);             JobHistory.MapAttempt.logFinished(profile.getJobId(),                 tip.getTIPId(), status.getTaskId(), status.getFinishTime(),                 taskTrackerName);             JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),                 Values.MAP.name(), status.getFinishTime());           }else{              JobHistory.ReduceAttempt.logStarted(profile.getJobId(),                   tip.getTIPId(), status.getTaskId(), status.getStartTime(),                   taskTrackerName);               JobHistory.ReduceAttempt.logFinished(profile.getJobId(),                   tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),                  status.getSortFinishTime(), status.getFinishTime(),                   taskTrackerName);               JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),                   Values.REDUCE.name(), status.getFinishTime());           }        }                tip.completed(taskid);        // updating the running/finished map/reduce counts        if (tip.isMapTask()){          runningMapTasks -= 1;          finishedMapTasks += 1;          metrics.completeMap();        } else{          runningReduceTasks -= 1;          finishedReduceTasks += 1;          metrics.completeReduce();        }                //        // Figure out whether the Job is done        //        boolean allDone = true;        for (int i = 0; i < maps.length; i++) {            if (! maps[i].isComplete()) {                allDone = false;                break;            }        }        if (allDone) {            if (tip.isMapTask()) {              this.status.setMapProgress(1.0f);                          }            for (int i = 0; i < reduces.length; i++) {                if (! reduces[i].isComplete()) {                    allDone = false;                    break;                }            }        }        //        // If all tasks are complete, then the job is done!        //        if (this.status.getRunState() == JobStatus.RUNNING && allDone) {            this.status.setRunState(JobStatus.SUCCEEDED);            this.status.setReduceProgress(1.0f);            this.finishTime = System.currentTimeMillis();            garbageCollect();            LOG.info("Job " + this.status.getJobId() +                      " has completed successfully.");            JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,                 this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);            metrics.completeJob();        }    }    /**     * Kill the job and all its component tasks.     */    public synchronized void kill() {        if (status.getRunState() != JobStatus.FAILED) {            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);            this.finishTime = System.currentTimeMillis();            this.runningMapTasks = 0;            this.runningReduceTasks = 0;            //            // kill all TIPs.            //            for (int i = 0; i < maps.length; i++) {                maps[i].kill();            }            for (int i = 0; i < reduces.length; i++) {                reduces[i].kill();            }            JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,                 this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks,                 failedReduceTasks);            garbageCollect();        }    }    /**     * A task assigned to this JobInProgress has reported in as failed.     * Most of the time, we'll just reschedule execution.  However, after     * many repeated failures we may instead decide to allow the entire      * job to fail.     *     * Even if a task has reported as completed in the past, it might later     * be reported as failed.  That's because the TaskTracker that hosts a map     * task might die before the entire job can complete.  If that happens,     * we need to schedule reexecution so that downstream reduce tasks can      * obtain the map task's output.     */    private void failedTask(TaskInProgress tip, String taskid,                             TaskStatus status, String trackerName,                            boolean wasRunning, boolean wasComplete) {        tip.failedSubTask(taskid, trackerName);        boolean isRunning = tip.isRunning();        boolean isComplete = tip.isComplete();                //update running  count on task failure.        if (wasRunning && !isRunning) {          if (tip.isMapTask()){            runningMapTasks -= 1;          } else {            runningReduceTasks -= 1;          }        }                // the case when the map was complete but the task tracker went down.        if (wasComplete && !isComplete) {          if (tip.isMapTask()){            finishedMapTasks -= 1;          }        }                // update job history        String taskTrackerName = status.getTaskTracker();        if (status.getIsMap()) {          JobHistory.MapAttempt.logStarted(profile.getJobId(),               tip.getTIPId(), status.getTaskId(), status.getStartTime(),               taskTrackerName);           JobHistory.MapAttempt.logFailed(profile.getJobId(),               tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),              taskTrackerName, status.getDiagnosticInfo());         } else {          JobHistory.ReduceAttempt.logStarted(profile.getJobId(),               tip.getTIPId(), status.getTaskId(), status.getStartTime(),               taskTrackerName);           JobHistory.ReduceAttempt.logFailed(profile.getJobId(),               tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),              taskTrackerName, status.getDiagnosticInfo());         }                // After this, try to assign tasks with the one after this, so that        // the failed task goes to the end of the list.        if (tip.isMapTask()) {          firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;          failedMapTasks++;         } else {          firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;          failedReduceTasks++;         }                    //        // Check if we need to kill the job because of too many failures        //        if (tip.isFailed()) {            LOG.info("Aborting job " + profile.getJobId());            JobHistory.JobInfo.logFailed(profile.getJobId(),                 System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);            kill();        }        jobtracker.removeTaskEntry(taskid);        JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),             tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(),              System.currentTimeMillis(), status.getDiagnosticInfo()); }    /**     * Fail a task with a given reason, but without a status object.     * @author Owen O'Malley     * @param tip The task's tip     * @param taskid The task id     * @param reason The reason that the task failed     * @param trackerName The task tracker the task failed on     */    public void failedTask(TaskInProgress tip, String taskid,                            String reason, Phase phase,                            String hostname, String trackerName,                           JobTrackerMetrics metrics) {       TaskStatus status = new TaskStatus(taskid,                                          tip.isMapTask(),                                          0.0f,                                          TaskStatus.State.FAILED,                                          reason,                                          reason,                                          trackerName, phase);       updateTaskStatus(tip, status, metrics);       JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),            tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),            System.currentTimeMillis(), reason);     }                                      /**     * The job is dead.  We're now GC'ing it, getting rid of the job     * from all tables.  Be sure to remove all of this job's tasks     * from the various tables.     */    synchronized void garbageCollect() {      try {        // Definitely remove the local-disk copy of the job file        if (localJobFile != null) {            localFs.delete(localJobFile);            localJobFile = null;        }        if (localJarFile != null) {            localFs.delete(localJarFile);            localJarFile = null;        }        // JobClient always creates a new directory with job files        // so we remove that directory to cleanup        FileSystem fs = FileSystem.get(conf);        fs.delete(new Path(profile.getJobFile()).getParent());      } catch (IOException e) {        LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);      }    }    /**      * Return the TaskInProgress that matches the tipid.      */    public TaskInProgress getTaskInProgress(String tipid){      for (int i = 0; i < maps.length; i++) {        if (tipid.equals(maps[i].getTIPId())){          return maps[i];        }                     }      for (int i = 0; i < reduces.length; i++) {        if (tipid.equals(reduces[i].getTIPId())){          return reduces[i];        }      }      return null;    }        /**     * Find the details of someplace where a map has finished     * @param mapId the id of the map     * @return the task status of the completed task     */    public TaskStatus findFinishedMap(int mapId) {       TaskInProgress tip = maps[mapId];       if (tip.isComplete()) {         TaskStatus[] statuses = tip.getTaskStatuses();         for(int i=0; i < statuses.length; i++) {           if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) {             return statuses[i];           }         }       }       return null;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -