⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 crawldbreader.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.crawl;import java.io.IOException;import java.util.Iterator;import java.util.Random;import java.util.TreeMap;// Commons Logging importsimport org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Closeable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.MapFile;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapFileOutputFormat;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileInputFormat;import org.apache.hadoop.mapred.SequenceFileOutputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.HashPartitioner;import org.apache.hadoop.mapred.lib.IdentityMapper;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.nutch.util.NutchConfiguration;import org.apache.nutch.util.NutchJob;/** * Read utility for the CrawlDB. *  * @author Andrzej Bialecki *  */public class CrawlDbReader implements Closeable {  public static final Log LOG = LogFactory.getLog(CrawlDbReader.class);    private MapFile.Reader[] readers = null;    private void openReaders(String crawlDb, Configuration config) throws IOException {    if (readers != null) return;    FileSystem fs = FileSystem.get(config);    readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDatum.DB_DIR_NAME), config);  }    private void closeReaders() {    if (readers == null) return;    for (int i = 0; i < readers.length; i++) {      try {        readers[i].close();      } catch (Exception e) {              }    }  }  public static class CrawlDbStatMapper implements Mapper {    LongWritable COUNT_1 = new LongWritable(1);    public void configure(JobConf job) {}    public void close() {}    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)            throws IOException {      CrawlDatum cd = (CrawlDatum) value;      output.collect(new UTF8("T"), COUNT_1);      output.collect(new UTF8("status " + cd.getStatus()), COUNT_1);      output.collect(new UTF8("retry " + cd.getRetriesSinceFetch()), COUNT_1);      output.collect(new UTF8("s"), new LongWritable((long) (cd.getScore() * 1000.0)));    }  }    public static class CrawlDbStatCombiner implements Reducer {    LongWritable val = new LongWritable();        public CrawlDbStatCombiner() { }    public void configure(JobConf job) { }    public void close() {}    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)        throws IOException {      val.set(0L);      String k = ((UTF8)key).toString();      if (!k.equals("s")) {        while (values.hasNext()) {          LongWritable cnt = (LongWritable)values.next();          val.set(val.get() + cnt.get());        }        output.collect(key, val);      } else {        long total = 0;        long min = Long.MAX_VALUE;        long max = Long.MIN_VALUE;        while (values.hasNext()) {          LongWritable cnt = (LongWritable)values.next();          if (cnt.get() < min) min = cnt.get();          if (cnt.get() > max) max = cnt.get();          total += cnt.get();        }        output.collect(new UTF8("scn"), new LongWritable(min));        output.collect(new UTF8("scx"), new LongWritable(max));        output.collect(new UTF8("sct"), new LongWritable(total));      }    }  }  public static class CrawlDbStatReducer implements Reducer {    public void configure(JobConf job) {}    public void close() {}    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)            throws IOException {      String k = ((UTF8) key).toString();      if (k.equals("T")) {        // sum all values for this key        long sum = 0;        while (values.hasNext()) {          sum += ((LongWritable) values.next()).get();        }        // output sum        output.collect(key, new LongWritable(sum));      } else if (k.startsWith("status") || k.startsWith("retry")) {        LongWritable cnt = new LongWritable();        while (values.hasNext()) {          LongWritable val = (LongWritable)values.next();          cnt.set(cnt.get() + val.get());        }        output.collect(key, cnt);      } else if (k.equals("scx")) {        LongWritable cnt = new LongWritable(Long.MIN_VALUE);        while (values.hasNext()) {          LongWritable val = (LongWritable)values.next();          if (cnt.get() < val.get()) cnt.set(val.get());        }        output.collect(key, cnt);      } else if (k.equals("scn")) {        LongWritable cnt = new LongWritable(Long.MAX_VALUE);        while (values.hasNext()) {          LongWritable val = (LongWritable)values.next();          if (cnt.get() > val.get()) cnt.set(val.get());        }        output.collect(key, cnt);      } else if (k.equals("sct")) {        LongWritable cnt = new LongWritable();        while (values.hasNext()) {          LongWritable val = (LongWritable)values.next();          cnt.set(cnt.get() + val.get());        }        output.collect(key, cnt);      }    }  }  public static class CrawlDbDumpReducer implements Reducer {    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {      while (values.hasNext()) {        output.collect(key, (Writable)values.next());      }    }    public void configure(JobConf job) {}    public void close() {}  }    public static class CrawlDbTopNMapper implements Mapper {    private static final FloatWritable fw = new FloatWritable();    private float min = 0.0f;        public void configure(JobConf job) {      long lmin = job.getLong("CrawlDbReader.topN.min", 0);      if (lmin != 0) {        min = (float)lmin / 1000000.0f;      }    }    public void close() {}    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter)            throws IOException {      CrawlDatum datum = (CrawlDatum)value;      if (datum.getScore() < min) return; // don't collect low-scoring records      fw.set(-datum.getScore()); // reverse sorting order      output.collect(fw, key); // invert mapping: score -> url    }  }    public static class CrawlDbTopNReducer implements Reducer {    private long topN;    private long count = 0L;        public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {      while (values.hasNext() && count < topN) {        FloatWritable fw = (FloatWritable)key;        fw.set(-fw.get());        output.collect(fw, (Writable)values.next());        count++;      }    }    public void configure(JobConf job) {      topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks();    }        public void close() {}  }  public void close() {    closeReaders();  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -