jobinprogress.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 507 行 · 第 1/2 页

JAVA
507
字号
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.util.LogFormatter;import java.io.*;import java.net.*;import java.util.*;import java.util.logging.*;///////////////////////////////////////////////////////// JobInProgress maintains all the info for keeping// a Job on the straight and narrow.  It keeps its JobProfile// and its latest JobStatus, plus a set of tables for // doing bookkeeping of its Tasks.///////////////////////////////////////////////////////class JobInProgress {    public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.JobInProgress");    JobProfile profile;    JobStatus status;    File localJobFile = null;    File localJarFile = null;    TaskInProgress maps[] = new TaskInProgress[0];    TaskInProgress reduces[] = new TaskInProgress[0];    int numMapTasks = 0;    int numReduceTasks = 0;    JobTracker jobtracker = null;    TreeMap cachedHints = new TreeMap();    long startTime;    long finishTime;    String deleteUponCompletion = null;    private JobConf conf;    boolean tasksInited = false;    /**     * Create a JobInProgress with the given job file, plus a handle     * to the tracker.     */    public JobInProgress(String jobFile, JobTracker jobtracker,                          Configuration default_conf) throws IOException {        String jobid = "job_" + jobtracker.createUniqueId();        String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;        this.jobtracker = jobtracker;        this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);        this.startTime = System.currentTimeMillis();        JobConf default_job_conf = new JobConf(default_conf);        this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,             jobid + ".xml");        this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR,             jobid + ".jar");        FileSystem fs = FileSystem.get(default_conf);        fs.copyToLocalFile(new File(jobFile), localJobFile);        conf = new JobConf(localJobFile);        this.profile = new JobProfile(conf.getUser(), jobid, jobFile, url,                                      conf.getJobName());        String jarFile = conf.getJar();        if (jarFile != null) {          fs.copyToLocalFile(new File(jarFile), localJarFile);          conf.setJar(localJarFile.getCanonicalPath());        }        this.numMapTasks = conf.getNumMapTasks();        this.numReduceTasks = conf.getNumReduceTasks();        //        // If a jobFile is in the systemDir, we can delete it (and        // its JAR) upon completion        //        File systemDir = conf.getSystemDir();        if (jobFile.startsWith(systemDir.getPath())) {            this.deleteUponCompletion = jobFile;        }    }    /**     * Construct the splits, etc.  This is invoked from an async     * thread so that split-computation doesn't block anyone.     */    public void initTasks() throws IOException {        if (tasksInited) {            return;        }        //        // construct input splits        //        String jobid = profile.getJobId();        String jobFile = profile.getJobFile();        JobConf jd = new JobConf(localJobFile);        FileSystem fs = FileSystem.get(conf);        String ifClassName = jd.get("mapred.input.format.class");        InputFormat inputFormat;        if (ifClassName != null && localJarFile != null) {          try {            ClassLoader loader =              new URLClassLoader(new URL[]{ localJarFile.toURL() });            Class inputFormatClass = loader.loadClass(ifClassName);            inputFormat = (InputFormat)inputFormatClass.newInstance();          } catch (Exception e) {            throw new IOException(e.toString());          }        } else {          inputFormat = jd.getInputFormat();        }        FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);        //        // sort splits by decreasing length, to reduce job's tail        //        Arrays.sort(splits, new Comparator() {            public int compare(Object a, Object b) {                long diff =                    ((FileSplit)b).getLength() - ((FileSplit)a).getLength();                return diff==0 ? 0 : (diff > 0 ? 1 : -1);            }        });        //        // adjust number of map tasks to actual number of splits        //        this.numMapTasks = splits.length;        // create a map task for each split        this.maps = new TaskInProgress[numMapTasks];        for (int i = 0; i < numMapTasks; i++) {            maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, this);        }        //        // Create reduce tasks        //        this.reduces = new TaskInProgress[numReduceTasks];        for (int i = 0; i < numReduceTasks; i++) {            reduces[i] = new TaskInProgress(jobFile, maps, i, jobtracker, conf, this);        }        //        // Obtain some tasktracker-cache information for the map task splits.        //        for (int i = 0; i < maps.length; i++) {            String hints[][] = fs.getFileCacheHints(splits[i].getFile(), splits[i].getStart(), splits[i].getLength());            cachedHints.put(maps[i].getTIPId(), hints);        }        this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);        tasksInited = true;    }    /**     * This is called by TaskInProgress objects.  The JobInProgress     * prefetches and caches a lot of these hints.  If the hint is     * not available, then we pass it through to the filesystem.     */    String[][] getFileCacheHints(String tipID, File f, long start, long len) throws IOException {        String results[][] = (String[][]) cachedHints.get(tipID);        if (tipID == null) {            FileSystem fs = FileSystem.get(conf);            results = fs.getFileCacheHints(f, start, len);            cachedHints.put(tipID, results);        }        return results;    }    /////////////////////////////////////////////////////    // Accessors for the JobInProgress    /////////////////////////////////////////////////////    public JobProfile getProfile() {        return profile;    }    public JobStatus getStatus() {        return status;    }    public long getStartTime() {        return startTime;    }    public long getFinishTime() {        return finishTime;    }    public int desiredMaps() {        return numMapTasks;    }    public int finishedMaps() {        int finishedCount = 0;        for (int i = 0; i < maps.length; i++) {            if (maps[i].isComplete()) {                finishedCount++;            }        }        return finishedCount;    }    public int desiredReduces() {        return numReduceTasks;    }    public int finishedReduces() {        int finishedCount = 0;        for (int i = 0; i < reduces.length; i++) {            if (reduces[i].isComplete()) {                finishedCount++;            }        }        return finishedCount;    }    /**     * Return a treeset of completed TaskInProgress objects     */    public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) {        Vector results = new Vector();        TaskInProgress tips[] = null;        if (shouldBeMap) {            tips = maps;        } else {            tips = reduces;        }        for (int i = 0; i < tips.length; i++) {            if (tips[i].isComplete() == shouldBeComplete) {                results.add(tips[i]);            }        }        return results;    }    ////////////////////////////////////////////////////    // Status update methods    ////////////////////////////////////////////////////    public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {        double oldProgress = tip.getProgress();   // save old progress        tip.updateStatus(status);                 // update tip        //

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?