⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 taskinprogress.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.util.*;import java.text.NumberFormat;import java.io.*;import java.util.*;////////////////////////////////////////////////////////// TaskInProgress maintains all the info needed for a// Task in the lifetime of its owning Job.  A given Task// might be speculatively executed or reexecuted, so we// need a level of indirection above the running-id itself.//// A given TaskInProgress contains multiple taskids,// 0 or more of which might be executing at any one time.// (That's what allows speculative execution.)  A taskid// is now *never* recycled.  A TIP allocates enough taskids// to account for all the speculation and failures it will// ever have to handle.  Once those are up, the TIP is dead.//////////////////////////////////////////////////////////class TaskInProgress {    static final int MAX_TASK_EXECS = 1;    static final int MAX_TASK_FAILURES = 4;        static final double SPECULATIVE_GAP = 0.2;    static final long SPECULATIVE_LAG = 60 * 1000;    private static NumberFormat idFormat = NumberFormat.getInstance();    static {      idFormat.setMinimumIntegerDigits(6);      idFormat.setGroupingUsed(false);    }    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskInProgress");    // Defines the TIP    private String jobFile = null;    private FileSplit split = null;    private int numMaps;    private int partition;    private JobTracker jobtracker;    private String id;    private String totalTaskIds[];    private JobInProgress job;    // Status of the TIP    private int numTaskFailures = 0;    private double progress = 0;    private String state = "";    private long startTime = 0;    private long execStartTime = 0 ;    private long execFinishTime = 0 ;    private int completes = 0;    private boolean failed = false;    private boolean killed = false;    private TreeSet usableTaskIds = new TreeSet();    private TreeSet recentTasks = new TreeSet();    private JobConf conf;    private boolean runSpeculative;    private Map<String,List<String>> taskDiagnosticData = new TreeMap();    /**     * Map from taskId -> TaskStatus     */    private TreeMap taskStatuses = new TreeMap();    private TreeSet machinesWhereFailed = new TreeSet();    private TreeSet tasksReportedClosed = new TreeSet();    /**     * Constructor for MapTask     */    public TaskInProgress(String uniqueString, String jobFile, FileSplit split,                           JobTracker jobtracker, JobConf conf,                           JobInProgress job, int partition) {        this.jobFile = jobFile;        this.split = split;        this.jobtracker = jobtracker;        this.job = job;        this.conf = conf;        this.partition = partition;        init(uniqueString);    }            /**     * Constructor for ReduceTask     */    public TaskInProgress(String uniqueString, String jobFile,                           int numMaps,                           int partition, JobTracker jobtracker, JobConf conf,                          JobInProgress job) {        this.jobFile = jobFile;        this.numMaps = numMaps;        this.partition = partition;        this.jobtracker = jobtracker;        this.job = job;        this.conf = conf;        init(uniqueString);    }    /**     * Make a unique name for this TIP.     * @param uniqueBase The unique name of the job     * @return The unique string for this tip     */    private String makeUniqueString(String uniqueBase) {      StringBuffer result = new StringBuffer();      result.append(uniqueBase);      if (isMapTask()) {        result.append("_m_");      } else {        result.append("_r_");      }      result.append(idFormat.format(partition));      return result.toString();    }        /**     * Initialization common to Map and Reduce     */    void init(String jobUniqueString) {        this.startTime = System.currentTimeMillis();        this.runSpeculative = conf.getSpeculativeExecution();        String uniqueString = makeUniqueString(jobUniqueString);        this.id = "tip_" + uniqueString;        this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];        for (int i = 0; i < totalTaskIds.length; i++) {          totalTaskIds[i] = "task_" + uniqueString + "_" + i;          usableTaskIds.add(totalTaskIds[i]);        }    }    ////////////////////////////////////    // Accessors, info, profiles, etc.    ////////////////////////////////////    /**     * Return the parent job     */    public JobInProgress getJob() {        return job;    }    /**     * Return an ID for this task, not its component taskid-threads     */    public String getTIPId() {        return this.id;    }    /**     * Whether this is a map task     */    public boolean isMapTask() {        return split != null;    }        /**     * Is this tip currently running any tasks?     * @return true if any tasks are running     */    public boolean isRunning() {      return !recentTasks.isEmpty();    }        /**     */    public boolean isComplete() {        return (completes > 0);    }    /**     */    public boolean isComplete(String taskid) {        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);        if (status == null) {            return false;        }        return ((completes > 0) &&                 (status.getRunState() == TaskStatus.State.SUCCEEDED));    }    /**     */    public boolean isFailed() {        return failed;    }    /**     * Number of times the TaskInProgress has failed.     */    public int numTaskFailures() {        return numTaskFailures;    }    /**     * Get the overall progress (from 0 to 1.0) for this TIP     */    public double getProgress() {        return progress;    }    /**     * Returns whether a component task-thread should be      * closed because the containing JobInProgress has completed.     */    public boolean shouldCloseForClosedJob(String taskid) {        // If the thing has never been closed,        // and it belongs to this TIP,        // and this TIP is somehow FINISHED,        // then true        TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);        if ((ts != null) &&            (! tasksReportedClosed.contains(taskid)) &&            (job.getStatus().getRunState() != JobStatus.RUNNING)) {            tasksReportedClosed.add(taskid);            return true;        } else {            return false;        }    }    /**     * Creates a "status report" for this task.  Includes the     * task ID and overall status, plus reports for all the     * component task-threads that have ever been started.     */    synchronized TaskReport generateSingleReport() {      ArrayList diagnostics = new ArrayList();      for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {        diagnostics.addAll((List)i.next());      }      TaskReport report = new TaskReport      (getTIPId(), (float)progress, state,          (String[])diagnostics.toArray(new String[diagnostics.size()]),          execStartTime, execFinishTime);            return report ;    }    /**     * Get the diagnostic messages for a given task within this tip.     * @param taskId the id of the required task

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -