⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 segmentmerger.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      } catch (Exception e) {        if (LOG.isWarnEnabled()) {          LOG.warn("Cannot filter key " + key + ": " + e.getMessage());        }      }    }    output.collect(key, value);  }  /**   * NOTE: in selecting the latest version we rely exclusively on the segment   * name (not all segment data contain time information). Therefore it is extremely   * important that segments be named in an increasing lexicographic order as   * their creation time increases.   */  public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {    CrawlDatum lastG = null;    CrawlDatum lastF = null;    CrawlDatum lastSig = null;    Content lastC = null;    ParseData lastPD = null;    ParseText lastPT = null;    String lastGname = null;    String lastFname = null;    String lastSigname = null;    String lastCname = null;    String lastPDname = null;    String lastPTname = null;    TreeMap linked = new TreeMap();    while (values.hasNext()) {      ObjectWritable wrapper = (ObjectWritable)values.next();      Object o = wrapper.get();      if (o instanceof CrawlDatum) {        CrawlDatum val = (CrawlDatum)o;        // check which output dir it belongs to        UTF8 part = (UTF8)val.getMetaData().get(SEGMENT_PART_KEY);        if (part == null)          throw new IOException("Null segment part, key=" + key);        UTF8 uName = (UTF8)val.getMetaData().get(SEGMENT_NAME_KEY);        if (uName == null)          throw new IOException("Null segment name, key=" + key);        String name = uName.toString();        String partString = part.toString();        if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {          if (lastG == null) {            lastG = val;            lastGname = name;          } else {            // take newer            if (lastGname.compareTo(name) < 0) {              lastG = val;              lastGname = name;            }          }        } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {          if (lastF == null) {            lastF = val;            lastFname = name;          } else {            // take newer            if (lastFname.compareTo(name) < 0) {              lastF = val;              lastFname = name;            }          }        } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {          if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {            if (lastSig == null) {              lastSig = val;              lastSigname = name;            } else {              // take newer              if (lastSigname.compareTo(name) < 0) {                lastSig = val;                lastSigname = name;              }            }            continue;          }          // collect all LINKED values from the latest segment          ArrayList segLinked = (ArrayList)linked.get(name);          if (segLinked == null) {            segLinked = new ArrayList();            linked.put(name, segLinked);          }          segLinked.add(val);        } else {          throw new IOException("Cannot determine segment part: " + partString);        }      } else if (o instanceof Content) {        String name = ((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString());        if (lastC == null) {          lastC = (Content)o;          lastCname = name;        } else {          if (lastCname.compareTo(name) < 0) {            lastC = (Content)o;            lastCname = name;          }        }      } else if (o instanceof ParseData) {        String name = ((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString());        if (lastPD == null) {          lastPD = (ParseData)o;          lastPDname = name;        } else {          if (lastPDname.compareTo(name) < 0) {            lastPD = (ParseData)o;            lastPDname = name;          }        }      } else if (o instanceof ParseText) {        String text = ((ParseText)o).getText();        String name = null;        int idx = text.indexOf(nameMarker, nameMarker.length());        if (idx != -1) {          name = text.substring(nameMarker.length(), idx);        } else {          throw new IOException("Missing segment name marker in ParseText, key " + key + ": " + text);        }        if (lastPT == null) {          lastPT = (ParseText)o;          lastPTname = name;        } else {          if (lastPTname.compareTo(name) < 0) {            lastPT = (ParseText)o;            lastPTname = name;          }        }      }    }    curCount++;    UTF8 sliceName = null;    ObjectWritable wrapper = new ObjectWritable();    if (sliceSize > 0) {      sliceName = new UTF8(String.valueOf(curCount / sliceSize));    }    // now output the latest values    if (lastG != null) {      if (sliceName != null) {        lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);      }      wrapper.set(lastG);      output.collect(key, wrapper);    }    if (lastF != null) {      if (sliceName != null) {        lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);      }      wrapper.set(lastF);      output.collect(key, wrapper);    }    if (lastSig != null) {      if (sliceName != null) {        lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);      }      wrapper.set(lastSig);      output.collect(key, wrapper);    }    if (lastC != null) {      if (sliceName != null) {        lastC.getMetadata().set(sliceMarker, sliceName.toString());      }      wrapper.set(lastC);      output.collect(key, wrapper);    }    if (lastPD != null) {      if (sliceName != null) {        lastPD.getParseMeta().set(sliceMarker, sliceName.toString());      }      wrapper.set(lastPD);      output.collect(key, wrapper);    }    if (lastPT != null) {      if (sliceName != null) {        lastPT = new ParseText(sliceMarker + sliceName + sliceMarker                + lastPT.getText());      }      wrapper.set(lastPT);      output.collect(key, wrapper);    }    if (linked.size() > 0) {      String name = (String)linked.lastKey();      ArrayList segLinked = (ArrayList)linked.get(name);      for (int i = 0; i < segLinked.size(); i++) {        CrawlDatum link = (CrawlDatum)segLinked.get(i);        if (sliceName != null) {          link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName);        }        wrapper.set(link);        output.collect(key, wrapper);      }    }  }  public void merge(Path out, Path[] segs, boolean filter, long slice) throws Exception {    String segmentName = Generator.generateSegmentName();    if (LOG.isInfoEnabled()) {      LOG.info("Merging " + segs.length + " segments to " + out + "/" + segmentName);    }    JobConf job = new JobConf(getConf());    job.setJobName("mergesegs " + out + "/" + segmentName);    job.setBoolean("segment.merger.filter", filter);    job.setLong("segment.merger.slice", slice);    job.set("segment.merger.segmentName", segmentName);    FileSystem fs = FileSystem.get(getConf());    // prepare the minimal common set of input dirs    boolean g = true;    boolean f = true;    boolean p = true;    boolean c = true;    boolean pd = true;    boolean pt = true;    for (int i = 0; i < segs.length; i++) {      if (!fs.exists(segs[i])) {        if (LOG.isWarnEnabled()) {          LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping.");        }        segs[i] = null;        continue;      }      if (LOG.isInfoEnabled()) {        LOG.info("SegmentMerger:   adding " + segs[i]);      }      Path cDir = new Path(segs[i], Content.DIR_NAME);      Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);      Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);      Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);      Path pdDir = new Path(segs[i], ParseData.DIR_NAME);      Path ptDir = new Path(segs[i], ParseText.DIR_NAME);      c = c && fs.exists(cDir);      g = g && fs.exists(gDir);      f = f && fs.exists(fDir);      p = p && fs.exists(pDir);      pd = pd && fs.exists(pdDir);      pt = pt && fs.exists(ptDir);    }    StringBuffer sb = new StringBuffer();    if (c) sb.append(" " + Content.DIR_NAME);    if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);    if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME);    if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME);    if (pd) sb.append(" " + ParseData.DIR_NAME);    if (pt) sb.append(" " + ParseText.DIR_NAME);    if (LOG.isInfoEnabled()) {      LOG.info("SegmentMerger: using segment data from:" + sb.toString());    }    for (int i = 0; i < segs.length; i++) {      if (segs[i] == null) continue;      if (g) {        Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);        job.addInputPath(gDir);      }      if (c) {        Path cDir = new Path(segs[i], Content.DIR_NAME);        job.addInputPath(cDir);      }      if (f) {        Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);        job.addInputPath(fDir);      }      if (p) {        Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);        job.addInputPath(pDir);      }      if (pd) {        Path pdDir = new Path(segs[i], ParseData.DIR_NAME);        job.addInputPath(pdDir);      }      if (pt) {        Path ptDir = new Path(segs[i], ParseText.DIR_NAME);        job.addInputPath(ptDir);      }    }    job.setInputFormat(ObjectInputFormat.class);    job.setInputKeyClass(UTF8.class);    job.setInputValueClass(ObjectWritable.class);    job.setMapperClass(SegmentMerger.class);    job.setReducerClass(SegmentMerger.class);    job.setOutputPath(out);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(ObjectWritable.class);    job.setOutputFormat(SegmentOutputFormat.class);        setConf(job);        JobClient.runJob(job);  }  /**   * @param args   */  public static void main(String[] args) throws Exception {    if (args.length < 2) {      System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]");      System.err.println("\toutput_dir\tname of the parent dir for output segment slice(s)");      System.err.println("\t-dir segments\tparent dir containing several segments");      System.err.println("\tseg1 seg2 ...\tlist of segment dirs");      System.err.println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters");      System.err.println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs");      return;    }    Configuration conf = NutchConfiguration.create();    final FileSystem fs = FileSystem.get(conf);    Path out = new Path(args[0]);    ArrayList segs = new ArrayList();    long sliceSize = 0;    boolean filter = false;    for (int i = 1; i < args.length; i++) {      if (args[i].equals("-dir")) {        Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() {          public boolean accept(Path f) {            try {              if (fs.isDirectory(f)) return true;            } catch (IOException e) {}            ;            return false;          }        });        for (int j = 0; j < files.length; j++)          segs.add(files[j]);      } else if (args[i].equals("-filter")) {        filter = true;      } else if (args[i].equals("-slice")) {        sliceSize = Long.parseLong(args[++i]);      } else {        segs.add(new Path(args[i]));      }    }    if (segs.size() == 0) {      System.err.println("ERROR: No input segments.");      return;    }    SegmentMerger merger = new SegmentMerger(conf);    merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, sliceSize);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -