⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipemapred.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
  void addJobConfToEnvironment(JobConf conf, Properties env) {    if (debug_) {      logprintln("addJobConfToEnvironment: begin");    }    Iterator it = conf.entries();    while (it.hasNext()) {      Map.Entry en = (Map.Entry) it.next();      String name = (String) en.getKey();      //String value = (String)en.getValue(); // does not apply variable expansion      String value = conf.get(name); // does variable expansion       name = safeEnvVarName(name);      envPut(env, name, value);    }    if (debug_) {      logprintln("addJobConfToEnvironment: end");    }  }  String safeEnvVarName(String var) {    StringBuffer safe = new StringBuffer();    int len = var.length();    for (int i = 0; i < len; i++) {      char c = var.charAt(i);      char s;      if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {        s = c;      } else {        s = '_';      }      safe.append(s);    }    return safe.toString();  }  void addEnvironment(Properties env, String nameVals) {    // encoding "a=b c=d" from StreamJob    if (nameVals == null) return;    String[] nv = nameVals.split(" ");    for (int i = 0; i < nv.length; i++) {      String[] pair = nv[i].split("=", 2);      if (pair.length != 2) {        logprintln("Skip ev entry:" + nv[i]);      } else {        envPut(env, pair[0], pair[1]);      }    }  }  void envPut(Properties env, String name, String value) {    if (debug_) {      logprintln("Add  ev entry:" + name + "=" + value);    }    env.put(name, value);  }  /** .. and if successful: delete the task log */  void appendLogToJobLog(String status) {    if (jobLog_ == null) {      return; // not using a common joblog    }    if (log_ != null) {      StreamUtil.exec("/bin/rm " + LOGNAME, log_);    }    // TODO socket-based aggregator (in JobTrackerInfoServer)  }  void startOutputThreads(OutputCollector output, Reporter reporter) {    outThread_ = new MROutputThread(output, reporter);    outThread_.start();    errThread_ = new MRErrorThread(reporter);    errThread_.start();  }  void waitOutputThreads() {    try {      sim.waitFor();      if (outThread_ != null) {        outThread_.join(joinDelay_);      }      if (errThread_ != null) {        errThread_.join(joinDelay_);      }    } catch (InterruptedException e) {      //ignore    }  }  /**   * Split a line into key and value. Assume the delimitor is a tab.   * @param line: a byte array of line containing UTF-8 bytes   * @param key: key of a record   * @param val: value of a record   * @throws IOException   */  void splitKeyVal(byte[] line, Text key, Text val) throws IOException {    int pos = -1;    if (keyCols_ != ALL_COLS) {      pos = UTF8ByteArrayUtils.findTab(line);    }    try {      if (pos == -1) {        key.set(line);        val.set("");      } else {        UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);      }    } catch (CharacterCodingException e) {      LOG.warn(StringUtils.stringifyException(e));    }  }  class MROutputThread extends Thread {    MROutputThread(OutputCollector output, Reporter reporter) {      setDaemon(true);      this.output = output;      this.reporter = reporter;    }    public void run() {      try {        Text key = new Text();        Text val = new Text();        // 3/4 Tool to Hadoop        while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {          // 4/4 Hadoop out          if (optSideEffect_) {            sideEffectOut_.write(answer);            sideEffectOut_.write('\n');          } else {            splitKeyVal(answer, key, val);            output.collect(key, val);          }          numRecWritten_++;          long now = System.currentTimeMillis();          if (now-lastStdoutReport > reporterOutDelay_) {            lastStdoutReport = now;            String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;            reporter.setStatus(hline);            logprintln(hline);            logflush();          }        }      } catch (IOException io) {        io.printStackTrace(log_);      }      logprintln("MROutputThread done");    }    OutputCollector output;    Reporter reporter;    byte[] answer;    long lastStdoutReport = 0;      }  class MRErrorThread extends Thread {    public MRErrorThread(Reporter reporter) {      this.reporter = reporter;      setDaemon(true);    }    public void run() {      byte[] line;      try {        long num = 0;        while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) {          num++;          String lineStr = new String(line, "UTF-8");          logprintln(lineStr);          long now = System.currentTimeMillis();           if (num < 20 || (now-lastStderrReport > reporterErrDelay_)) {            lastStderrReport = now;            String hline = "MRErr: " + lineStr;            System.err.println(hline);            reporter.setStatus(hline);          }        }      } catch (IOException io) {        logStackTrace(io);      }    }    long lastStderrReport = 0;    Reporter reporter;  }  public void mapRedFinished() {    logprintln("mapRedFinished");    try {      if (!doPipe_) return;      try {        if (clientOut_ != null) {          clientOut_.close();        }      } catch (IOException io) {      }      waitOutputThreads();      try {        if (optSideEffect_) {          logprintln("closing " + sideEffectURI_);          if (sideEffectOut_ != null) sideEffectOut_.close();          logprintln("closed  " + sideEffectURI_);          if (sideEffectURI_.getScheme().equals("file")) {            logprintln("size  " + new File(sideEffectURI_).length());          }          if (useSingleSideOutputURI_) {            // With sideEffectPath_ we wrote in-place.             // Possibly a named pipe set up by user or a socket.          } else {            boolean del = sideFs_.delete(sideEffectPathFinal_);            logprintln("deleted  (" + del + ") " + sideEffectPathFinal_);            sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_);            logprintln("renamed  " + sideEffectPathFinal_);          }        }      } catch (IOException io) {        io.printStackTrace();      }      if (sim != null) sim.destroy();    } catch (RuntimeException e) {      logStackTrace(e);      throw e;    }    if (debugFailLate_) {      throw new RuntimeException("debugFailLate_");    }  }  void maybeLogRecord() {    if (numRecRead_ >= nextRecReadLog_) {      String info = numRecInfo();      logprintln(info);      logflush();      System.err.println(info);      //nextRecReadLog_ *= 10;      nextRecReadLog_ += 100;    }  }  public String getContext() {    String s = numRecInfo() + "\n";    s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";    s += "LOGNAME=" + LOGNAME + "\n";    s += envline("HOST");    s += envline("USER");    s += envline("HADOOP_USER");    //s += envline("PWD"); // =/home/crawler/hadoop/trunk    s += "last Hadoop input: |" + mapredKey_ + "|\n";    if (outThread_ != null) {      s += "last tool output: |" + outThread_.answer + "|\n";    }    s += "Date: " + new Date() + "\n";    // s += envline("HADOOP_HOME");    // s += envline("REMOTE_HOST");    return s;  }  String envline(String var) {    return var + "=" + StreamUtil.env().get(var) + "\n";  }  String numRecInfo() {    long elapsed = (System.currentTimeMillis() - startTime_) / 1000;    long total = numRecRead_ + numRecWritten_ + numRecSkipped_;    return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:"        + safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed)        + " [rec/s]";  }  String safeDiv(long n, long d) {    return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d;  }  String logFailure(Exception e) {    StringWriter sw = new StringWriter();    PrintWriter pw = new PrintWriter(sw);    e.printStackTrace(pw);    String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";    logprintln(msg);    return msg;  }  /**   * Write a writable value to the output stream using UTF-8 encoding   * @param value output value   * @throws IOException   */  void write(Writable value) throws IOException {    byte[] bval;    int valSize;    if (value instanceof BytesWritable) {      BytesWritable val = (BytesWritable) value;      bval = val.get();      valSize = val.getSize();    } else if (value instanceof Text) {      Text val = (Text) value;      bval = val.getBytes();      valSize = val.getLength();    } else {      String sval = value.toString();      bval = sval.getBytes("UTF-8");      valSize = bval.length;    }    clientOut_.write(bval, 0, valSize);  }  long startTime_;  long numRecRead_ = 0;  long numRecWritten_ = 0;  long numRecSkipped_ = 0;  long nextRecReadLog_ = 1;    long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;  int keyCols_;  final static int ALL_COLS = Integer.MAX_VALUE;  long reporterOutDelay_ = 10*1000L;   long reporterErrDelay_ = 10*1000L;   long joinDelay_;  JobConf job_;  FileSystem fs_;  FileSystem sideFs_;  // generic MapRed parameters passed on by hadoopStreaming  int reportPortPlusOne_;  boolean doPipe_;  boolean debug_;  boolean debugFailEarly_;  boolean debugFailDuring_;  boolean debugFailLate_;  Process sim;  MROutputThread outThread_;  String jobLog_;  MRErrorThread errThread_;  DataOutputStream clientOut_;  DataInputStream clientErr_;  DataInputStream clientIn_;  // set in PipeMapper/PipeReducer subclasses  String mapredKey_;  int numExceptions_;  StreamUtil.TaskId taskId_;  boolean optUseKey_ = true;  private boolean optSideEffect_;  private URI sideEffectURI_;  private Path sideEffectPathFinal_;  private boolean useSingleSideOutputURI_;  private String sideOutputURI_;  private OutputStream sideEffectOut_;  String LOGNAME;  PrintStream log_;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -