⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 crawlcontroller.java

📁 最强的爬虫工程
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
            try {                this.bdbEnvironment.sync();                this.bdbEnvironment.close();            } catch (DatabaseException e) {                e.printStackTrace();            }            this.bdbEnvironment = null;        }        this.bigmaps = null;        if (this.toePool != null) {            this.toePool.cleanup();            // I played with launching a thread here to do cleanup of the            // ToePool ThreadGroup (making sure the cleanup thread was not            // in the ToePool ThreadGroup).  Did this because ToePools seemed            // to be sticking around holding references to CrawlController at            // least.  Need to spend more time looking to see that this is            // still the case even after adding the above toePool#cleanup call.        }        this.toePool = null;        LOGGER.fine("Finished crawl.");    }        synchronized void completePause() {        // Send a notifyAll. At least checkpointing thread may be waiting on a        // complete pause.        notifyAll();        sendCrawlStateChangeEvent(PAUSED, CrawlJob.STATUS_PAUSED);    }    private boolean shouldContinueCrawling() {        if (frontier.isEmpty()) {            this.sExit = CrawlJob.STATUS_FINISHED;            return false;        }        if (maxBytes > 0 && frontier.totalBytesWritten() >= maxBytes) {            // Hit the max byte download limit!            sExit = CrawlJob.STATUS_FINISHED_DATA_LIMIT;            return false;        } else if (maxDocument > 0                && frontier.succeededFetchCount() >= maxDocument) {            // Hit the max document download limit!            this.sExit = CrawlJob.STATUS_FINISHED_DOCUMENT_LIMIT;            return false;        } else if (maxTime > 0 &&                statistics.crawlDuration() >= maxTime * 1000) {            // Hit the max byte download limit!            this.sExit = CrawlJob.STATUS_FINISHED_TIME_LIMIT;            return false;        }        return state == RUNNING;    }    /**     * Request a checkpoint.     * Sets a checkpointing thread running.     * @throws IllegalStateException Thrown if crawl is not in paused state     * (Crawl must be first paused before checkpointing).     */    public synchronized void requestCrawlCheckpoint()    throws IllegalStateException {        if (this.checkpointer == null) {            return;        }        if (this.checkpointer.isCheckpointing()) {            throw new IllegalStateException("Checkpoint already running.");        }        this.checkpointer.checkpoint();    }           /**     * @return True if checkpointing.     */    public boolean isCheckpointing() {        return this.state == CHECKPOINTING;    }        /**     * Run checkpointing.     * CrawlController takes care of managing the checkpointing/serializing     * of bdb, the StatisticsTracker, and the CheckpointContext.  Other     * modules that want to revive themselves on checkpoint recovery need to     * save state during their {@link CrawlStatusListener#crawlCheckpoint(File)}     * invocation and then in their #initialize if a module,     * or in their #initialTask if a processor, check with the CrawlController     * if its checkpoint recovery. If it is, read in their old state from the     * pointed to  checkpoint directory.     * <p>Default access only to be called by Checkpointer.     * @throws Exception     */    void checkpoint()    throws Exception {        // Tell registered listeners to checkpoint.        sendCheckpointEvent(this.checkpointer.            getCheckpointInProgressDirectory());                // Rotate off crawler logs.        LOGGER.fine("Rotating log files.");        rotateLogFiles(CURRENT_LOG_SUFFIX + "." +            this.checkpointer.getNextCheckpointName());        // Sync the BigMap contents to bdb, if their bdb bigmaps.        LOGGER.fine("BigMaps.");        checkpointBigMaps(this.checkpointer.getCheckpointInProgressDirectory());        // Note, on deserialization, the super CrawlType#parent        // needs to be restored. Parent is '/crawl-order/loggers'.        // The settings handler for this module also needs to be        // restored. Both of these fields are private in the        // super class. Adding the restored ST to crawl order should take        // care of this.        // Checkpoint bdb environment.        LOGGER.fine("Bdb environment.");        checkpointBdb(this.checkpointer.getCheckpointInProgressDirectory());        // Make copy of order, seeds, and settings.        LOGGER.fine("Copying settings.");        copySettings(this.checkpointer.getCheckpointInProgressDirectory());        // Checkpoint this crawlcontroller.        CheckpointUtils.writeObjectToFile(this,            this.checkpointer.getCheckpointInProgressDirectory());    }        /**     * Copy off the settings.     * @param checkpointDir Directory to write checkpoint to.     * @throws IOException      */    protected void copySettings(final File checkpointDir) throws IOException {        final List files = this.settingsHandler.getListOfAllFiles();        boolean copiedSettingsDir = false;        final File settingsDir = new File(this.disk, "settings");        for (final Iterator i = files.iterator(); i.hasNext();) {            File f = new File((String)i.next());            if (f.getAbsolutePath().startsWith(settingsDir.getAbsolutePath())) {                if (copiedSettingsDir) {                    // Skip.  We've already copied this member of the                    // settings directory.                    continue;                }                // Copy 'settings' dir all in one lump, not a file at a time.                copiedSettingsDir = true;                FileUtils.copyFiles(settingsDir,                    new File(checkpointDir, settingsDir.getName()));                continue;            }            FileUtils.copyFiles(f, f.isDirectory()? checkpointDir:                new File(checkpointDir, f.getName()));        }    }        /**     * Checkpoint bdb.     * I used do a call to log cleaning as suggested in je-2.0 javadoc but takes     * way too much time (20minutes for a crawl of 1million items). Assume     * cleaner is keeping up. Below was log cleaning loop .     * <pre>int totalCleaned = 0;     * for (int cleaned = 0; (cleaned = this.bdbEnvironment.cleanLog()) != 0;     *  totalCleaned += cleaned) {     *      LOGGER.fine("Cleaned " + cleaned + " log files.");     * }     * </pre>     * <p>I also used to do a sync. But, from Mark Hayes, sync and checkpoint     * are effectively same thing only sync is not configurable.  He suggests     * doing one or the other:     * <p>MS: Reading code, Environment.sync() is a checkpoint.  Looks like     * I don't need to call a checkpoint after calling a sync?     * <p>MH: Right, they're almost the same thing -- just do one or the other,     * not both.  With the new API, you'll need to do a checkpoint not a     * sync, because the sync() method has no config parameter.  Don't worry     * -- it's fine to do a checkpoint even though you're not using.     * @param checkpointDir Directory to write checkpoint to.     * @throws DatabaseException      * @throws IOException      * @throws RuntimeException Thrown if failed setup of new bdb environment.     */    protected void checkpointBdb(File checkpointDir)    throws DatabaseException, IOException, RuntimeException {        EnvironmentConfig envConfig = this.bdbEnvironment.getConfig();        final List bkgrdThreads = Arrays.asList(new String []            {"je.env.runCheckpointer", "je.env.runCleaner",                "je.env.runINCompressor"});        try {            // Disable background threads            setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "false");            // Do a force checkpoint.  Thats what a sync does (i.e. doSync).            CheckpointConfig chkptConfig = new CheckpointConfig();            chkptConfig.setForce(true);                        // Mark Hayes of sleepycat says:            // "The default for this property is false, which gives the current            // behavior (allow deltas).  If this property is true, deltas are            // prohibited -- full versions of internal nodes are always logged            // during the checkpoint. When a full version of an internal node            // is logged during a checkpoint, recovery does not need to process            // it at all.  It is only fetched if needed by the application,            // during normal DB operations after recovery. When a delta of an            // internal node is logged during a checkpoint, recovery must            // process it by fetching the full version of the node from earlier            // in the log, and then applying the delta to it.  This can be            // pretty slow, since it is potentially a large amount of            // random I/O."            chkptConfig.setMinimizeRecoveryTime(true);            this.bdbEnvironment.checkpoint(chkptConfig);            LOGGER.fine("Finished bdb checkpoint.");                        // From the sleepycat folks: A trick for flipping db logs.            EnvironmentImpl envImpl =                 DbInternal.envGetEnvironmentImpl(this.bdbEnvironment);            long firstFileInNextSet =                DbLsn.getFileNumber(envImpl.forceLogFileFlip());            // So the last file in the checkpoint is firstFileInNextSet - 1.            // Write manifest of all log files into the bdb directory.            final String lastBdbCheckpointLog =                getBdbLogFileName(firstFileInNextSet - 1);            processBdbLogs(checkpointDir, lastBdbCheckpointLog);            LOGGER.fine("Finished processing bdb log files.");        } finally {            // Restore background threads.            setBdbjeBkgrdThreads(envConfig, bkgrdThreads, "true");        }    }        protected void processBdbLogs(final File checkpointDir,            final String lastBdbCheckpointLog) throws IOException {        File bdbDir = CheckpointUtils.getBdbSubDirectory(checkpointDir);        if (!bdbDir.exists()) {            bdbDir.mkdir();        }        PrintWriter pw = new PrintWriter(new FileOutputStream(new File(             checkpointDir, "bdbje-logs-manifest.txt")));        try {            // Don't copy any beyond the last bdb log file (bdbje can keep            // writing logs after checkpoint).            boolean pastLastLogFile = false;            Set srcFilenames = null;            final boolean copyFiles = getCheckpointCopyBdbjeLogs();            do {                FilenameFilter filter = CheckpointUtils.getJeLogsFilter();                srcFilenames =                    new HashSet(Arrays.asList(getStateDisk().list(filter)));                List tgtFilenames = Arrays.asList(bdbDir.list(filter));                if (tgtFilenames != null && tgtFilenames.size() > 0) {                    srcFilenames.removeAll(tgtFilenames);                }                if (srcFilenames.size() > 0) {                    // Sort files.                    srcFilenames = new TreeSet(srcFilenames);                    int count = 0;                    for (final Iterator i = srcFilenames.iterator();                            i.hasNext() && !pastLastLogFile;) {                        String name = (String) i.next();                        if (copyFiles) {                            FileUtils.copyFiles(new File(getStateDisk(), name),                                new File(bdbDir, name));                        }                        pw.println(name);                        if (name.equals(lastBdbCheckpointLog)) {                            // We're done.                            pastLastLogFile = true;                        }                        count++;                    }                    if (LOGGER.isLoggable(Level.FINE)) {                        LOGGER.fine("Copied " + count);                    }                }            } while (!pastLastLogFile && srcFilenames != null &&                srcFilenames.size() > 0);        } finally {            pw.close();        }    }     protected String getBdbLogFileName(final long index) {        String lastBdbLogFileHex = Long.toHexString(index);        StringBuffer buffer = new StringBuffer();        for (int i = 0; i < (8 - lastBdbLogFileHex.length()); i++) {            buffer.append('0');        }        buffer.append(lastBdbLogFileHex);        buffer.append(".jdb");        return buffer.toString();    }        protected void setBdbjeBkgrdThreads(final EnvironmentConfig config,            final List threads, final String setting) {        for (final Iterator i = threads.iterator(); i.hasNext();) {            config.setConfigParam((String)i.next(), setting);        }    }        /**     * Get recover checkpoint.     * Returns null if we're NOT in recover mode.     * Looks at ATTR_RECOVER_PATH and if its a directory, assumes checkpoint     * recover. If checkpoint mode, returns Checkpoint instance if     * checkpoint was VALID (else null).     * @return Checkpoint instance if we're in recover checkpoint     * mode and the pointed-to checkpoint was valid.     * @see #isCheckpointRecover()     */    public synchronized Checkpoint getCheckpointRecover() {        if (this.checkpointRecover != null) {            return this.checkpointRecover;        }        return getCheckpointRecover(this.order);    }        public static Checkpoint getCheckpointRecover(final CrawlOrder order) {        String path = (String)order.getUncheckedAttribute(null,            CrawlOrder.ATTR_RECOVER_PATH);        if (path == null || path.length() <= 0) {            return null;        }        File rp = new File(path);        // Assume if path is to a directory, its a checkpoint recover.        Checkpoint result = null;        if (rp.exists() && rp.isDirectory()) {            Checkpoint cp = new Checkpoint(rp);            if (cp.isValid()) {                // if valid, set as result.                result = cp;            }        }        return result;    }        public static boolean isCheckpointRecover(final CrawlOrder order) {        return getCheckpointRecover(order) != null;    }        /**     * @return True if we're in checkpoint recover mode. Call     * {@link #getCheckpointRecover()} to get at Checkpoint instance     * that has info on checkpoint directory being recovered from.     */    public boolean isCheckpointRecover() {        return this.checkpointRecover != null;    }    /**     * Operator requested for crawl to stop.     */    public synchronized void requestCrawlStop() {        requestCrawlStop(CrawlJob.STATUS_ABORTED);    }        /**     * Operator requested for crawl to stop.     * @param message      */    public synchronized void requestCrawlStop(String message) {        if (state == STOPPING || state == FINISHED) {            return;        }        if (message == null) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -