📄 jobtracker.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.ipc.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.StringUtils;import java.io.*;import java.net.*;import java.text.NumberFormat;import java.util.*;import org.apache.hadoop.metrics.MetricsRecord;import org.apache.hadoop.metrics.Metrics;/******************************************************* * JobTracker is the central location for submitting and * tracking MR jobs in a network environment. * * @author Mike Cafarella *******************************************************/public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol { static long JOBINIT_SLEEP_INTERVAL = 2000; static long RETIRE_JOB_INTERVAL; static long RETIRE_JOB_CHECK_INTERVAL; static float TASK_ALLOC_EPSILON; static float PAD_FRACTION; static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3; /** * Used for formatting the id numbers */ private static NumberFormat idFormat = NumberFormat.getInstance(); static { idFormat.setMinimumIntegerDigits(4); idFormat.setGroupingUsed(false); } private int nextJobId = 1; public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker"); private static JobTracker tracker = null; private static boolean runTracker = true; public static void startTracker(Configuration conf) throws IOException { if (tracker != null) throw new IOException("JobTracker already running."); runTracker = true; while (runTracker) { try { tracker = new JobTracker(conf); break; } catch (IOException e) { LOG.warn("Starting tracker", e); } try { Thread.sleep(1000); } catch (InterruptedException e) { } } if (runTracker) { tracker.offerService(); } } public static JobTracker getTracker() { return tracker; } public static void stopTracker() throws IOException { if (tracker == null) throw new IOException("Trying to stop JobTracker that is not running."); runTracker = false; tracker.close(); tracker = null; } public long getProtocolVersion(String protocol, long clientVersion) { if (protocol.equals(InterTrackerProtocol.class.getName())) { return InterTrackerProtocol.versionID; } else { return JobSubmissionProtocol.versionID; } } /** * A thread to timeout tasks that have been assigned to task trackers, * but that haven't reported back yet. * Note that I included a stop() method, even though there is no place * where JobTrackers are cleaned up. * @author Owen O'Malley */ private class ExpireLaunchingTasks implements Runnable { private volatile boolean shouldRun = true; /** * This is a map of the tasks that have been assigned to task trackers, * but that have not yet been seen in a status report. * map: task-id (String) -> time-assigned (Long) */ private Map launchingTasks = new LinkedHashMap(); public void run() { while (shouldRun) { try { // Every 3 minutes check for any tasks that are overdue Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3); long now = System.currentTimeMillis(); LOG.debug("Starting launching task sweep"); synchronized (JobTracker.this) { synchronized (launchingTasks) { Iterator itr = launchingTasks.entrySet().iterator(); while (itr.hasNext()) { Map.Entry pair = (Map.Entry) itr.next(); String taskId = (String) pair.getKey(); long age = now - ((Long) pair.getValue()).longValue(); LOG.info(taskId + " is " + age + " ms debug."); if (age > TASKTRACKER_EXPIRY_INTERVAL) { LOG.info("Launching task " + taskId + " timed out."); TaskInProgress tip = null; tip = (TaskInProgress) taskidToTIPMap.get(taskId); if (tip != null) { JobInProgress job = tip.getJob(); String trackerName = getAssignedTracker(taskId); TaskTrackerStatus trackerStatus = getTaskTracker(trackerName); job.failedTask(tip, taskId, "Error launching task", tip.isMapTask()?Phase.MAP:Phase.STARTING, trackerStatus.getHost(), trackerName, myMetrics); } itr.remove(); } else { // the tasks are sorted by start time, so once we find // one that we want to keep, we are done for this cycle. break; } } } } } catch (InterruptedException ie) { // all done return; } catch (Exception e) { LOG.error("Expire Launching Task Thread got exception: " + StringUtils.stringifyException(e)); } } } public void addNewTask(String taskName) { synchronized (launchingTasks) { launchingTasks.put(taskName, new Long(System.currentTimeMillis())); } } public void removeTask(String taskName) { synchronized (launchingTasks) { launchingTasks.remove(taskName); } } public void stop() { shouldRun = false; } } /////////////////////////////////////////////////////// // Used to expire TaskTrackers that have gone down /////////////////////////////////////////////////////// class ExpireTrackers implements Runnable { boolean shouldRun = true; public ExpireTrackers() { } /** * The run method lives for the life of the JobTracker, and removes TaskTrackers * that have not checked in for some time. */ public void run() { while (shouldRun) { try { // // Thread runs periodically to check whether trackers should be expired. // The sleep interval must be no more than half the maximum expiry time // for a task tracker. // Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3); // // Loop through all expired items in the queue // synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { long now = System.currentTimeMillis(); TaskTrackerStatus leastRecent = null; while ((trackerExpiryQueue.size() > 0) && ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { // Remove profile from head of queue trackerExpiryQueue.remove(leastRecent); String trackerName = leastRecent.getTrackerName(); // Figure out if last-seen time should be updated, or if tracker is dead TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); // Items might leave the taskTracker set through other means; the // status stored in 'taskTrackers' might be null, which means the // tracker has already been destroyed. if (newProfile != null) { if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { // Remove completely updateTaskTrackerStatus(trackerName, null); lostTaskTracker(leastRecent.getTrackerName(), leastRecent.getHost()); } else { // Update time by inserting latest profile trackerExpiryQueue.add(newProfile); } } } } } } catch (Exception t) { LOG.error("Tracker Expiry Thread got exception: " + StringUtils.stringifyException(t)); } } } /** * Stop the tracker on next iteration */ public void stopTracker() { shouldRun = false; } } /////////////////////////////////////////////////////// // Used to remove old finished Jobs that have been around for too long /////////////////////////////////////////////////////// class RetireJobs implements Runnable { boolean shouldRun = true; public RetireJobs() { } /** * The run method lives for the life of the JobTracker, * and removes Jobs that are not still running, but which * finished a long time ago. */ public void run() { while (shouldRun) { try { Thread.sleep(RETIRE_JOB_CHECK_INTERVAL); } catch (InterruptedException ie) { } synchronized (jobs) { synchronized (jobsByArrival) { synchronized (jobInitQueue) { for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) { String jobid = (String) it.next(); JobInProgress job = (JobInProgress) jobs.get(jobid); if (job.getStatus().getRunState() != JobStatus.RUNNING && job.getStatus().getRunState() != JobStatus.PREP && (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) { it.remove(); jobInitQueue.remove(job); jobsByArrival.remove(job); } } } } } } } public void stopRetirer() { shouldRun = false; } } ///////////////////////////////////////////////////////////////// // Used to init new jobs that have just been created ///////////////////////////////////////////////////////////////// class JobInitThread implements Runnable { boolean shouldRun = true; public JobInitThread() { } public void run() { while (shouldRun) { JobInProgress job = null; synchronized (jobInitQueue) { if (jobInitQueue.size() > 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -