📄 taskinprogress.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.util.*;import java.text.NumberFormat;import java.io.*;import java.util.*;////////////////////////////////////////////////////////// TaskInProgress maintains all the info needed for a// Task in the lifetime of its owning Job. A given Task// might be speculatively executed or reexecuted, so we// need a level of indirection above the running-id itself.//// A given TaskInProgress contains multiple taskids,// 0 or more of which might be executing at any one time.// (That's what allows speculative execution.) A taskid// is now *never* recycled. A TIP allocates enough taskids// to account for all the speculation and failures it will// ever have to handle. Once those are up, the TIP is dead.//////////////////////////////////////////////////////////class TaskInProgress { static final int MAX_TASK_EXECS = 1; static final int MAX_TASK_FAILURES = 4; static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; private static NumberFormat idFormat = NumberFormat.getInstance(); static { idFormat.setMinimumIntegerDigits(6); idFormat.setGroupingUsed(false); } public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskInProgress"); // Defines the TIP private String jobFile = null; private FileSplit split = null; private int numMaps; private int partition; private JobTracker jobtracker; private String id; private String totalTaskIds[]; private JobInProgress job; // Status of the TIP private int numTaskFailures = 0; private double progress = 0; private String state = ""; private long startTime = 0; private long execStartTime = 0 ; private long execFinishTime = 0 ; private int completes = 0; private boolean failed = false; private boolean killed = false; private TreeSet usableTaskIds = new TreeSet(); private TreeSet recentTasks = new TreeSet(); private JobConf conf; private boolean runSpeculative; private Map<String,List<String>> taskDiagnosticData = new TreeMap(); /** * Map from taskId -> TaskStatus */ private TreeMap taskStatuses = new TreeMap(); private TreeSet machinesWhereFailed = new TreeSet(); private TreeSet tasksReportedClosed = new TreeSet(); /** * Constructor for MapTask */ public TaskInProgress(String uniqueString, String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition) { this.jobFile = jobFile; this.split = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; this.partition = partition; init(uniqueString); } /** * Constructor for ReduceTask */ public TaskInProgress(String uniqueString, String jobFile, int numMaps, int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) { this.jobFile = jobFile; this.numMaps = numMaps; this.partition = partition; this.jobtracker = jobtracker; this.job = job; this.conf = conf; init(uniqueString); } /** * Make a unique name for this TIP. * @param uniqueBase The unique name of the job * @return The unique string for this tip */ private String makeUniqueString(String uniqueBase) { StringBuffer result = new StringBuffer(); result.append(uniqueBase); if (isMapTask()) { result.append("_m_"); } else { result.append("_r_"); } result.append(idFormat.format(partition)); return result.toString(); } /** * Initialization common to Map and Reduce */ void init(String jobUniqueString) { this.startTime = System.currentTimeMillis(); this.runSpeculative = conf.getSpeculativeExecution(); String uniqueString = makeUniqueString(jobUniqueString); this.id = "tip_" + uniqueString; this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES]; for (int i = 0; i < totalTaskIds.length; i++) { totalTaskIds[i] = "task_" + uniqueString + "_" + i; usableTaskIds.add(totalTaskIds[i]); } } //////////////////////////////////// // Accessors, info, profiles, etc. //////////////////////////////////// /** * Return the parent job */ public JobInProgress getJob() { return job; } /** * Return an ID for this task, not its component taskid-threads */ public String getTIPId() { return this.id; } /** * Whether this is a map task */ public boolean isMapTask() { return split != null; } /** * Is this tip currently running any tasks? * @return true if any tasks are running */ public boolean isRunning() { return !recentTasks.isEmpty(); } /** */ public boolean isComplete() { return (completes > 0); } /** */ public boolean isComplete(String taskid) { TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status == null) { return false; } return ((completes > 0) && (status.getRunState() == TaskStatus.State.SUCCEEDED)); } /** */ public boolean isFailed() { return failed; } /** * Number of times the TaskInProgress has failed. */ public int numTaskFailures() { return numTaskFailures; } /** * Get the overall progress (from 0 to 1.0) for this TIP */ public double getProgress() { return progress; } /** * Returns whether a component task-thread should be * closed because the containing JobInProgress has completed. */ public boolean shouldCloseForClosedJob(String taskid) { // If the thing has never been closed, // and it belongs to this TIP, // and this TIP is somehow FINISHED, // then true TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); if ((ts != null) && (! tasksReportedClosed.contains(taskid)) && (job.getStatus().getRunState() != JobStatus.RUNNING)) { tasksReportedClosed.add(taskid); return true; } else { return false; } } /** * Creates a "status report" for this task. Includes the * task ID and overall status, plus reports for all the * component task-threads that have ever been started. */ synchronized TaskReport generateSingleReport() { ArrayList diagnostics = new ArrayList(); for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) { diagnostics.addAll((List)i.next()); } TaskReport report = new TaskReport (getTIPId(), (float)progress, state, (String[])diagnostics.toArray(new String[diagnostics.size()]), execStartTime, execFinishTime); return report ; } /** * Get the diagnostic messages for a given task within this tip. * @param taskId the id of the required task
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -