⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 deleteduplicates.java

📁 nutch0.8源码
💻 JAVA
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.indexer;import java.io.*;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.util.NutchConfiguration;import org.apache.nutch.util.NutchJob;import org.apache.lucene.index.IndexReader;import org.apache.lucene.document.Document;/****************************************************************** * Deletes duplicate documents in a set of Lucene indexes. * Duplicates have either the same contents (via MD5 hash) or the same URL. ******************************************************************/public class DeleteDuplicates extends Configured  implements Mapper, Reducer, OutputFormat {  private static final Log LOG = LogFactory.getLog(DeleteDuplicates.class);//   Algorithm://      //   1. map indexes -> <<md5, score, urlLen>, <index,doc>>//      partition by md5//      reduce, deleting all but largest score w/ shortest url////   2. map indexes -> <<url, fetchdate>, <index,doc>>//      partition by url//      reduce, deleting all but most recent.////   Part 2 is not yet implemented, but the Indexer currently only indexes one//   URL per page, so this is not a critical problem.  public static class IndexDoc implements WritableComparable {    private UTF8 index;                           // the segment index    private int doc;                              // within the index    public void write(DataOutput out) throws IOException {      index.write(out);      out.writeInt(doc);    }    public void readFields(DataInput in) throws IOException {      if (index == null) {        index = new UTF8();      }      index.readFields(in);      this.doc = in.readInt();    }    public int compareTo(Object o) {      IndexDoc that = (IndexDoc)o;      int indexCompare = this.index.compareTo(that.index);      if (indexCompare != 0) {                    // prefer later indexes        return indexCompare;      } else {        return this.doc - that.doc;               // prefer later docs      }    }    public boolean equals(Object o) {      IndexDoc that = (IndexDoc)o;      return this.index.equals(that.index) && this.doc == that.doc;    }  }  public static class HashScore implements WritableComparable {    private MD5Hash hash;    private float score;    private int urlLen;    public void write(DataOutput out) throws IOException {      hash.write(out);      out.writeFloat(score);      out.writeInt(urlLen);    }    public void readFields(DataInput in) throws IOException {      if (hash == null) {        hash = new MD5Hash();      }      hash.readFields(in);      score = in.readFloat();      urlLen = in.readInt();    }    public int compareTo(Object o) {      HashScore that = (HashScore)o;      if (!this.hash.equals(that.hash)) {         // order first by hash        return this.hash.compareTo(that.hash);      } else if (this.score != that.score) {      // prefer larger scores        return this.score < that.score ? 1 : -1 ;      } else {                                    // prefer shorter urls        return this.urlLen - that.urlLen;      }    }    public boolean equals(Object o) {      HashScore that = (HashScore)o;      return this.hash.equals(that.hash)        && this.score == that.score        && this.urlLen == that.urlLen;    }  }  public static class InputFormat extends InputFormatBase {    private static final long INDEX_LENGTH = Integer.MAX_VALUE;    /** Return each index as a split. */    public FileSplit[] getSplits(FileSystem fs, JobConf job,                                 int numSplits)      throws IOException {      Path[] files = listPaths(fs, job);      FileSplit[] splits = new FileSplit[files.length];      for (int i = 0; i < files.length; i++) {        splits[i] = new FileSplit(files[i], 0, INDEX_LENGTH);      }      return splits;    }    /** Return each index as a split. */    public RecordReader getRecordReader(final FileSystem fs,                                        final FileSplit split,                                        final JobConf job,                                        Reporter reporter) throws IOException {      final UTF8 index = new UTF8(split.getPath().toString());      reporter.setStatus(index.toString());      return new RecordReader() {          private IndexReader indexReader =            IndexReader.open(new FsDirectory(fs, split.getPath(), false, job));          { indexReader.undeleteAll(); }          private final int maxDoc = indexReader.maxDoc();          private int doc;          public boolean next(Writable key, Writable value)            throws IOException {            if (doc >= maxDoc)              return false;            Document document = indexReader.document(doc);            // fill in key            if (key instanceof UTF8) {              ((UTF8)key).set(document.get("url"));            } else {              HashScore hashScore = (HashScore)key;              if (hashScore.hash == null) {                hashScore.hash = new MD5Hash();              }              hashScore.hash.setDigest(document.get("digest"));              hashScore.score = Float.parseFloat(document.get("boost"));              hashScore.urlLen = document.get("url").length();            }            // fill in value            IndexDoc indexDoc = (IndexDoc)value;            if (indexDoc.index == null) {              indexDoc.index = new UTF8();            }            indexDoc.index.set(index);            indexDoc.doc = doc;            doc++;            return true;          }          public long getPos() throws IOException {            return maxDoc==0 ? 0 : (doc*INDEX_LENGTH)/maxDoc;          }          public void close() throws IOException {            indexReader.close();          }        };    }  }  public static class HashPartitioner implements Partitioner {    public void configure(JobConf job) {}    public void close() {}    public int getPartition(WritableComparable key, Writable value,                            int numReduceTasks) {      int hashCode = ((HashScore)key).hash.hashCode();      return (hashCode & Integer.MAX_VALUE) % numReduceTasks;    }  }  public static class HashReducer implements Reducer {    private MD5Hash prevHash = new MD5Hash();    public void configure(JobConf job) {}    public void close() {}    public void reduce(WritableComparable key, Iterator values,                       OutputCollector output, Reporter reporter)      throws IOException {      MD5Hash hash = ((HashScore)key).hash;      while (values.hasNext()) {        Writable value = (Writable)values.next();        if (hash.equals(prevHash)) {                // collect all but first          output.collect(key, value);        } else {          prevHash.set(hash);        }      }    }  }      private FileSystem fs;  public DeleteDuplicates() { super(null); }  public DeleteDuplicates(Configuration conf) { super(conf); }  public void configure(JobConf job) {    setConf(job);    try {      fs = FileSystem.get(job);    } catch (IOException e) {      throw new RuntimeException(e);    }  }  public void close() {}  /** Map [*,IndexDoc] pairs to [index,doc] pairs. */  public void map(WritableComparable key, Writable value,                  OutputCollector output, Reporter reporter)    throws IOException {    IndexDoc indexDoc = (IndexDoc)value;    output.collect(indexDoc.index, new IntWritable(indexDoc.doc));  }  /** Delete docs named in values from index named in key. */  public void reduce(WritableComparable key, Iterator values,                     OutputCollector output, Reporter reporter)    throws IOException {    Path index = new Path(key.toString());    IndexReader reader = IndexReader.open(new FsDirectory(fs, index, false, getConf()));    try {      while (values.hasNext()) {        reader.deleteDocument(((IntWritable)values.next()).get());      }    } finally {      reader.close();    }  }  /** Write nothing. */  public RecordWriter getRecordWriter(final FileSystem fs,                                      final JobConf job,                                      final String name,                                      final Progressable progress) throws IOException {    return new RecordWriter() {                           public void write(WritableComparable key, Writable value)          throws IOException {          throw new UnsupportedOperationException();        }                public void close(Reporter reporter) throws IOException {}      };  }  public void checkOutputSpecs(FileSystem fs, JobConf job) {}  public void dedup(Path[] indexDirs)    throws IOException {    if (LOG.isInfoEnabled()) { LOG.info("Dedup: starting"); }    Path hashDir =      new Path("dedup-hash-"+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));    JobConf job = new NutchJob(getConf());    for (int i = 0; i < indexDirs.length; i++) {      if (LOG.isInfoEnabled()) {        LOG.info("Dedup: adding indexes in: " + indexDirs[i]);      }      job.addInputPath(indexDirs[i]);    }    job.setJobName("dedup phase 1");    job.setInputKeyClass(HashScore.class);    job.setInputValueClass(IndexDoc.class);    job.setInputFormat(InputFormat.class);    job.setBoolean("mapred.speculative.execution", false);    job.setPartitionerClass(HashPartitioner.class);    job.setReducerClass(HashReducer.class);    job.setOutputPath(hashDir);    job.setOutputKeyClass(HashScore.class);    job.setOutputValueClass(IndexDoc.class);    job.setOutputFormat(SequenceFileOutputFormat.class);    JobClient.runJob(job);    job = new NutchJob(getConf());    job.setJobName("dedup phase 2");    job.addInputPath(hashDir);    job.setInputFormat(SequenceFileInputFormat.class);    job.setInputKeyClass(HashScore.class);    job.setInputValueClass(IndexDoc.class);    job.setInt("io.file.buffer.size", 4096);    job.setMapperClass(DeleteDuplicates.class);    job.setReducerClass(DeleteDuplicates.class);    job.setOutputFormat(DeleteDuplicates.class);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(IntWritable.class);    JobClient.runJob(job);    new JobClient(getConf()).getFs().delete(hashDir);    if (LOG.isInfoEnabled()) { LOG.info("Dedup: done"); }  }  public static void main(String[] args) throws Exception {    DeleteDuplicates dedup = new DeleteDuplicates(NutchConfiguration.create());        if (args.length < 1) {      System.err.println("Usage: <indexes> ...");      return;    }        Path[] indexes = new Path[args.length];    for (int i = 0; i < args.length; i++) {      indexes[i] = new Path(args[i]);    }    dedup.dedup(indexes);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -