jobtracker.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 910 行 · 第 1/3 页

JAVA
910
字号
                        continue;                    }                    Task t = job.obtainNewMapTask(taskTracker, tts);                    if (t != null) {                        return t;                    }                    //                    // Beyond the highest-priority task, reserve a little                     // room for failures and speculative executions; don't                     // schedule tasks to the hilt.                    //                    totalNeededMaps += job.desiredMaps();                    double padding = 0;                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {                        padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION);                    }                    if (totalNeededMaps + padding >= totalCapacity) {                        break;                    }                }            }            //            // Same thing, but for reduce tasks            //            if ((numReduces < maxCurrentTasks) &&                (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {                int totalNeededReduces = 0;                for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {                    JobInProgress job = (JobInProgress) it.next();                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {                        continue;                    }                    Task t = job.obtainNewReduceTask(taskTracker, tts);                    if (t != null) {                        return t;                    }                    //                    // Beyond the highest-priority task, reserve a little                     // room for failures and speculative executions; don't                     // schedule tasks to the hilt.                    //                    totalNeededReduces += job.desiredReduces();                    double padding = 0;                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {                        padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION);                    }                    if (totalNeededReduces + padding >= totalCapacity) {                        break;                    }                }            }        }        return null;    }    /**     * A tracker wants to know if any of its Tasks have been     * closed (because the job completed, whether successfully or not)     */    public synchronized String pollForTaskWithClosedJob(String taskTracker) {        TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);        if (taskIds != null) {            for (Iterator it = taskIds.iterator(); it.hasNext(); ) {                String taskId = (String) it.next();                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);                if (tip.shouldCloseForClosedJob(taskId)) {                    //                     // This is how the JobTracker ends a task at the TaskTracker.                    // It may be successfully completed, or may be killed in                    // mid-execution.                    //                    return taskId;                }            }        }        return null;    }    /**     * A TaskTracker wants to know the physical locations of completed, but not     * yet closed, tasks.  This exists so the reduce task thread can locate     * map task outputs.     */    public synchronized MapOutputLocation[] locateMapOutputs(String taskId, String[][] mapTasksNeeded) {        ArrayList v = new ArrayList();        for (int i = 0; i < mapTasksNeeded.length; i++) {            for (int j = 0; j < mapTasksNeeded[i].length; j++) {                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(mapTasksNeeded[i][j]);                if (tip != null && tip.isComplete(mapTasksNeeded[i][j])) {                    String trackerId = (String) taskidToTrackerMap.get(mapTasksNeeded[i][j]);                    TaskTrackerStatus tracker;                    synchronized (taskTrackers) {                      tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);                    }                    v.add(new MapOutputLocation(mapTasksNeeded[i][j], tracker.getHost(), tracker.getPort()));                    break;                }            }        }        // randomly shuffle results to load-balance map output requests        Collections.shuffle(v);        return (MapOutputLocation[]) v.toArray(new MapOutputLocation[v.size()]);    }    /**     * Grab the local fs name     */    public synchronized String getFilesystemName() throws IOException {        return fs.getName();    }    ////////////////////////////////////////////////////    // JobSubmissionProtocol    ////////////////////////////////////////////////////    /**     * JobTracker.submitJob() kicks off a new job.       *     * Create a 'JobInProgress' object, which contains both JobProfile     * and JobStatus.  Those two sub-objects are sometimes shipped outside     * of the JobTracker.  But JobInProgress adds info that's useful for     * the JobTracker alone.     *     * We add the JIP to the jobInitQueue, which is processed      * asynchronously to handle split-computation and build up     * the right TaskTracker/Block mapping.     */    public synchronized JobStatus submitJob(String jobFile) throws IOException {        totalSubmissions++;        JobInProgress job = new JobInProgress(jobFile, this, this.conf);        synchronized (jobs) {            synchronized (jobsByArrival) {                synchronized (jobInitQueue) {                    jobs.put(job.getProfile().getJobId(), job);                    jobsByArrival.add(job);                    jobInitQueue.add(job);                    jobInitQueue.notifyAll();                }            }        }        return job.getStatus();    }    public synchronized ClusterStatus getClusterStatus() {        synchronized (taskTrackers) {          return new ClusterStatus(taskTrackers.size(),                                   totalMaps,                                   totalReduces,                                   maxCurrentTasks);                  }    }        public synchronized void killJob(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        job.kill();    }    public synchronized JobProfile getJobProfile(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job != null) {            return job.getProfile();        } else {            return null;        }    }    public synchronized JobStatus getJobStatus(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job != null) {            return job.getStatus();        } else {            return null;        }    }    public synchronized TaskReport[] getMapTaskReports(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job == null) {            return new TaskReport[0];        } else {            Vector reports = new Vector();            Vector completeMapTasks = job.reportTasksInProgress(true, true);            for (Iterator it = completeMapTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            Vector incompleteMapTasks = job.reportTasksInProgress(true, false);            for (Iterator it = incompleteMapTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);        }    }    public synchronized TaskReport[] getReduceTaskReports(String jobid) {        JobInProgress job = (JobInProgress) jobs.get(jobid);        if (job == null) {            return new TaskReport[0];        } else {            Vector reports = new Vector();            Vector completeReduceTasks = job.reportTasksInProgress(false, true);            for (Iterator it = completeReduceTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            Vector incompleteReduceTasks = job.reportTasksInProgress(false, false);            for (Iterator it = incompleteReduceTasks.iterator(); it.hasNext(); ) {                TaskInProgress tip = (TaskInProgress) it.next();                reports.add(tip.generateSingleReport());            }            return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]);        }    }    ///////////////////////////////////////////////////////////////    // JobTracker methods    ///////////////////////////////////////////////////////////////    public JobInProgress getJob(String jobid) {        return (JobInProgress) jobs.get(jobid);    }    /**     * Grab random num for task id     */    String createUniqueId() {        return "" + Integer.toString(Math.abs(r.nextInt()),36);    }    ////////////////////////////////////////////////////    // Methods to track all the TaskTrackers    ////////////////////////////////////////////////////    /**     * Accept and process a new TaskTracker profile.  We might     * have known about the TaskTracker previously, or it might     * be brand-new.  All task-tracker structures have already     * been updated.  Just process the contained tasks and any     * jobs that might be affected.     */    void updateTaskStatuses(TaskTrackerStatus status) {        for (Iterator it = status.taskReports(); it.hasNext(); ) {            TaskStatus report = (TaskStatus) it.next();            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(report.getTaskId());            if (tip == null) {                LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());            } else {                JobInProgress job = tip.getJob();                job.updateTaskStatus(tip, report);                if (report.getRunState() == TaskStatus.SUCCEEDED) {                    job.completedTask(tip, report.getTaskId());                } else if (report.getRunState() == TaskStatus.FAILED) {                    // Tell the job to fail the relevant task                    job.failedTask(tip, report.getTaskId(), status.getTrackerName());                }            }        }    }    /**     * We lost the task tracker!  All task-tracker structures have      * already been updated.  Just process the contained tasks and any     * jobs that might be affected.     */    void lostTaskTracker(String trackerName) {        LOG.info("Lost tracker '" + trackerName + "'");        TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName);        trackerToTaskMap.remove(trackerName);        if (lostTasks != null) {            for (Iterator it = lostTasks.iterator(); it.hasNext(); ) {                String taskId = (String) it.next();                TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);                // Tell the job to fail the relevant task                JobInProgress job = tip.getJob();                job.failedTask(tip, taskId, trackerName);            }        }    }    ////////////////////////////////////////////////////////////    // main()    ////////////////////////////////////////////////////////////    /**     * Start the JobTracker process.  This is used only for debugging.  As a rule,     * JobTracker should be run as part of the DFS Namenode process.     */    public static void main(String argv[]) throws IOException, InterruptedException {        if (argv.length != 0) {          System.out.println("usage: JobTracker");          System.exit(-1);        }        startTracker(new Configuration());    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?