📄 tasktracker.java
字号:
/** * Start with the local machine name, and the default JobTracker */ public TaskTracker(JobConf conf) throws IOException { maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); this.fConf = conf; this.jobTrackAddr = JobTracker.getAddress(conf); this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); int httpPort = conf.getInt("tasktracker.http.port", 50060); String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0"); this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true); int workerThreads = conf.getInt("tasktracker.http.threads", 40); server.setThreads(1, workerThreads); // let the jsp pages get to the task tracker, config, and other relevant // objects FileSystem local = FileSystem.getNamed("local", conf); server.setAttribute("task.tracker", this); server.setAttribute("local.file.system", local); server.setAttribute("conf", conf); server.setAttribute("log", LOG); server.addServlet("mapOutput", "mapOutput", MapOutputServlet.class); server.start(); this.httpPort = server.getPort(); initialize(); } /** * The connection to the JobTracker, used by the TaskRunner * for locating remote files. */ public InterTrackerProtocol getJobClient() { return jobClient; } /** * Are we running under killall-less operating system. */ private static boolean isWindows = System.getProperty("os.name").startsWith("Windows"); /** * Get the call stacks for all java processes on this system. * Obviously, this is only useful for debugging. */ private static void getCallStacks() { if (LOG.isDebugEnabled() && !isWindows) { try { Process proc = Runtime.getRuntime().exec("killall -QUIT java"); proc.waitFor(); } catch (IOException ie) { LOG.warn(StringUtils.stringifyException(ie)); } catch (InterruptedException ie) {} } } /**Return the DFS filesystem * @return */ public FileSystem getFileSystem(){ return fs; } /** * Main service loop. Will stay in this loop forever. */ int offerService() throws Exception { long lastHeartbeat = 0; this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf); while (running && !shuttingDown) { long now = System.currentTimeMillis(); long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat); if (waitTime > 0) { try { // sleeps for the wait time, wakes up if a task is finished. synchronized(finishedCount) { if (finishedCount[0] == 0) { finishedCount.wait(waitTime); } finishedCount[0] = 0; } } catch (InterruptedException ie) { } } lastHeartbeat = now; // // Emit standard hearbeat message to check in with JobTracker // Vector taskReports = new Vector(); synchronized (this) { for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); TaskStatus status = tip.createStatus(); taskReports.add(status); } } // // Xmit the heartbeat // TaskTrackerStatus status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, taskReports, failures); int resultCode = jobClient.emitHeartbeat(status, justStarted); synchronized (this) { for (Iterator it = taskReports.iterator(); it.hasNext(); ) { TaskStatus taskStatus = (TaskStatus) it.next(); if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { if (taskStatus.getIsMap()) { mapTotal--; } else { reduceTotal--; } myMetrics.completeTask(); runningTasks.remove(taskStatus.getTaskId()); } } } justStarted = false; if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) { return STALE_STATE; } // // Check if we should createRecord a new Task // try { if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) { checkLocalDirs(fConf.getLocalDirs()); if (enoughFreeSpace(minSpaceStart)) { Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { startNewTask(t); } } } } catch (DiskErrorException de ) { LOG.warn("Exiting task tracker because "+de.getMessage()); jobClient.reportTaskTrackerError(taskTrackerName, "DiskErrorException", de.getMessage()); return STALE_STATE; } catch (IOException ie) { LOG.info("Problem launching task: " + StringUtils.stringifyException(ie)); } // // Kill any tasks that have not reported progress in the last X seconds. // synchronized (this) { for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); long timeSinceLastReport = System.currentTimeMillis() - tip.getLastProgressReport(); if ((tip.getRunState() == TaskStatus.State.RUNNING) && (timeSinceLastReport > this.taskTimeout) && !tip.wasKilled) { String msg = "Task failed to report status for " + (timeSinceLastReport / 1000) + " seconds. Killing."; LOG.info(tip.getTask().getTaskId() + ": " + msg); getCallStacks(); tip.reportDiagnosticInfo(msg); try { tip.killAndCleanup(true); } catch (IOException ie) { LOG.info("Problem cleaning task up: " + StringUtils.stringifyException(ie)); } } } } // // Check for any Tasks that should be killed, even if // the containing Job is still ongoing. (This happens // with speculative execution, when one version of the // task finished before another // // // Check for any Tasks whose job may have ended // try { String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName); if (toCloseIds != null) { synchronized (this) { for (int i = 0; i < toCloseIds.length; i++) { Object tip = tasks.get(toCloseIds[i]); synchronized(runningJobs){ runningJobs.remove(((TaskInProgress) tasks.get(toCloseIds[i])).getTask().getJobId()); } if (tip != null) { tasksToCleanup.put(tip); } else { LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]); } } } } } catch (IOException ie) { LOG.info("Problem getting closed tasks: " + StringUtils.stringifyException(ie)); } //Check if we're dangerously low on disk space // If so, kill jobs to free up space and make sure // we don't accept any new tasks // Try killing the reduce jobs first, since I believe they // use up most space // Then pick the one with least progress if (!enoughFreeSpace(minSpaceKill)) { acceptNewTasks=false; //we give up! do not accept new tasks until //all the ones running have finished and they're all cleared up synchronized (this) { TaskInProgress killMe = null; for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); if ((tip.getRunState() == TaskStatus.State.RUNNING) && !tip.wasKilled) { if (killMe == null) { killMe = tip; } else if (!tip.getTask().isMapTask()) { //reduce task, give priority if (killMe.getTask().isMapTask() || (tip.getTask().getProgress().get() < killMe.getTask().getProgress().get())) { killMe = tip; } } else if (killMe.getTask().isMapTask() && tip.getTask().getProgress().get() < killMe.getTask().getProgress().get()) { //map task, only add if the progress is lower killMe = tip; } } } if (killMe!=null) { String msg = "Tasktracker running out of space. Killing task."; LOG.info(killMe.getTask().getTaskId() + ": " + msg); killMe.reportDiagnosticInfo(msg); try { killMe.killAndCleanup(true); } catch (IOException ie) { LOG.info("Problem cleaning task up: " + StringUtils.stringifyException(ie)); } } } } //we've cleaned up, resume normal operation if (!acceptNewTasks && tasks.isEmpty()) { acceptNewTasks=true; } } return 0; } /** * Check if all of the local directories have enough * free space * * If not, do not try to get a new task assigned * @return * @throws IOException */ private boolean enoughFreeSpace(long minSpace) throws IOException { String[] localDirs = fConf.getLocalDirs(); for (int i = 0; i < localDirs.length; i++) { DF df = null; if (localDirsDf.containsKey(localDirs[i])) { df = (DF) localDirsDf.get(localDirs[i]); } else { df = new DF(new File(localDirs[i]), fConf); localDirsDf.put(localDirs[i], df); } if (df.getAvailable() < minSpace) return false; } return true; } /** * Start a new task. * All exceptions are handled locally, so that we don't mess up the * task tracker. */ private void startNewTask(Task t) { TaskInProgress tip = new TaskInProgress(t, this.fConf); synchronized (this) { tasks.put(t.getTaskId(), tip); runningTasks.put(t.getTaskId(), tip); boolean isMap = t.isMapTask(); if (isMap) { mapTotal++; } else { reduceTotal++; } } try { localizeJob(tip); } catch (IOException ie) { String msg = ("Error initializing " + tip.getTask().getTaskId() + ":\n" + StringUtils.stringifyException(ie)); LOG.warn(msg); tip.reportDiagnosticInfo(msg); try { tip.killAndCleanup(true); } catch (IOException ie2) { LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" + StringUtils.stringifyException(ie2)); } } } /** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only * loops when the old TaskTracker has gone bad (its state is * stale somehow) and we need to reinitialize everything. */ public void run() { try { while (running && !shuttingDown) { boolean staleState = false;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -