📄 crawlcontroller.java
字号:
try { this.bdbEnvironment.sync(); this.bdbEnvironment.close(); } catch (DatabaseException e) { e.printStackTrace(); } this.bdbEnvironment = null; } this.bigmaps = null; if (this.toePool != null) { this.toePool.cleanup(); // I played with launching a thread here to do cleanup of the // ToePool ThreadGroup (making sure the cleanup thread was not // in the ToePool ThreadGroup). Did this because ToePools seemed // to be sticking around holding references to CrawlController at // least. Need to spend more time looking to see that this is // still the case even after adding the above toePool#cleanup call. } this.toePool = null; LOGGER.fine("Finished crawl."); } synchronized void completePause() { // Send a notifyAll. At least checkpointing thread may be waiting on a // complete pause. notifyAll(); sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED); } private boolean shouldContinueCrawling() { if (frontier.isEmpty()) { this.sExit = CrawlJob.STATUS_FINISHED; return false; } if (maxBytes > 0 && frontier.totalBytesWritten() >= maxBytes) { // Hit the max byte download limit! sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT; return false; } else if (maxDocument > 0 && frontier.succeededFetchCount() >= maxDocument) { // Hit the max document download limit! this.sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT; return false; } else if (maxTime > 0 && statistics.crawlDuration() >= maxTime * 1000) { // Hit the max byte download limit! this.sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT; return false; } return state == RUNNING; } /** * Request a checkpoint. * Sets a checkpointing thread running. * @throws IllegalStateException Thrown if crawl is not in paused state * (Crawl must be first paused before checkpointing). */ public synchronized void requestCrawlCheckpoint() throws IllegalStateException { if (this.checkpointer == null) { return; } if (this.checkpointer.isCheckpointing()) { throw new IllegalStateException("Checkpoint already running."); } this.checkpointer.checkpoint(); } /** * @return True if checkpointing. */ public boolean isCheckpointing() { return this.state == CHECKPOINTING; } /** * Run checkpointing. * CrawlController takes care of managing the checkpointing/serializing * of bdb, the StatisticsTracker, and the CheckpointContext. Other * modules that want to revive themselves on checkpoint recovery need to * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)} * invocation and then in their #initialize if a module, * or in their #initialTask if a processor, check with the CrawlController * if its checkpoint recovery. If it is, read in their old state from the * pointed to checkpoint directory. * <p>Default access only to be called by Checkpointer. * @throws Exception */ void checkpoint() throws Exception { // Tell registered listeners to checkpoint. sendCheckpointEvent(this.checkpointer. getCheckpointInProgressDirectory()); // Rotate off crawler logs. LOGGER.fine("Rotating log files."); rotateLogFiles(CURRENT_LOG_SUFFIX + "." + this.checkpointer.getNextCheckpointName()); // Sync the BigMap contents to bdb, if their bdb bigmaps. LOGGER.fine("BigMaps."); checkpointBigMaps(this.checkpointer.getCheckpointInProgressDirectory()); // Note, on deserialization, the super CrawlType#parent // needs to be restored. Parent is '/crawl-order/loggers'. // The settings handler for this module also needs to be // restored. Both of these fields are private in the // super class. Adding the restored ST to crawl order should take // care of this. // Checkpoint bdb environment. LOGGER.fine("Bdb environment."); checkpointBdb(this.checkpointer.getCheckpointInProgressDirectory()); // Make copy of order, seeds, and settings. LOGGER.fine("Copying settings."); copySettings(this.checkpointer.getCheckpointInProgressDirectory()); // Checkpoint this crawlcontroller. CheckpointUtils.writeObjectToFile(this, this.checkpointer.getCheckpointInProgressDirectory()); } /** * Copy off the settings. * @param checkpointDir Directory to write checkpoint to. * @throws IOException */ protected void copySettings(final File checkpointDir) throws IOException { final List files = this.settingsHandler.getListOfAllFiles(); boolean copiedSettingsDir = false; final File settingsDir = new File(this.disk, "settings"); for (final Iterator i = files.iterator(); i.hasNext();) { File f = new File((String)i.next()); if (f.getAbsolutePath().startsWith(settingsDir.getAbsolutePath())) { if (copiedSettingsDir) { // Skip. We've already copied this member of the // settings directory. continue; } // Copy 'settings' dir all in one lump, not a file at a time. copiedSettingsDir = true; FileUtils.copyFiles(settingsDir, new File(checkpointDir, settingsDir.getName())); continue; } FileUtils.copyFiles(f, f.isDirectory()? checkpointDir: new File(checkpointDir, f.getName())); } } /** * Checkpoint bdb. * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes * way too much time (20minutes for a crawl of 1million items). Assume * cleaner is keeping up. Below was log cleaning loop . * <pre>int totalCleaned = 0; * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0; * totalCleaned += cleaned) { * LOGGER.fine("Cleaned " + cleaned + " log files."); * } * </pre> * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint * are effectively same thing only sync is not configurable. He suggests * doing one or the other: * <p>MS: Reading code, Environment.sync() is a checkpoint. Looks like * I don't need to call a checkpoint after calling a sync? * <p>MH: Right, they're almost the same thing -- just do one or the other, * not both. With the new API, you'll need to do a checkpoint not a * sync, because the sync() method has no config parameter. Don't worry * -- it's fine to do a checkpoint even though you're not using. * @param checkpointDir Directory to write checkpoint to. * @throws DatabaseException * @throws IOException * @throws RuntimeException Thrown if failed setup of new bdb environment. */ protected void checkpointBdb(File checkpointDir) throws DatabaseException, IOException, RuntimeException { EnvironmentConfig envConfig = this.bdbEnvironment.getConfig(); final List bkgrdThreads = Arrays.asList(new String [] {"je.env.runCheckpointer", "je.env.runCleaner", "je.env.runINCompressor"}); try { // Disable background threads setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false"); // Do a force checkpoint. Thats what a sync does (i.e. doSync). CheckpointConfig chkptConfig = new CheckpointConfig(); chkptConfig.setForce(true); // Mark Hayes of sleepycat says: // "The default for this property is false, which gives the current // behavior (allow deltas). If this property is true, deltas are // prohibited -- full versions of internal nodes are always logged // during the checkpoint. When a full version of an internal node // is logged during a checkpoint, recovery does not need to process // it at all. It is only fetched if needed by the application, // during normal DB operations after recovery. When a delta of an // internal node is logged during a checkpoint, recovery must // process it by fetching the full version of the node from earlier // in the log, and then applying the delta to it. This can be // pretty slow, since it is potentially a large amount of // random I/O." chkptConfig.setMinimizeRecoveryTime(true); this.bdbEnvironment.checkpoint(chkptConfig); LOGGER.fine("Finished bdb checkpoint."); // From the sleepycat folks: A trick for flipping db logs. EnvironmentImpl envImpl = DbInternal.envGetEnvironmentImpl(this.bdbEnvironment); long firstFileInNextSet = DbLsn.getFileNumber(envImpl.forceLogFileFlip()); // So the last file in the checkpoint is firstFileInNextSet - 1. // Write manifest of all log files into the bdb directory. final String lastBdbCheckpointLog = getBdbLogFileName(firstFileInNextSet - 1); processBdbLogs(checkpointDir, lastBdbCheckpointLog); LOGGER.fine("Finished processing bdb log files."); } finally { // Restore background threads. setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true"); } } protected void processBdbLogs(final File checkpointDir, final String lastBdbCheckpointLog) throws IOException { File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir); if (!bdbDir.exists()) { bdbDir.mkdir(); } PrintWriter pw = new PrintWriter(new FileOutputStream(new File( checkpointDir, "bdbje-logs-manifest.txt"))); try { // Don't copy any beyond the last bdb log file (bdbje can keep // writing logs after checkpoint). boolean pastLastLogFile = false; Set srcFilenames = null; final boolean copyFiles = getCheckpointCopyBdbjeLogs(); do { FilenameFilter filter = CheckpointUtils.getJeLogsFilter(); srcFilenames = new HashSet(Arrays.asList(getStateDisk().list(filter))); List tgtFilenames = Arrays.asList(bdbDir.list(filter)); if (tgtFilenames != null && tgtFilenames.size() > 0) { srcFilenames.removeAll(tgtFilenames); } if (srcFilenames.size() > 0) { // Sort files. srcFilenames = new TreeSet(srcFilenames); int count = 0; for (final Iterator i = srcFilenames.iterator(); i.hasNext() && !pastLastLogFile;) { String name = (String) i.next(); if (copyFiles) { FileUtils.copyFiles(new File(getStateDisk(), name), new File(bdbDir, name)); } pw.println(name); if (name.equals(lastBdbCheckpointLog)) { // We're done. pastLastLogFile = true; } count++; } if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Copied " + count); } } } while (!pastLastLogFile && srcFilenames != null && srcFilenames.size() > 0); } finally { pw.close(); } } protected String getBdbLogFileName(final long index) { String lastBdbLogFileHex = Long.toHexString(index); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) { buffer.append('0'); } buffer.append(lastBdbLogFileHex); buffer.append(".jdb"); return buffer.toString(); } protected void setBdbjeBkgrdThreads(final EnvironmentConfig config, final List threads, final String setting) { for (final Iterator i = threads.iterator(); i.hasNext();) { config.setConfigParam((String)i.next(), setting); } } /** * Get recover checkpoint. * Returns null if we're NOT in recover mode. * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint * recover. If checkpoint mode, returns Checkpoint instance if * checkpoint was VALID (else null). * @return Checkpoint instance if we're in recover checkpoint * mode and the pointed-to checkpoint was valid. * @see #isCheckpointRecover() */ public synchronized Checkpoint getCheckpointRecover() { if (this.checkpointRecover != null) { return this.checkpointRecover; } return getCheckpointRecover(this.order); } public static Checkpoint getCheckpointRecover(final CrawlOrder order) { String path = (String)order.getUncheckedAttribute(null, CrawlOrder.ATTR_RECOVER_PATH); if (path == null || path.length() <= 0) { return null; } File rp = new File(path); // Assume if path is to a directory, its a checkpoint recover. Checkpoint result = null; if (rp.exists() && rp.isDirectory()) { Checkpoint cp = new Checkpoint(rp); if (cp.isValid()) { // if valid, set as result. result = cp; } } return result; } public static boolean isCheckpointRecover(final CrawlOrder order) { return getCheckpointRecover(order) != null; } /** * @return True if we're in checkpoint recover mode. Call * {@link #getCheckpointRecover()} to get at Checkpoint instance * that has info on checkpoint directory being recovered from. */ public boolean isCheckpointRecover() { return this.checkpointRecover != null; } /** * Operator requested for crawl to stop. */ public synchronized void requestCrawlStop() { requestCrawlStop(CrawlJob.STATUS_ABORTED); } /** * Operator requested for crawl to stop. * @param message */ public synchronized void requestCrawlStop(String message) { if (state == STOPPING || state == FINISHED) { return; } if (message == null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -