⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 workqueuefrontier.java

📁 Heritrix是一个开源,可扩展的web爬虫项目。Heritrix设计成严格按照robots.txt文件的排除指示和META robots标签。
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
     *      * @throws FatalConfigurationException     */    private void initCostPolicy() throws FatalConfigurationException {        try {            costAssignmentPolicy = (CostAssignmentPolicy) Class.forName(                    (String) getUncheckedAttribute(null, ATTR_COST_POLICY))                    .newInstance();        } catch (Exception e) {            e.printStackTrace();            throw new FatalConfigurationException(e.getMessage());        }    }    /* (non-Javadoc)     * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String)     */    public void crawlEnded(String sExitMessage) {        // Cleanup.  CrawlJobs persist after crawl has finished so undo any        // references.        if (this.alreadyIncluded != null) {            this.alreadyIncluded.close();            this.alreadyIncluded = null;        }        this.queueAssignmentPolicy = null;                try {            closeQueue();        } catch (IOException e) {            // FIXME exception handling            e.printStackTrace();        }        this.wakeTimer.cancel();                this.allQueues.clear();        this.allQueues = null;        this.inProcessQueues = null;        this.readyClassQueues = null;        this.snoozedClassQueues = null;        this.inactiveQueues = null;        this.retiredQueues = null;                this.costAssignmentPolicy = null;                // Clearing controller is a problem. We get NPEs in #preNext.        super.crawlEnded(sExitMessage);        this.controller = null;    }    /**     * Create a UriUniqFilter that will serve as record      * of already seen URIs.     *     * @return A UURISet that will serve as a record of already seen URIs     * @throws IOException     */    protected abstract UriUniqFilter createAlreadyIncluded() throws IOException;    /**     * Arrange for the given CandidateURI to be visited, if it is not     * already scheduled/completed.     *     * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI)     */    public void schedule(CandidateURI caUri) {        // Canonicalization may set forceFetch flag.  See        // #canonicalization(CandidateURI) javadoc for circumstance.        String canon = canonicalize(caUri);        if (caUri.forceFetch()) {            alreadyIncluded.addForce(canon, caUri);        } else {            alreadyIncluded.add(canon, caUri);        }    }    /**     * Accept the given CandidateURI for scheduling, as it has     * passed the alreadyIncluded filter.      *      * Choose a per-classKey queue and enqueue it. If this     * item has made an unready queue ready, place that      * queue on the readyClassQueues queue.      * @param caUri CandidateURI.     */    public void receive(CandidateURI caUri) {        CrawlURI curi = asCrawlUri(caUri);        applySpecialHandling(curi);        sendToQueue(curi);        // Update recovery log.        doJournalAdded(curi);    }	/* (non-Javadoc)	 * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI)	 */	protected CrawlURI asCrawlUri(CandidateURI caUri) {		CrawlURI curi = super.asCrawlUri(caUri);		// force cost to be calculated, pre-insert		getCost(curi);		return curi;	}	    /**     * Send a CrawlURI to the appropriate subqueue.     *      * @param curi     */    protected void sendToQueue(CrawlURI curi) {        WorkQueue wq = getQueueFor(curi);        synchronized (wq) {            wq.enqueue(this, curi);            if(!wq.isRetired()) {                incrementQueuedUriCount();            }            if(!wq.isHeld()) {                wq.setHeld();                if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) {                    deactivateQueue(wq);                } else {                    replenishSessionBalance(wq);                    readyQueue(wq);                }            }            WorkQueue laq = longestActiveQueue;            if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) {                longestActiveQueue = wq;             }        }    }    /**     * Whether queues should start inactive (only becoming active when needed     * to keep the crawler busy), or if queues should start out ready.     *      * @return true if new queues should held inactive     */    private boolean holdQueues() {        return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES))                .booleanValue();    }    /**     * Put the given queue on the readyClassQueues queue     * @param wq     */    private void readyQueue(WorkQueue wq) {        try {            wq.setActive(this, true);            readyClassQueues.put(wq.getClassKey());        } catch (InterruptedException e) {            e.printStackTrace();            System.err.println("unable to ready queue "+wq);            // propagate interrupt up             throw new RuntimeException(e);        }    }    /**     * Put the given queue on the inactiveQueues queue     * @param wq     */    private void deactivateQueue(WorkQueue wq) {        try {            wq.setSessionBalance(0); // zero out session balance            inactiveQueues.put(wq.getClassKey());            wq.setActive(this, false);        } catch (InterruptedException e) {            e.printStackTrace();            System.err.println("unable to deactivate queue "+wq);            // propagate interrupt up             throw new RuntimeException(e);        }    }        /**     * Put the given queue on the retiredQueues queue     * @param wq     */    private void retireQueue(WorkQueue wq) {        try {            retiredQueues.put(wq.getClassKey());            decrementQueuedCount(wq.getCount());            wq.setRetired(true);            wq.setActive(this, false);        } catch (InterruptedException e) {            e.printStackTrace();            System.err.println("unable to retire queue "+wq);            // propagate interrupt up             throw new RuntimeException(e);        }    }        /**      * Accomodate any changes in settings.     *      * @see org.archive.crawler.framework.Frontier#kickUpdate()     */    public void kickUpdate() {        super.kickUpdate();        int target = (Integer)getUncheckedAttribute(null,                ATTR_TARGET_READY_QUEUES_BACKLOG);        if (target < 1) {            target = 1;        }        this.targetSizeForReadyQueues = target;         try {            initCostPolicy();        } catch (FatalConfigurationException fce) {            throw new RuntimeException(fce);        }        // The rules for a 'retired' queue may have changed; so,        // unretire all queues to 'inactive'. If they still qualify        // as retired/overbudget next time they come up, they'll        // be re-retired; if not, they'll get a chance to become        // active under the new rules.        Object key = this.retiredQueues.poll();        while (key != null) {            WorkQueue q = (WorkQueue)this.allQueues.get(key);            if(q != null) {                unretireQueue(q);            }            key = this.retiredQueues.poll();        }    }    /**     * Restore a retired queue to the 'inactive' state.      *      * @param q     */    private void unretireQueue(WorkQueue q) {        deactivateQueue(q);        q.setRetired(false);         incrementQueuedUriCount(q.getCount());    }    /**     * Return the work queue for the given CrawlURI's classKey. URIs     * are ordered and politeness-delayed within their 'class'.     * If the requested queue is not found, a new instance is created.     *      * @param curi CrawlURI to base queue on     * @return the found or created ClassKeyQueue     */    protected abstract WorkQueue getQueueFor(CrawlURI curi);    /**     * Return the work queue for the given classKey, or null     * if no such queue exists.     *      * @param classKey key to look for     * @return the found WorkQueue     */    protected abstract WorkQueue getQueueFor(String classKey);        /**     * Return the next CrawlURI to be processed (and presumably     * visited/fetched) by a a worker thread.     *     * Relies on the readyClassQueues having been loaded with     * any work queues that are eligible to provide a URI.      *     * @return next CrawlURI to be processed. Or null if none is available.     *     * @see org.archive.crawler.framework.Frontier#next()     */    public CrawlURI next()    throws InterruptedException, EndedException {        while (true) { // loop left only by explicit return or exception            long now = System.currentTimeMillis();            // Do common checks for pause, terminate, bandwidth-hold            preNext(now);                        synchronized(readyClassQueues) {                int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size();                while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) {                    activateInactiveQueue();                    activationsNeeded--;                }            }                               WorkQueue readyQ = null;            Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS);            if (key != null) {                readyQ = (WorkQueue)this.allQueues.get(key);            }            if (readyQ != null) {                while(true) { // loop left by explicit return or break on empty                    CrawlURI curi = null;                    synchronized(readyQ) {                        curi = readyQ.peek(this);                                             if (curi != null) {                            // check if curi belongs in different queue                            String currentQueueKey = getClassKey(curi);                            if (currentQueueKey.equals(curi.getClassKey())) {                                // curi was in right queue, emit                                noteAboutToEmit(curi, readyQ);                                inProcessQueues.add(readyQ);                                return curi;                            }                            // URI's assigned queue has changed since it                            // was queued (eg because its IP has become                            // known). Requeue to new queue.                            curi.setClassKey(currentQueueKey);                            readyQ.dequeue(this);                            decrementQueuedCount(1);                            curi.setHolderKey(null);                            // curi will be requeued to true queue after lock                            //  on readyQ is released, to prevent deadlock                        } else {                            // readyQ is empty and ready: it's exhausted                            // release held status, allowing any subsequent                             // enqueues to again put queue in ready                            readyQ.clearHeld();                            break;                        }                    }                    if(curi!=null) {                        // complete the requeuing begun earlier                        sendToQueue(curi);                    }                }            } else {                // ReadyQ key wasn't in all queues: unexpected                if (key != null) {                    logger.severe("Key "+ key +                        " in readyClassQueues but not allQueues");                }            }            if(shouldTerminate) {                // skip subsequent steps if already on last legs                throw new EndedException("shouldTerminate is true");            }                            if(inProcessQueues.size()==0) {                // Nothing was ready or in progress or imminent to wake; ensure 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -