⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tasktracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                try {                    // This while-loop attempts reconnects if we get network errors                    while (running && ! staleState && !shuttingDown ) {                        try {                            if (offerService() == STALE_STATE) {                                staleState = true;                            }                        } catch (Exception ex) {                            if (!shuttingDown) {                                LOG.info("Lost connection to JobTracker [" +                                        jobTrackAddr + "].  Retrying...", ex);                                try {                                    Thread.sleep(5000);                                } catch (InterruptedException ie) {                                }                            }                        }                    }                } finally {                    close();                }                if (shuttingDown) { return; }                LOG.info("Reinitializing local state");                initialize();            }        } catch (IOException iex) {            LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString());            return;        }    }    /**     * This class implements a queue that is put between producer and      * consumer threads. It will grow without bound.     * @author Owen O'Malley     */    static private class BlockingQueue {      private List queue;            /**       * Create an empty queue.       */      public BlockingQueue() {        queue = new ArrayList();      }             /**       * Put the given object at the back of the queue.       * @param obj       */      public void put(Object obj) {        synchronized (queue) {          queue.add(obj);          queue.notify();        }      }            /**       * Take the object at the front of the queue.       * It blocks until there is an object available.       * @return the head of the queue       */      public Object take() {        synchronized (queue) {          while (queue.isEmpty()) {            try {              queue.wait();            } catch (InterruptedException ie) {}          }          Object result = queue.get(0);          queue.remove(0);          return result;        }      }    }        ///////////////////////////////////////////////////////    // TaskInProgress maintains all the info for a Task that    // lives at this TaskTracker.  It maintains the Task object,    // its TaskStatus, and the TaskRunner.    ///////////////////////////////////////////////////////    class TaskInProgress {        Task task;        float progress;        TaskStatus.State runstate;        long lastProgressReport;        StringBuffer diagnosticInfo = new StringBuffer();        TaskRunner runner;        boolean done = false;        boolean wasKilled = false;        private JobConf defaultJobConf;        private JobConf localJobConf;        private boolean keepFailedTaskFiles;        private boolean alwaysKeepTaskFiles;        private TaskStatus taskStatus ;         private boolean keepJobFiles;        /**         */        public TaskInProgress(Task task, JobConf conf) {            this.task = task;            this.progress = 0.0f;            this.runstate = TaskStatus.State.UNASSIGNED;            this.lastProgressReport = System.currentTimeMillis();            this.defaultJobConf = conf;            localJobConf = null;            taskStatus = new TaskStatus(task.getTaskId(),                 task.isMapTask(),                progress, runstate,                 diagnosticInfo.toString(),                 "initializing",                   getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE);             keepJobFiles = false;        }                private void localizeTask(Task task) throws IOException{            Path localTaskDir =              new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR                    + JOBCACHE + Path.SEPARATOR                    + task.getJobId()), task.getTaskId());           FileSystem localFs = FileSystem.getNamed("local", fConf);           localFs.mkdirs(localTaskDir);           Path localTaskFile = new Path(localTaskDir, "job.xml");           task.setJobFile(localTaskFile.toString());           localJobConf.set("mapred.local.dir",                    fConf.get("mapred.local.dir"));                       localJobConf.set("mapred.task.id", task.getTaskId());           keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();           task.localizeConfiguration(localJobConf);           OutputStream out = localFs.create(localTaskFile);           try {             localJobConf.write(out);           } finally {             out.close();           }            task.setConf(localJobConf);            String keepPattern = localJobConf.getKeepTaskFilesPattern();            if (keepPattern != null) {                keepJobFiles = true;                alwaysKeepTaskFiles =                 Pattern.matches(keepPattern, task.getTaskId());            } else {              alwaysKeepTaskFiles = false;            }        }                /**         */        public Task getTask() {            return task;        }        public void setJobConf(JobConf lconf){            this.localJobConf = lconf;            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();        }                /**         */        public synchronized TaskStatus createStatus() {          taskStatus.setProgress(progress);          taskStatus.setRunState(runstate);          taskStatus.setDiagnosticInfo(diagnosticInfo.toString());                    if (diagnosticInfo.length() > 0) {              diagnosticInfo = new StringBuffer();          }          return taskStatus;        }        /**         * Kick off the task execution         */        public synchronized void launchTask() throws IOException {            localizeTask(task);            this.runstate = TaskStatus.State.RUNNING;            this.runner = task.createRunner(TaskTracker.this);            this.runner.start();            this.taskStatus.setStartTime(System.currentTimeMillis());        }        /**         * The task is reporting its progress         */        public synchronized void reportProgress(float p, String state, Phase newPhase) {            LOG.info(task.getTaskId()+" "+p+"% "+state);            this.progress = p;            this.runstate = TaskStatus.State.RUNNING;            this.lastProgressReport = System.currentTimeMillis();            Phase oldPhase = taskStatus.getPhase() ;            if( oldPhase != newPhase ){              // sort phase started              if( newPhase == Phase.SORT ){                this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());              }else if( newPhase == Phase.REDUCE){                this.taskStatus.setSortFinishTime(System.currentTimeMillis());              }            }            this.taskStatus.setStateString(state);        }        /**         */        public long getLastProgressReport() {            return lastProgressReport;        }        /**         */        public TaskStatus.State getRunState() {            return runstate;        }        /**         * The task has reported some diagnostic info about its status         */        public synchronized void reportDiagnosticInfo(String info) {            this.diagnosticInfo.append(info);        }        /**         * The task is reporting that it's done running         */        public synchronized void reportDone() {            LOG.info("Task " + task.getTaskId() + " is done.");            this.progress = 1.0f;            this.taskStatus.setFinishTime(System.currentTimeMillis());            this.done = true;        }        /**         * The task has actually finished running.         */        public void taskFinished() {            long start = System.currentTimeMillis();            //            // Wait until task reports as done.  If it hasn't reported in,            // wait for a second and try again.            //            while (! done && (System.currentTimeMillis() - start < WAIT_FOR_DONE)) {                try {                    Thread.sleep(1000);                } catch (InterruptedException ie) {                }            }            //            // Change state to success or failure, depending on whether            // task was 'done' before terminating            //            boolean needCleanup = false;            synchronized (this) {              if (done) {                  runstate = TaskStatus.State.SUCCEEDED;              } else {                  if (!wasKilled) {                    failures += 1;                    runstate = TaskStatus.State.FAILED;                  } else {                    runstate = TaskStatus.State.KILLED;                  }                  progress = 0.0f;              }              this.taskStatus.setFinishTime(System.currentTimeMillis());              needCleanup = (runstate == TaskStatus.State.FAILED) |                            (runstate == TaskStatus.State.KILLED);            }            //            // If the task has failed, or if the task was killAndCleanup()'ed,            // we should clean up right away.  We only wait to cleanup            // if the task succeeded, and its results might be useful            // later on to downstream job processing.            //            if (needCleanup) {                try {                    cleanup();                } catch (IOException ie) {                }            }        }        /**         * We no longer need anything from this task, as the job has         * finished.  If the task is still running, kill it (and clean up         */        public synchronized void jobHasFinished() throws IOException {        	             if (getRunState() == TaskStatus.State.RUNNING) {                killAndCleanup(false);            } else {                cleanup();            }            if (keepJobFiles)              return;            // delete the job diretory for this task             // since the job is done/failed            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +                     JOBCACHE + Path.SEPARATOR +  task.getJobId());        }        /**         * Something went wrong and the task must be killed.         * @param wasFailure was it a failure (versus a kill request)?         */        public synchronized void killAndCleanup(boolean wasFailure                                                ) throws IOException {            if (runstate == TaskStatus.State.RUNNING) {                wasKilled = true;                if (wasFailure) {                  failures += 1;                }                runner.kill();            } else if (runstate == TaskStatus.State.UNASSIGNED) {              if (wasFailure) {                failures += 1;                runstate = TaskStatus.State.FAILED;              } else {                runstate = TaskStatus.State.KILLED;              }            }        }        /**         * The map output has been lost.         */        public synchronized void mapOutputLost(String failure                                               ) throws IOException {            if (runstate == TaskStatus.State.SUCCEEDED) {              LOG.info("Reporting output lost:"+task.getTaskId());              runstate = TaskStatus.State.FAILED;    // change status to failure              progress = 0.0f;              reportDiagnosticInfo("Map output lost, rescheduling: " +                                    failure);              runningTasks.put(task.getTaskId(), this);              mapTotal++;            } else {              LOG.warn("Output already reported lost:"+task.getTaskId());            }        }        /**         * We no longer need anything from this task.  Either the          * controlling job is all done and the files have been copied         * away, or the task failed and we don't need the remains.         */        void cleanup() throws IOException {            String taskId = task.getTaskId();            LOG.debug("Cleaning up " + taskId);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -