taskinprogress.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 453 行 · 第 1/2 页

JAVA
453
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.util.*;import java.util.logging.*;////////////////////////////////////////////////////////// TaskInProgress maintains all the info needed for a// Task in the lifetime of its owning Job.  A given Task// might be speculatively executed or reexecuted, so we// need a level of indirection above the running-id itself.//// A given TaskInProgress contains multiple taskids,// 0 or more of which might be executing at any one time.// (That's what allows speculative execution.)  A taskid// is now *never* recycled.  A TIP allocates enough taskids// to account for all the speculation and failures it will// ever have to handle.  Once those are up, the TIP is dead.//////////////////////////////////////////////////////////class TaskInProgress {    static final int MAX_TASK_EXECS = 1;    static final int MAX_TASK_FAILURES = 4;        static final double SPECULATIVE_GAP = 0.2;    static final long SPECULATIVE_LAG = 60 * 1000;    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress");    // Defines the TIP    private String jobFile = null;    private FileSplit split = null;    private String hints[][] = null;    private TaskInProgress predecessors[] = null;    private int partition;    private JobTracker jobtracker;    private String id;    private String totalTaskIds[];    private JobInProgress job;    // Status of the TIP    private int numTaskFailures = 0;    private double progress = 0;    private String state = "";    private long startTime = 0;    private int completes = 0;    private boolean failed = false;    private TreeSet usableTaskIds = new TreeSet();    private TreeSet recentTasks = new TreeSet();    private JobConf conf;        private TreeMap taskDiagnosticData = new TreeMap();    private TreeMap taskStatuses = new TreeMap();    private TreeSet machinesWhereFailed = new TreeSet();    private TreeSet tasksReportedClosed = new TreeSet();    /**     * Constructor for MapTask     */    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) {        this.jobFile = jobFile;        this.split = split;        this.jobtracker = jobtracker;        this.job = job;        this.conf = conf;        init();    }            /**     * Constructor for ReduceTask     */    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) {        this.jobFile = jobFile;        this.predecessors = predecessors;        this.partition = partition;        this.jobtracker = jobtracker;        this.job = job;        this.conf = conf;        init();    }    /**     * Initialization common to Map and Reduce     */    void init() {        this.startTime = System.currentTimeMillis();        this.id = "tip_" + jobtracker.createUniqueId();        this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];        for (int i = 0; i < totalTaskIds.length; i++) {            if (isMapTask()) {                totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId();            } else {                totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId();            }            usableTaskIds.add(totalTaskIds[i]);        }    }    ////////////////////////////////////    // Accessors, info, profiles, etc.    ////////////////////////////////////    /**     * Return the parent job     */    public JobInProgress getJob() {        return job;    }    /**     * Return an ID for this task, not its component taskid-threads     */    public String getTIPId() {        return this.id;    }    /**     * Whether this is a map task     */    public boolean isMapTask() {        return split != null;    }    /**     */    public boolean isComplete() {        return (completes > 0);    }    /**     */    public boolean isComplete(String taskid) {        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);        if (status == null) {            return false;        }        return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED));    }    /**     */    public boolean isFailed() {        return failed;    }    /**     * Number of times the TaskInProgress has failed.     */    public int numTaskFailures() {        return numTaskFailures;    }    /**     * Get the overall progress (from 0 to 1.0) for this TIP     */    public double getProgress() {        return progress;    }    /**     * Returns whether a component task-thread should be      * closed because the containing JobInProgress has completed.     */    public boolean shouldCloseForClosedJob(String taskid) {        // If the thing has never been closed,        // and it belongs to this TIP,        // and this TIP is somehow FINISHED,        // then true        TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);        if ((ts != null) &&            (! tasksReportedClosed.contains(taskid)) &&            (job.getStatus().getRunState() != JobStatus.RUNNING)) {            tasksReportedClosed.add(taskid);            return true;        } else {            return false;        }    }    /**     * A TaskInProgress might be speculatively executed, and so     * can have many taskids simultaneously.  Reduce tasks rely on knowing     * their predecessor ids, so they can be sure that all the previous     * work has been completed.     *     * But we don't know ahead of time which task id will actually be     * the one that completes for a given Map task.  We don't want the     * Reduce task to have to be recreated after Map-completion, or check     * in with the JobTracker.  So instead, each TaskInProgress preallocates     * all the task-ids it could ever want to run simultaneously.  Then the     * Reduce task can be told about all the ids task-ids for a given Map      * TaskInProgress.  If any of the Map TIP's tasks complete, the Reduce     * task will know all is well, and can continue.     *     * Most of the time, only a small number of the possible task-ids will     * ever be used.     */    public String[] getAllPossibleTaskIds() {        return totalTaskIds;    }    /**     * Creates a "status report" for this task.  Includes the     * task ID and overall status, plus reports for all the     * component task-threads that have ever been started.     */    TaskReport generateSingleReport() {      ArrayList diagnostics = new ArrayList();      for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {        diagnostics.addAll((Vector)i.next());      }      return new TaskReport        (getTIPId(), (float)progress, state,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?