📄 pipemapred.java
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.*;import java.net.Socket;import java.net.URI;import java.nio.channels.*;import java.nio.charset.CharacterCodingException;import java.io.IOException;import java.util.Date;import java.util.Map;import java.util.Iterator;import java.util.Arrays;import java.util.ArrayList;import java.util.Properties;import java.util.regex.*;import org.apache.commons.logging.*;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.util.StringUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.fs.LocalFileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FSDataOutputStream;/** Shared functionality for PipeMapper, PipeReducer. * @author Michel Tourn */public abstract class PipeMapRed { protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName()); /** The command to be spawned as a subprocess. * Mapper/Reducer operations will delegate to it */ abstract String getPipeCommand(JobConf job); /* */ abstract String getKeyColPropName(); /** Write output as side-effect files rather than as map outputs. This is useful to do "Map" tasks rather than "MapReduce" tasks. */ boolean getUseSideEffect() { return false; } abstract boolean getDoPipe(); /** * @returns how many TABS before the end of the key part * usually: 1 or "ALL" * used for tool output of both Map and Reduce * configured via tool's argv: splitKeyVal=ALL or 1.. * although it is interpreted here, not by tool */ int getKeyColsFromPipeCommand(String cmd) { String key = getKeyColPropName(); Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*"); Matcher match = kcPat.matcher(cmd); String kc; if (!match.matches()) { kc = null; } else { kc = match.group(1); } int cols; if (kc == null) { // default value is 1 and the Stream applications could instead // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL cols = 1; } else if (kc.equals("ALL")) { cols = ALL_COLS; } else { try { cols = Integer.parseInt(kc); } catch (NumberFormatException nf) { cols = Integer.MAX_VALUE; } } System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd); return cols; } final static int OUTSIDE = 1; final static int SINGLEQ = 2; final static int DOUBLEQ = 3; static String[] splitArgs(String args) { ArrayList argList = new ArrayList(); char[] ch = args.toCharArray(); int clen = ch.length; int state = OUTSIDE; int argstart = 0; for (int c = 0; c <= clen; c++) { boolean last = (c == clen); int lastState = state; boolean endToken = false; if (!last) { if (ch[c] == '\'') { if (state == OUTSIDE) { state = SINGLEQ; } else if (state == SINGLEQ) { state = OUTSIDE; } endToken = (state != lastState); } else if (ch[c] == '"') { if (state == OUTSIDE) { state = DOUBLEQ; } else if (state == DOUBLEQ) { state = OUTSIDE; } endToken = (state != lastState); } else if (ch[c] == ' ') { if (state == OUTSIDE) { endToken = true; } } } if (last || endToken) { if (c == argstart) { // unquoted space } else { String a; a = args.substring(argstart, c); argList.add(a); } argstart = c + 1; lastState = state; } } return (String[]) argList.toArray(new String[0]); } OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException { final String SOCKET = "socket"; if (uri.getScheme().equals(SOCKET)) { if (!allowSocket) { throw new IOException(SOCKET + " not allowed on outputstream " + uri); } final Socket sock = new Socket(uri.getHost(), uri.getPort()); OutputStream out = new FilterOutputStream(sock.getOutputStream()) { public void close() throws IOException { sock.close(); super.close(); } }; return out; } else { // a FSDataOutputStreamm, localFS or HDFS. // localFS file may be set up as a FIFO. return sideFs_.create(new Path(uri.getSchemeSpecificPart())); } } String getSideEffectFileName() { FileSplit split = StreamUtil.getCurrentSplit(job_); String leaf = split.getPath().getName(); if (split.getStart() == 0) { return leaf; } else { return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString(); } } String makeUniqueFileSuffix() { return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id"); } public void configure(JobConf job) { try { String argv = getPipeCommand(job); keyCols_ = getKeyColsFromPipeCommand(argv); debug_ = (job.get("stream.debug") != null); if (debug_) { System.out.println("PipeMapRed: stream.debug=true"); } joinDelay_ = job.getLong("stream.joindelay.milli", 0); job_ = job; fs_ = FileSystem.get(job_); if (job_.getBoolean("stream.sideoutput.localfs", false)) { //sideFs_ = new LocalFileSystem(job_); sideFs_ = FileSystem.getNamed("local", job_); } else { sideFs_ = fs_; } if (debug_) { System.out.println("kind :" + this.getClass()); System.out.println("split :" + StreamUtil.getCurrentSplit(job_)); System.out.println("fs :" + fs_.toString()); System.out.println("sideFs :" + sideFs_.toString()); } doPipe_ = getDoPipe(); if (!doPipe_) return; setStreamJobDetails(job); setStreamProperties(); if (debugFailEarly_) { throw new RuntimeException("debugFailEarly_"); } String[] argvSplit = splitArgs(argv); String prog = argvSplit[0]; String userdir = System.getProperty("user.dir"); if (new File(prog).isAbsolute()) { // we don't own it. Hope it is executable } else { new MustangFile(prog).setExecutable(true, true); } if (job_.getInputValueClass().equals(BytesWritable.class)) { // TODO expose as separate config: // job or semistandard inputformat property optUseKey_ = false; } optSideEffect_ = getUseSideEffect(); if (optSideEffect_) { // during work: use a completely unique filename to avoid HDFS namespace conflicts // after work: rename to a filename that depends only on the workload (the FileSplit) // it's a friendly name and in case of reexecution it will clobber. // reexecution can be due to: other job, failed task and speculative task // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: // client has renamed outputPath and saved the argv's original output path as: if (useSingleSideOutputURI_) { sideEffectURI_ = new URI(sideOutputURI_); sideEffectPathFinal_ = null; // in-place, no renaming to final } else { String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale sideEffectPathFinal_ = new Path(sideOutputPath, fileName); sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit dfs: } // apply default scheme if(sideEffectURI_.getScheme() == null) { sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null); } boolean allowSocket = useSingleSideOutputURI_; sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket); } // argvSplit[0]: // An absolute path should be a preexisting valid path on all TaskTrackers // A relative path should match in the unjarred Job data // In this case, force an absolute path to make sure exec finds it. argvSplit[0] = new File(argvSplit[0]).getAbsolutePath(); logprintln("PipeMapRed exec " + Arrays.asList(argvSplit)); logprintln("sideEffectURI_=" + sideEffectURI_); Environment childEnv = (Environment) StreamUtil.env().clone(); addJobConfToEnvironment(job_, childEnv); addEnvironment(childEnv, job_.get("stream.addenvironment")); sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray()); /* // This way required jdk1.5 Builder processBuilder = new ProcessBuilder(argvSplit); Map<String, String> env = processBuilder.environment(); addEnvironment(env, job_.get("stream.addenvironment")); sim = processBuilder.start(); */ clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream())); clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream())); clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream())); startTime_ = System.currentTimeMillis(); } catch (Exception e) { logStackTrace(e); } } void setStreamJobDetails(JobConf job) { jobLog_ = job.get("stream.jobLog_"); String s = job.get("stream.minRecWrittenToEnableSkip_"); if (s != null) { minRecWrittenToEnableSkip_ = Long.parseLong(s); logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_); } taskId_ = StreamUtil.getTaskInfo(job_); debugFailEarly_ = isDebugFail("early"); debugFailDuring_ = isDebugFail("during"); debugFailLate_ = isDebugFail("late"); sideOutputURI_ = job_.get("stream.sideoutput.uri"); useSingleSideOutputURI_ = (sideOutputURI_ != null); } boolean isDebugFail(String kind) { String execidlist = job_.get("stream.debugfail.reexec." + kind); if (execidlist == null) { return false; } String[] e = execidlist.split(","); for (int i = 0; i < e.length; i++) { int ei = Integer.parseInt(e[i]); if (taskId_.execid == ei) { return true; } } return false; } void setStreamProperties() { String s = System.getProperty("stream.port"); if (s != null) { reportPortPlusOne_ = Integer.parseInt(s); } } void logStackTrace(Exception e) { if (e == null) return; e.printStackTrace(); if (log_ != null) { e.printStackTrace(log_); } } void logprintln(String s) { if (log_ != null) { log_.println(s); } else { LOG.info(s); // or LOG.info() } } void logflush() { if (log_ != null) { log_.flush(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -