⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 indexer.java

📁 nutch0.8源码
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.indexer;import java.io.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.io.*;import org.apache.nutch.fetcher.Fetcher;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.parse.*;import org.apache.nutch.analysis.*;import org.apache.nutch.scoring.ScoringFilterException;import org.apache.nutch.scoring.ScoringFilters;import org.apache.nutch.util.LogUtil;import org.apache.nutch.util.NutchConfiguration;import org.apache.nutch.util.NutchJob;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.crawl.Inlinks;import org.apache.nutch.crawl.LinkDb;import org.apache.lucene.index.*;import org.apache.lucene.document.*;import org.apache.nutch.metadata.Metadata;/** Create indexes for segments. */public class Indexer extends Configured implements Reducer {    public static final String DONE_NAME = "index.done";  public static final Log LOG = LogFactory.getLog(Indexer.class);  /** Wraps inputs in an {@link ObjectWritable}, to permit merging different   * types in reduce. */  public static class InputFormat extends SequenceFileInputFormat {    public RecordReader getRecordReader(FileSystem fs, FileSplit split,                                        JobConf job, Reporter reporter)      throws IOException {      reporter.setStatus(split.toString());            return new SequenceFileRecordReader(job, split) {          public synchronized boolean next(Writable key, Writable value)            throws IOException {            ObjectWritable wrapper = (ObjectWritable)value;            try {              wrapper.set(getValueClass().newInstance());            } catch (Exception e) {              throw new IOException(e.toString());            }            return super.next(key, (Writable)wrapper.get());          }        };    }  }  /** Unwrap Lucene Documents created by reduce and add them to an index. */  public static class OutputFormat    extends org.apache.hadoop.mapred.OutputFormatBase {    public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,                                        String name, Progressable progress) throws IOException {      final Path perm = new Path(job.getOutputPath(), name);      final Path temp =        job.getLocalPath("index/_"+Integer.toString(new Random().nextInt()));      fs.delete(perm);                            // delete old, if any      final AnalyzerFactory factory = new AnalyzerFactory(job);      final IndexWriter writer =                  // build locally first        new IndexWriter(fs.startLocalOutput(perm, temp).toString(),                        new NutchDocumentAnalyzer(job), true);      writer.setMergeFactor(job.getInt("indexer.mergeFactor", 10));      writer.setMaxBufferedDocs(job.getInt("indexer.minMergeDocs", 100));      writer.setMaxMergeDocs(job.getInt("indexer.maxMergeDocs", Integer.MAX_VALUE));      writer.setTermIndexInterval        (job.getInt("indexer.termIndexInterval", 128));      writer.setMaxFieldLength(job.getInt("indexer.max.tokens", 10000));      writer.setInfoStream(LogUtil.getInfoStream(LOG));      writer.setUseCompoundFile(false);      writer.setSimilarity(new NutchSimilarity());      return new RecordWriter() {          boolean closed;          public void write(WritableComparable key, Writable value)            throws IOException {                  // unwrap & index doc            Document doc = (Document)((ObjectWritable)value).get();            NutchAnalyzer analyzer = factory.get(doc.get("lang"));            if (LOG.isInfoEnabled()) {              LOG.info(" Indexing [" + doc.getField("url").stringValue() + "]" +                       " with analyzer " + analyzer +                       " (" + doc.get("lang") + ")");            }            writer.addDocument(doc, analyzer);          }                    public void close(final Reporter reporter) throws IOException {            // spawn a thread to give progress heartbeats            Thread prog = new Thread() {                public void run() {                  while (!closed) {                    try {                      reporter.setStatus("closing");                      Thread.sleep(1000);                    } catch (InterruptedException e) { continue; }                      catch (Throwable e) { return; }                  }                }              };            try {              prog.start();              if (LOG.isInfoEnabled()) { LOG.info("Optimizing index."); }              // optimize & close index              writer.optimize();              writer.close();              fs.completeLocalOutput(perm, temp);   // copy to dfs              fs.createNewFile(new Path(perm, DONE_NAME));            } finally {              closed = true;            }          }        };    }  }  private IndexingFilters filters;  private ScoringFilters scfilters;  public Indexer() {    super(null);  }  /** Construct an Indexer. */  public Indexer(Configuration conf) {    super(conf);  }  public void configure(JobConf job) {    setConf(job);    this.filters = new IndexingFilters(getConf());    this.scfilters = new ScoringFilters(getConf());  }  public void close() {}  public void reduce(WritableComparable key, Iterator values,                     OutputCollector output, Reporter reporter)    throws IOException {    Inlinks inlinks = null;    CrawlDatum dbDatum = null;    CrawlDatum fetchDatum = null;    ParseData parseData = null;    ParseText parseText = null;    while (values.hasNext()) {      Object value = ((ObjectWritable)values.next()).get(); // unwrap      if (value instanceof Inlinks) {        inlinks = (Inlinks)value;      } else if (value instanceof CrawlDatum) {        CrawlDatum datum = (CrawlDatum)value;        switch (datum.getStatus()) {        case CrawlDatum.STATUS_DB_UNFETCHED:        case CrawlDatum.STATUS_DB_FETCHED:        case CrawlDatum.STATUS_DB_GONE:          dbDatum = datum;          break;        case CrawlDatum.STATUS_FETCH_SUCCESS:        case CrawlDatum.STATUS_FETCH_RETRY:        case CrawlDatum.STATUS_FETCH_GONE:          fetchDatum = datum;          break;        default:          throw new RuntimeException("Unexpected status: "+datum.getStatus());        }      } else if (value instanceof ParseData) {        parseData = (ParseData)value;      } else if (value instanceof ParseText) {        parseText = (ParseText)value;      } else if (LOG.isWarnEnabled()) {        LOG.warn("Unrecognized type: "+value.getClass());      }    }          if (fetchDatum == null || dbDatum == null        || parseText == null || parseData == null) {      return;                                     // only have inlinks    }    Document doc = new Document();    Metadata metadata = parseData.getContentMeta();    // add segment, used to map from merged index back to segment files    doc.add(new Field("segment", metadata.get(Fetcher.SEGMENT_NAME_KEY),            Field.Store.YES, Field.Index.NO));    // add digest, used by dedup    doc.add(new Field("digest", metadata.get(Fetcher.SIGNATURE_KEY),            Field.Store.YES, Field.Index.NO));//     if (LOG.isInfoEnabled()) {//       LOG.info("Url: "+key.toString());//       LOG.info("Title: "+parseData.getTitle());//       LOG.info(crawlDatum.toString());//       if (inlinks != null) {//         LOG.info(inlinks.toString());//       }//     }    Parse parse = new ParseImpl(parseText, parseData);    try {      // run indexing filters      doc = this.filters.filter(doc, parse, (UTF8)key, fetchDatum, inlinks);    } catch (IndexingException e) {      if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }      return;    }    float boost = 1.0f;    // run scoring filters    try {      boost = this.scfilters.indexerScore((UTF8)key, doc, dbDatum,              fetchDatum, parse, inlinks, boost);    } catch (ScoringFilterException e) {      if (LOG.isWarnEnabled()) {        LOG.warn("Error calculating score " + key + ": " + e);      }      return;    }    // apply boost to all indexed fields.    doc.setBoost(boost);    // store boost for use by explain and dedup    doc.add(new Field("boost", Float.toString(boost),            Field.Store.YES, Field.Index.NO));    output.collect(key, new ObjectWritable(doc));  }  public void index(Path indexDir, Path crawlDb, Path linkDb, Path[] segments)    throws IOException {    if (LOG.isInfoEnabled()) {      LOG.info("Indexer: starting");      LOG.info("Indexer: linkdb: " + linkDb);    }    JobConf job = new NutchJob(getConf());    job.setJobName("index " + indexDir);    for (int i = 0; i < segments.length; i++) {      if (LOG.isInfoEnabled()) {        LOG.info("Indexer: adding segment: " + segments[i]);      }      job.addInputPath(new Path(segments[i], CrawlDatum.FETCH_DIR_NAME));      job.addInputPath(new Path(segments[i], ParseData.DIR_NAME));      job.addInputPath(new Path(segments[i], ParseText.DIR_NAME));    }    job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME));    job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME));    job.setInputFormat(InputFormat.class);    job.setInputKeyClass(UTF8.class);    job.setInputValueClass(ObjectWritable.class);    //job.setCombinerClass(Indexer.class);    job.setReducerClass(Indexer.class);    job.setOutputPath(indexDir);    job.setOutputFormat(OutputFormat.class);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(ObjectWritable.class);    JobClient.runJob(job);    if (LOG.isInfoEnabled()) { LOG.info("Indexer: done"); }  }  public static void main(String[] args) throws Exception {    Indexer indexer = new Indexer(NutchConfiguration.create());        if (args.length < 4) {      System.err.println("Usage: <index> <crawldb> <linkdb> <segment> ...");      return;    }        Path[] segments = new Path[args.length-3];    for (int i = 3; i < args.length; i++) {      segments[i-3] = new Path(args[i]);    }    indexer.index(new Path(args[0]), new Path(args[1]), new Path(args[2]),                  segments);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -