📄 crawlcontroller.java
字号:
// checkpointRecover instance and then put into place the old bdb // log files. If any of the log files already exist in target state // diretory, WE DO NOT OVERWRITE (Makes for faster recovery). // CrawlController checkpoint recovery code manages restoration of // the old StatisticsTracker, any BigMaps used by the Crawler and // the moving of bdb log files into place only. Other objects // interested in recovery need to ask if // CrawlController#isCheckpointRecover is set to figure if in // recovery and then take appropriate recovery action // (These objects can call CrawlController#getCheckpointRecover // to get the directory that might hold files/objects dropped // checkpointing). Such objects will need to use a technique other // than object serialization restoring settings because they'll // have already been constructed when comes time for object to ask // if its to recover itself. See ARCWriterProcessor for example. onFailMessage = "Unable to test/run checkpoint recover"; this.checkpointRecover = getCheckpointRecover(); if (this.checkpointRecover == null) { this.checkpointer = new Checkpointer(this, this.checkpointsDisk); } else { setupCheckpointRecover(); } onFailMessage = "Unable to setup bdb environment."; setupBdb(); onFailMessage = "Unable to setup statistics"; setupStatTracking(); onFailMessage = "Unable to setup crawl modules"; setupCrawlModules(); } catch (Exception e) { String tmp = "On crawl: " + settingsHandler.getSettingsObject(null).getName() + " " + onFailMessage; LOGGER.log(Level.SEVERE, tmp, e); throw new InitializationException(tmp, e); } // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group dns.getRecords("localhost", Type.A, DClass.IN); setupToePool(); setThresholds(); reserveMemory = new LinkedList(); for(int i = 1; i < RESERVE_BLOCKS; i++) { reserveMemory.add(new char[RESERVE_BLOCK_SIZE]); } } /** * Does setup of checkpoint recover. * Copies bdb log files into state dir. * @throws IOException */ protected void setupCheckpointRecover() throws IOException { long started = System.currentTimeMillis();; if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Starting recovery setup -- copying into place " + "bdbje log files -- for checkpoint named " + this.checkpointRecover.getDisplayName()); } // Mark context we're in a recovery. this.checkpointer.recover(this); this.progressStats.info("CHECKPOINT RECOVER " + this.checkpointRecover.getDisplayName()); // Copy the bdb log files to the state dir so we don't damage // old checkpoint. If thousands of log files, can take // tens of minutes (1000 logs takes ~5 minutes to java copy, // dependent upon hardware). If log file already exists over in the // target state directory, we do not overwrite -- we assume the log // file in the target same as one we'd copy from the checkpoint dir. File bdbSubDir = CheckpointUtils. getBdbSubDirectory(this.checkpointRecover.getDirectory()); FileUtils.copyFiles(bdbSubDir, CheckpointUtils.getJeLogsFilter(), getStateDisk(), true, false); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Finished recovery setup for checkpoint named " + this.checkpointRecover.getDisplayName() + " in " + (System.currentTimeMillis() - started) + "ms."); } } protected boolean getCheckpointCopyBdbjeLogs() { return ((Boolean)this.order.getUncheckedAttribute(null, CrawlOrder.ATTR_CHECKPOINT_COPY_BDBJE_LOGS)).booleanValue(); } private void setupBdb() throws FatalConfigurationException, AttributeNotFoundException { EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); int bdbCachePercent = ((Integer)this.order. getAttribute(null, CrawlOrder.ATTR_BDB_CACHE_PERCENT)).intValue(); if(bdbCachePercent > 0) { // Operator has expressed a preference; override BDB default or // je.properties value envConfig.setCachePercent(bdbCachePercent); } envConfig.setLockTimeout(5000000); // 5 seconds if (LOGGER.isLoggable(Level.FINEST)) { envConfig.setConfigParam("java.util.logging.level", "SEVERE"); envConfig.setConfigParam("java.util.logging.level.evictor", "SEVERE"); envConfig.setConfigParam("java.util.logging.ConsoleHandler.on", "true"); } if (!getCheckpointCopyBdbjeLogs()) { // If we are not copying files on checkpoint, then set bdbje to not // remove its log files so that its possible to later assemble // (manually) all needed to run a recovery using mix of current // bdbje logs and those its marked for deletion. envConfig.setConfigParam("je.cleaner.expunge", "false"); } try { this.bdbEnvironment = new Environment(getStateDisk(), envConfig); if (LOGGER.isLoggable(Level.FINE)) { // Write out the bdb configuration. envConfig = bdbEnvironment.getConfig(); LOGGER.fine("BdbConfiguration: Cache percentage " + envConfig.getCachePercent() + ", cache size " + envConfig.getCacheSize()); } // Open the class catalog database. Create it if it does not // already exist. DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); this.classCatalogDB = this.bdbEnvironment. openDatabase(null, "classes", dbConfig); this.classCatalog = new StoredClassCatalog(classCatalogDB); } catch (DatabaseException e) { e.printStackTrace(); throw new FatalConfigurationException(e.getMessage()); } } public Environment getBdbEnvironment() { return this.bdbEnvironment; } public StoredClassCatalog getClassCatalog() { return this.classCatalog; } /** * Register for CrawlStatus events. * * @param cl a class implementing the CrawlStatusListener interface * * @see CrawlStatusListener */ public void addCrawlStatusListener(CrawlStatusListener cl) { synchronized (this.registeredCrawlStatusListeners) { this.registeredCrawlStatusListeners.add(cl); } } /** * Register for CrawlURIDisposition events. * * @param cl a class implementing the CrawlURIDispostionListener interface * * @see CrawlURIDispositionListener */ public void addCrawlURIDispositionListener(CrawlURIDispositionListener cl) { registeredCrawlURIDispositionListener = null; if (registeredCrawlURIDispositionListeners == null) { // First listener; registeredCrawlURIDispositionListener = cl; //Only used for the first one while it is the only one. registeredCrawlURIDispositionListeners = new ArrayList(1); //We expect it to be very small. } registeredCrawlURIDispositionListeners.add(cl); } /** * Allows an external class to raise a CrawlURIDispostion * crawledURISuccessful event that will be broadcast to all listeners that * have registered with the CrawlController. * * @param curi - The CrawlURI that will be sent with the event notification. * * @see CrawlURIDispositionListener#crawledURISuccessful(CrawlURI) */ public void fireCrawledURISuccessfulEvent(CrawlURI curi) { if (registeredCrawlURIDispositionListener != null) { // Then we'll just use that. registeredCrawlURIDispositionListener.crawledURISuccessful(curi); } else { // Go through the list. if (registeredCrawlURIDispositionListeners != null && registeredCrawlURIDispositionListeners.size() > 0) { Iterator it = registeredCrawlURIDispositionListeners.iterator(); while (it.hasNext()) { ( (CrawlURIDispositionListener) it .next()) .crawledURISuccessful( curi); } } } } /** * Allows an external class to raise a CrawlURIDispostion * crawledURINeedRetry event that will be broadcast to all listeners that * have registered with the CrawlController. * * @param curi - The CrawlURI that will be sent with the event notification. * * @see CrawlURIDispositionListener#crawledURINeedRetry(CrawlURI) */ public void fireCrawledURINeedRetryEvent(CrawlURI curi) { if (registeredCrawlURIDispositionListener != null) { // Then we'll just use that. registeredCrawlURIDispositionListener.crawledURINeedRetry(curi); return; } // Go through the list. if (registeredCrawlURIDispositionListeners != null && registeredCrawlURIDispositionListeners.size() > 0) { for (Iterator i = registeredCrawlURIDispositionListeners.iterator(); i.hasNext();) { ((CrawlURIDispositionListener)i.next()).crawledURINeedRetry(curi); } } } /** * Allows an external class to raise a CrawlURIDispostion * crawledURIDisregard event that will be broadcast to all listeners that * have registered with the CrawlController. * * @param curi - * The CrawlURI that will be sent with the event notification. * * @see CrawlURIDispositionListener#crawledURIDisregard(CrawlURI) */ public void fireCrawledURIDisregardEvent(CrawlURI curi) { if (registeredCrawlURIDispositionListener != null) { // Then we'll just use that. registeredCrawlURIDispositionListener.crawledURIDisregard(curi); } else { // Go through the list. if (registeredCrawlURIDispositionListeners != null && registeredCrawlURIDispositionListeners.size() > 0) { Iterator it = registeredCrawlURIDispositionListeners.iterator(); while (it.hasNext()) { ( (CrawlURIDispositionListener) it .next()) .crawledURIDisregard( curi); } } } } /** * Allows an external class to raise a CrawlURIDispostion crawledURIFailure event * that will be broadcast to all listeners that have registered with the CrawlController. * * @param curi - The CrawlURI that will be sent with the event notification. * * @see CrawlURIDispositionListener#crawledURIFailure(CrawlURI) */ public void fireCrawledURIFailureEvent(CrawlURI curi) { if (registeredCrawlURIDispositionListener != null) { // Then we'll just use that. registeredCrawlURIDispositionListener.crawledURIFailure(curi); } else { // Go through the list. if (registeredCrawlURIDispositionListeners != null && registeredCrawlURIDispositionListeners.size() > 0) { Iterator it = registeredCrawlURIDispositionListeners.iterator(); while (it.hasNext()) { ((CrawlURIDispositionListener)it.next()) .crawledURIFailure(curi); } } } } private void setupCrawlModules() throws FatalConfigurationException, AttributeNotFoundException, MBeanException, ReflectionException { if (scope == null) { scope = (CrawlScope) order.getAttribute(CrawlScope.ATTR_NAME); scope.initialize(this); } try { this.serverCache = new ServerCache(this); } catch (Exception e) { throw new FatalConfigurationException("Unable to" + " initialize frontier (Failed setup of ServerCache) " + e); } if (this.frontier == null) { this.frontier = (Frontier)order.getAttribute(Frontier.ATTR_NAME); try { frontier.initialize(this); frontier.pause(); // Pause until begun // Run recovery if recoverPath points to a file (If it points // to a directory, its a checkpoint recovery). // TODO: make recover path relative to job root dir. if (!isCheckpointRecover()) { runFrontierRecover((String)order. getAttribute(CrawlOrder.ATTR_RECOVER_PATH)); } } catch (IOException e) { throw new FatalConfigurationException( "unable to initialize frontier: " + e); } } // Setup processors if (processorChains == null) { processorChains = new ProcessorChainList(order); } } protected void runFrontierRecover(String recoverPath) throws AttributeNotFoundException, MBeanException, ReflectionException, FatalConfigurationException { if (recoverPath == null || recoverPath.length() <= 0) { return; } File f = new File(recoverPath); if (!f.exists()) { LOGGER.severe("Recover file does not exist " + recoverPath); return; } if (!f.isFile()) { // Its a directory if supposed to be doing a checkpoint recover. return; } boolean retainFailures = ((Boolean)order. getAttribute(CrawlOrder.ATTR_RECOVER_RETAIN_FAILURES)).booleanValue(); try { frontier.importRecoverLog(recoverPath, retainFailures); } catch (IOException e) { e.printStackTrace(); throw (FatalConfigurationException) new FatalConfigurationException( "Recover.log " + recoverPath + " problem: " + e).initCause(e); } } private void setupDisk() throws AttributeNotFoundException { String diskPath = (String) order.getAttribute(null, CrawlOrder.ATTR_DISK_PATH); this.disk = getSettingsHandler().
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -