📄 bdbmultipleworkqueues.java
字号:
/* BdbMultipleWorkQueues * * Created on Dec 24, 2004 * * Copyright (C) 2004 Internet Archive. * * This file is part of the Heritrix web crawler (crawler.archive.org). * * Heritrix is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * any later version. * * Heritrix is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser Public License for more details. * * You should have received a copy of the GNU Lesser Public License * along with Heritrix; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */package org.archive.crawler.frontier;import java.io.UnsupportedEncodingException;import java.math.BigInteger;import java.util.ArrayList;import java.util.List;import java.util.logging.Level;import java.util.logging.Logger;import java.util.regex.Pattern;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.framework.FrontierMarker;import org.archive.util.ArchiveUtils;import com.sleepycat.bind.serial.StoredClassCatalog;import com.sleepycat.je.Cursor;import com.sleepycat.je.Database;import com.sleepycat.je.DatabaseConfig;import com.sleepycat.je.DatabaseEntry;import com.sleepycat.je.DatabaseException;import com.sleepycat.je.DatabaseNotFoundException;import com.sleepycat.je.Environment;import com.sleepycat.je.OperationStatus;import com.sleepycat.util.RuntimeExceptionWrapper;/** * A BerkeleyDB-database-backed structure for holding ordered * groupings of CrawlURIs. Reading the groupings from specific * per-grouping (per-classKey/per-Host) starting points allows * this to act as a collection of independent queues. * * <p>For how the bdb keys are made, see {@link #calculateInsertKey(CrawlURI)}. * * <p>TODO: refactor, improve naming. * * @author gojomo */public class BdbMultipleWorkQueues { private static final long serialVersionUID = ArchiveUtils .classnameBasedUID(BdbMultipleWorkQueues.class, 1); private static final Logger LOGGER = Logger.getLogger(BdbMultipleWorkQueues.class.getName()); /** Database holding all pending URIs, grouped in virtual queues */ private Database pendingUrisDB = null; /** Supporting bdb serialization of CrawlURIs */ private RecyclingSerialBinding crawlUriBinding; /** * Create the multi queue in the given environment. * * @param env bdb environment to use * @param classCatalog Class catalog to use. * @param recycle True if we are to reuse db content if any. * @throws DatabaseException */ public BdbMultipleWorkQueues(Environment env, StoredClassCatalog classCatalog, final boolean recycle) throws DatabaseException { // Open the database. Create it if it does not already exist. DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); if (!recycle) { try { env.truncateDatabase(null, "pending", false); } catch (DatabaseNotFoundException e) { // Ignored } } // Make database deferred write: URLs that are added then removed // before a page-out is required need never cause disk IO. dbConfig.setDeferredWrite(true); this.pendingUrisDB = env.openDatabase(null, "pending", dbConfig); crawlUriBinding = new RecyclingSerialBinding(classCatalog, CrawlURI.class); } /** * Delete all CrawlURIs matching the given expression. * * @param match * @param queue * @param headKey * @return count of deleted items * @throws DatabaseException * @throws DatabaseException */ public long deleteMatchingFromQueue(String match, String queue, DatabaseEntry headKey) throws DatabaseException { long deletedCount = 0; Pattern pattern = Pattern.compile(match); DatabaseEntry key = headKey; DatabaseEntry value = new DatabaseEntry(); Cursor cursor = null; try { cursor = pendingUrisDB.openCursor(null, null); OperationStatus result = cursor.getSearchKeyRange(headKey, value, null); while (result == OperationStatus.SUCCESS) { if(value.getData().length>0) { CrawlURI curi = (CrawlURI) crawlUriBinding .entryToObject(value); if (!curi.getClassKey().equals(queue)) { // rolled into next queue; finished with this queue break; } if (pattern.matcher(curi.toString()).matches()) { cursor.delete(); deletedCount++; } } result = cursor.getNext(key, value, null); } } finally { if (cursor != null) { cursor.close(); } } return deletedCount; } /** * @param m marker * @param maxMatches * @return list of matches starting from marker position * @throws DatabaseException */ public List getFrom(FrontierMarker m, int maxMatches) throws DatabaseException { int matches = 0; int tries = 0; ArrayList<CrawlURI> results = new ArrayList<CrawlURI>(maxMatches); BdbFrontierMarker marker = (BdbFrontierMarker) m; DatabaseEntry key = marker.getStartKey(); DatabaseEntry value = new DatabaseEntry(); if (key != null) { Cursor cursor = null; OperationStatus result = null; try { cursor = pendingUrisDB.openCursor(null,null); result = cursor.getSearchKey(key, value, null); while(matches<maxMatches && result == OperationStatus.SUCCESS) { if(value.getData().length>0) { CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value); if(marker.accepts(curi)) { results.add(curi); matches++; } tries++; } result = cursor.getNext(key,value,null); } } finally { if (cursor !=null) { cursor.close(); } } if(result != OperationStatus.SUCCESS) { // end of scan marker.setStartKey(null); } } return results; } /** * Get a marker for beginning a scan over all contents * * @param regexpr * @return a marker pointing to the first item */ public FrontierMarker getInitialMarker(String regexpr) { try { return new BdbFrontierMarker(getFirstKey(), regexpr); } catch (DatabaseException e) { e.printStackTrace(); return null; } } /** * @return the key to the first item in the database * @throws DatabaseException */ protected DatabaseEntry getFirstKey() throws DatabaseException { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); Cursor cursor = pendingUrisDB.openCursor(null,null); OperationStatus status = cursor.getNext(key,value,null); cursor.close(); if(status == OperationStatus.SUCCESS) { return key; } return null; } /** * Get the next nearest item after the given key. Relies on * external discipline -- we'll look at the queues count of how many * items it has -- to avoid asking for something from a * range where there are no associated items -- * otherwise could get first item of next 'queue' by mistake. * * <p>TODO: hold within a queue's range * * @param headKey Key prefix that demarks the beginning of the range * in <code>pendingUrisDB</code> we're interested in. * @return CrawlURI. * @throws DatabaseException */ public CrawlURI get(DatabaseEntry headKey) throws DatabaseException { DatabaseEntry result = new DatabaseEntry(); // From Linda Lee of sleepycat: // "You want to check the status returned from Cursor.getSearchKeyRange // to make sure that you have OperationStatus.SUCCESS. In that case, // you have found a valid data record, and result.getData() // (called by internally by the binding code, in this case) will be // non-null. The other possible status return is // OperationStatus.NOTFOUND, in which case no data record matched // the criteria. " OperationStatus status = getNextNearestItem(headKey, result); CrawlURI retVal = null; if (status != OperationStatus.SUCCESS) { LOGGER.severe("See '1219854 NPE je-2.0 " + "entryToObject...'. OperationStatus " + " was not SUCCESS: " + status + ", headKey " + BdbWorkQueue.getPrefixClassKey(headKey.getData())); return null; } try { retVal = (CrawlURI)crawlUriBinding.entryToObject(result); } catch (RuntimeExceptionWrapper rw) { LOGGER.log( Level.SEVERE, "expected object missing in queue " + BdbWorkQueue.getPrefixClassKey(headKey.getData()), rw); return null; } retVal.setHolderKey(headKey); return retVal; } protected OperationStatus getNextNearestItem(DatabaseEntry headKey, DatabaseEntry result) throws DatabaseException { Cursor cursor = null; OperationStatus status; try {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -