📄 workqueuefrontier.java
字号:
/* $Id: WorkQueueFrontier.java 5046 2007-04-10 01:40:08Z gojomo $ * Created on Sep 24, 2004 * * Copyright (C) 2004 Internet Archive. * * This file is part of the Heritrix web crawler (crawler.archive.org). * * Heritrix is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * any later version. * * Heritrix is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser Public License for more details. * * You should have received a copy of the GNU Lesser Public License * along with Heritrix; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */package org.archive.crawler.frontier;import java.io.IOException;import java.io.PrintWriter;import java.io.Serializable;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.Date;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.SortedSet;import java.util.Timer;import java.util.TimerTask;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.apache.commons.collections.Bag;import org.apache.commons.collections.BagUtils;import org.apache.commons.collections.bag.HashBag;import org.archive.crawler.datamodel.CandidateURI;import org.archive.crawler.datamodel.CoreAttributeConstants;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.datamodel.FetchStatusCodes;import org.archive.crawler.datamodel.UriUniqFilter;import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;import org.archive.crawler.framework.CrawlController;import org.archive.crawler.framework.Frontier;import org.archive.crawler.framework.exceptions.EndedException;import org.archive.crawler.framework.exceptions.FatalConfigurationException;import org.archive.crawler.settings.SimpleType;import org.archive.crawler.settings.Type;import org.archive.net.UURI;import org.archive.util.ArchiveUtils;import com.sleepycat.collections.StoredIterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;/** * A common Frontier base using several queues to hold pending URIs. * * Uses in-memory map of all known 'queues' inside a single database. * Round-robins between all queues. * * @author Gordon Mohr * @author Christian Kohlschuetter */public abstract class WorkQueueFrontier extends AbstractFrontierimplements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver, Serializable { private static final long serialVersionUID = 570384305871965843L; public class WakeTask extends TimerTask { @Override public void run() { synchronized(snoozedClassQueues) { if(this!=nextWake) { // an intervening waketask was made return; } wakeQueues(); } } } /** truncate reporting of queues at some large but not unbounded number */ private static final int REPORT_MAX_QUEUES = 2000; /** * If we know that only a small amount of queues is held in memory, * we can avoid using a disk-based BigMap. * This only works efficiently if the WorkQueue does not hold its * entries in memory as well. */ private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000; /** * When a snooze target for a queue is longer than this amount, and * there are already ready queues, deactivate rather than snooze * the current queue -- so other more responsive sites get a chance * in active rotation. (As a result, queue's next try may be much * further in the future than the snooze target delay.) */ public final static String ATTR_SNOOZE_DEACTIVATE_MS = "snooze-deactivate-ms"; public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(5*60*1000); // 5 minutes private static final Logger logger = Logger.getLogger(WorkQueueFrontier.class.getName()); /** whether to hold queues INACTIVE until needed for throughput */ public final static String ATTR_HOLD_QUEUES = "hold-queues"; protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(true); /** amount to replenish budget on each activation (duty cycle) */ public final static String ATTR_BALANCE_REPLENISH_AMOUNT = "balance-replenish-amount"; protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT = new Integer(3000); /** whether to hold queues INACTIVE until needed for throughput */ public final static String ATTR_ERROR_PENALTY_AMOUNT = "error-penalty-amount"; protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT = new Integer(100); /** total expenditure to allow a queue before 'retiring' it */ public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget"; protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(-1); /** cost assignment policy to use (by class name) */ public final static String ATTR_COST_POLICY = "cost-policy"; protected final static String DEFAULT_COST_POLICY = UnitCostAssignmentPolicy.class.getName(); /** target size of ready queues backlog */ public final static String ATTR_TARGET_READY_QUEUES_BACKLOG = "target-ready-backlog"; protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG = new Integer(50); /** those UURIs which are already in-process (or processed), and thus should not be rescheduled */ protected transient UriUniqFilter alreadyIncluded; /** All known queues. */ protected transient Map<String,WorkQueue> allQueues = null; // of classKey -> ClassKeyQueue /** * All per-class queues whose first item may be handed out. * Linked-list of keys for the queues. */ protected BlockingQueue<String> readyClassQueues = new LinkedBlockingQueue<String>(); /** Target (minimum) size to keep readyClassQueues */ protected int targetSizeForReadyQueues; /** * All 'inactive' queues, not yet in active rotation. * Linked-list of keys for the queues. */ protected BlockingQueue<String> inactiveQueues = new LinkedBlockingQueue<String>(); /** * 'retired' queues, no longer considered for activation. * Linked-list of keys for queues. */ protected BlockingQueue<String> retiredQueues = new LinkedBlockingQueue<String>(); /** all per-class queues from whom a URI is outstanding */ protected Bag inProcessQueues = BagUtils.synchronizedBag(new HashBag()); // of ClassKeyQueue /** * All per-class queues held in snoozed state, sorted by wake time. */ protected SortedSet<WorkQueue> snoozedClassQueues = Collections.synchronizedSortedSet(new TreeSet<WorkQueue>()); /** Timer for tasks which wake head item of snoozedClassQueues */ protected transient Timer wakeTimer; /** Task for next wake */ protected transient WakeTask nextWake; protected WorkQueue longestActiveQueue = null; /** how long to wait for a ready queue when there's nothing snoozed */ private static final long DEFAULT_WAIT = 1000; // 1 second /** a policy for assigning 'cost' values to CrawlURIs */ private transient CostAssignmentPolicy costAssignmentPolicy; /** all policies available to be chosen */ String[] AVAILABLE_COST_POLICIES = new String[] { ZeroCostAssignmentPolicy.class.getName(), UnitCostAssignmentPolicy.class.getName(), WagCostAssignmentPolicy.class.getName(), AntiCalendarCostAssignmentPolicy.class.getName()}; /** * Create the CommonFrontier * * @param name * @param description */ public WorkQueueFrontier(String name, String description) { // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME) // therefore we'll ignore the supplied parameter. super(Frontier.ATTR_NAME, description); Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES, "Whether to hold newly-created per-host URI work" + " queues until needed to stay busy. If false (default)," + " all queues may contribute URIs for crawling at all" + " times. If true, queues begin (and collect URIs) in" + " an 'inactive' state, and only when the Frontier needs" + " another queue to keep all ToeThreads busy will new" + " queues be activated.", DEFAULT_HOLD_QUEUES)); t.setExpertSetting(true); t.setOverrideable(false); t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT, "Amount to replenish a queue's activity balance when it becomes " + "active. Larger amounts mean more URIs will be tried from the " + "queue before it is deactivated in favor of waiting queues. " + "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT)); t.setExpertSetting(true); t.setOverrideable(true); t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT, "Amount to additionally penalize a queue when one of" + "its URIs fails completely. Accelerates deactivation or " + "full retirement of problem queues and unresponsive sites. " + "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT)); t.setExpertSetting(true); t.setOverrideable(true); t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET, "Total activity expenditure allowable to a single queue; queues " + "over this expenditure will be 'retired' and crawled no more. " + "Default of -1 means no ceiling on activity expenditures is " + "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET)); t.setExpertSetting(true); t.setOverrideable(true); t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY, "Policy for calculating the cost of each URI attempted. " + "The default UnitCostAssignmentPolicy considers the cost of " + "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES)); t.setExpertSetting(true); t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS, "Threshold above which any 'snooze' delay will cause the " + "affected queue to go inactive, allowing other queues a " + "chance to rotate into active state. Typically set to be " + "longer than the politeness pauses between successful " + "fetches, but shorter than the connection-failed " + "'retry-delay-seconds'. (Default is 5 minutes.)", DEFAULT_SNOOZE_DEACTIVATE_MS)); t.setExpertSetting(true); t.setOverrideable(false); t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG, "Target size for backlog of ready queues. This many queues " + "will be brought into 'ready' state even if a thread is " + "not waiting. Only has effect if 'hold-queues' is true. " + "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG)); t.setExpertSetting(true); t.setOverrideable(false); } /** * Initializes the Frontier, given the supplied CrawlController. * * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController) */ public void initialize(CrawlController c) throws FatalConfigurationException, IOException { // Call the super method. It sets up frontier journalling. super.initialize(c); this.controller = c; this.targetSizeForReadyQueues = (Integer)getUncheckedAttribute(null, ATTR_TARGET_READY_QUEUES_BACKLOG); if (this.targetSizeForReadyQueues < 1) { this.targetSizeForReadyQueues = 1; } this.wakeTimer = new Timer("waker for " + c.toString()); try { if (workQueueDataOnDisk() && queueAssignmentPolicy.maximumNumberOfKeys() >= 0 && queueAssignmentPolicy.maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) { this.allQueues = Collections.synchronizedMap( new HashMap<String,WorkQueue>()); } else { this.allQueues = c.getBigMap("allqueues", String.class, WorkQueue.class); if (logger.isLoggable(Level.FINE)) { Iterator i = this.allQueues.keySet().iterator(); try { for (; i.hasNext();) { logger.fine((String) i.next()); } } finally { StoredIterator.close(i); } } } this.alreadyIncluded = createAlreadyIncluded(); initQueue(); } catch (IOException e) { e.printStackTrace(); throw (FatalConfigurationException) new FatalConfigurationException(e.getMessage()).initCause(e); } catch (Exception e) { e.printStackTrace(); throw (FatalConfigurationException) new FatalConfigurationException(e.getMessage()).initCause(e); } initCostPolicy(); loadSeeds(); } /** * Set (or reset after configuration change) the cost policy in effect.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -