tasktracker.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 759 行 · 第 1/2 页
JAVA
759 行
this.jobConf = new JobConf(conf); this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId()); localizeTask(task); } /** * Some fields in the Task object need to be made machine-specific. * So here, edit the Task's fields appropriately. */ void localizeTask(Task t) throws IOException { File localJobFile = this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml"); File localJarFile = this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar"); String jobFile = t.getJobFile(); fs.copyToLocalFile(new File(jobFile), localJobFile); t.setJobFile(localJobFile.getCanonicalPath()); JobConf jc = new JobConf(localJobFile); String jarFile = jc.getJar(); if (jarFile != null) { fs.copyToLocalFile(new File(jarFile), localJarFile); jc.setJar(localJarFile.getCanonicalPath()); BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localJobFile)); try { jc.write(out); } finally { out.close(); } } } /** */ public Task getTask() { return task; } /** */ public TaskStatus createStatus() { TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString); if (diagnosticInfo.length() > 0) { diagnosticInfo = new StringBuffer(); } return status; } /** * Kick off the task execution */ public synchronized void launchTask() throws IOException { this.progress = 0.0f; this.runstate = TaskStatus.RUNNING; this.diagnosticInfo = new StringBuffer(); this.runner = task.createRunner(TaskTracker.this); this.runner.start(); } /** * The task is reporting its progress */ public synchronized void reportProgress(float p, String state) { LOG.info(task.getTaskId()+" "+p+"% "+state); this.progress = p; this.runstate = TaskStatus.RUNNING; this.lastProgressReport = System.currentTimeMillis(); this.stateString = state; } /** */ public long getLastProgressReport() { return lastProgressReport; } /** */ public int getRunState() { return runstate; } /** * The task has reported some diagnostic info about its status */ public synchronized void reportDiagnosticInfo(String info) { this.diagnosticInfo.append(info); } /** * The task is reporting that it's done running */ public synchronized void reportDone() { LOG.info("Task " + task.getTaskId() + " is done."); this.progress = 1.0f; this.done = true; } /** * The task has actually finished running. */ public synchronized void taskFinished() { long start = System.currentTimeMillis(); // // Wait until task reports as done. If it hasn't reported in, // wait for a second and try again. // while (! done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } // // Change state to success or failure, depending on whether // task was 'done' before terminating // if (done) { runstate = TaskStatus.SUCCEEDED; } else { runstate = TaskStatus.FAILED; } // // If the task has failed, or if the task was killAndCleanup()'ed, // we should clean up right away. We only wait to cleanup // if the task succeeded, and its results might be useful // later on to downstream job processing. // if (wasKilled || runstate == TaskStatus.FAILED) { try { cleanup(); } catch (IOException ie) { } } } /** * We no longer need anything from this task, as the job has * finished. If the task is still running, kill it (and clean up */ public synchronized void jobHasFinished() throws IOException { if (getRunState() == TaskStatus.RUNNING) { killAndCleanup(); } else { cleanup(); } } /** * This task has run on too long, and should be killed. */ public synchronized void killAndCleanup() throws IOException { if (runstate == TaskStatus.RUNNING) { wasKilled = true; runner.kill(); } } /** * The map output has been lost. */ public synchronized void mapOutputLost() throws IOException { if (runstate == TaskStatus.SUCCEEDED) { LOG.info("Reporting output lost:"+task.getTaskId()); runstate = TaskStatus.FAILED; // change status to failure synchronized (TaskTracker.this) { // force into next heartbeat runningTasks.put(task.getTaskId(), this); mapTotal++; } } else { LOG.warning("Output already reported lost:"+task.getTaskId()); } } /** * We no longer need anything from this task. Either the * controlling job is all done and the files have been copied * away, or the task failed and we don't need the remains. */ synchronized void cleanup() throws IOException { tasks.remove(task.getTaskId()); try { runner.close(); } catch (IOException ie) { } this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId()); } } ///////////////////////////////////////////////////////////////// // MapOutputProtocol ///////////////////////////////////////////////////////////////// public MapOutputFile getFile(String mapTaskId, String reduceTaskId, IntWritable partition) { MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId, partition.get()); mapOutputFile.setConf(this.fConf); return mapOutputFile; } // /////////////////////////////////////////////////////////////// // TaskUmbilicalProtocol ///////////////////////////////////////////////////////////////// /** * Called upon startup by the child process, to fetch Task data. */ public synchronized Task getTask(String taskid) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { return (Task) tip.getTask(); } else { return null; } } /** * Called periodically to report Task progress, from 0.0 to 1.0. */ public synchronized void progress(String taskid, float progress, String state) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportProgress(progress, state); } else { LOG.warning("Progress from unknown child task: "+taskid+". Ignored."); } } /** * Called when the task dies before completion, and we want to report back * diagnostic info */ public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportDiagnosticInfo(info); } else { LOG.warning("Error from unknown child task: "+taskid+". Ignored."); } } /** Child checking to see if we're alive. Normally does nothing.*/ public synchronized void ping(String taskid) throws IOException { if (tasks.get(taskid) == null) { throw new IOException("No such task id."); // force child exit } } /** * The task is done. */ public synchronized void done(String taskid) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportDone(); } else { LOG.warning("Unknown child task done: "+taskid+". Ignored."); } } /** A child task had a local filesystem error. Exit, so that no future * jobs are accepted. */ public synchronized void fsError(String message) throws IOException { LOG.severe("FSError, exiting: "+ message); running = false; } ///////////////////////////////////////////////////// // Called by TaskTracker thread after task process ends ///////////////////////////////////////////////////// /** * The task is no longer running. It may not have completed successfully */ synchronized void reportTaskFinished(String taskid) { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.taskFinished(); } else { LOG.warning("Unknown child task finshed: "+taskid+". Ignored."); } } /** * A completed map task's output has been lost. */ public synchronized void mapOutputLost(String taskid) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.mapOutputLost(); } else { LOG.warning("Unknown child with bad map output: "+taskid+". Ignored."); } } /** * The main() for child processes. */ public static class Child { public static void main(String[] args) throws Throwable { LogFormatter.showTime(false); LOG.info("Child starting"); Configuration conf = new Configuration(); int port = Integer.parseInt(args[0]); String taskid = args[1]; TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, new InetSocketAddress(port), conf); Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); conf.addFinalResource(new File(task.getJobFile())); startPinging(umbilical, taskid); // start pinging parent try { // If the user set a working directory, use it String workDir = job.getWorkingDirectory(); if (workDir != null) { FileSystem file_sys = FileSystem.get(job); file_sys.setWorkingDirectory(new File(workDir)); } task.run(job, umbilical); // run the task } catch (FSError e) { LOG.log(Level.SEVERE, "FSError from child", e); umbilical.fsError(e.getMessage()); } catch (Throwable throwable) { LOG.log(Level.WARNING, "Error running child", throwable); // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); throwable.printStackTrace(new PrintStream(baos)); umbilical.reportDiagnosticInfo(taskid, baos.toString()); } } /** Periodically ping parent and exit when this fails.*/ private static void startPinging(final TaskUmbilicalProtocol umbilical, final String taskid) { Thread thread = new Thread(new Runnable() { public void run() { while (true) { try { umbilical.ping(taskid); } catch (Throwable t) { LOG.log(Level.WARNING, "Parent died. Exiting "+taskid, t); System.exit(1); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }, "Pinger for "+taskid); thread.setDaemon(true); thread.start(); } } /** * Start the TaskTracker, point toward the indicated JobTracker */ public static void main(String argv[]) throws IOException { if (argv.length != 0) { System.out.println("usage: TaskTracker"); System.exit(-1); } TaskTracker tt = new TaskTracker(new Configuration()); tt.run(); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?