⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 compounddirspec.java

📁 hadoop:Nutch集群平台
💻 JAVA
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.streaming;import java.io.File;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.mapred.JobConf;/** Parses a -input &lt;spec> that determines the DFS paths that will  be accessed by a MergedInputFormat.<br> CompoundDirSpec.getPaths() is a 2-D ragged array of DFS paths.<br> One of the paths is the <b>primary</b> and can contain a globbing pattern to match multiple files.<br> The other paths are <b>secondary</b> and must indicate either a directory or a single file. During execution secondary files are computed to be the secondary path  plus the primary non-qualified filename. Example: <tt> -input "/filter/colsx NULL | +/batch1/colsx/* /batch1/colsy/" -input "/filter/colsx NULL | +/batch2/colsx/* /batch2/colsy/" </tt> Files and contents:<tt> /filter/colsx/part-00000: /batch1/colsx/part-00000: /batch1/colsy/part-00000: /batch2/colsx/part-00000: /batch2/colsy/part-00000: </tt> Mapper input:<tt> </tt> Side-effect outputs with Identity "mapper":<tt> </tt> @author Michel Tourn */class CompoundDirSpec {  // Keep the Usage messages and docs in sync!  public final static String MERGEGLOB_PREFIX = "||";  public final static String MERGE_SEP = "|";  public final static String COL_SEP = " ";  public final static String PRIMARY_PREFIX = "+";  CompoundDirSpec(String argSpec, boolean isInputSpec) {    argSpec_ = argSpec;    isInputSpec_ = isInputSpec;    direction_ = isInputSpec_ ? "input" : "output";    parse();  }  public void parse() throws IllegalStateException {    String[] mergerSpecs = argSpec_.split(StreamUtil.regexpEscape(MERGE_SEP));    int msup = mergerSpecs.length;    paths_ = new String[msup][];    if (msup == 0) {      throw new IllegalStateException("A -" + direction_ + " spec needs at list one path");    }    if (false == isInputSpec_) {      if (msup > 1) {        throw new IllegalStateException("A -output spec cannot use merged streams ('" + MERGE_SEP            + "' delimiter)");      }    }    for (int m = 0; m < msup; m++) {      String merged = mergerSpecs[m];      merged = merged.trim();      String[] colSpecs = merged.split(StreamUtil.regexpEscape(COL_SEP));      int csup = colSpecs.length;      if (csup == 0) {        throw new IllegalStateException("A -input spec needs at list one path spec per |<column>|");      }      paths_[m] = new String[csup];      for (int c = 0; c < csup; c++) {        String spec = colSpecs[c];        if (spec.startsWith(PRIMARY_PREFIX)) {          // for (!isInputSpec_) the tuple paths should be symmetric.          // but we still allow user to specify one in case setOutputDir makes a difference          if (prow_ != NA) {            throwBadNumPrimaryInputSpecs();          }          spec = spec.substring(PRIMARY_PREFIX.length());          prow_ = m;          pcol_ = c;        }        paths_[m][c] = spec;      }    }    if (prow_ == NA) {      if (!isInputSpec_) {        // pick an 'arbitrary' one -- the tuple paths should be symmetric.        prow_ = 0;        pcol_ = 0;      } else if (msup == 1 && paths_[0].length == 1) {        // pick the only one available. That's also bw-compatible syntax        prow_ = 0;        pcol_ = 0;      } else {        throwBadNumPrimaryInputSpecs();      }    }  }  void throwBadNumPrimaryInputSpecs() throws IllegalStateException {    String msg = "A compound -input spec needs exactly one primary path prefixed with "        + PRIMARY_PREFIX;    msg += ":\n";    msg += toTableString();    throw new IllegalStateException(msg);  }  // TBD need to decide early whether they are dirs or files or globs?  public void validatePaths(FileSystem fs) {    int rsup = paths_.length;    for (int r = 0; r < rsup; r++) {      int csup = paths_[r].length;      for (int c = 0; c < csup; c++) {        String path = paths_[r][c];      }    }  }  public int primaryRow() {    return prow_;  }  public int primaryCol() {    return pcol_;  }  public String primarySpec() {    return paths_[prow_][pcol_];  }    /*   Example input spec in table form:   <1 +[/input/part-00]    <2  [/input/part-01]    <3  [/input/part-02]    Example output spec in table form:   +[/my.output]    */  public String toTableString() {    StringBuffer buf = new StringBuffer();    int maxWid = 0;    for (int pass = 1; pass <= 2; pass++) {      int rsup = paths_.length;      for (int r = 0; r < rsup; r++) {        int csup = paths_[r].length;        for (int c = 0; c < csup; c++) {          String cell = "[" + paths_[r][c] + "]";          if (r == prow_ && c == pcol_) {            cell = PRIMARY_PREFIX + cell;          } else {            cell = StreamUtil.rjustify(cell, cell.length() + PRIMARY_PREFIX.length());          }          if (isInputSpec_) {            // channels are for tagged input streams: r-based            if (rsup > 1) {              String channel = "<" + (r + 1);              cell = channel + " " + cell;            }          } else {            // channels are for columns (multiple files) c-based            if (csup > 1) {              String channel = ">" + (c + 1);              cell = channel + " " + cell;            }          }          if (pass == 2) {            cell = StreamUtil.ljustify(cell, maxWid);            buf.append(cell);            buf.append(" ");          } else {            if (cell.length() > maxWid) {              maxWid = cell.length();            }          }        }        if (pass == 2) {          buf.append("\n");        }      }    }    return buf.toString();  }  /**    @see #primaryRow    @see #primaryCol   */  public String[][] getPaths() {    return paths_;  }  // ==== Static helpers that depend on a JobConf. ====    // Unlike CompoundDirSpec.parse() which is reexecuted at Task runtime,  // this is expanded once in advance and relies on client-side DFS access.  // Main reason is we need to choose a primary input file at submission time.   public static String expandGlobInputSpec(String inputSpec, JobConf job)  {    inputSpec = inputSpec.trim();    if(!inputSpec.startsWith(MERGEGLOB_PREFIX)) {      return inputSpec;    }    inputSpec = inputSpec.substring(MERGEGLOB_PREFIX.length());    // TODO use upcoming DFSShell wildcarding code..    return inputSpec;  }    // find the -input statement that contains the job's split  // TODO test with globbing / directory /single file  public static CompoundDirSpec findInputSpecForPrimary(String primary, JobConf job) {    int num = job.getInt("stream.numinputspecs", -1);    for (int s = 0; s < num; s++) {      String specStr = job.get("stream.inputspecs." + s);      CompoundDirSpec spec = new CompoundDirSpec(specStr, true);      if (pathsMatch(spec.primarySpec(), primary, job)) {        return spec;      }    }    return null;  }  // There can be only one output spec but this provides some server-side validation  public static CompoundDirSpec findOutputSpecForPrimary(String primary, JobConf job) {    String specStr = job.get("stream.outputspec");    CompoundDirSpec spec = new CompoundDirSpec(specStr, false);    if (pathsMatch(spec.primarySpec(), primary, job)) {      return spec;    }    return spec;  }  static boolean pathsMatch(String s1, String s2, JobConf job) {    boolean isLocalFS = job.get("fs.default.name", "").equals("local");    if (isLocalFS) {      s1 = StreamUtil.safeGetCanonicalPath(new File(s1));      s2 = StreamUtil.safeGetCanonicalPath(new File(s2));    }    return (s1.equals(s2));  }  final static int NA = -1;  String argSpec_;  boolean isInputSpec_;  String direction_;  String[][] paths_;  int prow_ = NA;  int pcol_ = NA;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -