⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workqueuefrontier.java

📁 Heritrix是一个开源,可扩展的web爬虫项目。Heritrix设计成严格按照robots.txt文件的排除指示和META robots标签。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* $Id: WorkQueueFrontier.java 5046 2007-04-10 01:40:08Z gojomo $ * Created on Sep 24, 2004 * *  Copyright (C) 2004 Internet Archive. * * This file is part of the Heritrix web crawler (crawler.archive.org). * * Heritrix is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * any later version. * * Heritrix is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU Lesser Public License for more details. * * You should have received a copy of the GNU Lesser Public License * along with Heritrix; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA *  */package org.archive.crawler.frontier;import java.io.IOException;import java.io.PrintWriter;import java.io.Serializable;import java.util.ArrayList;import java.util.Collection;import java.util.Collections;import java.util.Date;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.SortedSet;import java.util.Timer;import java.util.TimerTask;import java.util.TreeSet;import java.util.logging.Level;import java.util.logging.Logger;import org.apache.commons.collections.Bag;import org.apache.commons.collections.BagUtils;import org.apache.commons.collections.bag.HashBag;import org.archive.crawler.datamodel.CandidateURI;import org.archive.crawler.datamodel.CoreAttributeConstants;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.datamodel.FetchStatusCodes;import org.archive.crawler.datamodel.UriUniqFilter;import org.archive.crawler.datamodel.UriUniqFilter.HasUriReceiver;import org.archive.crawler.framework.CrawlController;import org.archive.crawler.framework.Frontier;import org.archive.crawler.framework.exceptions.EndedException;import org.archive.crawler.framework.exceptions.FatalConfigurationException;import org.archive.crawler.settings.SimpleType;import org.archive.crawler.settings.Type;import org.archive.net.UURI;import org.archive.util.ArchiveUtils;import com.sleepycat.collections.StoredIterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;/** * A common Frontier base using several queues to hold pending URIs.  *  * Uses in-memory map of all known 'queues' inside a single database. * Round-robins between all queues. * * @author Gordon Mohr * @author Christian Kohlschuetter */public abstract class WorkQueueFrontier extends AbstractFrontierimplements FetchStatusCodes, CoreAttributeConstants, HasUriReceiver,        Serializable {	private static final long serialVersionUID = 570384305871965843L;	    public class WakeTask extends TimerTask {        @Override        public void run() {            synchronized(snoozedClassQueues) {                if(this!=nextWake) {                    // an intervening waketask was made                    return;                }                wakeQueues();            }        }    }    /** truncate reporting of queues at some large but not unbounded number */    private static final int REPORT_MAX_QUEUES = 2000;        /**     * If we know that only a small amount of queues is held in memory,     * we can avoid using a disk-based BigMap.     * This only works efficiently if the WorkQueue does not hold its     * entries in memory as well.     */     private static final int MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY = 3000;    /**     * When a snooze target for a queue is longer than this amount, and      * there are already ready queues, deactivate rather than snooze      * the current queue -- so other more responsive sites get a chance     * in active rotation. (As a result, queue's next try may be much     * further in the future than the snooze target delay.)     */    public final static String ATTR_SNOOZE_DEACTIVATE_MS =        "snooze-deactivate-ms";    public static Long DEFAULT_SNOOZE_DEACTIVATE_MS = new Long(5*60*1000); // 5 minutes        private static final Logger logger =        Logger.getLogger(WorkQueueFrontier.class.getName());        /** whether to hold queues INACTIVE until needed for throughput */    public final static String ATTR_HOLD_QUEUES = "hold-queues";    protected final static Boolean DEFAULT_HOLD_QUEUES = new Boolean(true);     /** amount to replenish budget on each activation (duty cycle) */    public final static String ATTR_BALANCE_REPLENISH_AMOUNT =        "balance-replenish-amount";    protected final static Integer DEFAULT_BALANCE_REPLENISH_AMOUNT =        new Integer(3000);        /** whether to hold queues INACTIVE until needed for throughput */    public final static String ATTR_ERROR_PENALTY_AMOUNT =        "error-penalty-amount";    protected final static Integer DEFAULT_ERROR_PENALTY_AMOUNT =        new Integer(100);    /** total expenditure to allow a queue before 'retiring' it  */    public final static String ATTR_QUEUE_TOTAL_BUDGET = "queue-total-budget";    protected final static Long DEFAULT_QUEUE_TOTAL_BUDGET = new Long(-1);    /** cost assignment policy to use (by class name) */    public final static String ATTR_COST_POLICY = "cost-policy";    protected final static String DEFAULT_COST_POLICY =        UnitCostAssignmentPolicy.class.getName();    /** target size of ready queues backlog */    public final static String ATTR_TARGET_READY_QUEUES_BACKLOG =        "target-ready-backlog";    protected final static Integer DEFAULT_TARGET_READY_QUEUES_BACKLOG =        new Integer(50);        /** those UURIs which are already in-process (or processed), and     thus should not be rescheduled */    protected transient UriUniqFilter alreadyIncluded;    /** All known queues.     */    protected transient Map<String,WorkQueue> allQueues = null;     // of classKey -> ClassKeyQueue    /**     * All per-class queues whose first item may be handed out.     * Linked-list of keys for the queues.     */    protected BlockingQueue<String> readyClassQueues =    	new LinkedBlockingQueue<String>();        /** Target (minimum) size to keep readyClassQueues */    protected int targetSizeForReadyQueues;        /**      * All 'inactive' queues, not yet in active rotation.     * Linked-list of keys for the queues.     */    protected BlockingQueue<String> inactiveQueues =    	new LinkedBlockingQueue<String>();    /**     * 'retired' queues, no longer considered for activation.     * Linked-list of keys for queues.     */    protected BlockingQueue<String> retiredQueues =    	new LinkedBlockingQueue<String>();        /** all per-class queues from whom a URI is outstanding */    protected Bag inProcessQueues =         BagUtils.synchronizedBag(new HashBag()); // of ClassKeyQueue        /**     * All per-class queues held in snoozed state, sorted by wake time.     */    protected SortedSet<WorkQueue> snoozedClassQueues =        Collections.synchronizedSortedSet(new TreeSet<WorkQueue>());        /** Timer for tasks which wake head item of snoozedClassQueues */    protected transient Timer wakeTimer;        /** Task for next wake */     protected transient WakeTask nextWake;         protected WorkQueue longestActiveQueue = null;        /** how long to wait for a ready queue when there's nothing snoozed */    private static final long DEFAULT_WAIT = 1000; // 1 second    /** a policy for assigning 'cost' values to CrawlURIs */    private transient CostAssignmentPolicy costAssignmentPolicy;        /** all policies available to be chosen */    String[] AVAILABLE_COST_POLICIES = new String[] {            ZeroCostAssignmentPolicy.class.getName(),            UnitCostAssignmentPolicy.class.getName(),            WagCostAssignmentPolicy.class.getName(),            AntiCalendarCostAssignmentPolicy.class.getName()};    /**     * Create the CommonFrontier     *      * @param name     * @param description     */    public WorkQueueFrontier(String name, String description) {        // The 'name' of all frontiers should be the same (URIFrontier.ATTR_NAME)        // therefore we'll ignore the supplied parameter.        super(Frontier.ATTR_NAME, description);        Type t = addElementToDefinition(new SimpleType(ATTR_HOLD_QUEUES,            "Whether to hold newly-created per-host URI work" +            " queues until needed to stay busy. If false (default)," +            " all queues may contribute URIs for crawling at all" +            " times. If true, queues begin (and collect URIs) in" +            " an 'inactive' state, and only when the Frontier needs" +            " another queue to keep all ToeThreads busy will new" +            " queues be activated.", DEFAULT_HOLD_QUEUES));        t.setExpertSetting(true);        t.setOverrideable(false);        t = addElementToDefinition(new SimpleType(ATTR_BALANCE_REPLENISH_AMOUNT,            "Amount to replenish a queue's activity balance when it becomes " +            "active. Larger amounts mean more URIs will be tried from the " +            "queue before it is deactivated in favor of waiting queues. " +            "Default is 3000", DEFAULT_BALANCE_REPLENISH_AMOUNT));        t.setExpertSetting(true);        t.setOverrideable(true);        t = addElementToDefinition(new SimpleType(ATTR_ERROR_PENALTY_AMOUNT,                "Amount to additionally penalize a queue when one of" +                "its URIs fails completely. Accelerates deactivation or " +                "full retirement of problem queues and unresponsive sites. " +                "Default is 100", DEFAULT_ERROR_PENALTY_AMOUNT));        t.setExpertSetting(true);        t.setOverrideable(true);        t = addElementToDefinition(new SimpleType(ATTR_QUEUE_TOTAL_BUDGET,            "Total activity expenditure allowable to a single queue; queues " +            "over this expenditure will be 'retired' and crawled no more. " +            "Default of -1 means no ceiling on activity expenditures is " +            "enforced.", DEFAULT_QUEUE_TOTAL_BUDGET));        t.setExpertSetting(true);        t.setOverrideable(true);        t = addElementToDefinition(new SimpleType(ATTR_COST_POLICY,                "Policy for calculating the cost of each URI attempted. " +                "The default UnitCostAssignmentPolicy considers the cost of " +                "each URI to be '1'.", DEFAULT_COST_POLICY, AVAILABLE_COST_POLICIES));        t.setExpertSetting(true);                t = addElementToDefinition(new SimpleType(ATTR_SNOOZE_DEACTIVATE_MS,                "Threshold above which any 'snooze' delay will cause the " +                "affected queue to go inactive, allowing other queues a " +                "chance to rotate into active state. Typically set to be " +                "longer than the politeness pauses between successful " +                "fetches, but shorter than the connection-failed " +                "'retry-delay-seconds'. (Default is 5 minutes.)",                 DEFAULT_SNOOZE_DEACTIVATE_MS));        t.setExpertSetting(true);        t.setOverrideable(false);        t = addElementToDefinition(new SimpleType(ATTR_TARGET_READY_QUEUES_BACKLOG,                "Target size for backlog of ready queues. This many queues " +                "will be brought into 'ready' state even if a thread is " +                "not waiting. Only has effect if 'hold-queues' is true. " +                "Default is 50.", DEFAULT_TARGET_READY_QUEUES_BACKLOG));        t.setExpertSetting(true);        t.setOverrideable(false);    }    /**     * Initializes the Frontier, given the supplied CrawlController.     *     * @see org.archive.crawler.framework.Frontier#initialize(org.archive.crawler.framework.CrawlController)     */    public void initialize(CrawlController c)            throws FatalConfigurationException, IOException {        // Call the super method. It sets up frontier journalling.        super.initialize(c);        this.controller = c;                this.targetSizeForReadyQueues = (Integer)getUncheckedAttribute(null,            ATTR_TARGET_READY_QUEUES_BACKLOG);        if (this.targetSizeForReadyQueues < 1) {            this.targetSizeForReadyQueues = 1;        }        this.wakeTimer = new Timer("waker for " + c.toString());                try {            if (workQueueDataOnDisk()                    && queueAssignmentPolicy.maximumNumberOfKeys() >= 0                    && queueAssignmentPolicy.maximumNumberOfKeys() <=                         MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {                this.allQueues = Collections.synchronizedMap(                        new HashMap<String,WorkQueue>());            } else {                this.allQueues = c.getBigMap("allqueues",                        String.class, WorkQueue.class);                if (logger.isLoggable(Level.FINE)) {                    Iterator i = this.allQueues.keySet().iterator();                    try {                        for (; i.hasNext();) {                            logger.fine((String) i.next());                        }                    } finally {                        StoredIterator.close(i);                    }                }            }            this.alreadyIncluded = createAlreadyIncluded();            initQueue();        } catch (IOException e) {            e.printStackTrace();            throw (FatalConfigurationException)                new FatalConfigurationException(e.getMessage()).initCause(e);        } catch (Exception e) {            e.printStackTrace();            throw (FatalConfigurationException)                new FatalConfigurationException(e.getMessage()).initCause(e);        }                initCostPolicy();                loadSeeds();    }        /**     * Set (or reset after configuration change) the cost policy in effect.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -