📄 recoveryjournal.java
字号:
/* RecoveryJournal * * $Id: RecoveryJournal.java,v 1.32 2006/05/16 01:09:47 stack-sf Exp $ * * Created on Jul 20, 2004 * * Copyright (C) 2004 Internet Archive. * * This file is part of the Heritrix web crawler (crawler.archive.org). * * Heritrix is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * any later version. * * Heritrix is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser Public License for more details. * * You should have received a copy of the GNU Lesser Public License * along with Heritrix; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */package org.archive.crawler.frontier;import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;import it.unimi.dsi.mg4j.util.MutableString;import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.EOFException;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.Writer;import java.util.ArrayList;import java.util.logging.Logger;import java.util.zip.GZIPInputStream;import java.util.zip.GZIPOutputStream;import org.apache.commons.httpclient.URIException;import org.archive.crawler.datamodel.CandidateURI;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.framework.Frontier;import org.archive.net.UURI;import org.archive.net.UURIFactory;import org.archive.util.ArchiveUtils;import java.util.concurrent.CountDownLatch;/** * Helper class for managing a simple Frontier change-events journal which is * useful for recovering from crawl problems. * * By replaying the journal into a new Frontier, its state (at least with * respect to URIs alreadyIncluded and in pending queues) will match that of the * original Frontier, allowing a pseudo-resume of a previous crawl, at least as * far as URI visitation/coverage is concerned. * * @author gojomo */public class RecoveryJournalimplements FrontierJournal { private static final Logger LOGGER = Logger.getLogger( RecoveryJournal.class.getName()); public final static String F_ADD = "F+ "; public final static String F_EMIT = "Fe "; public final static String F_RESCHEDULE = "Fr "; public final static String F_SUCCESS = "Fs "; public final static String F_FAILURE = "Ff "; public final static String LOG_ERROR = "E "; public final static String LOG_TIMESTAMP = "T "; public final int TIMESTAMP_INTERVAL = 10000; // timestamp every this many lines // show recovery progress every this many lines private final static int PROGRESS_INTERVAL = 1000000; // once this many URIs are queued during recovery, allow // crawl to begin, while enqueuing of other URIs from log // continues in background private static final long ENOUGH_TO_START_CRAWLING = 100000; /** * Stream on which we record frontier events. */ private Writer out = null; private long lines = 0; public static final String GZIP_SUFFIX = ".gz"; /** * File we're writing recovery to. * Keep a reference in case we want to rotate it off. */ private File gzipFile = null; /** * Allocate a buffer for accumulating lines to write and reuse it. */ private MutableString accumulatingBuffer = new MutableString(1 + F_ADD.length() + 128 /*curi.toString().length()*/ + 1 + 8 /*curi.getPathFromSeed().length()*/ + 1 + 128 /*curi.flattenVia().length()*/); /** * Create a new recovery journal at the given location * * @param path Directory to make the recovery journal in. * @param filename Name to use for recovery journal file. * @throws IOException */ public RecoveryJournal(String path, String filename) throws IOException { this.gzipFile = new File(path, filename + GZIP_SUFFIX); this.out = initialize(gzipFile); } private Writer initialize (final File f) throws FileNotFoundException, IOException { return new OutputStreamWriter(new GZIPOutputStream( new FastBufferedOutputStream(new FileOutputStream(f)))); } public synchronized void added(CrawlURI curi) { accumulatingBuffer.length(0); this.accumulatingBuffer.append(F_ADD). append(curi.toString()). append(" "). append(curi.getPathFromSeed()). append(" "). append(curi.flattenVia()); writeLine(accumulatingBuffer); } public void finishedSuccess(CrawlURI curi) { finishedSuccess(curi.toString()); } public void finishedSuccess(UURI uuri) { finishedSuccess(uuri.toString()); } protected void finishedSuccess(String uuri) { writeLine(F_SUCCESS, uuri); } public void emitted(CrawlURI curi) { writeLine(F_EMIT, curi.toString()); } public void finishedFailure(CrawlURI curi) { finishedFailure(curi.toString()); } public void finishedFailure(UURI uuri) { finishedFailure(uuri.toString()); } public void finishedFailure(String u) { writeLine(F_FAILURE, u); } public void rescheduled(CrawlURI curi) { writeLine(F_RESCHEDULE, curi.toString()); } private synchronized void writeLine(String string) { try { this.out.write("\n"); this.out.write(string); noteLine(); } catch (IOException e) { e.printStackTrace(); } } private synchronized void writeLine(String s1, String s2) { try { this.out.write("\n"); this.out.write(s1); this.out.write(s2); noteLine(); } catch (IOException e) { e.printStackTrace(); } } private synchronized void writeLine(MutableString mstring) { if (this.out == null) { return; } try { this.out.write("\n"); mstring.write(out); noteLine(); } catch (IOException e) { e.printStackTrace(); } } /** * @throws IOException */ private void noteLine() throws IOException { lines++; if(lines % TIMESTAMP_INTERVAL == 0) { out.write("\n"); out.write(LOG_TIMESTAMP); out.write(ArchiveUtils.getLog14Date()); } } /** * Utility method for scanning a recovery journal and applying it to * a Frontier. * * @param source Recover log path. * @param frontier Frontier reference. * @param retainFailures * @throws IOException * * @see org.archive.crawler.framework.Frontier#importRecoverLog(String, boolean) */ public static void importRecoverLog(final File source, final Frontier frontier, final boolean retainFailures) throws IOException { if (source == null) { throw new IllegalArgumentException("Passed source file is null."); } LOGGER.info("recovering frontier completion state from "+source); // first, fill alreadyIncluded with successes (and possibly failures), // and count the total lines final int lines = importCompletionInfoFromLog(source, frontier, retainFailures); LOGGER.info("finished completion state; recovering queues from " + source); // now, re-add anything that was in old frontier and not already // registered as finished. Do this in a separate thread that signals // this thread once ENOUGH_TO_START_CRAWLING URIs have been queued. final CountDownLatch recoveredEnough = new CountDownLatch(1); new Thread(new Runnable() { public void run() { importQueuesFromLog(source, frontier, lines, recoveredEnough); } }, "queuesRecoveryThread").start(); try { // wait until at least ENOUGH_TO_START_CRAWLING URIs queued recoveredEnough.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -