⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 taskrunner.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred;import org.apache.commons.logging.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.util.*;import org.apache.hadoop.filecache.*;import java.io.*;import java.util.jar.*;import java.util.Vector;import java.util.Enumeration;import java.net.URI;/** Base class that runs a task in a separate process.  Tasks are run in a * separate process in order to isolate the map/reduce system code from bugs in * user supplied map and reduce functions. */abstract class TaskRunner extends Thread {  public static final Log LOG =    LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");  boolean killed = false;  private Process process;  private Task t;  private TaskTracker tracker;  protected JobConf conf;  public TaskRunner(Task t, TaskTracker tracker, JobConf conf) {    this.t = t;    this.tracker = tracker;    this.conf = conf;  }  public Task getTask() { return t; }  public TaskTracker getTracker() { return tracker; }  /** Called to assemble this task's input.  This method is run in the parent   * process before the child is spawned.  It should not execute user code,   * only system code. */  public boolean prepare() throws IOException {return true;}  /** Called when this task's output is no longer needed.  * This method is run in the parent process after the child exits.  It should  * not execute user code, only system code.  */  public void close() throws IOException {}  private String stringifyPathArray(Path[] p){	  if (p == null){      return null;    }    String str = p[0].toString();    for (int i = 1; i < p.length; i++){      str = str + "," + p[i].toString();    }    return str;  }    public final void run() {    try {            //before preparing the job localize       //all the archives            URI[] archives = DistributedCache.getCacheArchives(conf);      URI[] files = DistributedCache.getCacheFiles(conf);      if ((archives != null) || (files != null)) {        if (archives != null) {          String[] md5 = DistributedCache.getArchiveMd5(conf);          Path[] p = new Path[archives.length];          for (int i = 0; i < archives.length;i++){            p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);          }          DistributedCache.setLocalArchives(conf, stringifyPathArray(p));        }        if ((files != null)) {          String[] md5 = DistributedCache.getFileMd5(conf);          Path[] p = new Path[files.length];          for (int i = 0; i < files.length;i++){           p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker              .getCacheSubdir()), false, md5[i]);          }          DistributedCache.setLocalFiles(conf, stringifyPathArray(p));        }                // sets the paths to local archives and paths        Path localTaskFile = new Path(t.getJobFile());        FileSystem localFs = FileSystem.getNamed("local", conf);        localFs.delete(localTaskFile);        OutputStream out = localFs.create(localTaskFile);        try {          conf.write(out);        } finally {          out.close();        }      }            if (! prepare()) {        return;      }      String sep = System.getProperty("path.separator");      StringBuffer classPath = new StringBuffer();      // start with same classpath as parent process      classPath.append(System.getProperty("java.class.path"));      classPath.append(sep);      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");      workDir.mkdirs();	        String jar = conf.getJar();      if (jar != null) {           	  // if jar exists, it into workDir        File[] libs = new File(workDir, "lib").listFiles();        if (libs != null) {          for (int i = 0; i < libs.length; i++) {            classPath.append(sep);            // add libs from jar to classpath            classPath.append(libs[i]);          }        }        classPath.append(sep);        classPath.append(new File(workDir, "classes"));        classPath.append(sep);        classPath.append(workDir);      }      //  Build exec child jmv args.      Vector vargs = new Vector(8);      File jvm =                                  // use same jvm as parent        new File(new File(System.getProperty("java.home"), "bin"), "java");      vargs.add(jvm.toString());      // Add child java ops.  Also, mapred.child.heap.size has been superceded      // by // mapred.child.java.opts.  Manage case where both are present      // letting the mapred.child.heap.size win over any setting of heap size in      // mapred.child.java.opts (Emit a warning that heap.size is deprecated).      //      // The following symbols if present in mapred.child.java.opts value are      // replaced:      // + @taskid@ is interpolated with value of TaskID.      // + Replaces @port@ with mapred.task.tracker.report.port + 1.      // Other occurrences of @ will not be altered.      //      // Example with multiple arguments and substitutions, showing      // jvm GC logging, and start of a passwordless JVM JMX agent so can      // connect with jconsole and the likes to watch child memory, threads      // and get thread dumps.      //      //     <name>mapred.child.optional.jvm.args</name>      //     <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \      //     -Dcom.sun.management.jmxremote.authenticate=false \      //     -Dcom.sun.management.jmxremote.ssl=false \      //     -Dcom.sun.management.jmxremote.port=@port@      //     </value>      //      String javaOpts = handleDeprecatedHeapSize(          conf.get("mapred.child.java.opts", "-Xmx200m"),          conf.get("mapred.child.heap.size"));      javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());      int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;      javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));      String [] javaOptsSplit = javaOpts.split(" ");      for (int i = 0; i < javaOptsSplit.length; i++) {         vargs.add(javaOptsSplit[i]);      }      // Add classpath.      vargs.add("-classpath");      vargs.add(classPath.toString());      // Add main class and its arguments       vargs.add(TaskTracker.Child.class.getName());  // main of Child      vargs.add(tracker.taskReportPort + "");        // pass umbilical port      vargs.add(t.getTaskId());                      // pass task identifier      // Run java      runChild((String[])vargs.toArray(new String[0]), workDir);    } catch (FSError e) {      LOG.fatal("FSError", e);      try {        tracker.fsError(e.getMessage());      } catch (IOException ie) {        LOG.fatal(t.getTaskId()+" reporting FSError", ie);      }    } catch (Throwable throwable) {      LOG.warn(t.getTaskId()+" Child Error", throwable);      ByteArrayOutputStream baos = new ByteArrayOutputStream();      throwable.printStackTrace(new PrintStream(baos));      try {        tracker.reportDiagnosticInfo(t.getTaskId(), baos.toString());      } catch (IOException e) {        LOG.warn(t.getTaskId()+" Reporting Diagnostics", e);      }    } finally {      try{        URI[] archives = DistributedCache.getCacheArchives(conf);        URI[] files = DistributedCache.getCacheFiles(conf);        if (archives != null){          for (int i = 0; i < archives.length; i++){            DistributedCache.releaseCache(archives[i], conf);          }        }        if (files != null){          for(int i = 0; i < files.length; i++){            DistributedCache.releaseCache(files[i], conf);          }        }      }catch(IOException ie){        LOG.warn("Error releasing caches : Cache files might not have been cleaned up");      }      tracker.reportTaskFinished(t.getTaskId());    }  }    /**   * Handle deprecated mapred.child.heap.size.   * If present, interpolate into mapred.child.java.opts value with   * warning.   * @param javaOpts Value of mapred.child.java.opts property.   * @param heapSize Value of mapred.child.heap.size property.   * @return A <code>javaOpts</code> with <code>heapSize</code>   * interpolated if present.   */  private String handleDeprecatedHeapSize(String javaOpts,          final String heapSize) {    if (heapSize == null || heapSize.length() <= 0) {        return javaOpts;    }    final String MX = "-Xmx";    int index = javaOpts.indexOf(MX);    if (index < 0) {        javaOpts = javaOpts + " " + MX + heapSize;    } else {        int end = javaOpts.indexOf(" ", index + MX.length());        javaOpts = javaOpts.substring(0, index + MX.length()) +            heapSize + ((end < 0)? "": javaOpts.substring(end));    }    LOG.warn("mapred.child.heap.size is deprecated. Use " +        "mapred.child.java.opt instead. Meantime, mapred.child.heap.size " +        "is interpolated into mapred.child.java.opt: " + javaOpts);    return javaOpts;  }  /**   * Replace <code>toFind</code> with <code>replacement</code>.   * When hadoop moves to JDK1.5, replace this method with   * String#replace (Of is commons-lang available, replace with   * StringUtils#replace).    * @param text String to do replacements in.   * @param toFind String to find.   * @param replacement String to replace <code>toFind</code> with.   * @return A String with all instances of <code>toFind</code>   * replaced by <code>replacement</code> (The original   * <code>text</code> is returned if <code>toFind</code> is not   * found in <code>text<code>).   */  private static String replaceAll(String text, final String toFind,      final String replacement) {    if (text ==  null || toFind ==  null || replacement ==  null) {      throw new IllegalArgumentException("Text " + text + " or toFind " +        toFind + " or replacement " + replacement + " are null.");    }    int offset = 0;    for (int index = text.indexOf(toFind); index >= 0;          index = text.indexOf(toFind, offset)) {      offset = index + toFind.length();      text = text.substring(0, index) + replacement +          text.substring(offset);            }    return text;  }  /**   * Run the child process   */  private void runChild(String[] args, File dir) throws IOException {    this.process = Runtime.getRuntime().exec(args, null, dir);    try {      new Thread() {        public void run() {          logStream(process.getErrorStream());    // copy log output        }      }.start();              logStream(process.getInputStream());        // normally empty            int exit_code = process.waitFor();           if (!killed && exit_code != 0) {        throw new IOException("Task process exit with nonzero status of " +                              exit_code + ".");      }          } catch (InterruptedException e) {      throw new IOException(e.toString());    } finally {      kill();    }  }  /**   * Kill the child process   */  public void kill() {      if (process != null) {          process.destroy();      }      killed = true;  }  /**   */  private void logStream(InputStream output) {    try {      BufferedReader in = new BufferedReader(new InputStreamReader(output));      String line;      while ((line = in.readLine()) != null) {        LOG.info(t.getTaskId()+" "+line);      }    } catch (IOException e) {      LOG.warn(t.getTaskId()+" Error reading child output", e);    } finally {      try {        output.close();      } catch (IOException e) {        LOG.warn(t.getTaskId()+" Error closing child output", e);      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -