📄 crawldbreader.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.crawl;import java.io.IOException;import java.util.Iterator;import java.util.Random;import java.util.TreeMap;// Commons Logging importsimport org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Closeable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.MapFile;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.UTF8;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapFileOutputFormat;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileInputFormat;import org.apache.hadoop.mapred.SequenceFileOutputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.HashPartitioner;import org.apache.hadoop.mapred.lib.IdentityMapper;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.nutch.util.NutchConfiguration;import org.apache.nutch.util.NutchJob;/** * Read utility for the CrawlDB. * * @author Andrzej Bialecki * */public class CrawlDbReader implements Closeable { public static final Log LOG = LogFactory.getLog(CrawlDbReader.class); private MapFile.Reader[] readers = null; private void openReaders(String crawlDb, Configuration config) throws IOException { if (readers != null) return; FileSystem fs = FileSystem.get(config); readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDatum.DB_DIR_NAME), config); } private void closeReaders() { if (readers == null) return; for (int i = 0; i < readers.length; i++) { try { readers[i].close(); } catch (Exception e) { } } } public static class CrawlDbStatMapper implements Mapper { LongWritable COUNT_1 = new LongWritable(1); public void configure(JobConf job) {} public void close() {} public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum cd = (CrawlDatum) value; output.collect(new UTF8("T"), COUNT_1); output.collect(new UTF8("status " + cd.getStatus()), COUNT_1); output.collect(new UTF8("retry " + cd.getRetriesSinceFetch()), COUNT_1); output.collect(new UTF8("s"), new LongWritable((long) (cd.getScore() * 1000.0))); } } public static class CrawlDbStatCombiner implements Reducer { LongWritable val = new LongWritable(); public CrawlDbStatCombiner() { } public void configure(JobConf job) { } public void close() {} public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { val.set(0L); String k = ((UTF8)key).toString(); if (!k.equals("s")) { while (values.hasNext()) { LongWritable cnt = (LongWritable)values.next(); val.set(val.get() + cnt.get()); } output.collect(key, val); } else { long total = 0; long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; while (values.hasNext()) { LongWritable cnt = (LongWritable)values.next(); if (cnt.get() < min) min = cnt.get(); if (cnt.get() > max) max = cnt.get(); total += cnt.get(); } output.collect(new UTF8("scn"), new LongWritable(min)); output.collect(new UTF8("scx"), new LongWritable(max)); output.collect(new UTF8("sct"), new LongWritable(total)); } } } public static class CrawlDbStatReducer implements Reducer { public void configure(JobConf job) {} public void close() {} public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { String k = ((UTF8) key).toString(); if (k.equals("T")) { // sum all values for this key long sum = 0; while (values.hasNext()) { sum += ((LongWritable) values.next()).get(); } // output sum output.collect(key, new LongWritable(sum)); } else if (k.startsWith("status") || k.startsWith("retry")) { LongWritable cnt = new LongWritable(); while (values.hasNext()) { LongWritable val = (LongWritable)values.next(); cnt.set(cnt.get() + val.get()); } output.collect(key, cnt); } else if (k.equals("scx")) { LongWritable cnt = new LongWritable(Long.MIN_VALUE); while (values.hasNext()) { LongWritable val = (LongWritable)values.next(); if (cnt.get() < val.get()) cnt.set(val.get()); } output.collect(key, cnt); } else if (k.equals("scn")) { LongWritable cnt = new LongWritable(Long.MAX_VALUE); while (values.hasNext()) { LongWritable val = (LongWritable)values.next(); if (cnt.get() > val.get()) cnt.set(val.get()); } output.collect(key, cnt); } else if (k.equals("sct")) { LongWritable cnt = new LongWritable(); while (values.hasNext()) { LongWritable val = (LongWritable)values.next(); cnt.set(cnt.get() + val.get()); } output.collect(key, cnt); } } } public static class CrawlDbDumpReducer implements Reducer { public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(key, (Writable)values.next()); } } public void configure(JobConf job) {} public void close() {} } public static class CrawlDbTopNMapper implements Mapper { private static final FloatWritable fw = new FloatWritable(); private float min = 0.0f; public void configure(JobConf job) { long lmin = job.getLong("CrawlDbReader.topN.min", 0); if (lmin != 0) { min = (float)lmin / 1000000.0f; } } public void close() {} public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { CrawlDatum datum = (CrawlDatum)value; if (datum.getScore() < min) return; // don't collect low-scoring records fw.set(-datum.getScore()); // reverse sorting order output.collect(fw, key); // invert mapping: score -> url } } public static class CrawlDbTopNReducer implements Reducer { private long topN; private long count = 0L; public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { while (values.hasNext() && count < topN) { FloatWritable fw = (FloatWritable)key; fw.set(-fw.get()); output.collect(fw, (Writable)values.next()); count++; } } public void configure(JobConf job) { topN = job.getLong("CrawlDbReader.topN", 100) / job.getNumReduceTasks(); } public void close() {} } public void close() { closeReaders(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -