⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tasktracker.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            synchronized (TaskTracker.this) {               tasks.remove(taskId);               if (alwaysKeepTaskFiles ||                   (runstate == TaskStatus.State.FAILED &&                        keepFailedTaskFiles)) {                 return;               }               synchronized (this) {                 try {                    runner.close();                 } catch (Throwable ie) {                 }               }            }            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +                     JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +                    taskId);            }    }        // ///////////////////////////////////////////////////////////////    // TaskUmbilicalProtocol    /////////////////////////////////////////////////////////////////    /**     * Called upon startup by the child process, to fetch Task data.     */    public synchronized Task getTask(String taskid) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {            return (Task) tip.getTask();        } else {            return null;        }    }    /**     * Called periodically to report Task progress, from 0.0 to 1.0.     */    public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportProgress(progress, state, phase);        } else {          LOG.warn("Progress from unknown child task: "+taskid+". Ignored.");        }    }    /**     * Called when the task dies before completion, and we want to report back     * diagnostic info     */    public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportDiagnosticInfo(info);        } else {          LOG.warn("Error from unknown child task: "+taskid+". Ignored.");        }    }    /** Child checking to see if we're alive.  Normally does nothing.*/    public synchronized boolean ping(String taskid) throws IOException {      return tasks.get(taskid) != null;    }    /**     * The task is done.     */    public synchronized void done(String taskid) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.reportDone();        } else {          LOG.warn("Unknown child task done: "+taskid+". Ignored.");        }    }    /** A child task had a local filesystem error.  Exit, so that no future     * jobs are accepted. */    public synchronized void fsError(String message) throws IOException {      LOG.fatal("FSError, exiting: "+ message);      running = false;    }    /////////////////////////////////////////////////////    //  Called by TaskTracker thread after task process ends    /////////////////////////////////////////////////////    /**     * The task is no longer running.  It may not have completed successfully     */    void reportTaskFinished(String taskid) {        TaskInProgress tip;        synchronized (this) {          tip = (TaskInProgress) tasks.get(taskid);        }        if (tip != null) {          tip.taskFinished();          synchronized(finishedCount) {              finishedCount[0]++;              finishedCount.notifyAll();          }        } else {          LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");        }    }    /**     * A completed map task's output has been lost.     */    public synchronized void mapOutputLost(String taskid,                                           String errorMsg) throws IOException {        TaskInProgress tip = (TaskInProgress) tasks.get(taskid);        if (tip != null) {          tip.mapOutputLost(errorMsg);        } else {          LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");        }    }        /**     *  The datastructure for initializing a job     */    static class RunningJob{      Path jobFile;      // keep this for later use      ArrayList tasks;      boolean localized;    }    /**      * The main() for child processes.      */    public static class Child {        public static void main(String[] args) throws Throwable {          //LogFactory.showTime(false);          LOG.info("Child starting");          JobConf defaultConf = new JobConf();          int port = Integer.parseInt(args[0]);          String taskid = args[1];          TaskUmbilicalProtocol umbilical =            (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,                                                TaskUmbilicalProtocol.versionID,                                                new InetSocketAddress(port),                                                 defaultConf);                      Task task = umbilical.getTask(taskid);          JobConf job = new JobConf(task.getJobFile());                    defaultConf.addFinalResource(new Path(task.getJobFile()));          startPinging(umbilical, taskid);        // start pinging parent          try {            // use job-specified working directory            FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());            task.run(job, umbilical);             // run the task          } catch (FSError e) {            LOG.fatal("FSError from child", e);            umbilical.fsError(e.getMessage());          } catch (Throwable throwable) {              LOG.warn("Error running child", throwable);              // Report back any failures, for diagnostic purposes              ByteArrayOutputStream baos = new ByteArrayOutputStream();              throwable.printStackTrace(new PrintStream(baos));              umbilical.reportDiagnosticInfo(taskid, baos.toString());          }        }        /** Periodically ping parent and exit when this fails.*/        private static void startPinging(final TaskUmbilicalProtocol umbilical,                                         final String taskid) {          Thread thread = new Thread(new Runnable() {              public void run() {                final int MAX_RETRIES = 3;                int remainingRetries = MAX_RETRIES;                while (true) {                  try {                    if (!umbilical.ping(taskid)) {                      LOG.warn("Parent died.  Exiting "+taskid);                      System.exit(66);                    }                    remainingRetries = MAX_RETRIES;                  } catch (Throwable t) {                    String msg = StringUtils.stringifyException(t);                    LOG.info("Ping exception: " + msg);                    remainingRetries -=1;                    if (remainingRetries == 0) {                      getCallStacks();                      LOG.warn("Last retry, killing "+taskid);                      System.exit(65);                    }                  }                  try {                    Thread.sleep(1000);                  } catch (InterruptedException e) {                  }                }              }            }, "Pinger for "+taskid);          thread.setDaemon(true);          thread.start();        }    }    /**     * Get the name for this task tracker.     * @return the string like "tracker_mymachine:50010"     */    String getName() {      return taskTrackerName;    }        /**     * Get the list of tasks that will be reported back to the      * job tracker in the next heartbeat cycle.     * @return a copy of the list of TaskStatus objects     */    synchronized List getRunningTaskStatuses() {      List result = new ArrayList(runningTasks.size());      Iterator itr = runningTasks.values().iterator();      while (itr.hasNext()) {        TaskInProgress tip = (TaskInProgress) itr.next();        result.add(tip.createStatus());      }      return result;    }        /**     * Get the default job conf for this tracker.     */    JobConf getJobConf() {      return fConf;    }        /**     * Check if the given local directories     * (and parent directories, if necessary) can be created.     * @param localDirs where the new TaskTracker should keep its local files.     * @throws DiskErrorException if all local directories are not writable     * @author hairong     */    private static void checkLocalDirs( String[] localDirs )             throws DiskErrorException {        boolean writable = false;                if( localDirs != null ) {            for (int i = 0; i < localDirs.length; i++) {                try {                    DiskChecker.checkDir( new File(localDirs[i]) );                    writable = true;                } catch( DiskErrorException e ) {                    LOG.warn("Task Tracker local " + e.getMessage() );                }            }        }        if( !writable )            throw new DiskErrorException(                     "all local directories are not writable" );    }        /**     * Is this task tracker idle?     * @return has this task tracker finished and cleaned up all of its tasks?     */    public synchronized boolean isIdle() {      return tasks.isEmpty();    }        /**     * Start the TaskTracker, point toward the indicated JobTracker     */    public static void main(String argv[]) throws Exception {        if (argv.length != 0) {            System.out.println("usage: TaskTracker");            System.exit(-1);        }        try {          JobConf conf=new JobConf();          new TaskTracker(conf).run();        } catch (IOException e) {            LOG.warn( "Can not start task tracker because "+                      StringUtils.stringifyException(e));            System.exit(-1);        }    }        /**     * This class is used in TaskTracker's Jetty to serve the map outputs     * to other nodes.     * @author Owen O'Malley     */    public static class MapOutputServlet extends HttpServlet {      public void doGet(HttpServletRequest request,                         HttpServletResponse response                       ) throws ServletException, IOException {        String mapId = request.getParameter("map");        String reduceId = request.getParameter("reduce");        if (mapId == null || reduceId == null) {          throw new IOException("map and reduce parameters are required");        }        ServletContext context = getServletContext();        int reduce = Integer.parseInt(reduceId);        byte[] buffer = new byte[64*1024];        OutputStream outStream = response.getOutputStream();        JobConf conf = (JobConf) context.getAttribute("conf");        FileSystem fileSys =           (FileSystem) context.getAttribute("local.file.system");        Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");        response.setContentLength((int) fileSys.getLength(filename));        InputStream inStream = null;        // true iff IOException was caused by attempt to access input        boolean isInputException = true;        try {          inStream = fileSys.open(filename);          try {            int len = inStream.read(buffer);            while (len > 0) {              try {                outStream.write(buffer, 0, len);              } catch (IOException ie) {                isInputException = false;                throw ie;              }              len = inStream.read(buffer);            }          } finally {            inStream.close();          }        } catch (IOException ie) {          TaskTracker tracker =             (TaskTracker) context.getAttribute("task.tracker");          Log log = (Log) context.getAttribute("log");          String errorMsg = ("getMapOutput(" + mapId + "," + reduceId +                              ") failed :\n"+                             StringUtils.stringifyException(ie));          log.warn(errorMsg);          if (isInputException) {            tracker.mapOutputLost(mapId, errorMsg);          }          response.sendError(HttpServletResponse.SC_GONE, errorMsg);          throw ie;        }         outStream.close();      }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -