📄 jobtracker.java
字号:
taskidToTrackerMap.put(taskid, taskTracker); // tracker --> taskid TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker); if (taskset == null) { taskset = new TreeSet(); trackerToTaskMap.put(taskTracker, taskset); } taskset.add(taskid); // taskid --> TIP taskidToTIPMap.put(taskid, tip); } void removeTaskEntry(String taskid) { // taskid --> tracker String tracker = (String) taskidToTrackerMap.remove(taskid); // tracker --> taskid TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker); if (trackerSet != null) { trackerSet.remove(taskid); } // taskid --> TIP taskidToTIPMap.remove(taskid); } /////////////////////////////////////////////////////// // Accessors for objects that want info on jobs, tasks, // trackers, etc. /////////////////////////////////////////////////////// public int getTotalSubmissions() { return totalSubmissions; } public String getJobTrackerMachine() { return localMachine; } public int getTrackerPort() { return port; } public int getInfoPort() { return infoPort; } public long getStartTime() { return startTime; } public Vector runningJobs() { Vector v = new Vector(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); if (status.getRunState() == JobStatus.RUNNING) { v.add(jip); } } return v; } public Vector failedJobs() { Vector v = new Vector(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); if (status.getRunState() == JobStatus.FAILED) { v.add(jip); } } return v; } public Vector completedJobs() { Vector v = new Vector(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); if (status.getRunState() == JobStatus.SUCCEEDED) { v.add(jip); } } return v; } public Collection taskTrackers() { synchronized (taskTrackers) { return taskTrackers.values(); } } public TaskTrackerStatus getTaskTracker(String trackerID) { synchronized (taskTrackers) { return (TaskTrackerStatus) taskTrackers.get(trackerID); } } //////////////////////////////////////////////////// // InterTrackerProtocol //////////////////////////////////////////////////// /** * Update the last recorded status for the given task tracker. * It assumes that the taskTrackers are locked on entry. * @author Owen O'Malley * @param trackerName The name of the tracker * @param status The new status for the task tracker * @return Was an old status found? */ private boolean updateTaskTrackerStatus(String trackerName, TaskTrackerStatus status) { TaskTrackerStatus oldStatus = (TaskTrackerStatus) taskTrackers.get(trackerName); if (oldStatus != null) { totalMaps -= oldStatus.countMapTasks(); totalReduces -= oldStatus.countReduceTasks(); if (status == null) { taskTrackers.remove(trackerName); } } if (status != null) { totalMaps += status.countMapTasks(); totalReduces += status.countReduceTasks(); taskTrackers.put(trackerName, status); } return oldStatus != null; } /** * Process incoming heartbeat messages from the task trackers. */ public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) { String trackerName = trackerStatus.getTrackerName(); trackerStatus.setLastSeen(System.currentTimeMillis()); synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { boolean seenBefore = updateTaskTrackerStatus(trackerName, trackerStatus); if (initialContact) { // If it's first contact, then clear out any state hanging around if (seenBefore) { lostTaskTracker(trackerName, trackerStatus.getHost()); } } else { // If not first contact, there should be some record of the tracker if (!seenBefore) { LOG.warn("Status from unknown Tracker : " + trackerName); taskTrackers.remove(trackerName); return InterTrackerProtocol.UNKNOWN_TASKTRACKER; } } if (initialContact) { trackerExpiryQueue.add(trackerStatus); } } } updateTaskStatuses(trackerStatus); //LOG.info("Got heartbeat from "+trackerName); return InterTrackerProtocol.TRACKERS_OK; } /** * A tracker wants to know if there's a Task to run. Returns * a task we'd like the TaskTracker to execute right now. * * Eventually this function should compute load on the various TaskTrackers, * and incorporate knowledge of DFS file placement. But for right now, it * just grabs a single item out of the pending task list and hands it back. */ public synchronized Task pollForNewTask(String taskTracker) { // // Compute average map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; int numTaskTrackers; TaskTrackerStatus tts; synchronized (taskTrackers) { numTaskTrackers = taskTrackers.size(); tts = (TaskTrackerStatus) taskTrackers.get(taskTracker); } if (tts == null) { LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker); return null; } int totalCapacity = numTaskTrackers * maxCurrentTasks; synchronized(jobsByArrival){ for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); int totalReduceTasks = job.desiredReduces(); remainingMapLoad += (totalMapTasks - job.finishedMaps()); remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); } } } // find out the maximum number of maps or reduces that we are willing // to run on any node. int maxMapLoad = 0; int maxReduceLoad = 0; if (numTaskTrackers > 0) { maxMapLoad = Math.min(maxCurrentTasks, (int) Math.ceil((double) remainingMapLoad / numTaskTrackers)); maxReduceLoad = Math.min(maxCurrentTasks, (int) Math.ceil((double) remainingReduceLoad / numTaskTrackers)); } // // Get map + reduce counts for the current tracker. // int numMaps = tts.countMapTasks(); int numReduces = tts.countReduceTasks(); // // In the below steps, we allocate first a map task (if appropriate), // and then a reduce task if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We hand a task to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // synchronized (jobsByArrival) { if (numMaps < maxMapLoad) { int totalNeededMaps = 0; for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = job.obtainNewMapTask(tts, numTaskTrackers); if (t != null) { expireLaunchingTasks.addNewTask(t.getTaskId()); myMetrics.launchMap(); return t; } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededMaps += job.desiredMaps(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentTasks, (int)(totalNeededMaps * PAD_FRACTION)); } if (totalMaps + padding >= totalCapacity) { break; } } } // // Same thing, but for reduce tasks // if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) { JobInProgress job = (JobInProgress) it.next(); if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(tts, numTaskTrackers); if (t != null) { expireLaunchingTasks.addNewTask(t.getTaskId()); myMetrics.launchReduce(); return t; } // // Beyond the highest-priority task, reserve a little // room for failures and speculative executions; don't // schedule tasks to the hilt. // totalNeededReduces += job.desiredReduces(); int padding = 0; if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) { padding = Math.min(maxCurrentTasks, (int) (totalNeededReduces * PAD_FRACTION)); } if (totalReduces + padding >= totalCapacity) { break; } } } } return null; } /** * A tracker wants to know if any of its Tasks have been * closed (because the job completed, whether successfully or not) */ public synchronized String[] pollForTaskWithClosedJob(String taskTracker) { TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -