📄 adaptiverevisitfrontier.java
字号:
/** * Canonicalize passed uuri. Its would be sweeter if this canonicalize * function was encapsulated by that which it canonicalizes but because * settings change with context -- i.e. there may be overrides in operation * for a particular URI -- its not so easy; Each CandidateURI would need a * reference to the settings system. That's awkward to pass in. * * @param uuri Candidate URI to canonicalize. * @return Canonicalized version of passed <code>uuri</code>. */ protected String canonicalize(UURI uuri) { return Canonicalizer.canonicalize(uuri, this.controller.getOrder()); } /** * Canonicalize passed CandidateURI. This method differs from * {@link #canonicalize(UURI)} in that it takes a look at * the CandidateURI context possibly overriding any canonicalization effect if * it could make us miss content. If canonicalization produces an URL that * was 'alreadyseen', but the entry in the 'alreadyseen' database did * nothing but redirect to the current URL, we won't get the current URL; * we'll think we've already see it. Examples would be archive.org * redirecting to www.archive.org or the inverse, www.netarkivet.net * redirecting to netarkivet.net (assuming stripWWW rule enabled). * <p>Note, this method under circumstance sets the forceFetch flag. * * @param cauri CandidateURI to examine. * @return Canonicalized <code>cacuri</code>. */ protected String canonicalize(CandidateURI cauri) { String canon = canonicalize(cauri.getUURI()); if (cauri.isLocation()) { // If the via is not the same as where we're being redirected (i.e. // we're not being redirected back to the same page, AND the // canonicalization of the via is equal to the the current cauri, // THEN forcefetch (Forcefetch so no chance of our not crawling // content because alreadyseen check things its seen the url before. // An example of an URL that redirects to itself is: // http://bridalelegance.com/images/buttons3/tuxedos-off.gif. // An example of an URL whose canonicalization equals its via's // canonicalization, and we want to fetch content at the // redirection (i.e. need to set forcefetch), is netarkivet.dk. if (!cauri.toString().equals(cauri.getVia().toString()) && canonicalize(cauri.getVia()).equals(canon)) { cauri.setForceFetch(true); } } return canon; } /** * * @param caUri The URI to schedule. */ protected void innerSchedule(CandidateURI caUri) { CrawlURI curi; if(caUri instanceof CrawlURI) { curi = (CrawlURI) caUri; } else { curi = CrawlURI.from(caUri,System.currentTimeMillis()); curi.putLong(A_TIME_OF_NEXT_PROCESSING, System.currentTimeMillis()); // New CrawlURIs get 'current time' as the time of next processing. } if(curi.getClassKey() == null){ curi.setClassKey(getClassKey(curi)); } if(curi.isSeed() && curi.getVia() != null && curi.flattenVia().length() > 0) { // The only way a seed can have a non-empty via is if it is the // result of a seed redirect. Add it to the seeds list. // // This is a feature. This is handling for case where a seed // gets immediately redirected to another page. What we're doing // is treating the immediate redirect target as a seed. this.controller.getScope().addSeed(curi); // And it needs rapid scheduling. curi.setSchedulingDirective(CandidateURI.MEDIUM); } // Optionally preferencing embeds up to MEDIUM int prefHops = ((Integer) getUncheckedAttribute(curi, ATTR_PREFERENCE_EMBED_HOPS)).intValue(); boolean prefEmbed = false; if (prefHops > 0) { int embedHops = curi.getTransHops(); if (embedHops > 0 && embedHops <= prefHops && curi.getSchedulingDirective() == CandidateURI.NORMAL) { // number of embed hops falls within the preferenced range, and // uri is not already MEDIUM -- so promote it curi.setSchedulingDirective(CandidateURI.MEDIUM); prefEmbed = true; } } // Finally, allow curi to be fetched right now // (while not overriding overdue items) curi.putLong(A_TIME_OF_NEXT_PROCESSING, System.currentTimeMillis()); try { logger.finest("scheduling " + curi.toString()); AdaptiveRevisitHostQueue hq = getHQ(curi); hq.add(curi,prefEmbed); } catch (IOException e) { // TODO Handle IOExceptions e.printStackTrace(); } } /** * Get the AdaptiveRevisitHostQueue for the given CrawlURI, creating * it if necessary. * * @param curi CrawlURI for which to get a queue * @return AdaptiveRevisitHostQueue for given CrawlURI * @throws IOException */ protected AdaptiveRevisitHostQueue getHQ(CrawlURI curi) throws IOException { AdaptiveRevisitHostQueue hq = hostQueues.getHQ(curi.getClassKey()); if(hq == null){ // Need to create it. int valence = DEFAULT_HOST_VALENCE.intValue(); try { valence = ((Integer)getAttribute(curi,ATTR_HOST_VALENCE)).intValue(); } catch (AttributeNotFoundException e2) { logger.severe("Unable to load valence."); } hq = hostQueues.createHQ(curi.getClassKey(),valence); } return hq; } protected void batchSchedule(CandidateURI caUri) { threadWaiting.getQueue().enqueue(caUri); } protected void batchFlush() { innerBatchFlush(); } private void innerBatchFlush() { Queue q = threadWaiting.getQueue(); while(!q.isEmpty()) { CandidateURI caUri = (CandidateURI)q.dequeue(); if(alreadyIncluded != null){ String cannon = canonicalize(caUri); System.out.println("Cannon of " + caUri + " is " + cannon); if (caUri.forceFetch()) { alreadyIncluded.addForce(cannon, caUri); } else { alreadyIncluded.add(cannon, caUri); } } else { innerSchedule(caUri); } } } /** * @param curi * @return the CrawlServer to be associated with this CrawlURI */ protected CrawlServer getServer(CrawlURI curi) { return this.controller.getServerCache().getServerFor(curi); } /* (non-Javadoc) * @see org.archive.crawler.framework.Frontier#next() */ public synchronized CrawlURI next() throws InterruptedException, EndedException { controller.checkFinish(); while(shouldPause){ controller.toePaused(); wait(); } if(shouldTerminate){ throw new EndedException("terminated"); } AdaptiveRevisitHostQueue hq = hostQueues.getTopHQ(); while(hq.getState() != AdaptiveRevisitHostQueue.HQSTATE_READY){ // Ok, so we don't have a ready queue, wait until the top one // will become available. long waitTime = hq.getNextReadyTime() - System.currentTimeMillis(); if(waitTime > 0){ wait(waitTime); } // The top HQ may have changed, so get it again hq = hostQueues.getTopHQ(); } if(shouldTerminate){ // May have been terminated while thread was waiting for IO throw new EndedException("terminated"); } try { CrawlURI curi = hq.next(); // Populate CURI with 'transient' variables such as server. logger.fine("Issuing " + curi.toString()); long temp = curi.getLong(A_TIME_OF_NEXT_PROCESSING); long currT = System.currentTimeMillis(); long overdue = (currT-temp); if(logger.isLoggable(Level.FINER)){ String waitI = "not set"; if(curi.containsKey(A_WAIT_INTERVAL)){ waitI = ArchiveUtils.formatMillisecondsToConventional( curi.getLong(A_WAIT_INTERVAL)); } logger.finer("Wait interval: " + waitI + ", Time of next proc: " + temp + ", Current time: " + currT + ", Overdue by: " + overdue + "ms"); } if(overdue < 0){ // This should never happen. logger.severe("Time overdue for " + curi.toString() + "is negative (" + overdue + ")!"); } curi.putLong(A_FETCH_OVERDUE,overdue); return curi; } catch (IOException e) { // TODO: Need to handle this in an intelligent manner. // Is probably fatal? e.printStackTrace(); } return null; } /* (non-Javadoc) * @see org.archive.crawler.framework.Frontier#isEmpty() */ public boolean isEmpty() { // Technically, the Frontier should never become empty since URIs are // only discarded under exceptional circumstances. return hostQueues.getSize() == 0; } /* (non-Javadoc) * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI) */ public void schedule(CandidateURI caURI) { batchSchedule(caURI); } /* (non-Javadoc) * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI) */ public synchronized void finished(CrawlURI curi) { logger.fine(curi.toString()+ " " + CrawlURI.fetchStatusCodesToString(curi.getFetchStatus())); curi.incrementFetchAttempts(); logLocalizedErrors(curi); innerFinished(curi); } protected synchronized void innerFinished(CrawlURI curi) { try { innerBatchFlush(); if (curi.isSuccess()) { successDisposition(curi); } else if (needsPromptRetry(curi)) { // Consider statuses which allow nearly-immediate retry // (like deferred to allow precondition to be fetched) reschedule(curi,false); } else if (needsRetrying(curi)) { // Consider errors which can be retried reschedule(curi,true); controller.fireCrawledURINeedRetryEvent(curi); } else if(isDisregarded(curi)) { // Check for codes that mean that while the crawler did // manage to get it it must be disregarded for any reason. disregardDisposition(curi); } else { // In that case FAILURE, note & log failureDisposition(curi); } // New items might be available, let waiting threads know // More then one queue might have become available due to // scheduling of items outside the parent URIs host, so we // wake all waiting threads. notifyAll(); } catch (RuntimeException e) { curi.setFetchStatus(S_RUNTIME_EXCEPTION); // store exception temporarily for logging logger.warning("RTE in innerFinished() " + e.getMessage()); e.printStackTrace(); curi.putObject(A_RUNTIME_EXCEPTION, e); failureDisposition(curi); } catch (AttributeNotFoundException e) { logger.severe(e.getMessage()); } } /** * Take note of any processor-local errors that have * been entered into the CrawlURI. * @param curi CrawlURI with errors. */ private void logLocalizedErrors(CrawlURI curi) { if(curi.containsKey(A_LOCALIZED_ERRORS)) { List localErrors = (List)curi.getObject(A_LOCALIZED_ERRORS); Iterator iter = localErrors.iterator(); while(iter.hasNext()) { Object array[] = {curi, iter.next()}; controller.localErrors.log(Level.WARNING,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -