📄 segmentmerger.java
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.segment;import java.io.IOException;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.crawl.Generator;import org.apache.nutch.fetcher.Fetcher;import org.apache.nutch.metadata.Metadata;import org.apache.nutch.net.URLFilters;import org.apache.nutch.parse.ParseData;import org.apache.nutch.parse.ParseText;import org.apache.nutch.protocol.Content;import org.apache.nutch.util.NutchConfiguration;/** * This tool takes several segments and merges their data together. Only the * latest versions of data is retained. * <p> * Optionally, you can apply current URLFilters to remove prohibited URL-s. * </p> * <p> * Also, it's possible to slice the resulting segment into chunks of fixed size. * </p> * <h3>Important Notes</h3> * <h4>Which parts are merged?</h4> * <p>It doesn't make sense to merge data from segments, which are at different stages * of processing (e.g. one unfetched segment, one fetched but not parsed, and * one fetched and parsed). Therefore, prior to merging, the tool will determine * the lowest common set of input data, and only this data will be merged. * This may have some unintended consequences: * e.g. if majority of input segments are fetched and parsed, but one of them is unfetched, * the tool will fall back to just merging fetchlists, and it will skip all other data * from all segments.</p> * <h4>Merging fetchlists</h4> * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching) * is not recommended, because this tool (unlike the {@link org.apache.nutch.crawl.Generator} * doesn't ensure that fetchlist parts for each map task are disjoint.</p> * <p> * <h4>Duplicate content</h4> * Merging segments removes older content whenever possible (see below). However, * this is NOT the same as de-duplication, which in addition removes identical * content found at different URL-s. In other words, running DeleteDuplicates is * still necessary. * </p> * <p>For some types of data (especially ParseText) it's not possible to determine * which version is really older. Therefore the tool always uses segment names as * timestamps, for all types of input data. Segment names are compared in forward lexicographic * order (0-9a-zA-Z), and data from segments with "higher" names will prevail. * It follows then that it is extremely important that segments be named in an * increasing lexicographic order as their creation time increases.</p> * <p> * <h4>Merging and indexes</h4> * Merged segment gets a different name. Since Indexer embeds segment names in * indexes, any indexes originally created for the input segments will NOT work with the * merged segment. Newly created merged segment(s) need to be indexed afresh. * This tool doesn't use existing indexes in any way, so if * you plan to merge segments you don't have to index them prior to merging. * * * @author Andrzej Bialecki */public class SegmentMerger extends Configured implements Mapper, Reducer { private static final Log LOG = LogFactory.getLog(SegmentMerger.class); private static final UTF8 SEGMENT_PART_KEY = new UTF8("_PaRt_"); private static final UTF8 SEGMENT_NAME_KEY = new UTF8("_NaMe_"); private static final String nameMarker = SEGMENT_NAME_KEY.toString(); private static final UTF8 SEGMENT_SLICE_KEY = new UTF8("_SlIcE_"); private static final String sliceMarker = SEGMENT_SLICE_KEY.toString(); private URLFilters filters = null; private long sliceSize = -1; private long curCount = 0; /** * Wraps inputs in an {@link ObjectWritable}, to permit merging different * types in reduce. */ public static class ObjectInputFormat extends SequenceFileInputFormat { public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); // find part name String dir = split.getPath().toString().replace('\\', '/'); int idx = dir.lastIndexOf("/part-"); if (idx == -1) { throw new IOException("Cannot determine segment part: " + dir); } dir = dir.substring(0, idx); idx = dir.lastIndexOf('/'); if (idx == -1) { throw new IOException("Cannot determine segment part: " + dir); } final String part = dir.substring(idx + 1); // find segment name dir = dir.substring(0, idx); idx = dir.lastIndexOf('/'); if (idx == -1) { throw new IOException("Cannot determine segment name: " + dir); } final String segment = dir.substring(idx + 1); return new SequenceFileRecordReader(job, split) { public synchronized boolean next(Writable key, Writable value) throws IOException { ObjectWritable wrapper = (ObjectWritable) value; try { wrapper.set(getValueClass().newInstance()); } catch (Exception e) { throw new IOException(e.toString()); } boolean res = super.next(key, (Writable) wrapper.get()); Object o = wrapper.get(); if (o instanceof CrawlDatum) { // record which part of segment this comes from ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new UTF8(part)); ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new UTF8(segment)); } else if (o instanceof Content) { if (((Content)o).getMetadata() == null) { ((Content)o).setMetadata(new Metadata()); } ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), segment); } else if (o instanceof ParseData) { if (((ParseData)o).getParseMeta() == null) { ((ParseData)o).setParseMeta(new Metadata()); } ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), segment); } else if (o instanceof ParseText) { String text = ((ParseText)o).getText(); o = new ParseText(SEGMENT_NAME_KEY.toString() + segment + SEGMENT_NAME_KEY.toString() + text); wrapper.set(o); } else { throw new IOException("Unknown value type: " + o.getClass().getName() + "(" + o + ")"); } return res; } }; } } public static class SegmentOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase { private static final String DEFAULT_SLICE = "default"; public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { return new RecordWriter() { MapFile.Writer c_out = null; MapFile.Writer f_out = null; MapFile.Writer pd_out = null; MapFile.Writer pt_out = null; SequenceFile.Writer g_out = null; SequenceFile.Writer p_out = null; HashMap sliceWriters = new HashMap(); String segmentName = job.get("segment.merger.segmentName"); public void write(WritableComparable key, Writable value) throws IOException { // unwrap Writable o = (Writable)((ObjectWritable)value).get(); String slice = null; if (o instanceof CrawlDatum) { // check which output dir it should go into UTF8 part = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY); ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY); ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY); if (part == null) throw new IOException("Null segment part, key=" + key); UTF8 uSlice = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY); ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY); if (uSlice != null) slice = uSlice.toString(); String partString = part.toString(); if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) { g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME); g_out.append(key, o); } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) { f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, CrawlDatum.class); f_out.append(key, o); } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) { p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME); p_out.append(key, o); } else { throw new IOException("Cannot determine segment part: " + partString); } } else if (o instanceof Content) { slice = ((Content)o).getMetadata().get(sliceMarker); ((Content)o).getMetadata().remove(sliceMarker); ((Content)o).getMetadata().remove(nameMarker); // update the segment name inside metadata if (slice == null) { ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName); } else { ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice); } c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); c_out.append(key, o); } else if (o instanceof ParseData) { slice = ((ParseData)o).getParseMeta().get(sliceMarker); ((ParseData)o).getParseMeta().remove(sliceMarker); ((ParseData)o).getParseMeta().remove(nameMarker); // update the segment name inside contentMeta - required by Indexer if (slice == null) { ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName); } else { ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice); } pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); pd_out.append(key, o); } else if (o instanceof ParseText) { String text = ((ParseText)o).getText(); if (text != null) { // get slice name, and remove it from the text if (text.startsWith(sliceMarker)) { int idx = text.indexOf(sliceMarker, sliceMarker.length()); if (idx != -1) { slice = text.substring(sliceMarker.length(), idx); text = text.substring(idx + sliceMarker.length()); } } // get segment name, and remove it from the text if (text.startsWith(nameMarker)) { int idx = text.indexOf(nameMarker, nameMarker.length()); if (idx != -1) { text = text.substring(idx + nameMarker.length()); } } o = new ParseText(text); } pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class); pt_out.append(key, o); } } // lazily create SequenceFile-s. private SequenceFile.Writer ensureSequenceFile(String slice, String dirName) throws IOException { if (slice == null) slice = DEFAULT_SLICE; SequenceFile.Writer res = (SequenceFile.Writer)sliceWriters.get(slice + dirName); if (res != null) return res; Path wname; if (slice == DEFAULT_SLICE) { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name); } else { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } res = new SequenceFile.Writer(fs, wname, UTF8.class, CrawlDatum.class); sliceWriters.put(slice + dirName, res); return res; } // lazily create MapFile-s. private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException { if (slice == null) slice = DEFAULT_SLICE; MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName); if (res != null) return res; Path wname; if (slice == DEFAULT_SLICE) { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name); } else { wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name); } res = new MapFile.Writer(fs, wname.toString(), UTF8.class, clazz); sliceWriters.put(slice + dirName, res); return res; } public void close(Reporter reporter) throws IOException { Iterator it = sliceWriters.values().iterator(); while (it.hasNext()) { Object o = it.next(); if (o instanceof SequenceFile.Writer) { ((SequenceFile.Writer)o).close(); } else { ((MapFile.Writer)o).close(); } } } }; } } public SegmentMerger() { super(null); } public SegmentMerger(Configuration conf) { super(conf); } public void setConf(Configuration conf) { super.setConf(conf); if (conf == null) return; if (conf.getBoolean("segment.merger.filter", false)) filters = new URLFilters(conf); sliceSize = conf.getLong("segment.merger.slice", -1); if ((sliceSize > 0) && (LOG.isInfoEnabled())) { LOG.info("Slice size: " + sliceSize + " URLs."); } } public void close() throws IOException { } public void configure(JobConf conf) { setConf(conf); if (sliceSize > 0) { sliceSize = sliceSize / conf.getNumReduceTasks(); } } public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { if (filters != null) { try { if (filters.filter(((UTF8)key).toString()) == null) { return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -