⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tasktracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    /**     * Start with the local machine name, and the default JobTracker     */    public TaskTracker(JobConf conf) throws IOException {      maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);      this.fConf = conf;      this.jobTrackAddr = JobTracker.getAddress(conf);      this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);      this.mapOutputFile = new MapOutputFile();      this.mapOutputFile.setConf(conf);      int httpPort = conf.getInt("tasktracker.http.port", 50060);      String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0");      this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true);      int workerThreads = conf.getInt("tasktracker.http.threads", 40);      server.setThreads(1, workerThreads);      // let the jsp pages get to the task tracker, config, and other relevant      // objects      FileSystem local = FileSystem.getNamed("local", conf);      server.setAttribute("task.tracker", this);      server.setAttribute("local.file.system", local);      server.setAttribute("conf", conf);      server.setAttribute("log", LOG);      server.addServlet("mapOutput", "mapOutput", MapOutputServlet.class);      server.start();      this.httpPort = server.getPort();      initialize();    }    /**     * The connection to the JobTracker, used by the TaskRunner      * for locating remote files.     */    public InterTrackerProtocol getJobClient() {      return jobClient;    }    /**     * Are we running under killall-less operating system.     */    private static boolean isWindows =       System.getProperty("os.name").startsWith("Windows");        /**     * Get the call stacks for all java processes on this system.     * Obviously, this is only useful for debugging.     */    private static void getCallStacks() {      if (LOG.isDebugEnabled() && !isWindows) {         try {          Process proc =             Runtime.getRuntime().exec("killall -QUIT java");          proc.waitFor();        } catch (IOException ie) {          LOG.warn(StringUtils.stringifyException(ie));        } catch (InterruptedException ie) {}      }    }    /**Return the DFS filesystem     * @return     */    public FileSystem getFileSystem(){      return fs;    }        /**     * Main service loop.  Will stay in this loop forever.     */    int offerService() throws Exception {        long lastHeartbeat = 0;        this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);        while (running && !shuttingDown) {            long now = System.currentTimeMillis();            long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);            if (waitTime > 0) {                try {                    // sleeps for the wait time, wakes up if a task is finished.		    synchronized(finishedCount) {                        if (finishedCount[0] == 0) {			    finishedCount.wait(waitTime);                        }                        finishedCount[0] = 0;                    }                } catch (InterruptedException ie) {               }	    }            lastHeartbeat = now;            //            // Emit standard hearbeat message to check in with JobTracker            //            Vector taskReports = new Vector();            synchronized (this) {                for (Iterator it = runningTasks.values().iterator();                      it.hasNext(); ) {                    TaskInProgress tip = (TaskInProgress) it.next();                    TaskStatus status = tip.createStatus();                    taskReports.add(status);                }            }            //            // Xmit the heartbeat            //                        TaskTrackerStatus status =               new TaskTrackerStatus(taskTrackerName, localHostname,                                     httpPort, taskReports,                                     failures);             int resultCode = jobClient.emitHeartbeat(status, justStarted);            synchronized (this) {              for (Iterator it = taskReports.iterator();                   it.hasNext(); ) {                  TaskStatus taskStatus = (TaskStatus) it.next();                  if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {                      if (taskStatus.getIsMap()) {                          mapTotal--;                      } else {                          reduceTotal--;                      }                      myMetrics.completeTask();                      runningTasks.remove(taskStatus.getTaskId());                  }              }            }            justStarted = false;                          if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {                return STALE_STATE;            }            //            // Check if we should createRecord a new Task            //            try {              if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {                  checkLocalDirs(fConf.getLocalDirs());                                    if (enoughFreeSpace(minSpaceStart)) {                    Task t = jobClient.pollForNewTask(taskTrackerName);                    if (t != null) {                      startNewTask(t);                    }                  }              }            } catch (DiskErrorException de ) {                LOG.warn("Exiting task tracker because "+de.getMessage());                jobClient.reportTaskTrackerError(taskTrackerName,                         "DiskErrorException", de.getMessage());                return STALE_STATE;            } catch (IOException ie) {              LOG.info("Problem launching task: " +                        StringUtils.stringifyException(ie));            }            //            // Kill any tasks that have not reported progress in the last X seconds.            //            synchronized (this) {                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {                    TaskInProgress tip = (TaskInProgress) it.next();                    long timeSinceLastReport = System.currentTimeMillis() -                                                tip.getLastProgressReport();                    if ((tip.getRunState() == TaskStatus.State.RUNNING) &&                        (timeSinceLastReport > this.taskTimeout) &&                        !tip.wasKilled) {                        String msg = "Task failed to report status for " +                                     (timeSinceLastReport / 1000) +                                      " seconds. Killing.";                        LOG.info(tip.getTask().getTaskId() + ": " + msg);                        getCallStacks();                        tip.reportDiagnosticInfo(msg);                        try {                          tip.killAndCleanup(true);                        } catch (IOException ie) {                          LOG.info("Problem cleaning task up: " +                                   StringUtils.stringifyException(ie));                        }                    }                }            }            //            // Check for any Tasks that should be killed, even if            // the containing Job is still ongoing.  (This happens            // with speculative execution, when one version of the            // task finished before another            //            //            // Check for any Tasks whose job may have ended            //            try {            String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);            if (toCloseIds != null) {              synchronized (this) {                for (int i = 0; i < toCloseIds.length; i++) {                  Object tip = tasks.get(toCloseIds[i]);                  synchronized(runningJobs){                    runningJobs.remove(((TaskInProgress)                	 	  tasks.get(toCloseIds[i])).getTask().getJobId());                  }                  if (tip != null) {                    tasksToCleanup.put(tip);                  } else {                    LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);                  }                }              }            }            } catch (IOException ie) {              LOG.info("Problem getting closed tasks: " +                       StringUtils.stringifyException(ie));            }                        //Check if we're dangerously low on disk space            // If so, kill jobs to free up space and make sure            // we don't accept any new tasks            // Try killing the reduce jobs first, since I believe they            // use up most space            // Then pick the one with least progress                        if (!enoughFreeSpace(minSpaceKill)) {              acceptNewTasks=false;               //we give up! do not accept new tasks until              //all the ones running have finished and they're all cleared up              synchronized (this) {                TaskInProgress killMe = null;                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {                  TaskInProgress tip = (TaskInProgress) it.next();                  if ((tip.getRunState() == TaskStatus.State.RUNNING) &&                      !tip.wasKilled) {                        	                    if (killMe == null) {                      killMe = tip;                    } else if (!tip.getTask().isMapTask()) {                      //reduce task, give priority                      if (killMe.getTask().isMapTask() ||                           (tip.getTask().getProgress().get() <                            killMe.getTask().getProgress().get())) {                        killMe = tip;                      }                    } else if (killMe.getTask().isMapTask() &&                               tip.getTask().getProgress().get() <                                killMe.getTask().getProgress().get()) {                      //map task, only add if the progress is lower                      killMe = tip;                    }                  }                }                if (killMe!=null) {                  String msg = "Tasktracker running out of space. Killing task.";                  LOG.info(killMe.getTask().getTaskId() + ": " + msg);                  killMe.reportDiagnosticInfo(msg);                  try {                    killMe.killAndCleanup(true);                  } catch (IOException ie) {                    LOG.info("Problem cleaning task up: " +                             StringUtils.stringifyException(ie));                  }                }              }            }            //we've cleaned up, resume normal operation            if (!acceptNewTasks && tasks.isEmpty()) {                acceptNewTasks=true;            }        }        return 0;    }    /**     * Check if all of the local directories have enough     * free space     *      * If not, do not try to get a new task assigned      * @return     * @throws IOException      */    private boolean enoughFreeSpace(long minSpace) throws IOException {      String[] localDirs = fConf.getLocalDirs();      for (int i = 0; i < localDirs.length; i++) {        DF df = null;        if (localDirsDf.containsKey(localDirs[i])) {          df = (DF) localDirsDf.get(localDirs[i]);        } else {          df = new DF(new File(localDirs[i]), fConf);          localDirsDf.put(localDirs[i], df);        }        if (df.getAvailable() < minSpace)          return false;      }      return true;    }        /**     * Start a new task.     * All exceptions are handled locally, so that we don't mess up the     * task tracker.     */    private void startNewTask(Task t) {      TaskInProgress tip = new TaskInProgress(t, this.fConf);      synchronized (this) {        tasks.put(t.getTaskId(), tip);        runningTasks.put(t.getTaskId(), tip);        boolean isMap = t.isMapTask();        if (isMap) {          mapTotal++;        } else {          reduceTotal++;        }      }      try {    	  localizeJob(tip);      } catch (IOException ie) {        String msg = ("Error initializing " + tip.getTask().getTaskId() +                       ":\n" + StringUtils.stringifyException(ie));        LOG.warn(msg);        tip.reportDiagnosticInfo(msg);        try {          tip.killAndCleanup(true);        } catch (IOException ie2) {          LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +                   StringUtils.stringifyException(ie2));                  }      }    }        /**     * The server retry loop.       * This while-loop attempts to connect to the JobTracker.  It only      * loops when the old TaskTracker has gone bad (its state is     * stale somehow) and we need to reinitialize everything.     */    public void run() {        try {            while (running && !shuttingDown) {                boolean staleState = false;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -