⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fetcher.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.fetcher;import java.io.IOException;// Commons Logging importsimport org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.io.*;import org.apache.hadoop.fs.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.mapred.*;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.crawl.SignatureFactory;import org.apache.nutch.metadata.Metadata;import org.apache.nutch.net.*;import org.apache.nutch.protocol.*;import org.apache.nutch.parse.*;import org.apache.nutch.scoring.ScoringFilters;import org.apache.nutch.util.*;/** The fetcher. Most of the work is done by plugins. */public class Fetcher extends Configured implements MapRunnable {   public static final Log LOG = LogFactory.getLog(Fetcher.class);    public static final String SIGNATURE_KEY = "nutch.content.digest";  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";  public static final String SCORE_KEY = "nutch.crawl.score";  public static class InputFormat extends SequenceFileInputFormat {    /** Don't split inputs, to keep things polite. */    public FileSplit[] getSplits(FileSystem fs, JobConf job, int nSplits)      throws IOException {      Path[] files = listPaths(fs, job);      FileSplit[] splits = new FileSplit[files.length];      for (int i = 0; i < files.length; i++) {        splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]));      }      return splits;    }  }  private RecordReader input;  private OutputCollector output;  private Reporter reporter;  private String segmentName;  private int activeThreads;  private int maxRedirect;  private long start = System.currentTimeMillis(); // start time of fetcher run  private long lastRequestStart = start;  private long bytes;                             // total bytes fetched  private int pages;                              // total pages fetched  private int errors;                             // total pages errored  private boolean storingContent;  private boolean parsing;  private class FetcherThread extends Thread {    private Configuration conf;    private URLFilters urlFilters;    private ScoringFilters scfilters;    private ParseUtil parseUtil;    private UrlNormalizer normalizer;    private ProtocolFactory protocolFactory;    public FetcherThread(Configuration conf) {      this.setDaemon(true);                       // don't hang JVM on exit      this.setName("FetcherThread");              // use an informative name      this.conf = conf;      this.urlFilters = new URLFilters(conf);      this.scfilters = new ScoringFilters(conf);      this.parseUtil = new ParseUtil(conf);      this.protocolFactory = new ProtocolFactory(conf);      this.normalizer = new UrlNormalizerFactory(conf).getNormalizer();    }    public void run() {      synchronized (Fetcher.this) {activeThreads++;} // count threads            try {        UTF8 key = new UTF8();        CrawlDatum datum = new CrawlDatum();                while (true) {          // TODO : NUTCH-258 ...          // If something bad happened, then exit          // if (conf.getBoolean("fetcher.exit", false)) {          //   break;          // ]                    try {                                   // get next entry from input            if (!input.next(key, datum)) {              break;                              // at eof, exit            }          } catch (IOException e) {            if (LOG.isFatalEnabled()) {              e.printStackTrace(LogUtil.getFatalStream(LOG));              LOG.fatal("fetcher caught:"+e.toString());            }            break;          }          synchronized (Fetcher.this) {            lastRequestStart = System.currentTimeMillis();          }          // url may be changed through redirects.          UTF8 url = new UTF8();          url.set(key);          try {            if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); }            // fetch the page            boolean redirecting;            int redirectCount = 0;            do {              if (LOG.isDebugEnabled()) {                LOG.debug("redirectCount=" + redirectCount);              }              redirecting = false;              Protocol protocol = this.protocolFactory.getProtocol(url.toString());              ProtocolOutput output = protocol.getProtocolOutput(url, datum);              ProtocolStatus status = output.getStatus();              Content content = output.getContent();              ParseStatus pstatus = null;              switch(status.getCode()) {              case ProtocolStatus.SUCCESS:        // got a page                pstatus = output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS);                updateStatus(content.getContent().length);                if (pstatus != null && pstatus.isSuccess() &&                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {                  String newUrl = pstatus.getMessage();                  newUrl = normalizer.normalize(newUrl);                  newUrl = this.urlFilters.filter(newUrl);                  if (newUrl != null && !newUrl.equals(url.toString())) {                    url = new UTF8(newUrl);                    redirecting = true;                    redirectCount++;                    if (LOG.isDebugEnabled()) {                      LOG.debug(" - content redirect to " + url);                    }                  } else if (LOG.isDebugEnabled()) {                    LOG.debug(" - content redirect skipped: " +                             (newUrl != null ? "to same url" : "filtered"));                  }                }                break;              case ProtocolStatus.MOVED:         // redirect              case ProtocolStatus.TEMP_MOVED:                String newUrl = status.getMessage();                newUrl = normalizer.normalize(newUrl);                newUrl = this.urlFilters.filter(newUrl);                if (newUrl != null && !newUrl.equals(url.toString())) {                  url = new UTF8(newUrl);                  redirecting = true;                  redirectCount++;                  if (LOG.isDebugEnabled()) {                    LOG.debug(" - protocol redirect to " + url);                  }                } else if (LOG.isDebugEnabled()) {                  LOG.debug(" - protocol redirect skipped: " +                           (newUrl != null ? "to same url" : "filtered"));                }                break;              // failures - increase the retry counter              case ProtocolStatus.EXCEPTION:                logError(url, status.getMessage());              /* FALLTHROUGH */              case ProtocolStatus.RETRY:          // retry                datum.setRetriesSinceFetch(datum.getRetriesSinceFetch()+1);              /* FALLTHROUGH */              // intermittent blocking - retry without increasing the counter              case ProtocolStatus.WOULDBLOCK:              case ProtocolStatus.BLOCKED:                output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);                break;                              // permanent failures              case ProtocolStatus.GONE:           // gone              case ProtocolStatus.NOTFOUND:              case ProtocolStatus.ACCESS_DENIED:              case ProtocolStatus.ROBOTS_DENIED:              case ProtocolStatus.NOTMODIFIED:                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);                break;              default:                if (LOG.isWarnEnabled()) {                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());                }                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);              }              if (redirecting && redirectCount >= maxRedirect) {                if (LOG.isInfoEnabled()) {                  LOG.info(" - redirect count exceeded " + url);                }                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);              }            } while (redirecting && (redirectCount < maxRedirect));                      } catch (Throwable t) {                 // unexpected exception            logError(url, t.toString());            output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);                      }        }      } catch (Throwable e) {        if (LOG.isFatalEnabled()) {          e.printStackTrace(LogUtil.getFatalStream(LOG));          LOG.fatal("fetcher caught:"+e.toString());        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -