📄 workqueuefrontier.java
字号:
// any piled-up pending-scheduled URIs are considered this.alreadyIncluded.requestFlush(); } } } private int targetSizeForReadyQueues() { return targetSizeForReadyQueues; } /** * Return the 'cost' of a CrawlURI (how much of its associated * queue's budget it depletes upon attempted processing) * * @param curi * @return the associated cost */ private int getCost(CrawlURI curi) { int cost = curi.getHolderCost(); if (cost == CrawlURI.UNCALCULATED) { cost = costAssignmentPolicy.costOf(curi); curi.setHolderCost(cost); } return cost; } /** * Activate an inactive queue, if any are available. */ private void activateInactiveQueue() { Object key = this.inactiveQueues.poll(); if (key == null) { return; } WorkQueue candidateQ = (WorkQueue)this.allQueues.get(key); if(candidateQ != null) { synchronized(candidateQ) { replenishSessionBalance(candidateQ); if(candidateQ.isOverBudget()){ // if still over-budget after an activation & replenishing, // retire retireQueue(candidateQ); return; } long now = System.currentTimeMillis(); long delay_ms = candidateQ.getWakeTime() - now; if(delay_ms>0) { // queue still due for snoozing snoozeQueue(candidateQ,now,delay_ms); return; } candidateQ.setWakeTime(0); // clear obsolete wake time, if any readyQueue(candidateQ); if (logger.isLoggable(Level.FINE)) { logger.fine("ACTIVATED queue: " + candidateQ.getClassKey()); } } } } /** * Replenish the budget of the given queue by the appropriate amount. * * @param queue queue to replenish */ private void replenishSessionBalance(WorkQueue queue) { // get a CrawlURI for override context purposes CrawlURI contextUri = queue.peek(this); // TODO: consider confusing cross-effects of this and IP-based politeness queue.setSessionBalance(((Integer) getUncheckedAttribute(contextUri, ATTR_BALANCE_REPLENISH_AMOUNT)).intValue()); // reset total budget (it may have changed) // TODO: is this the best way to be sensitive to potential mid-crawl changes long totalBudget = ((Long)getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue(); queue.setTotalBudget(totalBudget); queue.unpeek(); // don't insist on that URI being next released } /** * Enqueue the given queue to either readyClassQueues or inactiveQueues, * as appropriate. * * @param wq */ private void reenqueueQueue(WorkQueue wq) { if(wq.isOverBudget()) { // if still over budget, deactivate if (logger.isLoggable(Level.FINE)) { logger.fine("DEACTIVATED queue: " + wq.getClassKey()); } deactivateQueue(wq); } else { readyQueue(wq); } } /** * Wake any queues sitting in the snoozed queue whose time has come. */ void wakeQueues() { synchronized (snoozedClassQueues) { long now = System.currentTimeMillis(); long nextWakeDelay = 0; int wokenQueuesCount = 0; while (true) { if (snoozedClassQueues.isEmpty()) { return; } WorkQueue peek = (WorkQueue) snoozedClassQueues.first(); nextWakeDelay = peek.getWakeTime() - now; if (nextWakeDelay <= 0) { snoozedClassQueues.remove(peek); peek.setWakeTime(0); reenqueueQueue(peek); wokenQueuesCount++; } else { break; } } this.nextWake = new WakeTask(); this.wakeTimer.schedule(nextWake,nextWakeDelay); } } /** * Note that the previously emitted CrawlURI has completed * its processing (for now). * * The CrawlURI may be scheduled to retry, if appropriate, * and other related URIs may become eligible for release * via the next next() call, as a result of finished(). * * (non-Javadoc) * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI) */ public void finished(CrawlURI curi) { long now = System.currentTimeMillis(); curi.incrementFetchAttempts(); logLocalizedErrors(curi); WorkQueue wq = (WorkQueue) curi.getHolder(); assert (wq.peek(this) == curi) : "unexpected peek " + wq; inProcessQueues.remove(wq, 1); if(includesRetireDirective(curi)) { // CrawlURI is marked to trigger retirement of its queue curi.processingCleanup(); wq.unpeek(); wq.update(this, curi); // rewrite any changes retireQueue(wq); return; } if (needsRetrying(curi)) { // Consider errors which can be retried, leaving uri atop queue if(curi.getFetchStatus()!=S_DEFERRED) { wq.expend(getCost(curi)); // all retries but DEFERRED cost } long delay_sec = retryDelayFor(curi); curi.processingCleanup(); // lose state that shouldn't burden retry synchronized(wq) { wq.unpeek(); // TODO: consider if this should happen automatically inside unpeek() wq.update(this, curi); // rewrite any changes if (delay_sec > 0) { long delay_ms = delay_sec * 1000; snoozeQueue(wq, now, delay_ms); } else { reenqueueQueue(wq); } } // Let everyone interested know that it will be retried. controller.fireCrawledURINeedRetryEvent(curi); doJournalRescheduled(curi); return; } // Curi will definitely be disposed of without retry, so remove from queue wq.dequeue(this); decrementQueuedCount(1); log(curi); if (curi.isSuccess()) { totalProcessedBytes += curi.getRecordedSize(); incrementSucceededFetchCount(); // Let everyone know in case they want to do something before we strip the curi. controller.fireCrawledURISuccessfulEvent(curi); doJournalFinishedSuccess(curi); wq.expend(getCost(curi)); // successes cost } else if (isDisregarded(curi)) { // Check for codes that mean that while we the crawler did // manage to schedule it, it must be disregarded for some reason. incrementDisregardedUriCount(); // Let interested listeners know of disregard disposition. controller.fireCrawledURIDisregardEvent(curi); // if exception, also send to crawlErrors if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) { Object[] array = { curi }; controller.runtimeErrors.log(Level.WARNING, curi.getUURI() .toString(), array); } // TODO: consider reinstating forget-uri } else { // In that case FAILURE, note & log //Let interested listeners know of failed disposition. this.controller.fireCrawledURIFailureEvent(curi); // if exception, also send to crawlErrors if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) { Object[] array = { curi }; this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI() .toString(), array); } incrementFailedFetchCount(); // let queue note error wq.noteError(((Integer) getUncheckedAttribute(curi, ATTR_ERROR_PENALTY_AMOUNT)).intValue()); doJournalFinishedFailure(curi); wq.expend(getCost(curi)); // failures cost } long delay_ms = politenessDelayFor(curi); synchronized(wq) { if (delay_ms > 0) { snoozeQueue(wq,now,delay_ms); } else { reenqueueQueue(wq); } } curi.stripToMinimal(); curi.processingCleanup(); } private boolean includesRetireDirective(CrawlURI curi) { return curi.containsKey(A_FORCE_RETIRE) && (Boolean)curi.getObject(A_FORCE_RETIRE); } /** * Place the given queue into 'snoozed' state, ineligible to * supply any URIs for crawling, for the given amount of time. * * @param wq queue to snooze * @param now time now in ms * @param delay_ms time to snooze in ms */ private void snoozeQueue(WorkQueue wq, long now, long delay_ms) { long nextTime = now + delay_ms; wq.setWakeTime(nextTime); long snoozeToInactiveDelayMs = ((Long)getUncheckedAttribute(null, ATTR_SNOOZE_DEACTIVATE_MS)).longValue(); if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) { deactivateQueue(wq); } else { synchronized(snoozedClassQueues) { snoozedClassQueues.add(wq); if(wq == snoozedClassQueues.first()) { this.nextWake = new WakeTask(); this.wakeTimer.schedule(nextWake, delay_ms); } } } } /** * Forget the given CrawlURI. This allows a new instance * to be created in the future, if it is reencountered under * different circumstances. * * @param curi The CrawlURI to forget */ protected void forget(CrawlURI curi) { logger.finer("Forgetting " + curi); alreadyIncluded.forget(canonicalize(curi.getUURI()), curi); } /** (non-Javadoc) * @see org.archive.crawler.framework.Frontier#discoveredUriCount() */ public long discoveredUriCount() { return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0; } /** * @param match String to match. * @return Number of items deleted. */ public long deleteURIs(String match) { long count = 0; // TODO: DANGER/ values() may not work right from CachedBdbMap Iterator iter = allQueues.keySet().iterator(); while(iter.hasNext()) { WorkQueue wq = getQueueFor(((String)iter.next())); wq.unpeek(); count += wq.deleteMatching(this, match); } decrementQueuedCount(count); return count; } // // Reporter implementation // public static String STANDARD_REPORT = "standard"; public static String ALL_NONEMPTY = "nonempty"; public static String ALL_QUEUES = "all"; protected static String[] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES}; public String[] getReports() { return REPORTS; } /** * @param w Where to write to. */ public void singleLineReportTo(PrintWriter w) { if (this.allQueues == null) { return; } int allCount = allQueues.size(); int inProcessCount = inProcessQueues.uniqueSet().size(); int readyCount = readyClassQueues.size(); int snoozedCount = snoozedClassQueues.size(); int activeCount = inProcessCount + readyCount + snoozedCount; int inactiveCount = inactiveQueues.size(); int retiredCount = retiredQueues.size(); int exhaustedCount = allCount - activeCount - inactiveCount - retiredCount; w.print(allCount); w.print(" queues: "); w.print(activeCount); w.print(" active ("); w.print(inProcessCount); w.print(" in-process; ");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -