jobtracker.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 910 行 · 第 1/3 页
JAVA
910 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;/******************************************************* * JobTracker is the central location for submitting and * tracking MR jobs in a network environment. * * @author Mike Cafarella *******************************************************/public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol { static long JOBINIT_SLEEP_INTERVAL = 2000; static long RETIRE_JOB_INTERVAL; static long RETIRE_JOB_CHECK_INTERVAL; static float TASK_ALLOC_EPSILON; static float PAD_FRACTION; static float MIN_SLOTS_FOR_PADDING; public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobTracker"); private static JobTracker tracker = null; public static void startTracker(Configuration conf) throws IOException { if (tracker != null) throw new IOException("JobTracker already running."); while (true) { try { tracker = new JobTracker(conf); break; } catch (IOException e) { LOG.log(Level.WARNING, "Starting tracker", e); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } tracker.offerService(); } public static JobTracker getTracker() { return tracker; } /////////////////////////////////////////////////////// // Used to expire TaskTrackers that have gone down /////////////////////////////////////////////////////// class ExpireTrackers implements Runnable { boolean shouldRun = true; public ExpireTrackers() { } /** * The run method lives for the life of the JobTracker, and removes TaskTrackers * that have not checked in for some time. */ public void run() { while (shouldRun) { // // Thread runs periodically to check whether trackers should be expired. // The sleep interval must be no more than half the maximum expiry time // for a task tracker. // try { Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3); } catch (InterruptedException ie) { } // // Loop through all expired items in the queue // synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { long now = System.currentTimeMillis(); TaskTrackerStatus leastRecent = null; while ((trackerExpiryQueue.size() > 0) && ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { // Remove profile from head of queue trackerExpiryQueue.remove(leastRecent); String trackerName = leastRecent.getTrackerName(); // Figure out if last-seen time should be updated, or if tracker is dead TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); // Items might leave the taskTracker set through other means; the // status stored in 'taskTrackers' might be null, which means the // tracker has already been destroyed. if (newProfile != null) { if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { // Remove completely updateTaskTrackerStatus(trackerName, null); lostTaskTracker(leastRecent.getTrackerName()); } else { // Update time by inserting latest profile trackerExpiryQueue.add(newProfile); } } } } } } } /** * Stop the tracker on next iteration */ public void stopTracker() { shouldRun = false; } } /////////////////////////////////////////////////////// // Used to remove old finished Jobs that have been around for too long /////////////////////////////////////////////////////// class RetireJobs implements Runnable { boolean shouldRun = true; public RetireJobs() { } /** * The run method lives for the life of the JobTracker, * and removes Jobs that are not still running, but which * finished a long time ago. */ public void run() { while (shouldRun) { try { Thread.sleep(RETIRE_JOB_CHECK_INTERVAL); } catch (InterruptedException ie) { } synchronized (jobs) { synchronized (jobInitQueue) { synchronized (jobsByArrival) { for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) { String jobid = (String) it.next(); JobInProgress job = (JobInProgress) jobs.get(jobid); if (job.getStatus().getRunState() != JobStatus.RUNNING && job.getStatus().getRunState() != JobStatus.PREP && (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) { it.remove(); jobInitQueue.remove(job); jobsByArrival.remove(job); } } } } } } } public void stopRetirer() { shouldRun = false; } } ///////////////////////////////////////////////////////////////// // Used to init new jobs that have just been created ///////////////////////////////////////////////////////////////// class JobInitThread implements Runnable { boolean shouldRun = true; public JobInitThread() { } public void run() { while (shouldRun) { JobInProgress job = null; synchronized (jobInitQueue) { if (jobInitQueue.size() > 0) { job = (JobInProgress) jobInitQueue.elementAt(0); jobInitQueue.remove(job); } else { try { jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL); } catch (InterruptedException iex) { } } } try { if (job != null) { job.initTasks(); } } catch (Exception e) { LOG.log(Level.WARNING, "job init failed", e); job.kill(); } } } public void stopIniter() { shouldRun = false; } } ///////////////////////////////////////////////////////////////// // The real JobTracker //////////////////////////////////////////////////////////////// int port; String localMachine; long startTime; int totalSubmissions = 0; Random r = new Random(); private int maxCurrentTasks; // // Properties to maintain while running Jobs and Tasks: // // 1. Each Task is always contained in a single Job. A Job succeeds when all its // Tasks are complete. // // 2. Every running or successful Task is assigned to a Tracker. Idle Tasks are not. // // 3. When a Tracker fails, all of its assigned Tasks are marked as failures. // // 4. A Task might need to be reexecuted if it (or the machine it's hosted on) fails // before the Job is 100% complete. Sometimes an upstream Task can fail without // reexecution if all downstream Tasks that require its output have already obtained // the necessary files. // // All the known jobs. (jobid->JobInProgress) TreeMap jobs = new TreeMap(); Vector jobsByArrival = new Vector(); // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) TreeMap taskidToTIPMap = new TreeMap(); // (taskid --> trackerID) TreeMap taskidToTrackerMap = new TreeMap(); // (trackerID->TreeSet of taskids running at that tracker) TreeMap trackerToTaskMap = new TreeMap(); // // Watch and expire TaskTracker objects using these structures. // We can map from Name->TaskTrackerStatus, or we can expire by time. // int totalMaps = 0; int totalReduces = 0; private TreeMap taskTrackers = new TreeMap(); Vector jobInitQueue = new Vector(); ExpireTrackers expireTrackers = new ExpireTrackers(); RetireJobs retireJobs = new RetireJobs(); JobInitThread initJobs = new JobInitThread(); /** * It might seem like a bug to maintain a TreeSet of status objects, * which can be updated at any time. But that's not what happens! We * only update status objects in the taskTrackers table. Status objects * are never updated once they enter the expiry queue. Instead, we wait * for them to expire and remove them from the expiry queue. If a status * object has been updated in the taskTracker table, the latest status is * reinserted. Otherwise, we assume the tracker has expired. */ TreeSet trackerExpiryQueue = new TreeSet(new Comparator() { public int compare(Object o1, Object o2) { TaskTrackerStatus p1 = (TaskTrackerStatus) o1; TaskTrackerStatus p2 = (TaskTrackerStatus) o2; if (p1.getLastSeen() < p2.getLastSeen()) { return -1; } else if (p1.getLastSeen() > p2.getLastSeen()) { return 1; } else { return (p1.getTrackerName().compareTo(p2.getTrackerName())); } } }); // Used to provide an HTML view on Job, Task, and TaskTracker structures JobTrackerInfoServer infoServer; int infoPort; Server interTrackerServer; // Some jobs are stored in a local system directory. We can delete // the files when we're done with the job. static final String SUBDIR = "jobTracker"; FileSystem fs; File systemDir; private Configuration conf;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?