tasktracker.java

来自「Hadoop是一个用于运行应用程序在大型集群的廉价硬件设备上的框架。Hadoop」· Java 代码 · 共 759 行 · 第 1/2 页

JAVA
759
字号
            this.jobConf = new JobConf(conf);            this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());            localizeTask(task);        }        /**         * Some fields in the Task object need to be made machine-specific.         * So here, edit the Task's fields appropriately.         */        void localizeTask(Task t) throws IOException {            File localJobFile =              this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.xml");            File localJarFile =              this.jobConf.getLocalFile(SUBDIR+File.separator+t.getTaskId(), "job.jar");            String jobFile = t.getJobFile();            fs.copyToLocalFile(new File(jobFile), localJobFile);            t.setJobFile(localJobFile.getCanonicalPath());            JobConf jc = new JobConf(localJobFile);            String jarFile = jc.getJar();            if (jarFile != null) {              fs.copyToLocalFile(new File(jarFile), localJarFile);              jc.setJar(localJarFile.getCanonicalPath());              BufferedOutputStream out =                new BufferedOutputStream(new FileOutputStream(localJobFile));              try {                jc.write(out);              } finally {                out.close();              }            }        }        /**         */        public Task getTask() {            return task;        }        /**         */        public TaskStatus createStatus() {            TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString);            if (diagnosticInfo.length() > 0) {                diagnosticInfo = new StringBuffer();            }            return status;        }        /**         * Kick off the task execution         */        public synchronized void launchTask() throws IOException {            this.progress = 0.0f;            this.runstate = TaskStatus.RUNNING;            this.diagnosticInfo = new StringBuffer();            this.runner = task.createRunner(TaskTracker.this);            this.runner.start();        }        /**         * The task is reporting its progress         */        public synchronized void reportProgress(float p, String state) {            LOG.info(task.getTaskId()+" "+p+"% "+state);            this.progress = p;            this.runstate = TaskStatus.RUNNING;            this.lastProgressReport = System.currentTimeMillis();            this.stateString = state;        }        /**         */        public long getLastProgressReport() {            return lastProgressReport;        }        /**         */        public int getRunState() {            return runstate;        }        /**         * The task has reported some diagnostic info about its status         */        public synchronized void reportDiagnosticInfo(String info) {            this.diagnosticInfo.append(info);        }        /**         * The task is reporting that it's done running         */        public synchronized void reportDone() {            LOG.info("Task " + task.getTaskId() + " is done.");            this.progress = 1.0f;            this.done = true;        }        /**         * The task has actually finished running.         */        public synchronized void taskFinished() {            long start = System.currentTimeMillis();            //            // Wait until task reports as done.  If it hasn't reported in,            // wait for a second and try again.            //            while (! done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) {                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                }            }            //            // Change state to success or failure, depending on whether            // task was 'done' before terminating            //            if (done) {                runstate = TaskStatus.SUCCEEDED;            } else {                runstate = TaskStatus.FAILED;            }            //            // If the task has failed, or if the task was killAndCleanup()'ed,            // we should clean up right away.  We only wait to cleanup            // if the task succeeded, and its results might be useful            // later on to downstream job processing.            //            if (wasKilled || runstate == TaskStatus.FAILED) {                try {                    cleanup();                } catch (IOException ie) {                }            }        }        /**         * We no longer need anything from this task, as the job has         * finished.  If the task is still running, kill it (and clean up         */        public synchronized void jobHasFinished() throws IOException {            if (getRunState() == TaskStatus.RUNNING) {                killAndCleanup();            } else {                cleanup();            }        }        /**         * This task has run on too long, and should be killed.         */        public synchronized void killAndCleanup() throws IOException {            if (runstate == TaskStatus.RUNNING) {                wasKilled = true;                runner.kill();            }        }        /**         * The map output has been lost.         */        public synchronized void mapOutputLost() throws IOException {            if (runstate == TaskStatus.SUCCEEDED) {              LOG.info("Reporting output lost:"+task.getTaskId());              runstate = TaskStatus.FAILED;       // change status to failure              synchronized (TaskTracker.this) {   // force into next heartbeat                runningTasks.put(task.getTaskId(), this);                mapTotal++;              }            } else {              LOG.warning("Output already reported lost:"+task.getTaskId());            }        }        /**         * We no longer need anything from this task.  Either the          * controlling job is all done and the files have been copied         * away, or the task failed and we don't need the remains.         */        synchronized void cleanup() throws IOException {            tasks.remove(task.getTaskId());            try {                runner.close();            } catch (IOException ie) {            }            this.jobConf.deleteLocalFiles(SUBDIR + File.separator + task.getTaskId());        }    }    /////////////////////////////////////////////////////////////////    // MapOutputProtocol    /////////////////////////////////////////////////////////////////    public MapOutputFile getFile(String mapTaskId, String reduceTaskId,      IntWritable partition) {    MapOutputFile mapOutputFile = new MapOutputFile(mapTaskId, reduceTaskId,        partition.get());    mapOutputFile.setConf(this.fConf);    return mapOutputFile;  }    // ///////////////////////////////////////////////////////////////    // TaskUmbilicalProtocol    /////////////////////////////////////////////////////////////////    /**     * Called upon startup by the child process, to fetch Task data.     */    public synchronized Task getTask(String taskid) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {            return (Task) tip.getTask();        } else {            return null;        }    }    /**     * Called periodically to report Task progress, from 0.0 to 1.0.     */    public synchronized void progress(String taskid, float progress, String state) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportProgress(progress, state);        } else {          LOG.warning("Progress from unknown child task: "+taskid+". Ignored.");        }    }    /**     * Called when the task dies before completion, and we want to report back     * diagnostic info     */    public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportDiagnosticInfo(info);        } else {          LOG.warning("Error from unknown child task: "+taskid+". Ignored.");        }    }    /** Child checking to see if we're alive.  Normally does nothing.*/    public synchronized void ping(String taskid) throws IOException {      if (tasks.get(taskid) == null) {        throw new IOException("No such task id."); // force child exit      }    }    /**     * The task is done.     */    public synchronized void done(String taskid) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportDone();        } else {          LOG.warning("Unknown child task done: "+taskid+". Ignored.");        }    }    /** A child task had a local filesystem error.  Exit, so that no future     * jobs are accepted. */    public synchronized void fsError(String message) throws IOException {      LOG.severe("FSError, exiting: "+ message);      running = false;    }    /////////////////////////////////////////////////////    //  Called by TaskTracker thread after task process ends    /////////////////////////////////////////////////////    /**     * The task is no longer running.  It may not have completed successfully     */    synchronized void reportTaskFinished(String taskid) {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.taskFinished();        } else {          LOG.warning("Unknown child task finshed: "+taskid+". Ignored.");        }    }    /**     * A completed map task's output has been lost.     */    public synchronized void mapOutputLost(String taskid) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.mapOutputLost();        } else {          LOG.warning("Unknown child with bad map output: "+taskid+". Ignored.");        }    }    /**      * The main() for child processes.      */    public static class Child {        public static void main(String[] args) throws Throwable {          LogFormatter.showTime(false);          LOG.info("Child starting");          Configuration conf = new Configuration();          int port = Integer.parseInt(args[0]);          String taskid = args[1];          TaskUmbilicalProtocol umbilical =            (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,                                                new InetSocketAddress(port), conf);                      Task task = umbilical.getTask(taskid);          JobConf job = new JobConf(task.getJobFile());          conf.addFinalResource(new File(task.getJobFile()));          startPinging(umbilical, taskid);        // start pinging parent          try {              // If the user set a working directory, use it              String workDir = job.getWorkingDirectory();              if (workDir != null) {                FileSystem file_sys = FileSystem.get(job);                file_sys.setWorkingDirectory(new File(workDir));              }              task.run(job, umbilical);           // run the task          } catch (FSError e) {            LOG.log(Level.SEVERE, "FSError from child", e);            umbilical.fsError(e.getMessage());          } catch (Throwable throwable) {              LOG.log(Level.WARNING, "Error running child", throwable);              // Report back any failures, for diagnostic purposes              ByteArrayOutputStream baos = new ByteArrayOutputStream();              throwable.printStackTrace(new PrintStream(baos));              umbilical.reportDiagnosticInfo(taskid, baos.toString());          }        }        /** Periodically ping parent and exit when this fails.*/        private static void startPinging(final TaskUmbilicalProtocol umbilical,                                         final String taskid) {          Thread thread = new Thread(new Runnable() {              public void run() {                while (true) {                  try {                    umbilical.ping(taskid);                  } catch (Throwable t) {                    LOG.log(Level.WARNING, "Parent died.  Exiting "+taskid, t);                    System.exit(1);                  }                  try {                    Thread.sleep(1000);                  } catch (InterruptedException e) {                  }                }              }            }, "Pinger for "+taskid);          thread.setDaemon(true);          thread.start();        }    }    /**     * Start the TaskTracker, point toward the indicated JobTracker     */    public static void main(String argv[]) throws IOException {        if (argv.length != 0) {            System.out.println("usage: TaskTracker");            System.exit(-1);        }        TaskTracker tt = new TaskTracker(new Configuration());        tt.run();    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?