⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pipemapred.java

📁 hadoop:Nutch集群平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.*;import java.net.Socket;import java.net.URI;import java.nio.channels.*;import java.nio.charset.CharacterCodingException;import java.io.IOException;import java.util.Date;import java.util.Map;import java.util.Iterator;import java.util.Arrays;import java.util.ArrayList;import java.util.Properties;import java.util.regex.*;import org.apache.commons.logging.*;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.util.StringUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.fs.LocalFileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FSDataOutputStream;/** Shared functionality for PipeMapper, PipeReducer. *  @author Michel Tourn */public abstract class PipeMapRed {  protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());  /** The command to be spawned as a subprocess.   * Mapper/Reducer operations will delegate to it   */  abstract String getPipeCommand(JobConf job);  /*   */  abstract String getKeyColPropName();  /** Write output as side-effect files rather than as map outputs.   This is useful to do "Map" tasks rather than "MapReduce" tasks. */  boolean getUseSideEffect() {    return false;  }  abstract boolean getDoPipe();  /**   * @returns how many TABS before the end of the key part   * usually: 1 or "ALL"   * used for tool output of both Map and Reduce   * configured via tool's argv: splitKeyVal=ALL or 1..   * although it is interpreted here, not by tool   */  int getKeyColsFromPipeCommand(String cmd) {    String key = getKeyColPropName();    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");    Matcher match = kcPat.matcher(cmd);    String kc;    if (!match.matches()) {      kc = null;    } else {      kc = match.group(1);    }    int cols;    if (kc == null) {      // default value is 1 and the Stream applications could instead      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL      cols = 1;    } else if (kc.equals("ALL")) {      cols = ALL_COLS;    } else {      try {        cols = Integer.parseInt(kc);      } catch (NumberFormatException nf) {        cols = Integer.MAX_VALUE;      }    }    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);    return cols;  }  final static int OUTSIDE = 1;  final static int SINGLEQ = 2;  final static int DOUBLEQ = 3;  static String[] splitArgs(String args) {    ArrayList argList = new ArrayList();    char[] ch = args.toCharArray();    int clen = ch.length;    int state = OUTSIDE;    int argstart = 0;    for (int c = 0; c <= clen; c++) {      boolean last = (c == clen);      int lastState = state;      boolean endToken = false;      if (!last) {        if (ch[c] == '\'') {          if (state == OUTSIDE) {            state = SINGLEQ;          } else if (state == SINGLEQ) {            state = OUTSIDE;          }          endToken = (state != lastState);        } else if (ch[c] == '"') {          if (state == OUTSIDE) {            state = DOUBLEQ;          } else if (state == DOUBLEQ) {            state = OUTSIDE;          }          endToken = (state != lastState);        } else if (ch[c] == ' ') {          if (state == OUTSIDE) {            endToken = true;          }        }      }      if (last || endToken) {        if (c == argstart) {          // unquoted space        } else {          String a;          a = args.substring(argstart, c);          argList.add(a);        }        argstart = c + 1;        lastState = state;      }    }    return (String[]) argList.toArray(new String[0]);  }  OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException {    final String SOCKET = "socket";    if (uri.getScheme().equals(SOCKET)) {      if (!allowSocket) {        throw new IOException(SOCKET + " not allowed on outputstream " + uri);      }      final Socket sock = new Socket(uri.getHost(), uri.getPort());      OutputStream out = new FilterOutputStream(sock.getOutputStream()) {        public void close() throws IOException {          sock.close();          super.close();        }      };      return out;    } else {      // a FSDataOutputStreamm, localFS or HDFS.      // localFS file may be set up as a FIFO.      return sideFs_.create(new Path(uri.getSchemeSpecificPart()));    }  }  String getSideEffectFileName() {    FileSplit split = StreamUtil.getCurrentSplit(job_);    String leaf = split.getPath().getName();    if (split.getStart() == 0) {      return leaf;    } else {      return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString();    }  }  String makeUniqueFileSuffix() {    return "." + System.currentTimeMillis() + "." + job_.get("mapred.task.id");  }  public void configure(JobConf job) {    try {      String argv = getPipeCommand(job);      keyCols_ = getKeyColsFromPipeCommand(argv);      debug_ = (job.get("stream.debug") != null);      if (debug_) {        System.out.println("PipeMapRed: stream.debug=true");      }      joinDelay_ = job.getLong("stream.joindelay.milli", 0);      job_ = job;      fs_ = FileSystem.get(job_);      if (job_.getBoolean("stream.sideoutput.localfs", false)) {        //sideFs_ = new LocalFileSystem(job_);        sideFs_ = FileSystem.getNamed("local", job_);      } else {        sideFs_ = fs_;      }      if (debug_) {        System.out.println("kind   :" + this.getClass());        System.out.println("split  :" + StreamUtil.getCurrentSplit(job_));        System.out.println("fs     :" + fs_.toString());        System.out.println("sideFs :" + sideFs_.toString());      }      doPipe_ = getDoPipe();      if (!doPipe_) return;      setStreamJobDetails(job);      setStreamProperties();      if (debugFailEarly_) {        throw new RuntimeException("debugFailEarly_");      }      String[] argvSplit = splitArgs(argv);      String prog = argvSplit[0];      String userdir = System.getProperty("user.dir");      if (new File(prog).isAbsolute()) {        // we don't own it. Hope it is executable      } else {        new MustangFile(prog).setExecutable(true, true);      }      if (job_.getInputValueClass().equals(BytesWritable.class)) {        // TODO expose as separate config:        // job or semistandard inputformat property        optUseKey_ = false;      }      optSideEffect_ = getUseSideEffect();      if (optSideEffect_) {        // during work: use a completely unique filename to avoid HDFS namespace conflicts        // after work: rename to a filename that depends only on the workload (the FileSplit)        //   it's a friendly name and in case of reexecution it will clobber.         // reexecution can be due to: other job, failed task and speculative task        // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then:         // client has renamed outputPath and saved the argv's original output path as:        if (useSingleSideOutputURI_) {          sideEffectURI_ = new URI(sideOutputURI_);          sideEffectPathFinal_ = null; // in-place, no renaming to final        } else {          String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath()           String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale          sideEffectPathFinal_ = new Path(sideOutputPath, fileName);          sideEffectURI_ = new URI(sideEffectPathFinal_ + makeUniqueFileSuffix()); // implicit dfs:         }        // apply default scheme        if(sideEffectURI_.getScheme() == null) {          sideEffectURI_ = new URI("file", sideEffectURI_.getSchemeSpecificPart(), null);        }        boolean allowSocket = useSingleSideOutputURI_;        sideEffectOut_ = getURIOutputStream(sideEffectURI_, allowSocket);      }      // argvSplit[0]:      // An absolute path should be a preexisting valid path on all TaskTrackers      // A  relative path should match in the unjarred Job data      // In this case, force an absolute path to make sure exec finds it.      argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();      logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));      logprintln("sideEffectURI_=" + sideEffectURI_);      Environment childEnv = (Environment) StreamUtil.env().clone();      addJobConfToEnvironment(job_, childEnv);      addEnvironment(childEnv, job_.get("stream.addenvironment"));      sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());      /* // This way required jdk1.5       Builder processBuilder = new ProcessBuilder(argvSplit);       Map<String, String> env = processBuilder.environment();       addEnvironment(env, job_.get("stream.addenvironment"));       sim = processBuilder.start();       */      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));      clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));      clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));      startTime_ = System.currentTimeMillis();    } catch (Exception e) {      logStackTrace(e);    }  }  void setStreamJobDetails(JobConf job) {    jobLog_ = job.get("stream.jobLog_");    String s = job.get("stream.minRecWrittenToEnableSkip_");    if (s != null) {      minRecWrittenToEnableSkip_ = Long.parseLong(s);      logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);    }    taskId_ = StreamUtil.getTaskInfo(job_);    debugFailEarly_ = isDebugFail("early");    debugFailDuring_ = isDebugFail("during");    debugFailLate_ = isDebugFail("late");    sideOutputURI_ = job_.get("stream.sideoutput.uri");    useSingleSideOutputURI_ = (sideOutputURI_ != null);  }  boolean isDebugFail(String kind) {    String execidlist = job_.get("stream.debugfail.reexec." + kind);    if (execidlist == null) {      return false;    }    String[] e = execidlist.split(",");    for (int i = 0; i < e.length; i++) {      int ei = Integer.parseInt(e[i]);      if (taskId_.execid == ei) {        return true;      }    }    return false;  }  void setStreamProperties() {    String s = System.getProperty("stream.port");    if (s != null) {      reportPortPlusOne_ = Integer.parseInt(s);    }  }  void logStackTrace(Exception e) {    if (e == null) return;    e.printStackTrace();    if (log_ != null) {      e.printStackTrace(log_);    }  }  void logprintln(String s) {    if (log_ != null) {      log_.println(s);    } else {      LOG.info(s); // or LOG.info()    }  }  void logflush() {    if (log_ != null) {      log_.flush();    }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -