📄 segmentmerger.java
字号:
} catch (Exception e) { if (LOG.isWarnEnabled()) { LOG.warn("Cannot filter key " + key + ": " + e.getMessage()); } } } output.collect(key, value); } /** * NOTE: in selecting the latest version we rely exclusively on the segment * name (not all segment data contain time information). Therefore it is extremely * important that segments be named in an increasing lexicographic order as * their creation time increases. */ public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum lastG = null; CrawlDatum lastF = null; CrawlDatum lastSig = null; Content lastC = null; ParseData lastPD = null; ParseText lastPT = null; String lastGname = null; String lastFname = null; String lastSigname = null; String lastCname = null; String lastPDname = null; String lastPTname = null; TreeMap linked = new TreeMap(); while (values.hasNext()) { ObjectWritable wrapper = (ObjectWritable)values.next(); Object o = wrapper.get(); if (o instanceof CrawlDatum) { CrawlDatum val = (CrawlDatum)o; // check which output dir it belongs to UTF8 part = (UTF8)val.getMetaData().get(SEGMENT_PART_KEY); if (part == null) throw new IOException("Null segment part, key=" + key); UTF8 uName = (UTF8)val.getMetaData().get(SEGMENT_NAME_KEY); if (uName == null) throw new IOException("Null segment name, key=" + key); String name = uName.toString(); String partString = part.toString(); if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { if (lastG == null) { lastG = val; lastGname = name; } else { // take newer if (lastGname.compareTo(name) < 0) { lastG = val; lastGname = name; } } } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { if (lastF == null) { lastF = val; lastFname = name; } else { // take newer if (lastFname.compareTo(name) < 0) { lastF = val; lastFname = name; } } } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) { if (lastSig == null) { lastSig = val; lastSigname = name; } else { // take newer if (lastSigname.compareTo(name) < 0) { lastSig = val; lastSigname = name; } } continue; } // collect all LINKED values from the latest segment ArrayList segLinked = (ArrayList)linked.get(name); if (segLinked == null) { segLinked = new ArrayList(); linked.put(name, segLinked); } segLinked.add(val); } else { throw new IOException("Cannot determine segment part: " + partString); } } else if (o instanceof Content) { String name = ((Content)o).getMetadata().get(SEGMENT_NAME_KEY.toString()); if (lastC == null) { lastC = (Content)o; lastCname = name; } else { if (lastCname.compareTo(name) < 0) { lastC = (Content)o; lastCname = name; } } } else if (o instanceof ParseData) { String name = ((ParseData)o).getParseMeta().get(SEGMENT_NAME_KEY.toString()); if (lastPD == null) { lastPD = (ParseData)o; lastPDname = name; } else { if (lastPDname.compareTo(name) < 0) { lastPD = (ParseData)o; lastPDname = name; } } } else if (o instanceof ParseText) { String text = ((ParseText)o).getText(); String name = null; int idx = text.indexOf(nameMarker, nameMarker.length()); if (idx != -1) { name = text.substring(nameMarker.length(), idx); } else { throw new IOException("Missing segment name marker in ParseText, key " + key + ": " + text); } if (lastPT == null) { lastPT = (ParseText)o; lastPTname = name; } else { if (lastPTname.compareTo(name) < 0) { lastPT = (ParseText)o; lastPTname = name; } } } } curCount++; UTF8 sliceName = null; ObjectWritable wrapper = new ObjectWritable(); if (sliceSize > 0) { sliceName = new UTF8(String.valueOf(curCount / sliceSize)); } // now output the latest values if (lastG != null) { if (sliceName != null) { lastG.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); } wrapper.set(lastG); output.collect(key, wrapper); } if (lastF != null) { if (sliceName != null) { lastF.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); } wrapper.set(lastF); output.collect(key, wrapper); } if (lastSig != null) { if (sliceName != null) { lastSig.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); } wrapper.set(lastSig); output.collect(key, wrapper); } if (lastC != null) { if (sliceName != null) { lastC.getMetadata().set(sliceMarker, sliceName.toString()); } wrapper.set(lastC); output.collect(key, wrapper); } if (lastPD != null) { if (sliceName != null) { lastPD.getParseMeta().set(sliceMarker, sliceName.toString()); } wrapper.set(lastPD); output.collect(key, wrapper); } if (lastPT != null) { if (sliceName != null) { lastPT = new ParseText(sliceMarker + sliceName + sliceMarker + lastPT.getText()); } wrapper.set(lastPT); output.collect(key, wrapper); } if (linked.size() > 0) { String name = (String)linked.lastKey(); ArrayList segLinked = (ArrayList)linked.get(name); for (int i = 0; i < segLinked.size(); i++) { CrawlDatum link = (CrawlDatum)segLinked.get(i); if (sliceName != null) { link.getMetaData().put(SEGMENT_SLICE_KEY, sliceName); } wrapper.set(link); output.collect(key, wrapper); } } } public void merge(Path out, Path[] segs, boolean filter, long slice) throws Exception { String segmentName = Generator.generateSegmentName(); if (LOG.isInfoEnabled()) { LOG.info("Merging " + segs.length + " segments to " + out + "/" + segmentName); } JobConf job = new JobConf(getConf()); job.setJobName("mergesegs " + out + "/" + segmentName); job.setBoolean("segment.merger.filter", filter); job.setLong("segment.merger.slice", slice); job.set("segment.merger.segmentName", segmentName); FileSystem fs = FileSystem.get(getConf()); // prepare the minimal common set of input dirs boolean g = true; boolean f = true; boolean p = true; boolean c = true; boolean pd = true; boolean pt = true; for (int i = 0; i < segs.length; i++) { if (!fs.exists(segs[i])) { if (LOG.isWarnEnabled()) { LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping."); } segs[i] = null; continue; } if (LOG.isInfoEnabled()) { LOG.info("SegmentMerger: adding " + segs[i]); } Path cDir = new Path(segs[i], Content.DIR_NAME); Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); Path pdDir = new Path(segs[i], ParseData.DIR_NAME); Path ptDir = new Path(segs[i], ParseText.DIR_NAME); c = c && fs.exists(cDir); g = g && fs.exists(gDir); f = f && fs.exists(fDir); p = p && fs.exists(pDir); pd = pd && fs.exists(pdDir); pt = pt && fs.exists(ptDir); } StringBuffer sb = new StringBuffer(); if (c) sb.append(" " + Content.DIR_NAME); if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME); if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME); if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME); if (pd) sb.append(" " + ParseData.DIR_NAME); if (pt) sb.append(" " + ParseText.DIR_NAME); if (LOG.isInfoEnabled()) { LOG.info("SegmentMerger: using segment data from:" + sb.toString()); } for (int i = 0; i < segs.length; i++) { if (segs[i] == null) continue; if (g) { Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); job.addInputPath(gDir); } if (c) { Path cDir = new Path(segs[i], Content.DIR_NAME); job.addInputPath(cDir); } if (f) { Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME); job.addInputPath(fDir); } if (p) { Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME); job.addInputPath(pDir); } if (pd) { Path pdDir = new Path(segs[i], ParseData.DIR_NAME); job.addInputPath(pdDir); } if (pt) { Path ptDir = new Path(segs[i], ParseText.DIR_NAME); job.addInputPath(ptDir); } } job.setInputFormat(ObjectInputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(ObjectWritable.class); job.setMapperClass(SegmentMerger.class); job.setReducerClass(SegmentMerger.class); job.setOutputPath(out); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(ObjectWritable.class); job.setOutputFormat(SegmentOutputFormat.class); setConf(job); JobClient.runJob(job); } /** * @param args */ public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); System.err.println("\toutput_dir\tname of the parent dir for output segment slice(s)"); System.err.println("\t-dir segments\tparent dir containing several segments"); System.err.println("\tseg1 seg2 ...\tlist of segment dirs"); System.err.println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); System.err.println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs"); return; } Configuration conf = NutchConfiguration.create(); final FileSystem fs = FileSystem.get(conf); Path out = new Path(args[0]); ArrayList segs = new ArrayList(); long sliceSize = 0; boolean filter = false; for (int i = 1; i < args.length; i++) { if (args[i].equals("-dir")) { Path[] files = fs.listPaths(new Path(args[++i]), new PathFilter() { public boolean accept(Path f) { try { if (fs.isDirectory(f)) return true; } catch (IOException e) {} ; return false; } }); for (int j = 0; j < files.length; j++) segs.add(files[j]); } else if (args[i].equals("-filter")) { filter = true; } else if (args[i].equals("-slice")) { sliceSize = Long.parseLong(args[++i]); } else { segs.add(new Path(args[i])); } } if (segs.size() == 0) { System.err.println("ERROR: No input segments."); return; } SegmentMerger merger = new SegmentMerger(conf); merger.merge(out, (Path[]) segs.toArray(new Path[segs.size()]), filter, sliceSize); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -