📄 pipemapred.java
字号:
void addJobConfToEnvironment(JobConf conf, Properties env) { if (debug_) { logprintln("addJobConfToEnvironment: begin"); } Iterator it = conf.entries(); while (it.hasNext()) { Map.Entry en = (Map.Entry) it.next(); String name = (String) en.getKey(); //String value = (String)en.getValue(); // does not apply variable expansion String value = conf.get(name); // does variable expansion name = safeEnvVarName(name); envPut(env, name, value); } if (debug_) { logprintln("addJobConfToEnvironment: end"); } } String safeEnvVarName(String var) { StringBuffer safe = new StringBuffer(); int len = var.length(); for (int i = 0; i < len; i++) { char c = var.charAt(i); char s; if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) { s = c; } else { s = '_'; } safe.append(s); } return safe.toString(); } void addEnvironment(Properties env, String nameVals) { // encoding "a=b c=d" from StreamJob if (nameVals == null) return; String[] nv = nameVals.split(" "); for (int i = 0; i < nv.length; i++) { String[] pair = nv[i].split("=", 2); if (pair.length != 2) { logprintln("Skip ev entry:" + nv[i]); } else { envPut(env, pair[0], pair[1]); } } } void envPut(Properties env, String name, String value) { if (debug_) { logprintln("Add ev entry:" + name + "=" + value); } env.put(name, value); } /** .. and if successful: delete the task log */ void appendLogToJobLog(String status) { if (jobLog_ == null) { return; // not using a common joblog } if (log_ != null) { StreamUtil.exec("/bin/rm " + LOGNAME, log_); } // TODO socket-based aggregator (in JobTrackerInfoServer) } void startOutputThreads(OutputCollector output, Reporter reporter) { outThread_ = new MROutputThread(output, reporter); outThread_.start(); errThread_ = new MRErrorThread(reporter); errThread_.start(); } void waitOutputThreads() { try { sim.waitFor(); if (outThread_ != null) { outThread_.join(joinDelay_); } if (errThread_ != null) { errThread_.join(joinDelay_); } } catch (InterruptedException e) { //ignore } } /** * Split a line into key and value. Assume the delimitor is a tab. * @param line: a byte array of line containing UTF-8 bytes * @param key: key of a record * @param val: value of a record * @throws IOException */ void splitKeyVal(byte[] line, Text key, Text val) throws IOException { int pos = -1; if (keyCols_ != ALL_COLS) { pos = UTF8ByteArrayUtils.findTab(line); } try { if (pos == -1) { key.set(line); val.set(""); } else { UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos); } } catch (CharacterCodingException e) { LOG.warn(StringUtils.stringifyException(e)); } } class MROutputThread extends Thread { MROutputThread(OutputCollector output, Reporter reporter) { setDaemon(true); this.output = output; this.reporter = reporter; } public void run() { try { Text key = new Text(); Text val = new Text(); // 3/4 Tool to Hadoop while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) { // 4/4 Hadoop out if (optSideEffect_) { sideEffectOut_.write(answer); sideEffectOut_.write('\n'); } else { splitKeyVal(answer, key, val); output.collect(key, val); } numRecWritten_++; long now = System.currentTimeMillis(); if (now-lastStdoutReport > reporterOutDelay_) { lastStdoutReport = now; String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_; reporter.setStatus(hline); logprintln(hline); logflush(); } } } catch (IOException io) { io.printStackTrace(log_); } logprintln("MROutputThread done"); } OutputCollector output; Reporter reporter; byte[] answer; long lastStdoutReport = 0; } class MRErrorThread extends Thread { public MRErrorThread(Reporter reporter) { this.reporter = reporter; setDaemon(true); } public void run() { byte[] line; try { long num = 0; while ((line = UTF8ByteArrayUtils.readLine((InputStream) clientErr_)) != null) { num++; String lineStr = new String(line, "UTF-8"); logprintln(lineStr); long now = System.currentTimeMillis(); if (num < 20 || (now-lastStderrReport > reporterErrDelay_)) { lastStderrReport = now; String hline = "MRErr: " + lineStr; System.err.println(hline); reporter.setStatus(hline); } } } catch (IOException io) { logStackTrace(io); } } long lastStderrReport = 0; Reporter reporter; } public void mapRedFinished() { logprintln("mapRedFinished"); try { if (!doPipe_) return; try { if (clientOut_ != null) { clientOut_.close(); } } catch (IOException io) { } waitOutputThreads(); try { if (optSideEffect_) { logprintln("closing " + sideEffectURI_); if (sideEffectOut_ != null) sideEffectOut_.close(); logprintln("closed " + sideEffectURI_); if (sideEffectURI_.getScheme().equals("file")) { logprintln("size " + new File(sideEffectURI_).length()); } if (useSingleSideOutputURI_) { // With sideEffectPath_ we wrote in-place. // Possibly a named pipe set up by user or a socket. } else { boolean del = sideFs_.delete(sideEffectPathFinal_); logprintln("deleted (" + del + ") " + sideEffectPathFinal_); sideFs_.rename(new Path(sideEffectURI_.getSchemeSpecificPart()), sideEffectPathFinal_); logprintln("renamed " + sideEffectPathFinal_); } } } catch (IOException io) { io.printStackTrace(); } if (sim != null) sim.destroy(); } catch (RuntimeException e) { logStackTrace(e); throw e; } if (debugFailLate_) { throw new RuntimeException("debugFailLate_"); } } void maybeLogRecord() { if (numRecRead_ >= nextRecReadLog_) { String info = numRecInfo(); logprintln(info); logflush(); System.err.println(info); //nextRecReadLog_ *= 10; nextRecReadLog_ += 100; } } public String getContext() { String s = numRecInfo() + "\n"; s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " "; s += "LOGNAME=" + LOGNAME + "\n"; s += envline("HOST"); s += envline("USER"); s += envline("HADOOP_USER"); //s += envline("PWD"); // =/home/crawler/hadoop/trunk s += "last Hadoop input: |" + mapredKey_ + "|\n"; if (outThread_ != null) { s += "last tool output: |" + outThread_.answer + "|\n"; } s += "Date: " + new Date() + "\n"; // s += envline("HADOOP_HOME"); // s += envline("REMOTE_HOST"); return s; } String envline(String var) { return var + "=" + StreamUtil.env().get(var) + "\n"; } String numRecInfo() { long elapsed = (System.currentTimeMillis() - startTime_) / 1000; long total = numRecRead_ + numRecWritten_ + numRecSkipped_; return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:" + safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]"; } String safeDiv(long n, long d) { return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d; } String logFailure(Exception e) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); e.printStackTrace(pw); String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n"; logprintln(msg); return msg; } /** * Write a writable value to the output stream using UTF-8 encoding * @param value output value * @throws IOException */ void write(Writable value) throws IOException { byte[] bval; int valSize; if (value instanceof BytesWritable) { BytesWritable val = (BytesWritable) value; bval = val.get(); valSize = val.getSize(); } else if (value instanceof Text) { Text val = (Text) value; bval = val.getBytes(); valSize = val.getLength(); } else { String sval = value.toString(); bval = sval.getBytes("UTF-8"); valSize = bval.length; } clientOut_.write(bval, 0, valSize); } long startTime_; long numRecRead_ = 0; long numRecWritten_ = 0; long numRecSkipped_ = 0; long nextRecReadLog_ = 1; long minRecWrittenToEnableSkip_ = Long.MAX_VALUE; int keyCols_; final static int ALL_COLS = Integer.MAX_VALUE; long reporterOutDelay_ = 10*1000L; long reporterErrDelay_ = 10*1000L; long joinDelay_; JobConf job_; FileSystem fs_; FileSystem sideFs_; // generic MapRed parameters passed on by hadoopStreaming int reportPortPlusOne_; boolean doPipe_; boolean debug_; boolean debugFailEarly_; boolean debugFailDuring_; boolean debugFailLate_; Process sim; MROutputThread outThread_; String jobLog_; MRErrorThread errThread_; DataOutputStream clientOut_; DataInputStream clientErr_; DataInputStream clientIn_; // set in PipeMapper/PipeReducer subclasses String mapredKey_; int numExceptions_; StreamUtil.TaskId taskId_; boolean optUseKey_ = true; private boolean optSideEffect_; private URI sideEffectURI_; private Path sideEffectPathFinal_; private boolean useSingleSideOutputURI_; private String sideOutputURI_; private OutputStream sideEffectOut_; String LOGNAME; PrintStream log_;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -