📄 workqueue.java
字号:
package org.archive.crawler.frontier;import java.io.IOException;import java.io.PrintWriter;import java.io.Serializable;import java.util.logging.Level;import java.util.logging.Logger;import org.archive.crawler.datamodel.CrawlSubstats;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.framework.Frontier;import org.archive.util.ArchiveUtils;import org.archive.util.Reporter;/** * A single queue of related URIs to visit, grouped by a classKey * (typically "hostname:port" or similar) * * @author gojomo * @author Christian Kohlschuetter */public abstract class WorkQueue implements Frontier.FrontierGroup, Comparable, Serializable, Reporter { private static final Logger logger = Logger.getLogger(WorkQueue.class.getName()); /** The classKey */ protected final String classKey; private boolean active = true; /** Total number of stored items */ private long count = 0; /** Total number of items ever enqueued */ private long enqueueCount = 0; /** Whether queue is already in lifecycle stage */ private boolean isHeld = false; /** Time to wake, if snoozed */ private long wakeTime = 0; /** Running 'budget' indicating whether queue should stay active */ private int sessionBalance = 0; /** Cost of the last item to be charged against queue */ private int lastCost = 0; /** Total number of items charged against queue; with totalExpenditure * can be used to calculate 'average cost'. */ private long costCount = 0; /** Running tally of total expenditures on this queue */ private long totalExpenditure = 0; /** Total to spend on this queue over its lifetime */ private long totalBudget = 0; /** The next item to be returned */ private CrawlURI peekItem = null; /** Last URI enqueued */ private String lastQueued; /** Last URI peeked */ private String lastPeeked; /** time of last dequeue (disposition of some URI) **/ private long lastDequeueTime; /** count of errors encountered */ private long errorCount = 0; /** Substats for all CrawlURIs in this group */ protected CrawlSubstats substats = new CrawlSubstats(); private boolean retired; public WorkQueue(final String pClassKey) { this.classKey = pClassKey; } /** * Delete URIs matching the given pattern from this queue. * @param frontier * @param match * @return count of deleted URIs */ public long deleteMatching(final WorkQueueFrontier frontier, String match) { try { final long deleteCount = deleteMatchingFromQueue(frontier, match); this.count -= deleteCount; return deleteCount; } catch (IOException e) { //FIXME better exception handling e.printStackTrace(); throw new RuntimeException(e); } } /** * Add the given CrawlURI, noting its addition in running count. (It * should not already be present.) * * @param frontier Work queues manager. * @param curi CrawlURI to insert. */ public synchronized void enqueue(final WorkQueueFrontier frontier, CrawlURI curi) { try { insert(frontier, curi, false); } catch (IOException e) { //FIXME better exception handling e.printStackTrace(); throw new RuntimeException(e); } count++; enqueueCount++; } /** * Return the topmost queue item -- and remember it, * such that even later higher-priority inserts don't * change it. * * TODO: evaluate if this is really necessary * @param frontier Work queues manager * * @return topmost queue item, or null */ public CrawlURI peek(final WorkQueueFrontier frontier) { if(peekItem == null && count > 0) { try { peekItem = peekItem(frontier); } catch (IOException e) { //FIXME better exception handling logger.log(Level.SEVERE,"peek failure",e); e.printStackTrace(); // throw new RuntimeException(e); } if(peekItem != null) { lastPeeked = peekItem.toString(); } } return peekItem; } /** * Remove the peekItem from the queue and adjusts the count. * * @param frontier Work queues manager. */ public synchronized void dequeue(final WorkQueueFrontier frontier) { try { deleteItem(frontier, peekItem); } catch (IOException e) { //FIXME better exception handling e.printStackTrace(); throw new RuntimeException(e); } unpeek(); count--; lastDequeueTime = System.currentTimeMillis(); } /** * Set the session 'activity budget balance' to the given value * * @param balance to use */ public void setSessionBalance(int balance) { this.sessionBalance = balance; } /** * Return current session 'activity budget balance' * * @return session balance */ public int getSessionBalance() { return this.sessionBalance; } /** * Set the total expenditure level allowable before queue is * considered inherently 'over-budget'. * * @param budget */ public void setTotalBudget(long budget) { this.totalBudget = budget; } /** * Check whether queue has temporarily or permanently exceeded * its budget. * * @return true if queue is over its set budget(s) */ public boolean isOverBudget() { // check whether running balance is depleted // or totalExpenditure exceeds totalBudget return this.sessionBalance <= 0 || (this.totalBudget >= 0 && this.totalExpenditure > this.totalBudget); } /** * Return the tally of all expenditures on this queue * * @return total amount expended on this queue */ public long getTotalExpenditure() { return totalExpenditure; } /** * Increase the internal running budget to be used before * deactivating the queue * * @param amount amount to increment * @return updated budget value */ public int incrementSessionBalance(int amount) { this.sessionBalance = this.sessionBalance + amount; return this.sessionBalance; } /** * Decrease the internal running budget by the given amount. * @param amount tp decrement * @return updated budget value */ public int expend(int amount) { this.sessionBalance = this.sessionBalance - amount; this.totalExpenditure = this.totalExpenditure + amount; this.lastCost = amount; this.costCount++; return this.sessionBalance; } /** * A URI should not have been charged against queue (eg * it was disregarded); return the amount expended * @param amount to return * @return updated budget value */ public int refund(int amount) { this.sessionBalance = this.sessionBalance + amount; this.totalExpenditure = this.totalExpenditure - amount; this.costCount--; return this.sessionBalance; } /** * Note an error and assess an extra penalty. * @param penalty additional amount to deduct */ public void noteError(int penalty) { this.sessionBalance = this.sessionBalance - penalty; this.totalExpenditure = this.totalExpenditure + penalty; errorCount++; } /** * @param l */ public void setWakeTime(long l) { wakeTime = l; } /** * @return wakeTime */ public long getWakeTime() { return wakeTime; } /** * @return classKey, the 'identifier', for this queue. */ public String getClassKey() { return this.classKey; } /** * Clear isHeld to false */ public void clearHeld() { isHeld = false; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -