📄 tasktracker.java
字号:
synchronized (TaskTracker.this) { tasks.remove(taskId); if (alwaysKeepTaskFiles || (runstate == TaskStatus.State.FAILED && keepFailedTaskFiles)) { return; } synchronized (this) { try { runner.close(); } catch (Throwable ie) { } } } this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR + taskId); } } // /////////////////////////////////////////////////////////////// // TaskUmbilicalProtocol ///////////////////////////////////////////////////////////////// /** * Called upon startup by the child process, to fetch Task data. */ public synchronized Task getTask(String taskid) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { return (Task) tip.getTask(); } else { return null; } } /** * Called periodically to report Task progress, from 0.0 to 1.0. */ public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportProgress(progress, state, phase); } else { LOG.warn("Progress from unknown child task: "+taskid+". Ignored."); } } /** * Called when the task dies before completion, and we want to report back * diagnostic info */ public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportDiagnosticInfo(info); } else { LOG.warn("Error from unknown child task: "+taskid+". Ignored."); } } /** Child checking to see if we're alive. Normally does nothing.*/ public synchronized boolean ping(String taskid) throws IOException { return tasks.get(taskid) != null; } /** * The task is done. */ public synchronized void done(String taskid) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportDone(); } else { LOG.warn("Unknown child task done: "+taskid+". Ignored."); } } /** A child task had a local filesystem error. Exit, so that no future * jobs are accepted. */ public synchronized void fsError(String message) throws IOException { LOG.fatal("FSError, exiting: "+ message); running = false; } ///////////////////////////////////////////////////// // Called by TaskTracker thread after task process ends ///////////////////////////////////////////////////// /** * The task is no longer running. It may not have completed successfully */ void reportTaskFinished(String taskid) { TaskInProgress tip; synchronized (this) { tip = (TaskInProgress) tasks.get(taskid); } if (tip != null) { tip.taskFinished(); synchronized(finishedCount) { finishedCount[0]++; finishedCount.notifyAll(); } } else { LOG.warn("Unknown child task finshed: "+taskid+". Ignored."); } } /** * A completed map task's output has been lost. */ public synchronized void mapOutputLost(String taskid, String errorMsg) throws IOException { TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.mapOutputLost(errorMsg); } else { LOG.warn("Unknown child with bad map output: "+taskid+". Ignored."); } } /** * The datastructure for initializing a job */ static class RunningJob{ Path jobFile; // keep this for later use ArrayList tasks; boolean localized; } /** * The main() for child processes. */ public static class Child { public static void main(String[] args) throws Throwable { //LogFactory.showTime(false); LOG.info("Child starting"); JobConf defaultConf = new JobConf(); int port = Integer.parseInt(args[0]); String taskid = args[1]; TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID, new InetSocketAddress(port), defaultConf); Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); defaultConf.addFinalResource(new Path(task.getJobFile())); startPinging(umbilical, taskid); // start pinging parent try { // use job-specified working directory FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); task.run(job, umbilical); // run the task } catch (FSError e) { LOG.fatal("FSError from child", e); umbilical.fsError(e.getMessage()); } catch (Throwable throwable) { LOG.warn("Error running child", throwable); // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); throwable.printStackTrace(new PrintStream(baos)); umbilical.reportDiagnosticInfo(taskid, baos.toString()); } } /** Periodically ping parent and exit when this fails.*/ private static void startPinging(final TaskUmbilicalProtocol umbilical, final String taskid) { Thread thread = new Thread(new Runnable() { public void run() { final int MAX_RETRIES = 3; int remainingRetries = MAX_RETRIES; while (true) { try { if (!umbilical.ping(taskid)) { LOG.warn("Parent died. Exiting "+taskid); System.exit(66); } remainingRetries = MAX_RETRIES; } catch (Throwable t) { String msg = StringUtils.stringifyException(t); LOG.info("Ping exception: " + msg); remainingRetries -=1; if (remainingRetries == 0) { getCallStacks(); LOG.warn("Last retry, killing "+taskid); System.exit(65); } } try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }, "Pinger for "+taskid); thread.setDaemon(true); thread.start(); } } /** * Get the name for this task tracker. * @return the string like "tracker_mymachine:50010" */ String getName() { return taskTrackerName; } /** * Get the list of tasks that will be reported back to the * job tracker in the next heartbeat cycle. * @return a copy of the list of TaskStatus objects */ synchronized List getRunningTaskStatuses() { List result = new ArrayList(runningTasks.size()); Iterator itr = runningTasks.values().iterator(); while (itr.hasNext()) { TaskInProgress tip = (TaskInProgress) itr.next(); result.add(tip.createStatus()); } return result; } /** * Get the default job conf for this tracker. */ JobConf getJobConf() { return fConf; } /** * Check if the given local directories * (and parent directories, if necessary) can be created. * @param localDirs where the new TaskTracker should keep its local files. * @throws DiskErrorException if all local directories are not writable * @author hairong */ private static void checkLocalDirs( String[] localDirs ) throws DiskErrorException { boolean writable = false; if( localDirs != null ) { for (int i = 0; i < localDirs.length; i++) { try { DiskChecker.checkDir( new File(localDirs[i]) ); writable = true; } catch( DiskErrorException e ) { LOG.warn("Task Tracker local " + e.getMessage() ); } } } if( !writable ) throw new DiskErrorException( "all local directories are not writable" ); } /** * Is this task tracker idle? * @return has this task tracker finished and cleaned up all of its tasks? */ public synchronized boolean isIdle() { return tasks.isEmpty(); } /** * Start the TaskTracker, point toward the indicated JobTracker */ public static void main(String argv[]) throws Exception { if (argv.length != 0) { System.out.println("usage: TaskTracker"); System.exit(-1); } try { JobConf conf=new JobConf(); new TaskTracker(conf).run(); } catch (IOException e) { LOG.warn( "Can not start task tracker because "+ StringUtils.stringifyException(e)); System.exit(-1); } } /** * This class is used in TaskTracker's Jetty to serve the map outputs * to other nodes. * @author Owen O'Malley */ public static class MapOutputServlet extends HttpServlet { public void doGet(HttpServletRequest request, HttpServletResponse response ) throws ServletException, IOException { String mapId = request.getParameter("map"); String reduceId = request.getParameter("reduce"); if (mapId == null || reduceId == null) { throw new IOException("map and reduce parameters are required"); } ServletContext context = getServletContext(); int reduce = Integer.parseInt(reduceId); byte[] buffer = new byte[64*1024]; OutputStream outStream = response.getOutputStream(); JobConf conf = (JobConf) context.getAttribute("conf"); FileSystem fileSys = (FileSystem) context.getAttribute("local.file.system"); Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out"); response.setContentLength((int) fileSys.getLength(filename)); InputStream inStream = null; // true iff IOException was caused by attempt to access input boolean isInputException = true; try { inStream = fileSys.open(filename); try { int len = inStream.read(buffer); while (len > 0) { try { outStream.write(buffer, 0, len); } catch (IOException ie) { isInputException = false; throw ie; } len = inStream.read(buffer); } } finally { inStream.close(); } } catch (IOException ie) { TaskTracker tracker = (TaskTracker) context.getAttribute("task.tracker"); Log log = (Log) context.getAttribute("log"); String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + ") failed :\n"+ StringUtils.stringifyException(ie)); log.warn(errorMsg); if (isInputException) { tracker.mapOutputLost(mapId, errorMsg); } response.sendError(HttpServletResponse.SC_GONE, errorMsg); throw ie; } outStream.close(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -