📄 fetcher.java
字号:
} finally { synchronized (Fetcher.this) {activeThreads--;} // count threads } } private void logError(UTF8 url, String message) { if (LOG.isInfoEnabled()) { LOG.info("fetch of " + url + " failed with: " + message); } synchronized (Fetcher.this) { // record failure errors++; } } private ParseStatus output(UTF8 key, CrawlDatum datum, Content content, int status) { datum.setStatus(status); datum.setFetchTime(System.currentTimeMillis()); if (content == null) { String url = key.toString(); content = new Content(url, url, new byte[0], "", new Metadata(), this.conf); } Metadata metadata = content.getMetadata(); // add segment to metadata metadata.set(SEGMENT_NAME_KEY, segmentName); // add score to content metadata so that ParseSegment can pick it up. try { scfilters.passScoreBeforeParsing(key, datum, content); } catch (Exception e) { if (LOG.isWarnEnabled()) { e.printStackTrace(LogUtil.getWarnStream(LOG)); LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); } } Parse parse = null; if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { ParseStatus parseStatus; try { parse = this.parseUtil.parse(content); parseStatus = parse.getData().getStatus(); } catch (Exception e) { parseStatus = new ParseStatus(e); } if (!parseStatus.isSuccess()) { if (LOG.isWarnEnabled()) { LOG.warn("Error parsing: " + key + ": " + parseStatus); } parse = parseStatus.getEmptyParse(getConf()); } // Calculate page signature. For non-parsing fetchers this will // be done in ParseSegment byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse); metadata.set(SIGNATURE_KEY, StringUtil.toHexString(signature)); datum.setSignature(signature); // Ensure segment name and score are in parseData metadata parse.getData().getContentMeta().set(SEGMENT_NAME_KEY, segmentName); parse.getData().getContentMeta().set(SIGNATURE_KEY, StringUtil.toHexString(signature)); try { scfilters.passScoreAfterParsing(key, content, parse); } catch (Exception e) { if (LOG.isWarnEnabled()) { e.printStackTrace(LogUtil.getWarnStream(LOG)); LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); } } } try { output.collect (key, new FetcherOutput(datum, storingContent ? content : null, parse != null ? new ParseImpl(parse) : null)); } catch (IOException e) { if (LOG.isFatalEnabled()) { e.printStackTrace(LogUtil.getFatalStream(LOG)); LOG.fatal("fetcher caught:"+e.toString()); } } if (parse != null) return parse.getData().getStatus(); else return null; } } public Fetcher() { super(null); } public Fetcher(Configuration conf) { super(conf); } private synchronized void updateStatus(int bytesInPage) throws IOException { pages++; bytes += bytesInPage; } private void reportStatus() throws IOException { String status; synchronized (this) { long elapsed = (System.currentTimeMillis() - start)/1000; status = pages+" pages, "+errors+" errors, " + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, " + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, "; } reporter.setStatus(status); } public void configure(JobConf job) { setConf(job); this.segmentName = job.get(SEGMENT_NAME_KEY); this.storingContent = isStoringContent(job); this.parsing = isParsing(job);// if (job.getBoolean("fetcher.verbose", false)) {// LOG.setLevel(Level.FINE);// } } public void close() {} public static boolean isParsing(Configuration conf) { return conf.getBoolean("fetcher.parse", true); } public static boolean isStoringContent(Configuration conf) { return conf.getBoolean("fetcher.store.content", true); } public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException { this.input = input; this.output = output; this.reporter = reporter; this.maxRedirect = getConf().getInt("http.redirect.max", 3); int threadCount = getConf().getInt("fetcher.threads.fetch", 10); if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); } for (int i = 0; i < threadCount; i++) { // spawn threads new FetcherThread(getConf()).start(); } // select a timeout that avoids a task timeout long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2; do { // wait for threads to exit try { Thread.sleep(1000); } catch (InterruptedException e) {} reportStatus(); // some requests seem to hang, despite all intentions synchronized (this) { if ((System.currentTimeMillis() - lastRequestStart) > timeout) { if (LOG.isWarnEnabled()) { LOG.warn("Aborting with "+activeThreads+" hung threads."); } return; } } } while (activeThreads > 0); } public void fetch(Path segment, int threads) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("Fetcher: starting"); LOG.info("Fetcher: segment: " + segment); } JobConf job = new NutchJob(getConf()); job.setJobName("fetch " + segment); job.setInt("fetcher.threads.fetch", threads); job.set(SEGMENT_NAME_KEY, segment.getName()); // for politeness, don't permit parallel execution of a single task job.setSpeculativeExecution(false); job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); job.setInputFormat(InputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(CrawlDatum.class); job.setMapRunnerClass(Fetcher.class); job.setOutputPath(segment); job.setOutputFormat(FetcherOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(FetcherOutput.class); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); } } /** Run the fetcher. */ public static void main(String[] args) throws Exception { String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]"; if (args.length < 1) { System.err.println(usage); System.exit(-1); } Path segment = new Path(args[0]); Configuration conf = NutchConfiguration.create(); int threads = conf.getInt("fetcher.threads.fetch", 10); boolean parsing = true; for (int i = 1; i < args.length; i++) { // parse command line if (args[i].equals("-threads")) { // found -threads option threads = Integer.parseInt(args[++i]); } else if (args[i].equals("-noParsing")) parsing = false; } conf.setInt("fetcher.threads.fetch", threads); if (!parsing) { conf.setBoolean("fetcher.parse", parsing); } Fetcher fetcher = new Fetcher(conf); // make a Fetcher fetcher.fetch(segment, threads); // run the Fetcher }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -