📄 fetcher.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.fetcher;import java.io.IOException;// Commons Logging importsimport org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.*;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.crawl.SignatureFactory;import org.apache.nutch.metadata.Metadata;import org.apache.nutch.net.*;import org.apache.nutch.protocol.*;import org.apache.nutch.parse.*;import org.apache.nutch.scoring.ScoringFilters;import org.apache.nutch.util.*;/** The fetcher. Most of the work is done by plugins. */public class Fetcher extends Configured implements MapRunnable { public static final Log LOG = LogFactory.getLog(Fetcher.class); public static final String SIGNATURE_KEY = "nutch.content.digest"; public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; public static final String SCORE_KEY = "nutch.crawl.score"; public static class InputFormat extends SequenceFileInputFormat { /** Don't split inputs, to keep things polite. */ public FileSplit[] getSplits(FileSystem fs, JobConf job, int nSplits) throws IOException { Path[] files = listPaths(fs, job); FileSplit[] splits = new FileSplit[files.length]; for (int i = 0; i < files.length; i++) { splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i])); } return splits; } } private RecordReader input; private OutputCollector output; private Reporter reporter; private String segmentName; private int activeThreads; private int maxRedirect; private long start = System.currentTimeMillis(); // start time of fetcher run private long lastRequestStart = start; private long bytes; // total bytes fetched private int pages; // total pages fetched private int errors; // total pages errored private boolean storingContent; private boolean parsing; private class FetcherThread extends Thread { private Configuration conf; private URLFilters urlFilters; private ScoringFilters scfilters; private ParseUtil parseUtil; private UrlNormalizer normalizer; private ProtocolFactory protocolFactory; public FetcherThread(Configuration conf) { this.setDaemon(true); // don't hang JVM on exit this.setName("FetcherThread"); // use an informative name this.conf = conf; this.urlFilters = new URLFilters(conf); this.scfilters = new ScoringFilters(conf); this.parseUtil = new ParseUtil(conf); this.protocolFactory = new ProtocolFactory(conf); this.normalizer = new UrlNormalizerFactory(conf).getNormalizer(); } public void run() { synchronized (Fetcher.this) {activeThreads++;} // count threads try { UTF8 key = new UTF8(); CrawlDatum datum = new CrawlDatum(); while (true) { // TODO : NUTCH-258 ... // If something bad happened, then exit // if (conf.getBoolean("fetcher.exit", false)) { // break; // ] try { // get next entry from input if (!input.next(key, datum)) { break; // at eof, exit } } catch (IOException e) { if (LOG.isFatalEnabled()) { e.printStackTrace(LogUtil.getFatalStream(LOG)); LOG.fatal("fetcher caught:"+e.toString()); } break; } synchronized (Fetcher.this) { lastRequestStart = System.currentTimeMillis(); } // url may be changed through redirects. UTF8 url = new UTF8(); url.set(key); try { if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); } // fetch the page boolean redirecting; int redirectCount = 0; do { if (LOG.isDebugEnabled()) { LOG.debug("redirectCount=" + redirectCount); } redirecting = false; Protocol protocol = this.protocolFactory.getProtocol(url.toString()); ProtocolOutput output = protocol.getProtocolOutput(url, datum); ProtocolStatus status = output.getStatus(); Content content = output.getContent(); ParseStatus pstatus = null; switch(status.getCode()) { case ProtocolStatus.SUCCESS: // got a page pstatus = output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS); updateStatus(content.getContent().length); if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { String newUrl = pstatus.getMessage(); newUrl = normalizer.normalize(newUrl); newUrl = this.urlFilters.filter(newUrl); if (newUrl != null && !newUrl.equals(url.toString())) { url = new UTF8(newUrl); redirecting = true; redirectCount++; if (LOG.isDebugEnabled()) { LOG.debug(" - content redirect to " + url); } } else if (LOG.isDebugEnabled()) { LOG.debug(" - content redirect skipped: " + (newUrl != null ? "to same url" : "filtered")); } } break; case ProtocolStatus.MOVED: // redirect case ProtocolStatus.TEMP_MOVED: String newUrl = status.getMessage(); newUrl = normalizer.normalize(newUrl); newUrl = this.urlFilters.filter(newUrl); if (newUrl != null && !newUrl.equals(url.toString())) { url = new UTF8(newUrl); redirecting = true; redirectCount++; if (LOG.isDebugEnabled()) { LOG.debug(" - protocol redirect to " + url); } } else if (LOG.isDebugEnabled()) { LOG.debug(" - protocol redirect skipped: " + (newUrl != null ? "to same url" : "filtered")); } break; // failures - increase the retry counter case ProtocolStatus.EXCEPTION: logError(url, status.getMessage()); /* FALLTHROUGH */ case ProtocolStatus.RETRY: // retry datum.setRetriesSinceFetch(datum.getRetriesSinceFetch()+1); /* FALLTHROUGH */ // intermittent blocking - retry without increasing the counter case ProtocolStatus.WOULDBLOCK: case ProtocolStatus.BLOCKED: output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY); break; // permanent failures case ProtocolStatus.GONE: // gone case ProtocolStatus.NOTFOUND: case ProtocolStatus.ACCESS_DENIED: case ProtocolStatus.ROBOTS_DENIED: case ProtocolStatus.NOTMODIFIED: output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); break; default: if (LOG.isWarnEnabled()) { LOG.warn("Unknown ProtocolStatus: " + status.getCode()); } output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); } if (redirecting && redirectCount >= maxRedirect) { if (LOG.isInfoEnabled()) { LOG.info(" - redirect count exceeded " + url); } output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE); } } while (redirecting && (redirectCount < maxRedirect)); } catch (Throwable t) { // unexpected exception logError(url, t.toString()); output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY); } } } catch (Throwable e) { if (LOG.isFatalEnabled()) { e.printStackTrace(LogUtil.getFatalStream(LOG)); LOG.fatal("fetcher caught:"+e.toString()); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -