📄 jobtracker.java
字号:
if (taskIds != null) { ArrayList list = new ArrayList(); for (Iterator it = taskIds.iterator(); it.hasNext(); ) { String taskId = (String) it.next(); TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); if (tip.shouldCloseForClosedJob(taskId)) { // // This is how the JobTracker ends a task at the TaskTracker. // It may be successfully completed, or may be killed in // mid-execution. // list.add(taskId); } } return (String[]) list.toArray(new String[list.size()]); } return null; } /** * A TaskTracker wants to know the physical locations of completed, but not * yet closed, tasks. This exists so the reduce task thread can locate * map task outputs. */ public synchronized MapOutputLocation[] locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) { ArrayList result = new ArrayList(mapTasksNeeded.length); JobInProgress job = getJob(jobId); for (int i = 0; i < mapTasksNeeded.length; i++) { TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]); if (status != null) { String trackerId = (String) taskidToTrackerMap.get(status.getTaskId()); TaskTrackerStatus tracker; synchronized (taskTrackers) { tracker = (TaskTrackerStatus) taskTrackers.get(trackerId); } result.add(new MapOutputLocation(status.getTaskId(), mapTasksNeeded[i], tracker.getHost(), tracker.getHttpPort())); } } return (MapOutputLocation[]) result.toArray(new MapOutputLocation[result.size()]); } /** * Grab the local fs name */ public synchronized String getFilesystemName() throws IOException { return fs.getName(); } public void reportTaskTrackerError(String taskTracker, String errorClass, String errorMessage) throws IOException { LOG.warn("Report from " + taskTracker + ": " + errorMessage); } //////////////////////////////////////////////////// // JobSubmissionProtocol //////////////////////////////////////////////////// /** * JobTracker.submitJob() kicks off a new job. * * Create a 'JobInProgress' object, which contains both JobProfile * and JobStatus. Those two sub-objects are sometimes shipped outside * of the JobTracker. But JobInProgress adds info that's useful for * the JobTracker alone. * * We add the JIP to the jobInitQueue, which is processed * asynchronously to handle split-computation and build up * the right TaskTracker/Block mapping. */ public synchronized JobStatus submitJob(String jobFile) throws IOException { totalSubmissions++; JobInProgress job = new JobInProgress(jobFile, this, this.conf); synchronized (jobs) { synchronized (jobsByArrival) { synchronized (jobInitQueue) { jobs.put(job.getProfile().getJobId(), job); jobsByArrival.add(job); jobInitQueue.add(job); jobInitQueue.notifyAll(); } } } myMetrics.submitJob(); return job.getStatus(); } public synchronized ClusterStatus getClusterStatus() { synchronized (taskTrackers) { return new ClusterStatus(taskTrackers.size(), totalMaps, totalReduces, maxCurrentTasks); } } public synchronized void killJob(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); job.kill(); } public synchronized JobProfile getJobProfile(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); if (job != null) { return job.getProfile(); } else { return null; } } public synchronized JobStatus getJobStatus(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); if (job != null) { return job.getStatus(); } else { return null; } } public synchronized TaskReport[] getMapTaskReports(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); if (job == null) { return new TaskReport[0]; } else { Vector reports = new Vector(); Vector completeMapTasks = job.reportTasksInProgress(true, true); for (Iterator it = completeMapTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } Vector incompleteMapTasks = job.reportTasksInProgress(true, false); for (Iterator it = incompleteMapTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]); } } public synchronized TaskReport[] getReduceTaskReports(String jobid) { JobInProgress job = (JobInProgress) jobs.get(jobid); if (job == null) { return new TaskReport[0]; } else { Vector reports = new Vector(); Vector completeReduceTasks = job.reportTasksInProgress(false, true); for (Iterator it = completeReduceTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } Vector incompleteReduceTasks = job.reportTasksInProgress(false, false); for (Iterator it = incompleteReduceTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]); } } /** * Get the diagnostics for a given task * @param jobId the id of the job * @param tipId the id of the tip * @param taskId the id of the task * @return a list of the diagnostic messages */ public synchronized List<String> getTaskDiagnostics(String jobId, String tipId, String taskId) { JobInProgress job = (JobInProgress) jobs.get(jobId); if (job == null) { throw new IllegalArgumentException("Job " + jobId + " not found."); } TaskInProgress tip = job.getTaskInProgress(tipId); if (tip == null) { throw new IllegalArgumentException("TIP " + tipId + " not found."); } return tip.getDiagnosticInfo(taskId); } /** Get all the TaskStatuses from the tipid. */ TaskStatus[] getTaskStatuses(String jobid, String tipid){ JobInProgress job = (JobInProgress) jobs.get(jobid); if (job == null){ return new TaskStatus[0]; } TaskInProgress tip = (TaskInProgress) job.getTaskInProgress(tipid); if (tip == null){ return new TaskStatus[0]; } return tip.getTaskStatuses(); } /** * Get tracker name for a given task id. * @param taskId the name of the task * @return The name of the task tracker */ public synchronized String getAssignedTracker(String taskId) { return (String) taskidToTrackerMap.get(taskId); } public JobStatus[] jobsToComplete() { Vector v = new Vector(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); if (status.getRunState() == JobStatus.RUNNING || status.getRunState() == JobStatus.PREP) { status.setStartTime(jip.getStartTime()); status.setUsername(jip.getProfile().getUser()); v.add(status); } } return (JobStatus[]) v.toArray(new JobStatus[v.size()]); } /////////////////////////////////////////////////////////////// // JobTracker methods /////////////////////////////////////////////////////////////// public JobInProgress getJob(String jobid) { return (JobInProgress) jobs.get(jobid); } /** * Grab random num for job id */ String createUniqueId() { return idFormat.format(nextJobId++); } //////////////////////////////////////////////////// // Methods to track all the TaskTrackers //////////////////////////////////////////////////// /** * Accept and process a new TaskTracker profile. We might * have known about the TaskTracker previously, or it might * be brand-new. All task-tracker structures have already * been updated. Just process the contained tasks and any * jobs that might be affected. */ void updateTaskStatuses(TaskTrackerStatus status) { for (Iterator it = status.taskReports(); it.hasNext(); ) { TaskStatus report = (TaskStatus) it.next(); report.setTaskTracker(status.getTrackerName()); String taskId = report.getTaskId(); TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); if (tip == null) { LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskId()); } else { expireLaunchingTasks.removeTask(taskId); tip.getJob().updateTaskStatus(tip, report, myMetrics); } } } /** * We lost the task tracker! All task-tracker structures have * already been updated. Just process the contained tasks and any * jobs that might be affected. */ void lostTaskTracker(String trackerName, String hostname) { LOG.info("Lost tracker '" + trackerName + "'"); TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName); trackerToTaskMap.remove(trackerName); if (lostTasks != null) { for (Iterator it = lostTasks.iterator(); it.hasNext(); ) { String taskId = (String) it.next(); TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); // Completed reduce tasks never need to be failed, because // their outputs go to dfs if (tip.isMapTask() || !tip.isComplete()) { JobInProgress job = tip.getJob(); // if the job is done, we don't want to change anything if (job.getStatus().getRunState() == JobStatus.RUNNING) { job.failedTask(tip, taskId, "Lost task tracker", Phase.MAP, hostname, trackerName, myMetrics); } } } } } //////////////////////////////////////////////////////////// // main() //////////////////////////////////////////////////////////// /** * Start the JobTracker process. This is used only for debugging. As a rule, * JobTracker should be run as part of the DFS Namenode process. */ public static void main(String argv[]) throws IOException, InterruptedException { if (argv.length != 0) { System.out.println("usage: JobTracker"); System.exit(-1); } Configuration conf=new Configuration(); startTracker(conf); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -