taskinprogress.java
来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 453 行 · 第 1/2 页
JAVA
453 行
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.util.*;import java.util.logging.*;////////////////////////////////////////////////////////// TaskInProgress maintains all the info needed for a// Task in the lifetime of its owning Job. A given Task// might be speculatively executed or reexecuted, so we// need a level of indirection above the running-id itself.//// A given TaskInProgress contains multiple taskids,// 0 or more of which might be executing at any one time.// (That's what allows speculative execution.) A taskid// is now *never* recycled. A TIP allocates enough taskids// to account for all the speculation and failures it will// ever have to handle. Once those are up, the TIP is dead.//////////////////////////////////////////////////////////class TaskInProgress { static final int MAX_TASK_EXECS = 1; static final int MAX_TASK_FAILURES = 4; static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress"); // Defines the TIP private String jobFile = null; private FileSplit split = null; private String hints[][] = null; private TaskInProgress predecessors[] = null; private int partition; private JobTracker jobtracker; private String id; private String totalTaskIds[]; private JobInProgress job; // Status of the TIP private int numTaskFailures = 0; private double progress = 0; private String state = ""; private long startTime = 0; private int completes = 0; private boolean failed = false; private TreeSet usableTaskIds = new TreeSet(); private TreeSet recentTasks = new TreeSet(); private JobConf conf; private TreeMap taskDiagnosticData = new TreeMap(); private TreeMap taskStatuses = new TreeMap(); private TreeSet machinesWhereFailed = new TreeSet(); private TreeSet tasksReportedClosed = new TreeSet(); /** * Constructor for MapTask */ public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) { this.jobFile = jobFile; this.split = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; init(); } /** * Constructor for ReduceTask */ public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) { this.jobFile = jobFile; this.predecessors = predecessors; this.partition = partition; this.jobtracker = jobtracker; this.job = job; this.conf = conf; init(); } /** * Initialization common to Map and Reduce */ void init() { this.startTime = System.currentTimeMillis(); this.id = "tip_" + jobtracker.createUniqueId(); this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES]; for (int i = 0; i < totalTaskIds.length; i++) { if (isMapTask()) { totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId(); } else { totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId(); } usableTaskIds.add(totalTaskIds[i]); } } //////////////////////////////////// // Accessors, info, profiles, etc. //////////////////////////////////// /** * Return the parent job */ public JobInProgress getJob() { return job; } /** * Return an ID for this task, not its component taskid-threads */ public String getTIPId() { return this.id; } /** * Whether this is a map task */ public boolean isMapTask() { return split != null; } /** */ public boolean isComplete() { return (completes > 0); } /** */ public boolean isComplete(String taskid) { TaskStatus status = (TaskStatus) taskStatuses.get(taskid); if (status == null) { return false; } return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED)); } /** */ public boolean isFailed() { return failed; } /** * Number of times the TaskInProgress has failed. */ public int numTaskFailures() { return numTaskFailures; } /** * Get the overall progress (from 0 to 1.0) for this TIP */ public double getProgress() { return progress; } /** * Returns whether a component task-thread should be * closed because the containing JobInProgress has completed. */ public boolean shouldCloseForClosedJob(String taskid) { // If the thing has never been closed, // and it belongs to this TIP, // and this TIP is somehow FINISHED, // then true TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); if ((ts != null) && (! tasksReportedClosed.contains(taskid)) && (job.getStatus().getRunState() != JobStatus.RUNNING)) { tasksReportedClosed.add(taskid); return true; } else { return false; } } /** * A TaskInProgress might be speculatively executed, and so * can have many taskids simultaneously. Reduce tasks rely on knowing * their predecessor ids, so they can be sure that all the previous * work has been completed. * * But we don't know ahead of time which task id will actually be * the one that completes for a given Map task. We don't want the * Reduce task to have to be recreated after Map-completion, or check * in with the JobTracker. So instead, each TaskInProgress preallocates * all the task-ids it could ever want to run simultaneously. Then the * Reduce task can be told about all the ids task-ids for a given Map * TaskInProgress. If any of the Map TIP's tasks complete, the Reduce * task will know all is well, and can continue. * * Most of the time, only a small number of the possible task-ids will * ever be used. */ public String[] getAllPossibleTaskIds() { return totalTaskIds; } /** * Creates a "status report" for this task. Includes the * task ID and overall status, plus reports for all the * component task-threads that have ever been started. */ TaskReport generateSingleReport() { ArrayList diagnostics = new ArrayList(); for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) { diagnostics.addAll((Vector)i.next()); } return new TaskReport (getTIPId(), (float)progress, state,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?