⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 segmentmerger.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2006 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.segment;import java.io.IOException;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.*;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.crawl.Generator;import org.apache.nutch.fetcher.Fetcher;import org.apache.nutch.metadata.Metadata;import org.apache.nutch.net.URLFilters;import org.apache.nutch.parse.ParseData;import org.apache.nutch.parse.ParseText;import org.apache.nutch.protocol.Content;import org.apache.nutch.util.NutchConfiguration;/** * This tool takes several segments and merges their data together. Only the * latest versions of data is retained. * <p> * Optionally, you can apply current URLFilters to remove prohibited URL-s. * </p> * <p> * Also, it's possible to slice the resulting segment into chunks of fixed size. * </p> * <h3>Important Notes</h3> * <h4>Which parts are merged?</h4> * <p>It doesn't make sense to merge data from segments, which are at different stages * of processing (e.g. one unfetched segment, one fetched but not parsed, and * one fetched and parsed). Therefore, prior to merging, the tool will determine * the lowest common set of input data, and only this data will be merged. * This may have some unintended consequences: * e.g. if majority of input segments are fetched and parsed, but one of them is unfetched, * the tool will fall back to just merging fetchlists, and it will skip all other data * from all segments.</p> * <h4>Merging fetchlists</h4> * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching) * is not recommended, because this tool (unlike the {@link org.apache.nutch.crawl.Generator} * doesn't ensure that fetchlist parts for each map task are disjoint.</p> * <p> * <h4>Duplicate content</h4> * Merging segments removes older content whenever possible (see below). However, * this is NOT the same as de-duplication, which in addition removes identical * content found at different URL-s. In other words, running DeleteDuplicates is * still necessary. * </p> * <p>For some types of data (especially ParseText) it's not possible to determine * which version is really older. Therefore the tool always uses segment names as * timestamps, for all types of input data. Segment names are compared in forward lexicographic * order (0-9a-zA-Z), and data from segments with "higher" names will prevail. * It follows then that it is extremely important that segments be named in an * increasing lexicographic order as their creation time increases.</p> * <p> * <h4>Merging and indexes</h4> * Merged segment gets a different name. Since Indexer embeds segment names in * indexes, any indexes originally created for the input segments will NOT work with the * merged segment. Newly created merged segment(s) need to be indexed afresh. * This tool doesn't use existing indexes in any way, so if * you plan to merge segments you don't have to index them prior to merging. *  *  * @author Andrzej Bialecki */public class SegmentMerger extends Configured implements Mapper, Reducer {  private static final Log LOG = LogFactory.getLog(SegmentMerger.class);  private static final UTF8 SEGMENT_PART_KEY = new UTF8("_PaRt_");  private static final UTF8 SEGMENT_NAME_KEY = new UTF8("_NaMe_");  private static final String nameMarker = SEGMENT_NAME_KEY.toString();  private static final UTF8 SEGMENT_SLICE_KEY = new UTF8("_SlIcE_");  private static final String sliceMarker = SEGMENT_SLICE_KEY.toString();  private URLFilters filters = null;  private long sliceSize = -1;  private long curCount = 0;    /**   * Wraps inputs in an {@link ObjectWritable}, to permit merging different   * types in reduce.   */  public static class ObjectInputFormat extends SequenceFileInputFormat {    public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)            throws IOException {      reporter.setStatus(split.toString());      // find part name      String dir = split.getPath().toString().replace('\\', '/');      int idx = dir.lastIndexOf("/part-");      if (idx == -1) {        throw new IOException("Cannot determine segment part: " + dir);      }      dir = dir.substring(0, idx);      idx = dir.lastIndexOf('/');      if (idx == -1) {        throw new IOException("Cannot determine segment part: " + dir);      }      final String part = dir.substring(idx + 1);      // find segment name      dir = dir.substring(0, idx);      idx = dir.lastIndexOf('/');      if (idx == -1) {        throw new IOException("Cannot determine segment name: " + dir);      }      final String segment = dir.substring(idx + 1);      return new SequenceFileRecordReader(job, split) {        public synchronized boolean next(Writable key, Writable value) throws IOException {          ObjectWritable wrapper = (ObjectWritable) value;          try {            wrapper.set(getValueClass().newInstance());          } catch (Exception e) {            throw new IOException(e.toString());          }          boolean res = super.next(key, (Writable) wrapper.get());          Object o = wrapper.get();          if (o instanceof CrawlDatum) {            // record which part of segment this comes from            ((CrawlDatum)o).getMetaData().put(SEGMENT_PART_KEY, new UTF8(part));            ((CrawlDatum)o).getMetaData().put(SEGMENT_NAME_KEY, new UTF8(segment));          } else if (o instanceof Content) {            if (((Content)o).getMetadata() == null) {              ((Content)o).setMetadata(new Metadata());            }            ((Content)o).getMetadata().set(SEGMENT_NAME_KEY.toString(), segment);          } else if (o instanceof ParseData) {            if (((ParseData)o).getParseMeta() == null) {              ((ParseData)o).setParseMeta(new Metadata());            }            ((ParseData)o).getParseMeta().set(SEGMENT_NAME_KEY.toString(), segment);          } else if (o instanceof ParseText) {            String text = ((ParseText)o).getText();            o = new ParseText(SEGMENT_NAME_KEY.toString() +                    segment + SEGMENT_NAME_KEY.toString() + text);            wrapper.set(o);          } else {            throw new IOException("Unknown value type: " + o.getClass().getName() + "(" + o + ")");          }          return res;        }      };    }  }  public static class SegmentOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase {    private static final String DEFAULT_SLICE = "default";        public RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException {      return new RecordWriter() {        MapFile.Writer c_out = null;        MapFile.Writer f_out = null;        MapFile.Writer pd_out = null;        MapFile.Writer pt_out = null;        SequenceFile.Writer g_out = null;        SequenceFile.Writer p_out = null;        HashMap sliceWriters = new HashMap();        String segmentName = job.get("segment.merger.segmentName");                public void write(WritableComparable key, Writable value) throws IOException {          // unwrap          Writable o = (Writable)((ObjectWritable)value).get();          String slice = null;          if (o instanceof CrawlDatum) {            // check which output dir it should go into            UTF8 part = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_PART_KEY);            ((CrawlDatum)o).getMetaData().remove(SEGMENT_PART_KEY);            ((CrawlDatum)o).getMetaData().remove(SEGMENT_NAME_KEY);            if (part == null)              throw new IOException("Null segment part, key=" + key);            UTF8 uSlice = (UTF8)((CrawlDatum)o).getMetaData().get(SEGMENT_SLICE_KEY);            ((CrawlDatum)o).getMetaData().remove(SEGMENT_SLICE_KEY);            if (uSlice != null) slice = uSlice.toString();            String partString = part.toString();            if (partString.equals(CrawlDatum.GENERATE_DIR_NAME)) {              g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);              g_out.append(key, o);            } else if (partString.equals(CrawlDatum.FETCH_DIR_NAME)) {              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, CrawlDatum.class);              f_out.append(key, o);            } else if (partString.equals(CrawlDatum.PARSE_DIR_NAME)) {              p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);              p_out.append(key, o);            } else {              throw new IOException("Cannot determine segment part: " + partString);            }          } else if (o instanceof Content) {            slice = ((Content)o).getMetadata().get(sliceMarker);            ((Content)o).getMetadata().remove(sliceMarker);            ((Content)o).getMetadata().remove(nameMarker);            // update the segment name inside metadata            if (slice == null) {              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName);            } else {              ((Content)o).getMetadata().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice);            }            c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);            c_out.append(key, o);          } else if (o instanceof ParseData) {            slice = ((ParseData)o).getParseMeta().get(sliceMarker);            ((ParseData)o).getParseMeta().remove(sliceMarker);            ((ParseData)o).getParseMeta().remove(nameMarker);            // update the segment name inside contentMeta - required by Indexer            if (slice == null) {              ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName);            } else {              ((ParseData)o).getContentMeta().set(Fetcher.SEGMENT_NAME_KEY, segmentName + "-" + slice);            }            pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);            pd_out.append(key, o);          } else if (o instanceof ParseText) {            String text = ((ParseText)o).getText();            if (text != null) {              // get slice name, and remove it from the text              if (text.startsWith(sliceMarker)) {                int idx = text.indexOf(sliceMarker, sliceMarker.length());                if (idx != -1) {                  slice = text.substring(sliceMarker.length(), idx);                  text = text.substring(idx + sliceMarker.length());                }              }              // get segment name, and remove it from the text              if (text.startsWith(nameMarker)) {                int idx = text.indexOf(nameMarker, nameMarker.length());                if (idx != -1) {                  text = text.substring(idx + nameMarker.length());                }              }              o = new ParseText(text);            }            pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);            pt_out.append(key, o);          }        }                // lazily create SequenceFile-s.        private SequenceFile.Writer ensureSequenceFile(String slice, String dirName) throws IOException {          if (slice == null) slice = DEFAULT_SLICE;          SequenceFile.Writer res = (SequenceFile.Writer)sliceWriters.get(slice + dirName);          if (res != null) return res;          Path wname;          if (slice == DEFAULT_SLICE) {            wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name);          } else {            wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);          }          res = new SequenceFile.Writer(fs, wname, UTF8.class, CrawlDatum.class);          sliceWriters.put(slice + dirName, res);          return res;        }        // lazily create MapFile-s.        private MapFile.Writer ensureMapFile(String slice, String dirName, Class clazz) throws IOException {          if (slice == null) slice = DEFAULT_SLICE;          MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName);          if (res != null) return res;          Path wname;          if (slice == DEFAULT_SLICE) {            wname = new Path(new Path(new Path(job.getOutputPath(), segmentName), dirName), name);          } else {            wname = new Path(new Path(new Path(job.getOutputPath(), segmentName + "-" + slice), dirName), name);          }          res = new MapFile.Writer(fs, wname.toString(), UTF8.class, clazz);          sliceWriters.put(slice + dirName, res);          return res;        }        public void close(Reporter reporter) throws IOException {          Iterator it = sliceWriters.values().iterator();          while (it.hasNext()) {            Object o = it.next();            if (o instanceof SequenceFile.Writer) {              ((SequenceFile.Writer)o).close();            } else {              ((MapFile.Writer)o).close();            }          }        }      };    }  }  public SegmentMerger() {    super(null);  }    public SegmentMerger(Configuration conf) {    super(conf);  }    public void setConf(Configuration conf) {    super.setConf(conf);    if (conf == null) return;    if (conf.getBoolean("segment.merger.filter", false))      filters = new URLFilters(conf);    sliceSize = conf.getLong("segment.merger.slice", -1);    if ((sliceSize > 0) && (LOG.isInfoEnabled())) {      LOG.info("Slice size: " + sliceSize + " URLs.");    }  }  public void close() throws IOException {  }  public void configure(JobConf conf) {    setConf(conf);    if (sliceSize > 0) {      sliceSize = sliceSize / conf.getNumReduceTasks();    }  }    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {    if (filters != null) {      try {        if (filters.filter(((UTF8)key).toString()) == null) {          return;        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -