📄 toethread.java
字号:
/* Copyright (C) 2003 Internet Archive. * * This file is part of the Heritrix web crawler (crawler.archive.org). * * Heritrix is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * any later version. * * Heritrix is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser Public License for more details. * * You should have received a copy of the GNU Lesser Public License * along with Heritrix; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * ToeThread.java * Created on May 14, 2003 * * $Header$ */package org.archive.crawler.framework;import java.io.PrintWriter;import java.util.HashMap;import java.util.logging.Level;import java.util.logging.Logger;import org.archive.crawler.datamodel.CoreAttributeConstants;import org.archive.crawler.datamodel.CrawlOrder;import org.archive.crawler.datamodel.CrawlURI;import org.archive.crawler.datamodel.FetchStatusCodes;import org.archive.crawler.datamodel.InstancePerThread;import org.archive.crawler.framework.exceptions.EndedException;import org.archive.util.ArchiveUtils;import org.archive.util.DevUtils;import org.archive.util.HttpRecorder;import org.archive.util.HttpRecorderMarker;import org.archive.util.ProgressStatisticsReporter;import org.archive.util.Reporter;import com.sleepycat.util.RuntimeExceptionWrapper;/** * One "worker thread"; asks for CrawlURIs, processes them, * repeats unless told otherwise. * * @author Gordon Mohr */public class ToeThread extends Threadimplements CoreAttributeConstants, FetchStatusCodes, HttpRecorderMarker,Reporter, ProgressStatisticsReporter { private static final String STEP_NASCENT = "NASCENT"; private static final String STEP_ABOUT_TO_GET_URI = "ABOUT_TO_GET_URI"; private static final String STEP_FINISHED = "FINISHED"; private static final String STEP_ABOUT_TO_BEGIN_CHAIN = "ABOUT_TO_BEGIN_CHAIN"; private static final String STEP_ABOUT_TO_BEGIN_PROCESSOR = "ABOUT_TO_BEGIN_PROCESSOR"; private static final String STEP_DONE_WITH_PROCESSORS = "DONE_WITH_PROCESSORS"; private static final String STEP_HANDLING_RUNTIME_EXCEPTION = "HANDLING_RUNTIME_EXCEPTION"; private static final String STEP_ABOUT_TO_RETURN_URI = "ABOUT_TO_RETURN_URI"; private static final String STEP_FINISHING_PROCESS = "FINISHING_PROCESS"; private static Logger logger = Logger.getLogger("org.archive.crawler.framework.ToeThread"); private CrawlController controller; private int serialNumber; /** * Each ToeThead has an instance of HttpRecord that gets used * over and over by each request. * * @see org.archive.util.HttpRecorderMarker */ private HttpRecorder httpRecorder = null; private HashMap<String,Processor> localProcessors = new HashMap<String,Processor>(); private String currentProcessorName = ""; private String coreName; private CrawlURI currentCuri; private long lastStartTime; private long lastFinishTime; // activity monitoring, debugging, and problem detection private String step = STEP_NASCENT; private long atStepSince; // default priority; may not be meaningful in recent JVMs private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2; // indicator that a thread is now surplus based on current desired // count; it should wrap up cleanly private volatile boolean shouldRetire = false; /** * Create a ToeThread * * @param g ToeThreadGroup * @param sn serial number */ public ToeThread(ToePool g, int sn) { // TODO: add crawl name? super(g,"ToeThread #" + sn); coreName="ToeThread #" + sn + ": "; controller = g.getController(); serialNumber = sn; setPriority(DEFAULT_PRIORITY); int outBufferSize = ((Integer) controller .getOrder() .getUncheckedAttribute(null,CrawlOrder.ATTR_RECORDER_OUT_BUFFER)) .intValue(); int inBufferSize = ((Integer) controller .getOrder() .getUncheckedAttribute(null, CrawlOrder.ATTR_RECORDER_IN_BUFFER)) .intValue(); httpRecorder = new HttpRecorder(controller.getScratchDisk(), "tt" + sn + "http", outBufferSize, inBufferSize); lastFinishTime = System.currentTimeMillis(); } /** (non-Javadoc) * @see java.lang.Thread#run() */ public void run() { String name = controller.getOrder().getCrawlOrderName(); logger.fine(getName()+" started for order '"+name+"'"); try { while ( true ) { // TODO check for thread-abort? or is waiting for interrupt enough? continueCheck(); setStep(STEP_ABOUT_TO_GET_URI); CrawlURI curi = controller.getFrontier().next(); synchronized(this) { continueCheck(); setCurrentCuri(curi); } processCrawlUri(); setStep(STEP_ABOUT_TO_RETURN_URI); continueCheck(); synchronized(this) { controller.getFrontier().finished(currentCuri); setCurrentCuri(null); } setStep(STEP_FINISHING_PROCESS); lastFinishTime = System.currentTimeMillis(); controller.releaseContinuePermission(); if(shouldRetire) { break; // from while(true) } } } catch (EndedException e) { // crawl ended (or thread was retired), so allow thread to end } catch (Exception e) { // everything else (including interruption) logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.releaseContinuePermission(); } setCurrentCuri(null); // Do cleanup so that objects can be GC. this.httpRecorder.closeRecorders(); this.httpRecorder = null; localProcessors = null; logger.fine(getName()+" finished for order '"+name+"'"); setStep(STEP_FINISHED); controller.toeEnded(); controller = null; } /** * Set currentCuri, updating thread name as appropriate * @param curi */ private void setCurrentCuri(CrawlURI curi) { if(curi==null) { setName(coreName); } else { setName(coreName+curi); } currentCuri = curi; } /** * @param s */ private void setStep(String s) { step=s; atStepSince = System.currentTimeMillis(); } private void seriousError(Error err) { // try to prevent timeslicing until we have a chance to deal with OOM // TODO: recognize that new JVM priority indifference may make this // priority-jumbling pointless setPriority(DEFAULT_PRIORITY+1); if (controller!=null) { // hold all ToeThreads from proceeding to next processor controller.singleThreadMode(); // TODO: consider if SoftReferences would be a better way to // engineer a soft-landing for low-memory conditions controller.freeReserveMemory(); controller.requestCrawlPause(); if (controller.getFrontier().getFrontierJournal() != null) { controller.getFrontier().getFrontierJournal().seriousError( getName() + err.getMessage()); } } // OutOfMemory etc. String extraInfo = DevUtils.extraInfo(); System.err.println("<<<"); System.err.println(ArchiveUtils.getLog17Date()); System.err.println(err); System.err.println(extraInfo); err.printStackTrace(System.err); if (controller!=null) { PrintWriter pw = new PrintWriter(System.err); controller.getToePool().compactReportTo(pw); pw.flush(); } System.err.println(">>>");// DevUtils.sigquitSelf(); String context = "unknown"; if(currentCuri!=null) { // update fetch-status, saving original as annotation currentCuri.addAnnotation("err="+err.getClass().getName()); currentCuri.addAnnotation("os"+currentCuri.getFetchStatus()); currentCuri.setFetchStatus(S_SERIOUS_ERROR); context = currentCuri.singleLineReport() + " in " + currentProcessorName; } String message = "Serious error occured trying " + "to process '" + context + "'\n" + extraInfo; logger.log(Level.SEVERE, message.toString(), err); setPriority(DEFAULT_PRIORITY); } /** * Perform checks as to whether normal execution should proceed. * * If an external interrupt is detected, throw an interrupted exception. * Used before anything that should not be attempted by a 'zombie' thread * that the Frontier/Crawl has given up on. * * Otherwise, if the controller's memoryGate has been closed, * hold until it is opened. (Provides a better chance of * being able to complete some tasks after an OutOfMemoryError.) * * @throws InterruptedException */ private void continueCheck() throws InterruptedException { if(Thread.interrupted()) { throw new InterruptedException("die request detected"); } controller.acquireContinuePermission(); } /** * Pass the CrawlURI to all appropriate processors * * @throws InterruptedException */ private void processCrawlUri() throws InterruptedException { currentCuri.setThreadNumber(this.serialNumber); currentCuri.setNextProcessorChain(controller.getFirstProcessorChain()); lastStartTime = System.currentTimeMillis();// System.out.println(currentCuri); try { while (currentCuri.nextProcessorChain() != null) { setStep(STEP_ABOUT_TO_BEGIN_CHAIN); // Starting on a new processor chain. currentCuri.setNextProcessor(currentCuri.nextProcessorChain().getFirstProcessor()); currentCuri.setNextProcessorChain(currentCuri.nextProcessorChain().getNextProcessorChain()); while (currentCuri.nextProcessor() != null) { setStep(STEP_ABOUT_TO_BEGIN_PROCESSOR); Processor currentProcessor = getProcessor(currentCuri.nextProcessor()); currentProcessorName = currentProcessor.getName(); continueCheck();// long memBefore = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024; currentProcessor.process(currentCuri);// long memAfter = (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())/1024;// System.out.println((memAfter-memBefore)+"K in "+currentProcessorName); } } setStep(STEP_DONE_WITH_PROCESSORS); currentProcessorName = ""; } catch (RuntimeExceptionWrapper e) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -