📄 adaptiverevisithostqueue.java
字号:
/* AdaptiveRevisitHostQueue** Created on Sep 13, 2004** Copyright (C) 2004 Kristinn Sigur?sson.** This file is part of the Heritrix web crawler (crawler.archive.org).** Heritrix is free software; you can redistribute it and/or modify* it under the terms of the GNU Lesser Public License as published by* the Free Software Foundation; either version 2.1 of the License, or* any later version.** Heritrix is distributed in the hope that it will be useful,* but WITHOUT ANY WARRANTY; without even the implied warranty of* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the* GNU Lesser Public License for more details.** You should have received a copy of the GNU Lesser Public License* along with Heritrix; if not, write to the Free Software* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA*/package org.archive.crawler.frontier;import java.io.IOException;import java.util.logging.Level;import java.util.logging.Logger;import org.archive.crawler.datamodel.CandidateURI;import org.archive.crawler.datamodel.CrawlSubstats;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.framework.Frontier.FrontierGroup;import org.archive.util.ArchiveUtils;import com.sleepycat.bind.EntryBinding;import com.sleepycat.bind.serial.ClassCatalog;import com.sleepycat.bind.serial.SerialBinding;import com.sleepycat.bind.serial.StoredClassCatalog;import com.sleepycat.bind.serial.TupleSerialKeyCreator;import com.sleepycat.bind.tuple.StringBinding;import com.sleepycat.bind.tuple.TupleBinding;import com.sleepycat.bind.tuple.TupleInput;import com.sleepycat.bind.tuple.TupleOutput;import com.sleepycat.je.Cursor;import com.sleepycat.je.Database;import com.sleepycat.je.DatabaseConfig;import com.sleepycat.je.DatabaseEntry;import com.sleepycat.je.DatabaseException;import com.sleepycat.je.Environment;import com.sleepycat.je.LockMode;import com.sleepycat.je.OperationStatus;import com.sleepycat.je.SecondaryConfig;import com.sleepycat.je.SecondaryDatabase;/** * A priority based queue of CrawlURIs. Each queue should represent * one host (although this is not enforced in this class). Items are ordered * by the scheduling directive and time of next processing (in that order) * and also indexed by the URI. * <p> * The HQ does no calculations on the 'time of next processing.' It always * relies on values already set on the CrawlURI. * <p> * Note: Class is not 'thread safe.' In multi threaded environment the caller * must ensure that two threads do not make overlapping calls. * <p> * Any BDB DatabaseException will be converted to an IOException by public * methods. This includes preserving the original stacktrace, in favor of the * one created for the IOException, so that the true source of the exception * is not lost. * * @author Kristinn Sigurdsson */public class AdaptiveRevisitHostQueueimplements AdaptiveRevisitAttributeConstants, FrontierGroup { // TODO: Need to be able to remove URIs, both by name and reg.expr. // Constants declerations /** HQ contains no queued CrawlURIs elements. This state only occurs after * queue creation before the first add. After the first item is added the * state can never become empty again. */ public static final int HQSTATE_EMPTY = 0; /** HQ has a CrawlURI ready for processing */ public static final int HQSTATE_READY = 1; /** HQ has maximum number of CrawlURI currently being processed. This number * is either equal to the 'valence' (maximum number of simultanious * connections to a host) or (if smaller) the total number of CrawlURIs * in the HQ. */ public static final int HQSTATE_BUSY = 2; /** HQ is in a suspended state until it can be woken back up */ public static final int HQSTATE_SNOOZED = 3; // Internal class variables /** Name of the host that this AdaptiveRevisitHostQueue represents */ final String hostName; /** Last known state of HQ -- ALL methods should use getState() to read * this value, never read it directly. */ int state; /** Time (in milliseconds) when the HQ will next be ready to issue a URI * for processing. When setting this value, methods should use the * setter method {@link #setNextReadyTime(long) setNextReadyTime()} */ long nextReadyTime; /** Time (in milliseconds) when each URI 'slot' becomes available again.<p> * Any positive value larger then the current time signifies a taken slot * where the URI has completed processing but the politness wait has not * ended. <p> * A zero or positive value smaller then the current time in milliseconds * signifies an empty slot.<p> * Any negative value signifies a slot for a URI that is being processed. * <p> * Methods should never write directly to this, rather use the * {@link #updateWakeUpTimeSlot(long) updateWakeUpTimeSlot()} and * {@link #useWakeUpTimeSlot() useWakeUpTimeSlot()} methods as needed. */ long[] wakeUpTime; /** Number of simultanious connections permitted to this host. I.e. this * many URIs can be issued before state of HQ becomes busy until one of * them is returned via the update method. */ int valence; /** * Size of queue. That is, the number of CrawlURIs that have been added to * it, including any that are currently being processed. */ long size; /** Number of URIs belonging to this queue that are being processed at the * moment. This number will always be in the range of 0 - valence */ long inProcessing; /** The AdaptiveRevisitHostQueueList that contains this class. This * reference is * maintained to inform the owning class of changes to the sort order * value. Value may be null, in which case no notices are made.*/ private AdaptiveRevisitQueueList owner; /** Logger */ private static final Logger logger = Logger.getLogger(AdaptiveRevisitHostQueue.class.getName()); protected CrawlSubstats substats = new CrawlSubstats(); // Berkeley DB - All class member variables related to BDB JE // Databases /** Database containing the URI priority queue, indexed by the the * URI string. */ protected Database primaryUriDB; /** Secondary index into {@link #primaryUriDB the primary DB}, URIs indexed * by the time when they can next be processed again. */ protected SecondaryDatabase secondaryUriDB; /** A database containing those URIs that are currently being processed. */ protected Database processingUriDB; // Serialization support /** For BDB serialization of objects */ protected StoredClassCatalog classCatalog; /** A binding for the serialization of the primary key (URI string) */ protected EntryBinding primaryKeyBinding; /** A binding for the CrawlURIARWrapper object */ protected EntryBinding crawlURIBinding; // Cursors into databases /** * Constructor * * @param hostName Name of the host this queue represents. This name must * be unique for all HQs in the same Environment. * @param env Berkeley DB Environment. All BDB databases created will use * it. * @param catalog Db for bdb class serialization. * @param valence The total number of simultanous URIs that the HQ can issue * for processing. Once this many URIs have been issued for * processing, the HQ will go into {@link #HQSTATE_BUSY busy} * state until at least one of the URI is * {@link #update(CrawlURI, boolean, long) updated}. * Value should be larger then zero. Zero and negative values * will be treated same as 1. * * @throws IOException if an error occurs opening/creating the * database */ public AdaptiveRevisitHostQueue(String hostName, Environment env, StoredClassCatalog catalog, int valence) throws IOException { try{ if(valence < 1) { this.valence = 1; } else { this.valence = valence; } wakeUpTime = new long[valence]; for(int i = 0 ; i < valence ; i++){ wakeUpTime[i]=0; // 0 means open slot. } inProcessing = 0; this.hostName = hostName; state = HQSTATE_EMPTY; //HQ is initially empty. nextReadyTime = Long.MAX_VALUE; //Empty and busy HQ get this value. // Set up the primary URI database, it is indexed by URI names DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setTransactional(false); dbConfig.setAllowCreate(true); primaryUriDB = env.openDatabase(null, hostName, dbConfig); this.classCatalog = catalog; // Set up a DB for storing URIs being processed DatabaseConfig dbConfig2 = new DatabaseConfig(); dbConfig2.setTransactional(false); dbConfig2.setAllowCreate(true); processingUriDB = env.openDatabase(null, hostName + "/processing", dbConfig2); // Create a primitive binding for the primary key (URI string) primaryKeyBinding = TupleBinding.getPrimitiveBinding(String.class); // Create a serial binding for the CrawlURI object crawlURIBinding = new SerialBinding(classCatalog, CrawlURI.class); // Open a secondary database to allow accessing the primary // database by the secondary key value. SecondaryConfig secConfig = new SecondaryConfig(); secConfig.setAllowCreate(true); secConfig.setSortedDuplicates(true); secConfig.setKeyCreator( new OrderOfProcessingKeyCreator(classCatalog,CrawlURI.class)); secondaryUriDB = env.openSecondaryDatabase(null, hostName+"/timeOfProcessing", primaryUriDB, secConfig); // Check if we are opening an existing DB... size = countCrawlURIs(); if (size > 0) { // If size > 0 then we just opened an existing DB. // Set nextReadyTime; nextReadyTime = peek().getLong( A_TIME_OF_NEXT_PROCESSING); // Move any items in processingUriDB into the primariUriDB, ensure // that they wind up on top! flushProcessingURIs(); state = HQSTATE_READY; } } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); throw e2; } } /** * Returns the HQ's name * @return the HQ's name */ public String getHostName() { return hostName; } /** * Add a CrawlURI to this host queue. * <p> * Calls can optionally chose to have the time of next processing value * override existing values for the URI if the existing values are 'later' * then the new ones. * * @param curi The CrawlURI to add. * @param overrideSetTimeOnDups If true then the time of next processing for * the supplied URI will override the any * existing time for it already stored in the HQ. * If false, then no changes will be made to any * existing values of the URI. Note: Will never * override with a later time. * @throws IOException When an error occurs accessing the database */ public void add(CrawlURI curi, boolean overrideSetTimeOnDups) throws IOException{ if(logger.isLoggable(Level.FINER)){ logger.finer("Adding " + curi.toString()); } try{ if(inProcessing(curi.toString())){ // If it is currently being processed, then it is already been // added and we sure as heck can't fetch it any sooner! return; } OperationStatus opStatus = strictAdd(curi,false); long curiProcessingTime = curi.getLong( A_TIME_OF_NEXT_PROCESSING); if (opStatus == OperationStatus.KEYEXIST){ // Override an existing URI // We need to extract the old CrawlURI (it contains vital
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -