⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workqueuefrontier.java

📁 Heritrix是一个开源,可扩展的web爬虫项目。Heritrix设计成严格按照robots.txt文件的排除指示和META robots标签。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                // any piled-up pending-scheduled URIs are considered                this.alreadyIncluded.requestFlush();            }            }    }    private int targetSizeForReadyQueues() {        return targetSizeForReadyQueues;    }    /**     * Return the 'cost' of a CrawlURI (how much of its associated     * queue's budget it depletes upon attempted processing)     *      * @param curi     * @return the associated cost     */    private int getCost(CrawlURI curi) {        int cost = curi.getHolderCost();        if (cost == CrawlURI.UNCALCULATED) {            cost = costAssignmentPolicy.costOf(curi);            curi.setHolderCost(cost);        }        return cost;    }        /**     * Activate an inactive queue, if any are available.      */    private void activateInactiveQueue() {        Object key = this.inactiveQueues.poll();        if (key == null) {            return;        }        WorkQueue candidateQ = (WorkQueue)this.allQueues.get(key);        if(candidateQ != null) {            synchronized(candidateQ) {                replenishSessionBalance(candidateQ);                if(candidateQ.isOverBudget()){                    // if still over-budget after an activation & replenishing,                    // retire                    retireQueue(candidateQ);                    return;                }                 long now = System.currentTimeMillis();                long delay_ms = candidateQ.getWakeTime() - now;                if(delay_ms>0) {                    // queue still due for snoozing                    snoozeQueue(candidateQ,now,delay_ms);                    return;                }                candidateQ.setWakeTime(0); // clear obsolete wake time, if any                readyQueue(candidateQ);                if (logger.isLoggable(Level.FINE)) {                    logger.fine("ACTIVATED queue: " +                        candidateQ.getClassKey());                                   }            }        }    }    /**     * Replenish the budget of the given queue by the appropriate amount.     *      * @param queue queue to replenish     */    private void replenishSessionBalance(WorkQueue queue) {        // get a CrawlURI for override context purposes        CrawlURI contextUri = queue.peek(this);         // TODO: consider confusing cross-effects of this and IP-based politeness        queue.setSessionBalance(((Integer) getUncheckedAttribute(contextUri,                ATTR_BALANCE_REPLENISH_AMOUNT)).intValue());        // reset total budget (it may have changed)        // TODO: is this the best way to be sensitive to potential mid-crawl changes        long totalBudget = ((Long)getUncheckedAttribute(contextUri,ATTR_QUEUE_TOTAL_BUDGET)).longValue();        queue.setTotalBudget(totalBudget);        queue.unpeek(); // don't insist on that URI being next released    }    /**     * Enqueue the given queue to either readyClassQueues or inactiveQueues,     * as appropriate.     *      * @param wq     */    private void reenqueueQueue(WorkQueue wq) {        if(wq.isOverBudget()) {            // if still over budget, deactivate            if (logger.isLoggable(Level.FINE)) {                logger.fine("DEACTIVATED queue: " +                    wq.getClassKey());            }            deactivateQueue(wq);        } else {            readyQueue(wq);        }    }        /**     * Wake any queues sitting in the snoozed queue whose time has come.     */    void wakeQueues() {        synchronized (snoozedClassQueues) {            long now = System.currentTimeMillis();            long nextWakeDelay = 0;            int wokenQueuesCount = 0;            while (true) {                if (snoozedClassQueues.isEmpty()) {                    return;                }                WorkQueue peek = (WorkQueue) snoozedClassQueues.first();                nextWakeDelay = peek.getWakeTime() - now;                if (nextWakeDelay <= 0) {                    snoozedClassQueues.remove(peek);                    peek.setWakeTime(0);                    reenqueueQueue(peek);                    wokenQueuesCount++;                } else {                    break;                }            }            this.nextWake = new WakeTask();            this.wakeTimer.schedule(nextWake,nextWakeDelay);        }    }    /**     * Note that the previously emitted CrawlURI has completed     * its processing (for now).     *     * The CrawlURI may be scheduled to retry, if appropriate,     * and other related URIs may become eligible for release     * via the next next() call, as a result of finished().     *     *  (non-Javadoc)     * @see org.archive.crawler.framework.Frontier#finished(org.archive.crawler.datamodel.CrawlURI)     */    public void finished(CrawlURI curi) {        long now = System.currentTimeMillis();        curi.incrementFetchAttempts();        logLocalizedErrors(curi);        WorkQueue wq = (WorkQueue) curi.getHolder();        assert (wq.peek(this) == curi) : "unexpected peek " + wq;        inProcessQueues.remove(wq, 1);        if(includesRetireDirective(curi)) {            // CrawlURI is marked to trigger retirement of its queue            curi.processingCleanup();            wq.unpeek();            wq.update(this, curi); // rewrite any changes            retireQueue(wq);            return;        }                if (needsRetrying(curi)) {            // Consider errors which can be retried, leaving uri atop queue            if(curi.getFetchStatus()!=S_DEFERRED) {                wq.expend(getCost(curi)); // all retries but DEFERRED cost            }            long delay_sec = retryDelayFor(curi);            curi.processingCleanup(); // lose state that shouldn't burden retry            synchronized(wq) {                wq.unpeek();                // TODO: consider if this should happen automatically inside unpeek()                wq.update(this, curi); // rewrite any changes                if (delay_sec > 0) {                    long delay_ms = delay_sec * 1000;                    snoozeQueue(wq, now, delay_ms);                } else {                    reenqueueQueue(wq);                }            }            // Let everyone interested know that it will be retried.            controller.fireCrawledURINeedRetryEvent(curi);            doJournalRescheduled(curi);            return;        }        // Curi will definitely be disposed of without retry, so remove from queue        wq.dequeue(this);        decrementQueuedCount(1);        log(curi);        if (curi.isSuccess()) {            totalProcessedBytes += curi.getRecordedSize();            incrementSucceededFetchCount();            // Let everyone know in case they want to do something before we strip the curi.            controller.fireCrawledURISuccessfulEvent(curi);            doJournalFinishedSuccess(curi);            wq.expend(getCost(curi)); // successes cost        } else if (isDisregarded(curi)) {            // Check for codes that mean that while we the crawler did            // manage to schedule it, it must be disregarded for some reason.            incrementDisregardedUriCount();            // Let interested listeners know of disregard disposition.            controller.fireCrawledURIDisregardEvent(curi);            // if exception, also send to crawlErrors            if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {                Object[] array = { curi };                controller.runtimeErrors.log(Level.WARNING, curi.getUURI()                        .toString(), array);            }            // TODO: consider reinstating forget-uri        } else {            // In that case FAILURE, note & log            //Let interested listeners know of failed disposition.            this.controller.fireCrawledURIFailureEvent(curi);            // if exception, also send to crawlErrors            if (curi.getFetchStatus() == S_RUNTIME_EXCEPTION) {                Object[] array = { curi };                this.controller.runtimeErrors.log(Level.WARNING, curi.getUURI()                        .toString(), array);            }            incrementFailedFetchCount();            // let queue note error            wq.noteError(((Integer) getUncheckedAttribute(curi,                    ATTR_ERROR_PENALTY_AMOUNT)).intValue());             doJournalFinishedFailure(curi);            wq.expend(getCost(curi)); // failures cost        }        long delay_ms = politenessDelayFor(curi);        synchronized(wq) {            if (delay_ms > 0) {                snoozeQueue(wq,now,delay_ms);            } else {                reenqueueQueue(wq);            }        }        curi.stripToMinimal();        curi.processingCleanup();    }    private boolean includesRetireDirective(CrawlURI curi) {        return curi.containsKey(A_FORCE_RETIRE) && (Boolean)curi.getObject(A_FORCE_RETIRE);    }    /**     * Place the given queue into 'snoozed' state, ineligible to     * supply any URIs for crawling, for the given amount of time.      *      * @param wq queue to snooze      * @param now time now in ms      * @param delay_ms time to snooze in ms     */    private void snoozeQueue(WorkQueue wq, long now, long delay_ms) {        long nextTime = now + delay_ms;        wq.setWakeTime(nextTime);        long snoozeToInactiveDelayMs = ((Long)getUncheckedAttribute(null,                ATTR_SNOOZE_DEACTIVATE_MS)).longValue();        if (delay_ms > snoozeToInactiveDelayMs && !inactiveQueues.isEmpty()) {            deactivateQueue(wq);        } else {            synchronized(snoozedClassQueues) {                snoozedClassQueues.add(wq);                if(wq == snoozedClassQueues.first()) {                    this.nextWake = new WakeTask();                    this.wakeTimer.schedule(nextWake, delay_ms);                }            }        }    }    /**     * Forget the given CrawlURI. This allows a new instance     * to be created in the future, if it is reencountered under     * different circumstances.     *     * @param curi The CrawlURI to forget     */    protected void forget(CrawlURI curi) {        logger.finer("Forgetting " + curi);        alreadyIncluded.forget(canonicalize(curi.getUURI()), curi);    }    /**  (non-Javadoc)     * @see org.archive.crawler.framework.Frontier#discoveredUriCount()     */    public long discoveredUriCount() {        return (this.alreadyIncluded != null)? this.alreadyIncluded.count(): 0;    }    /**     * @param match String to  match.     * @return Number of items deleted.     */    public long deleteURIs(String match) {        long count = 0;        // TODO: DANGER/ values() may not work right from CachedBdbMap        Iterator iter = allQueues.keySet().iterator();         while(iter.hasNext()) {            WorkQueue wq = getQueueFor(((String)iter.next()));            wq.unpeek();            count += wq.deleteMatching(this, match);        }        decrementQueuedCount(count);        return count;    }    //    // Reporter implementation    //        public static String STANDARD_REPORT = "standard";    public static String ALL_NONEMPTY = "nonempty";    public static String ALL_QUEUES = "all";    protected static String[] REPORTS = {STANDARD_REPORT,ALL_NONEMPTY,ALL_QUEUES};        public String[] getReports() {        return REPORTS;    }        /**     * @param w Where to write to.     */    public void singleLineReportTo(PrintWriter w) {        if (this.allQueues == null) {            return;        }        int allCount = allQueues.size();        int inProcessCount = inProcessQueues.uniqueSet().size();        int readyCount = readyClassQueues.size();        int snoozedCount = snoozedClassQueues.size();        int activeCount = inProcessCount + readyCount + snoozedCount;        int inactiveCount = inactiveQueues.size();        int retiredCount = retiredQueues.size();        int exhaustedCount =             allCount - activeCount - inactiveCount - retiredCount;        w.print(allCount);        w.print(" queues: ");        w.print(activeCount);        w.print(" active (");        w.print(inProcessCount);        w.print(" in-process; ");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -