⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobinprogress.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;import org.apache.hadoop.mapred.JobHistory.Values ; import java.io.*;import java.net.*;import java.util.*;///////////////////////////////////////////////////////// JobInProgress maintains all the info for keeping// a Job on the straight and narrow.  It keeps its JobProfile// and its latest JobStatus, plus a set of tables for // doing bookkeeping of its Tasks.///////////////////////////////////////////////////////class JobInProgress {    private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress");    JobProfile profile;    JobStatus status;    Path localJobFile = null;    Path localJarFile = null;    TaskInProgress maps[] = new TaskInProgress[0];    TaskInProgress reduces[] = new TaskInProgress[0];    int numMapTasks = 0;    int numReduceTasks = 0;    int runningMapTasks = 0;    int runningReduceTasks = 0;    int finishedMapTasks = 0;    int finishedReduceTasks = 0;    int failedMapTasks = 0 ;     int failedReduceTasks = 0 ;     JobTracker jobtracker = null;    HashMap hostToMaps = new HashMap();    long startTime;    long finishTime;    private JobConf conf;    private int firstMapToTry = 0;    private int firstReduceToTry = 0;    boolean tasksInited = false;    private LocalFileSystem localFs;    private String uniqueString;      /**     * Create a JobInProgress with the given job file, plus a handle     * to the tracker.     */    public JobInProgress(String jobFile, JobTracker jobtracker,                          Configuration default_conf) throws IOException {        uniqueString = jobtracker.createUniqueId();        String jobid = "job_" + uniqueString;        String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;        this.jobtracker = jobtracker;        this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);        this.startTime = System.currentTimeMillis();        this.localFs = (LocalFileSystem)FileSystem.getNamed("local", default_conf);        JobConf default_job_conf = new JobConf(default_conf);        this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR                                                           +"/"+jobid + ".xml");        this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR                                                          +"/"+ jobid + ".jar");        FileSystem fs = FileSystem.get(default_conf);        fs.copyToLocalFile(new Path(jobFile), localJobFile);        conf = new JobConf(localJobFile);        this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,                                      conf.getJobName());        String jarFile = conf.getJar();        if (jarFile != null) {          fs.copyToLocalFile(new Path(jarFile), localJarFile);          conf.setJar(localJarFile.toString());        }        this.numMapTasks = conf.getNumMapTasks();        this.numReduceTasks = conf.getNumReduceTasks();        JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(),             System.currentTimeMillis(), jobFile);             }    /**     * Construct the splits, etc.  This is invoked from an async     * thread so that split-computation doesn't block anyone.     */    public synchronized void initTasks() throws IOException {        if (tasksInited) {            return;        }        //        // construct input splits        //        String jobFile = profile.getJobFile();        FileSystem fs = FileSystem.get(conf);        if (localJarFile != null) {            ClassLoader loader =              new URLClassLoader(new URL[]{ localFs.pathToFile(localJarFile).toURL() });            conf.setClassLoader(loader);        }        InputFormat inputFormat = conf.getInputFormat();        FileSplit[] splits = inputFormat.getSplits(fs, conf, numMapTasks);        //        // sort splits by decreasing length, to reduce job's tail        //        Arrays.sort(splits, new Comparator() {            public int compare(Object a, Object b) {                long diff =                    ((FileSplit)b).getLength() - ((FileSplit)a).getLength();                return diff==0 ? 0 : (diff > 0 ? 1 : -1);            }        });        //        // adjust number of map tasks to actual number of splits        //        this.numMapTasks = splits.length;                // if no split is returned, job is considered completed and successful        if (numMapTasks == 0) {            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);            tasksInited = true;            return;        }                // create a map task for each split        this.maps = new TaskInProgress[numMapTasks];        for (int i = 0; i < numMapTasks; i++) {            maps[i] = new TaskInProgress(uniqueString, jobFile, splits[i],                                          jobtracker, conf, this, i);        }        //        // Create reduce tasks        //        this.reduces = new TaskInProgress[numReduceTasks];        for (int i = 0; i < numReduceTasks; i++) {            reduces[i] = new TaskInProgress(uniqueString, jobFile,                                             numMapTasks, i,                                             jobtracker, conf, this);        }        //        // Obtain some tasktracker-cache information for the map task splits.        //        for (int i = 0; i < maps.length; i++) {            String hints[][] =              fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),                                   splits[i].getLength());            if (hints != null) {              for (int k = 0; k < hints.length; k++) {                for (int j = 0; j < hints[k].length; j++) {                  ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);                  if (hostMaps == null) {                    hostMaps = new ArrayList();                    hostToMaps.put(hints[k][j], hostMaps);                  }                  hostMaps.add(maps[i]);                }              }            }        }        this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);        tasksInited = true;                JobHistory.JobInfo.logStarted(profile.getJobId(), System.currentTimeMillis(), numMapTasks, numReduceTasks);    }    /////////////////////////////////////////////////////    // Accessors for the JobInProgress    /////////////////////////////////////////////////////    public JobProfile getProfile() {        return profile;    }    public JobStatus getStatus() {        return status;    }    public long getStartTime() {        return startTime;    }    public long getFinishTime() {        return finishTime;    }    public int desiredMaps() {        return numMapTasks;    }    public int finishedMaps() {        return finishedMapTasks;    }    public int desiredReduces() {        return numReduceTasks;    }    public synchronized int runningMaps() {        return runningMapTasks;    }    public synchronized int runningReduces() {        return runningReduceTasks;    }    public int finishedReduces() {        return finishedReduceTasks;    }     /**     * Get the list of map tasks     * @return the raw array of maps for this job     */    TaskInProgress[] getMapTasks() {      return maps;    }        /**     * Get the list of reduce tasks     * @return the raw array of reduce tasks for this job     */    TaskInProgress[] getReduceTasks() {      return reduces;    }        /**     * Get the job configuration     * @return the job's configuration     */    JobConf getJobConf() {      return conf;    }        /**     * Return a treeset of completed TaskInProgress objects     */    public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) {        Vector results = new Vector();        TaskInProgress tips[] = null;        if (shouldBeMap) {            tips = maps;        } else {            tips = reduces;        }        for (int i = 0; i < tips.length; i++) {            if (tips[i].isComplete() == shouldBeComplete) {                results.add(tips[i]);            }        }        return results;    }    ////////////////////////////////////////////////////    // Status update methods    ////////////////////////////////////////////////////    public synchronized void updateTaskStatus(TaskInProgress tip,                                               TaskStatus status,                                              JobTrackerMetrics metrics) {        double oldProgress = tip.getProgress();   // save old progress        boolean wasRunning = tip.isRunning();        boolean wasComplete = tip.isComplete();        boolean change = tip.updateStatus(status);        if (change) {          TaskStatus.State state = status.getRunState();          if (state == TaskStatus.State.SUCCEEDED) {            completedTask(tip, status, metrics);          } else if (state == TaskStatus.State.FAILED ||                     state == TaskStatus.State.KILLED) {            // Tell the job to fail the relevant task            failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),                       wasRunning, wasComplete);          }                  }        //        // Update JobInProgress status        //        LOG.debug("Taking progress for " + tip.getTIPId() + " from " +                   oldProgress + " to " + tip.getProgress());        double progressDelta = tip.getProgress() - oldProgress;        if (tip.isMapTask()) {          if (maps.length == 0) {            this.status.setMapProgress(1.0f);          } else {            this.status.setMapProgress((float) (this.status.mapProgress() +                                                progressDelta / maps.length));          }        } else {          if (reduces.length == 0) {            this.status.setReduceProgress(1.0f);          } else {            this.status.setReduceProgress                 ((float) (this.status.reduceProgress() +                           (progressDelta / reduces.length)));          }        }    }       /////////////////////////////////////////////////////    // Create/manage tasks    /////////////////////////////////////////////////////    /**     * Return a MapTask, if appropriate, to run on the given tasktracker     */    public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) {      if (! tasksInited) {        LOG.info("Cannot create task split for " + profile.getJobId());        return null;      }      ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());      double avgProgress = status.mapProgress() / maps.length;      int target = findNewTask(tts, clusterSize, avgProgress,                                   maps, firstMapToTry, mapCache);      if (target == -1) {        return null;      }      boolean wasRunning = maps[target].isRunning();      Task result = maps[target].getTaskToRun(tts.getTrackerName());      if (!wasRunning) {        runningMapTasks += 1;        JobHistory.Task.logStarted(profile.getJobId(),             maps[target].getTIPId(), Values.MAP.name(),            System.currentTimeMillis());      }      return result;    }        /**     * Return a ReduceTask, if appropriate, to run on the given tasktracker.     * We don't have cache-sensitivity for reduce tasks, as they     *  work on temporary MapRed files.       */    public Task obtainNewReduceTask(TaskTrackerStatus tts,                                    int clusterSize) {        if (! tasksInited) {            LOG.info("Cannot create task split for " + profile.getJobId());            return null;        }        double avgProgress = status.reduceProgress() / reduces.length;        int target = findNewTask(tts, clusterSize, avgProgress,                                     reduces, firstReduceToTry, null);        if (target == -1) {          return null;        }        boolean wasRunning = reduces[target].isRunning();        Task result = reduces[target].getTaskToRun(tts.getTrackerName());        if (!wasRunning) {          runningReduceTasks += 1;          JobHistory.Task.logStarted(profile.getJobId(), 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -