⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fetcher.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
      } finally {        synchronized (Fetcher.this) {activeThreads--;} // count threads      }    }    private void logError(UTF8 url, String message) {      if (LOG.isInfoEnabled()) {        LOG.info("fetch of " + url + " failed with: " + message);      }      synchronized (Fetcher.this) {               // record failure        errors++;      }    }    private ParseStatus output(UTF8 key, CrawlDatum datum,                        Content content, int status) {      datum.setStatus(status);      datum.setFetchTime(System.currentTimeMillis());      if (content == null) {        String url = key.toString();        content = new Content(url, url, new byte[0], "", new Metadata(), this.conf);      }      Metadata metadata = content.getMetadata();      // add segment to metadata      metadata.set(SEGMENT_NAME_KEY, segmentName);      // add score to content metadata so that ParseSegment can pick it up.      try {        scfilters.passScoreBeforeParsing(key, datum, content);      } catch (Exception e) {        if (LOG.isWarnEnabled()) {          e.printStackTrace(LogUtil.getWarnStream(LOG));          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");        }      }      Parse parse = null;      if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {        ParseStatus parseStatus;        try {          parse = this.parseUtil.parse(content);          parseStatus = parse.getData().getStatus();        } catch (Exception e) {          parseStatus = new ParseStatus(e);        }        if (!parseStatus.isSuccess()) {          if (LOG.isWarnEnabled()) {            LOG.warn("Error parsing: " + key + ": " + parseStatus);          }          parse = parseStatus.getEmptyParse(getConf());        }        // Calculate page signature. For non-parsing fetchers this will        // be done in ParseSegment        byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse);        metadata.set(SIGNATURE_KEY, StringUtil.toHexString(signature));        datum.setSignature(signature);        // Ensure segment name and score are in parseData metadata        parse.getData().getContentMeta().set(SEGMENT_NAME_KEY, segmentName);        parse.getData().getContentMeta().set(SIGNATURE_KEY, StringUtil.toHexString(signature));        try {          scfilters.passScoreAfterParsing(key, content, parse);        } catch (Exception e) {          if (LOG.isWarnEnabled()) {            e.printStackTrace(LogUtil.getWarnStream(LOG));            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");          }        }              }      try {        output.collect          (key,           new FetcherOutput(datum,                             storingContent ? content : null,                             parse != null ? new ParseImpl(parse) : null));      } catch (IOException e) {        if (LOG.isFatalEnabled()) {          e.printStackTrace(LogUtil.getFatalStream(LOG));          LOG.fatal("fetcher caught:"+e.toString());        }      }      if (parse != null) return parse.getData().getStatus();      else return null;    }      }  public Fetcher() { super(null); }  public Fetcher(Configuration conf) { super(conf); }  private synchronized void updateStatus(int bytesInPage) throws IOException {    pages++;    bytes += bytesInPage;  }  private void reportStatus() throws IOException {    String status;    synchronized (this) {      long elapsed = (System.currentTimeMillis() - start)/1000;      status =         pages+" pages, "+errors+" errors, "        + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "        + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";    }    reporter.setStatus(status);  }  public void configure(JobConf job) {    setConf(job);    this.segmentName = job.get(SEGMENT_NAME_KEY);    this.storingContent = isStoringContent(job);    this.parsing = isParsing(job);//    if (job.getBoolean("fetcher.verbose", false)) {//      LOG.setLevel(Level.FINE);//    }  }  public void close() {}  public static boolean isParsing(Configuration conf) {    return conf.getBoolean("fetcher.parse", true);  }  public static boolean isStoringContent(Configuration conf) {    return conf.getBoolean("fetcher.store.content", true);  }  public void run(RecordReader input, OutputCollector output,                  Reporter reporter) throws IOException {    this.input = input;    this.output = output;    this.reporter = reporter;    this.maxRedirect = getConf().getInt("http.redirect.max", 3);        int threadCount = getConf().getInt("fetcher.threads.fetch", 10);    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }    for (int i = 0; i < threadCount; i++) {       // spawn threads      new FetcherThread(getConf()).start();    }    // select a timeout that avoids a task timeout    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;    do {                                          // wait for threads to exit      try {        Thread.sleep(1000);      } catch (InterruptedException e) {}      reportStatus();      // some requests seem to hang, despite all intentions      synchronized (this) {        if ((System.currentTimeMillis() - lastRequestStart) > timeout) {          if (LOG.isWarnEnabled()) {            LOG.warn("Aborting with "+activeThreads+" hung threads.");          }          return;        }      }    } while (activeThreads > 0);      }  public void fetch(Path segment, int threads)    throws IOException {    if (LOG.isInfoEnabled()) {      LOG.info("Fetcher: starting");      LOG.info("Fetcher: segment: " + segment);    }    JobConf job = new NutchJob(getConf());    job.setJobName("fetch " + segment);    job.setInt("fetcher.threads.fetch", threads);    job.set(SEGMENT_NAME_KEY, segment.getName());    // for politeness, don't permit parallel execution of a single task    job.setSpeculativeExecution(false);    job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));    job.setInputFormat(InputFormat.class);    job.setInputKeyClass(UTF8.class);    job.setInputValueClass(CrawlDatum.class);    job.setMapRunnerClass(Fetcher.class);    job.setOutputPath(segment);    job.setOutputFormat(FetcherOutputFormat.class);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(FetcherOutput.class);    JobClient.runJob(job);    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }  }  /** Run the fetcher. */  public static void main(String[] args) throws Exception {    String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";    if (args.length < 1) {      System.err.println(usage);      System.exit(-1);    }          Path segment = new Path(args[0]);    Configuration conf = NutchConfiguration.create();    int threads = conf.getInt("fetcher.threads.fetch", 10);    boolean parsing = true;    for (int i = 1; i < args.length; i++) {       // parse command line      if (args[i].equals("-threads")) {           // found -threads option        threads =  Integer.parseInt(args[++i]);      } else if (args[i].equals("-noParsing")) parsing = false;    }    conf.setInt("fetcher.threads.fetch", threads);    if (!parsing) {      conf.setBoolean("fetcher.parse", parsing);    }    Fetcher fetcher = new Fetcher(conf);          // make a Fetcher        fetcher.fetch(segment, threads);              // run the Fetcher  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -