tasktracker.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 759 行 · 第 1/2 页
JAVA
759 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************* * TaskTracker is a process that starts and tracks MR Tasks * in a networked environment. It contacts the JobTracker * for Task assignments and reporting results. * * @author Mike Cafarella *******************************************************/public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable { static final long WAIT_FOR_DONE = 3 * 1000; private long taskTimeout; static final int STALE_STATE = 1; public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskTracker"); private boolean running = true; String taskTrackerName; String localHostname; InetSocketAddress jobTrackAddr; int taskReportPort; int mapOutputPort; Server taskReportServer = null; Server mapOutputServer = null; InterTrackerProtocol jobClient; TreeMap tasks = null; TreeMap runningTasks = null; int mapTotal = 0; int reduceTotal = 0; boolean justStarted = true; static Random r = new Random(); FileSystem fs = null; static final String SUBDIR = "taskTracker"; private Configuration fConf; private MapOutputFile mapOutputFile; private int maxCurrentTasks; class MapOutputServer extends RPC.Server { private MapOutputServer(int port, int threads) { super(TaskTracker.this, fConf, port, threads, false); } public TaskTracker getTaskTracker() { return TaskTracker.this; } } /** * Start with the local machine name, and the default JobTracker */ public TaskTracker(Configuration conf) throws IOException { this(JobTracker.getAddress(conf), conf); } /** * Start with the local machine name, and the addr of the target JobTracker */ public TaskTracker(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2); this.fConf = conf; this.jobTrackAddr = jobTrackAddr; this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000); this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); initialize(); } /** * Do the real constructor work here. It's in a separate method * so we can call it again and "recycle" the object after calling * close(). */ void initialize() throws IOException { this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000); LOG.info("Starting tracker " + taskTrackerName); this.localHostname = InetAddress.getLocalHost().getHostName(); new JobConf(this.fConf).deleteLocalFiles(SUBDIR); // Clear out state tables this.tasks = new TreeMap(); this.runningTasks = new TreeMap(); this.mapTotal = 0; this.reduceTotal = 0; // port numbers this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050); this.mapOutputPort = this.fConf.getInt("mapred.task.tracker.output.port", 50040); // RPC initialization while (true) { try { this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf); this.taskReportServer.start(); break; } catch (BindException e) { LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port"); this.taskReportPort++; } } while (true) { try { this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks); this.mapOutputServer.start(); break; } catch (BindException e) { LOG.info("Could not open mapoutput server at " + this.mapOutputPort + ", trying new port"); this.mapOutputPort++; } } // Clear out temporary files that might be lying around this.mapOutputFile.cleanupStorage(); this.justStarted = true; this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf); } /** * Close down the TaskTracker and all its components. We must also shutdown * any running tasks or threads, and cleanup disk space. A new TaskTracker * within the same process space might be restarted, so everything must be * clean. */ public synchronized void close() throws IOException { // // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', // because calling jobHasFinished() may result in an edit to 'tasks'. // TreeMap tasksToClose = new TreeMap(); tasksToClose.putAll(tasks); for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); tip.jobHasFinished(); } // Wait for them to die and report in try { Thread.sleep(5000); } catch (InterruptedException ie) { } // // Shutdown local RPC servers. Do them // in parallel, as RPC servers can take a long // time to shutdown. (They need to wait a full // RPC timeout, which might be 10-30 seconds.) // new Thread() { public void run() { if (taskReportServer != null) { taskReportServer.stop(); taskReportServer = null; } } }.start(); if (mapOutputServer != null) { mapOutputServer.stop(); mapOutputServer = null; } // Clear local storage this.mapOutputFile.cleanupStorage(); } /** * The connection to the JobTracker, used by the TaskRunner * for locating remote files. */ public InterTrackerProtocol getJobClient() { return jobClient; } /** * Main service loop. Will stay in this loop forever. */ int offerService() throws Exception { long lastHeartbeat = 0; while (running) { long now = System.currentTimeMillis(); long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat); if (waitTime > 0) { try { Thread.sleep(waitTime); } catch (InterruptedException ie) { } continue; } // // Emit standard hearbeat message to check in with JobTracker // Vector taskReports = new Vector(); synchronized (this) { for (Iterator it = runningTasks.keySet().iterator(); it.hasNext(); ) { String taskid = (String) it.next(); TaskInProgress tip = (TaskInProgress) runningTasks.get(taskid); TaskStatus status = tip.createStatus(); taskReports.add(status); if (status.getRunState() != TaskStatus.RUNNING) { if (tip.getTask().isMapTask()) { mapTotal--; } else { reduceTotal--; } it.remove(); } } } // // Xmit the heartbeat // if (justStarted) { this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf); } int resultCode = jobClient.emitHeartbeat(new TaskTrackerStatus(taskTrackerName, localHostname, mapOutputPort, taskReports), justStarted); justStarted = false; if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) { return STALE_STATE; } // // Check if we should create a new Task // if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) { Task t = jobClient.pollForNewTask(taskTrackerName); if (t != null) { TaskInProgress tip = new TaskInProgress(t, this.fConf); synchronized (this) { tasks.put(t.getTaskId(), tip); if (t.isMapTask()) { mapTotal++; } else { reduceTotal++; } runningTasks.put(t.getTaskId(), tip); } tip.launchTask(); } } // // Kill any tasks that have not reported progress in the last X seconds. // synchronized (this) { for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); if ((tip.getRunState() == TaskStatus.RUNNING) && (System.currentTimeMillis() - tip.getLastProgressReport() > this.taskTimeout)) { LOG.info("Task " + tip.getTask().getTaskId() + " timed out. Killing."); tip.reportDiagnosticInfo("Timed out."); tip.killAndCleanup(); } } } // // Check for any Tasks that should be killed, even if // the containing Job is still ongoing. (This happens // with speculative execution, when one version of the // task finished before another // // // Check for any Tasks whose job may have ended // String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName); if (toCloseId != null) { synchronized (this) { TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId); tip.jobHasFinished(); } } lastHeartbeat = now; } return 0; } /** * The server retry loop. * This while-loop attempts to connect to the JobTracker. It only * loops when the old TaskTracker has gone bad (its state is * stale somehow) and we need to reinitialize everything. */ public void run() { try { while (running) { boolean staleState = false; try { // This while-loop attempts reconnects if we get network errors while (running && ! staleState) { try { if (offerService() == STALE_STATE) { staleState = true; } } catch (Exception ex) { LOG.log(Level.INFO, "Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...", ex); try { Thread.sleep(5000); } catch (InterruptedException ie) { } } } } finally { close(); } LOG.info("Reinitializing local state"); initialize(); } } catch (IOException iex) { LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString()); return; } } /////////////////////////////////////////////////////// // TaskInProgress maintains all the info for a Task that // lives at this TaskTracker. It maintains the Task object, // its TaskStatus, and the TaskRunner. /////////////////////////////////////////////////////// class TaskInProgress { Task task; float progress; int runstate; String stateString = ""; long lastProgressReport; StringBuffer diagnosticInfo = new StringBuffer(); TaskRunner runner; boolean done = false; boolean wasKilled = false; private JobConf jobConf; /** */ public TaskInProgress(Task task, Configuration conf) throws IOException { this.task = task; this.lastProgressReport = System.currentTimeMillis();
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?