📄 abstractfrontier.java
字号:
} catch (AttributeNotFoundException e) { logger.log(Level.SEVERE, "Failed to get logs directory", e); } if (logsDisk != null) { String logsPath = logsDisk.getAbsolutePath() + File.separatorChar; if (((Boolean)getUncheckedAttribute(null, ATTR_RECOVERY_ENABLED)) .booleanValue()) { this.recover = new RecoveryJournal(logsPath, FrontierJournal.LOGNAME_RECOVER); } } try { final Class qapClass = Class.forName((String)getUncheckedAttribute( null, ATTR_QUEUE_ASSIGNMENT_POLICY)); queueAssignmentPolicy = (QueueAssignmentPolicy)qapClass.newInstance(); } catch (Exception e) { logger.log(Level.SEVERE, "Bad queue assignment policy class", e); throw new FatalConfigurationException(e.getMessage()); } } synchronized public void terminate() { shouldTerminate = true; if (this.recover != null) { this.recover.close(); this.recover = null; } unpause(); } protected void doJournalFinishedSuccess(CrawlURI c) { if (this.recover != null) { this.recover.finishedSuccess(c); } } protected void doJournalAdded(CrawlURI c) { if (this.recover != null) { this.recover.added(c); } } protected void doJournalRescheduled(CrawlURI c) { if (this.recover != null) { this.recover.rescheduled(c); } } protected void doJournalFinishedFailure(CrawlURI c) { if (this.recover != null) { this.recover.finishedFailure(c); } } protected void doJournalEmitted(CrawlURI c) { if (this.recover != null) { this.recover.emitted(c); } } /** * Frontier is empty only if all queues are empty and no URIs are in-process * * @return True if queues are empty. */ public synchronized boolean isEmpty() { return queuedUriCount == 0; } /** * Increment the running count of queued URIs. Synchronized because * operations on longs are not atomic. */ protected synchronized void incrementQueuedUriCount() { queuedUriCount++; } /** * Increment the running count of queued URIs. Synchronized because * operations on longs are not atomic. * * @param increment * amount to increment the queued count */ protected synchronized void incrementQueuedUriCount(long increment) { queuedUriCount += increment; } /** * Note that a number of queued Uris have been deleted. * * @param numberOfDeletes */ protected synchronized void decrementQueuedCount(long numberOfDeletes) { queuedUriCount -= numberOfDeletes; } /** * (non-Javadoc) * * @see org.archive.crawler.framework.Frontier#queuedUriCount() */ public long queuedUriCount() { return queuedUriCount; } /** * (non-Javadoc) * * @see org.archive.crawler.framework.Frontier#finishedUriCount() */ public long finishedUriCount() { return succeededFetchCount + failedFetchCount + disregardedUriCount; } /** * Increment the running count of successfully fetched URIs. Synchronized * because operations on longs are not atomic. */ protected synchronized void incrementSucceededFetchCount() { succeededFetchCount++; } /** * (non-Javadoc) * * @see org.archive.crawler.framework.Frontier#succeededFetchCount() */ public long succeededFetchCount() { return succeededFetchCount; } /** * Increment the running count of failed URIs. Synchronized because * operations on longs are not atomic. */ protected synchronized void incrementFailedFetchCount() { failedFetchCount++; } /** * (non-Javadoc) * * @see org.archive.crawler.framework.Frontier#failedFetchCount() */ public long failedFetchCount() { return failedFetchCount; } /** * Increment the running count of disregarded URIs. Synchronized because * operations on longs are not atomic. */ protected synchronized void incrementDisregardedUriCount() { disregardedUriCount++; } public long disregardedUriCount() { return disregardedUriCount; } /** @deprecated misnomer; use StatisticsTracking figures instead */ public long totalBytesWritten() { return totalProcessedBytes; } /** * Load up the seeds. * * This method is called on initialize and inside in the crawlcontroller * when it wants to force reloading of configuration. * * @see org.archive.crawler.framework.CrawlController#kickUpdate() */ public void loadSeeds() { Writer ignoredWriter = new StringWriter(); logger.info("beginning"); // Get the seeds to refresh. Iterator iter = this.controller.getScope().seedsIterator(ignoredWriter); int count = 0; while (iter.hasNext()) { UURI u = (UURI)iter.next(); CandidateURI caUri = CandidateURI.createSeedCandidateURI(u); caUri.setSchedulingDirective(CandidateURI.MEDIUM); if (((Boolean)getUncheckedAttribute(null, ATTR_SOURCE_TAG_SEEDS)) .booleanValue()) { caUri.putString(CoreAttributeConstants.A_SOURCE_TAG,caUri.toString()); caUri.makeHeritable(CoreAttributeConstants.A_SOURCE_TAG); } schedule(caUri); count++; if(count%1000==0) { logger.info(count+" seeds"); } } // save ignored items (if any) where they can be consulted later saveIgnoredItems(ignoredWriter.toString(), controller.getDisk()); logger.info("finished"); } /** * Dump ignored seed items (if any) to disk; delete file otherwise. * Static to allow non-derived sibling classes (frontiers not yet * subclassed here) to reuse. * * @param ignoredItems * @param dir */ public static void saveIgnoredItems(String ignoredItems, File dir) { File ignoredFile = new File(dir, IGNORED_SEEDS_FILENAME); if(ignoredItems==null | ignoredItems.length()>0) { try { BufferedWriter bw = new BufferedWriter(new FileWriter(ignoredFile)); bw.write(ignoredItems); bw.close(); } catch (IOException e) { // TODO make an alert? e.printStackTrace(); } } else { // delete any older file (if any) ignoredFile.delete(); } } protected CrawlURI asCrawlUri(CandidateURI caUri) { CrawlURI curi; if (caUri instanceof CrawlURI) { curi = (CrawlURI)caUri; } else { curi = CrawlURI.from(caUri, nextOrdinal.getAndIncrement()); } curi.setClassKey(getClassKey(curi)); return curi; } /** * @param now * @throws InterruptedException * @throws EndedException */ protected synchronized void preNext(long now) throws InterruptedException, EndedException { if (this.controller == null) { return; } // Check completion conditions if (this.controller.atFinish()) { if (((Boolean)getUncheckedAttribute(null, ATTR_PAUSE_AT_FINISH)) .booleanValue()) { this.controller.requestCrawlPause(); } else { this.controller.beginCrawlStop(); } } // enforce operator pause if (shouldPause) { while (shouldPause) { this.controller.toePaused(); wait(); } // exitted pause; possibly finish regardless of pause-at-finish if (controller != null && controller.atFinish()) { this.controller.beginCrawlStop(); } } // enforce operator terminate or thread retirement if (shouldTerminate || ((ToeThread)Thread.currentThread()).shouldRetire()) { throw new EndedException("terminated"); } enforceBandwidthThrottle(now); } /** * Perform any special handling of the CrawlURI, such as promoting its URI * to seed-status, or preferencing it because it is an embed. * * @param curi */ protected void applySpecialHandling(CrawlURI curi) { if (curi.isSeed() && curi.getVia() != null && curi.flattenVia().length() > 0) { // The only way a seed can have a non-empty via is if it is the // result of a seed redirect. Add it to the seeds list. // // This is a feature. This is handling for case where a seed // gets immediately redirected to another page. What we're doing is // treating the immediate redirect target as a seed. this.controller.getScope().addSeed(curi); // And it needs rapid scheduling. if (curi.getSchedulingDirective() == CandidateURI.NORMAL) curi.setSchedulingDirective(CandidateURI.MEDIUM); } // optionally preferencing embeds up to MEDIUM int prefHops = ((Integer)getUncheckedAttribute(curi, ATTR_PREFERENCE_EMBED_HOPS)).intValue(); if (prefHops > 0) { int embedHops = curi.getTransHops(); if (embedHops > 0 && embedHops <= prefHops && curi.getSchedulingDirective() == CandidateURI.NORMAL) { // number of embed hops falls within the preferenced range, and // uri is not already MEDIUM -- so promote it curi.setSchedulingDirective(CandidateURI.MEDIUM); } } } /** * Perform fixups on a CrawlURI about to be returned via next(). * * @param curi * CrawlURI about to be returned by next() * @param q * the queue from which the CrawlURI came */ protected void noteAboutToEmit(CrawlURI curi, WorkQueue q) { curi.setHolder(q); // if (curi.getServer() == null) { // // TODO: perhaps short-circuit the emit here, // // because URI will be rejected as unfetchable // } doJournalEmitted(curi); } /** * @param curi * @return the CrawlServer to be associated with this CrawlURI */ protected CrawlServer getServer(CrawlURI curi) { return this.controller.getServerCache().getServerFor(curi); } /** * Return a suitable value to wait before retrying the given URI. * * @param curi * CrawlURI to be retried * @return millisecond delay before retry */ protected long retryDelayFor(CrawlURI curi) { int status = curi.getFetchStatus(); return (status == S_CONNECT_FAILED || status == S_CONNECT_LOST || status == S_DOMAIN_UNRESOLVABLE)? ((Long)getUncheckedAttribute(curi, ATTR_RETRY_DELAY)).longValue(): 0; // no delay for most } /** * Update any scheduling structures with the new information in this * CrawlURI. Chiefly means make necessary arrangements for no other URIs at * the same host to be visited within the appropriate politeness window. *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -