📄 tasktracker.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.metrics.Metrics;import org.apache.hadoop.util.*;import org.apache.hadoop.util.DiskChecker.DiskErrorException;import java.io.*;import java.net.*;import java.util.*;import java.util.regex.Pattern;import javax.servlet.ServletContext;import javax.servlet.ServletException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import org.apache.hadoop.metrics.MetricsRecord;import org.apache.hadoop.net.DNS;/******************************************************* * TaskTracker is a process that starts and tracks MR Tasks * in a networked environment. It contacts the JobTracker * for Task assignments and reporting results. * * @author Mike Cafarella *******************************************************/public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, Runnable { static final long WAIT_FOR_DONE = 3 * 1000; private long taskTimeout; private int httpPort; static final int STALE_STATE = 1; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker"); private boolean running = true; String taskTrackerName; String localHostname; InetSocketAddress jobTrackAddr; String taskReportBindAddress; int taskReportPort; Server taskReportServer = null; InterTrackerProtocol jobClient; StatusHttpServer server = null; boolean shuttingDown = false; TreeMap tasks = null; /** * Map from taskId -> TaskInProgress. */ TreeMap runningTasks = null; Map runningJobs = null; int mapTotal = 0; int reduceTotal = 0; boolean justStarted = true; //dir -> DF Map localDirsDf = new HashMap(); long minSpaceStart = 0; //must have this much space free to start new tasks boolean acceptNewTasks = true; long minSpaceKill = 0; //if we run under this limit, kill one task //and make sure we never receive any new jobs //until all the old tasks have been cleaned up. //this is if a machine is so full it's only good //for serving map output to the other nodes static Random r = new Random(); FileSystem fs = null; private static final String SUBDIR = "taskTracker"; private static final String CACHEDIR = "archive"; private static final String JOBCACHE = "jobcache"; private JobConf fConf; private MapOutputFile mapOutputFile; private int maxCurrentTasks; private int failures; private int finishedCount[] = new int[1]; private class TaskTrackerMetrics { private MetricsRecord metricsRecord = null; private long totalTasksCompleted = 0L; TaskTrackerMetrics() { metricsRecord = Metrics.createRecord("mapred", "tasktracker"); } synchronized void completeTask() { if (metricsRecord != null) { metricsRecord.setMetric("tasks-completed", ++totalTasksCompleted); metricsRecord.setMetric("maps-running", mapTotal); metricsRecord.setMetric("reduce-running", reduceTotal); metricsRecord.update(); } } } private TaskTrackerMetrics myMetrics = null; /** * A list of tips that should be cleaned up. */ private BlockingQueue tasksToCleanup = new BlockingQueue(); /** * A daemon-thread that pulls tips off the list of things to cleanup. */ private Thread taskCleanupThread = new Thread(new Runnable() { public void run() { while (true) { try { TaskInProgress tip = (TaskInProgress) tasksToCleanup.take(); tip.jobHasFinished(); } catch (Throwable except) { LOG.warn(StringUtils.stringifyException(except)); } } } }); { taskCleanupThread.setDaemon(true); taskCleanupThread.start(); } static String getCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR; } static String getJobCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE; } public long getProtocolVersion(String protocol, long clientVersion) { return TaskUmbilicalProtocol.versionID; } /** * Do the real constructor work here. It's in a separate method * so we can call it again and "recycle" the object after calling * close(). */ synchronized void initialize() throws IOException { // use configured nameserver & interface to get local hostname this.localHostname = DNS.getDefaultHost (fConf.get("mapred.tasktracker.dns.interface","default"), fConf.get("mapred.tasktracker.dns.nameserver","default")); //check local disk checkLocalDirs(this.fConf.getLocalDirs()); fConf.deleteLocalFiles(SUBDIR); // Clear out state tables this.tasks = new TreeMap(); this.runningTasks = new TreeMap(); this.runningJobs = new TreeMap(); this.mapTotal = 0; this.reduceTotal = 0; this.acceptNewTasks = true; this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L); this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L); this.myMetrics = new TaskTrackerMetrics(); // port numbers this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050); // bind address this.taskReportBindAddress = this.fConf.get("mapred.task.tracker.report.bindAddress", "0.0.0.0"); // RPC initialization while (true) { try { this.taskReportServer = RPC.getServer(this, this.taskReportBindAddress, this.taskReportPort, maxCurrentTasks, false, this.fConf); this.taskReportServer.start(); break; } catch (BindException e) { LOG.info("Could not open report server at " + this.taskReportPort + ", trying new port"); this.taskReportPort++; } } this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportPort; LOG.info("Starting tracker " + taskTrackerName); // Clear out temporary files that might be lying around this.mapOutputFile.cleanupStorage(); this.justStarted = true; this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, this.fConf); this.running = true; } // intialize the job directory private void localizeJob(TaskInProgress tip) throws IOException { Path localJarFile = null; Task t = tip.getTask(); Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t .getJobId() + Path.SEPARATOR + "job.xml")); RunningJob rjob = null; synchronized (runningJobs) { if (!runningJobs.containsKey(t.getJobId())) { rjob = new RunningJob(); rjob.localized = false; rjob.tasks = new ArrayList(); rjob.jobFile = localJobFile; rjob.tasks.add(tip); runningJobs.put(t.getJobId(), rjob); } else { rjob = (RunningJob) runningJobs.get(t.getJobId()); // keep this for later use when we just get a jobid to delete // the data for rjob.tasks.add(tip); } } synchronized (rjob) { if (!rjob.localized) { localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t .getJobId()) + Path.SEPARATOR + "job.jar"); String jobFile = t.getJobFile(); fs.copyToLocalFile(new Path(jobFile), localJobFile); JobConf localJobConf = new JobConf(localJobFile); String jarFile = localJobConf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new Path(jarFile), localJarFile); localJobConf.setJar(localJarFile.toString()); FileSystem localFs = FileSystem.getNamed("local", fConf); OutputStream out = localFs.create(localJobFile); try { localJobConf.write(out); } finally { out.close(); } // also unjar the job.jar files in workdir File workDir = new File( new File(localJobFile.toString()).getParent(), "work"); workDir.mkdirs(); RunJar.unJar(new File(localJarFile.toString()), workDir); } rjob.localized = true; } } launchTaskForJob(tip, new JobConf(rjob.jobFile)); } private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ synchronized (tip) { try { tip.setJobConf(jobConf); tip.launchTask(); } catch (Throwable ie) { tip.runstate = TaskStatus.State.FAILED; try { tip.cleanup(); } catch (Throwable ie2) { // Ignore it, we are just trying to cleanup. } String error = StringUtils.stringifyException(ie); tip.reportDiagnosticInfo(error); LOG.info(error); } } } public synchronized void shutdown() throws IOException { shuttingDown = true; close(); if (this.server != null) { try { LOG.info("Shutting down StatusHttpServer"); this.server.stop(); } catch (InterruptedException ex) { ex.printStackTrace(); } } } /** * Close down the TaskTracker and all its components. We must also shutdown * any running tasks or threads, and cleanup disk space. A new TaskTracker * within the same process space might be restarted, so everything must be * clean. */ public synchronized void close() throws IOException { // // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', // because calling jobHasFinished() may result in an edit to 'tasks'. // TreeMap tasksToClose = new TreeMap(); tasksToClose.putAll(tasks); for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); tip.jobHasFinished(); } // Shutdown local RPC servers. Do them // in parallel, as RPC servers can take a long // time to shutdown. (They need to wait a full // RPC timeout, which might be 10-30 seconds.) new Thread() { public void run() { if (taskReportServer != null) { taskReportServer.stop(); taskReportServer = null; } } }.start(); this.running = false; // Clear local storage this.mapOutputFile.cleanupStorage(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -