📄 workqueuefrontier.java
字号:
* * @throws FatalConfigurationException */ private void initCostPolicy() throws FatalConfigurationException { try { costAssignmentPolicy = (CostAssignmentPolicy) Class.forName( (String) getUncheckedAttribute(null, ATTR_COST_POLICY)) .newInstance(); } catch (Exception e) { e.printStackTrace(); throw new FatalConfigurationException(e.getMessage()); } } /* (non-Javadoc) * @see org.archive.crawler.frontier.AbstractFrontier#crawlEnded(java.lang.String) */ public void crawlEnded(String sExitMessage) { // Cleanup. CrawlJobs persist after crawl has finished so undo any // references. if (this.alreadyIncluded != null) { this.alreadyIncluded.close(); this.alreadyIncluded = null; } this.queueAssignmentPolicy = null; try { closeQueue(); } catch (IOException e) { // FIXME exception handling e.printStackTrace(); } this.wakeTimer.cancel(); this.allQueues.clear(); this.allQueues = null; this.inProcessQueues = null; this.readyClassQueues = null; this.snoozedClassQueues = null; this.inactiveQueues = null; this.retiredQueues = null; this.costAssignmentPolicy = null; // Clearing controller is a problem. We get NPEs in #preNext. super.crawlEnded(sExitMessage); this.controller = null; } /** * Create a UriUniqFilter that will serve as record * of already seen URIs. * * @return A UURISet that will serve as a record of already seen URIs * @throws IOException */ protected abstract UriUniqFilter createAlreadyIncluded() throws IOException; /** * Arrange for the given CandidateURI to be visited, if it is not * already scheduled/completed. * * @see org.archive.crawler.framework.Frontier#schedule(org.archive.crawler.datamodel.CandidateURI) */ public void schedule(CandidateURI caUri) { // Canonicalization may set forceFetch flag. See // #canonicalization(CandidateURI) javadoc for circumstance. String canon = canonicalize(caUri); if (caUri.forceFetch()) { alreadyIncluded.addForce(canon, caUri); } else { alreadyIncluded.add(canon, caUri); } } /** * Accept the given CandidateURI for scheduling, as it has * passed the alreadyIncluded filter. * * Choose a per-classKey queue and enqueue it. If this * item has made an unready queue ready, place that * queue on the readyClassQueues queue. * @param caUri CandidateURI. */ public void receive(CandidateURI caUri) { CrawlURI curi = asCrawlUri(caUri); applySpecialHandling(curi); sendToQueue(curi); // Update recovery log. doJournalAdded(curi); } /* (non-Javadoc) * @see org.archive.crawler.frontier.AbstractFrontier#asCrawlUri(org.archive.crawler.datamodel.CandidateURI) */ protected CrawlURI asCrawlUri(CandidateURI caUri) { CrawlURI curi = super.asCrawlUri(caUri); // force cost to be calculated, pre-insert getCost(curi); return curi; } /** * Send a CrawlURI to the appropriate subqueue. * * @param curi */ protected void sendToQueue(CrawlURI curi) { WorkQueue wq = getQueueFor(curi); synchronized (wq) { wq.enqueue(this, curi); if(!wq.isRetired()) { incrementQueuedUriCount(); } if(!wq.isHeld()) { wq.setHeld(); if(holdQueues() && readyClassQueues.size()>=targetSizeForReadyQueues()) { deactivateQueue(wq); } else { replenishSessionBalance(wq); readyQueue(wq); } } WorkQueue laq = longestActiveQueue; if(!wq.isRetired()&&((laq==null) || wq.getCount() > laq.getCount())) { longestActiveQueue = wq; } } } /** * Whether queues should start inactive (only becoming active when needed * to keep the crawler busy), or if queues should start out ready. * * @return true if new queues should held inactive */ private boolean holdQueues() { return ((Boolean) getUncheckedAttribute(null, ATTR_HOLD_QUEUES)) .booleanValue(); } /** * Put the given queue on the readyClassQueues queue * @param wq */ private void readyQueue(WorkQueue wq) { try { wq.setActive(this, true); readyClassQueues.put(wq.getClassKey()); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("unable to ready queue "+wq); // propagate interrupt up throw new RuntimeException(e); } } /** * Put the given queue on the inactiveQueues queue * @param wq */ private void deactivateQueue(WorkQueue wq) { try { wq.setSessionBalance(0); // zero out session balance inactiveQueues.put(wq.getClassKey()); wq.setActive(this, false); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("unable to deactivate queue "+wq); // propagate interrupt up throw new RuntimeException(e); } } /** * Put the given queue on the retiredQueues queue * @param wq */ private void retireQueue(WorkQueue wq) { try { retiredQueues.put(wq.getClassKey()); decrementQueuedCount(wq.getCount()); wq.setRetired(true); wq.setActive(this, false); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("unable to retire queue "+wq); // propagate interrupt up throw new RuntimeException(e); } } /** * Accomodate any changes in settings. * * @see org.archive.crawler.framework.Frontier#kickUpdate() */ public void kickUpdate() { super.kickUpdate(); int target = (Integer)getUncheckedAttribute(null, ATTR_TARGET_READY_QUEUES_BACKLOG); if (target < 1) { target = 1; } this.targetSizeForReadyQueues = target; try { initCostPolicy(); } catch (FatalConfigurationException fce) { throw new RuntimeException(fce); } // The rules for a 'retired' queue may have changed; so, // unretire all queues to 'inactive'. If they still qualify // as retired/overbudget next time they come up, they'll // be re-retired; if not, they'll get a chance to become // active under the new rules. Object key = this.retiredQueues.poll(); while (key != null) { WorkQueue q = (WorkQueue)this.allQueues.get(key); if(q != null) { unretireQueue(q); } key = this.retiredQueues.poll(); } } /** * Restore a retired queue to the 'inactive' state. * * @param q */ private void unretireQueue(WorkQueue q) { deactivateQueue(q); q.setRetired(false); incrementQueuedUriCount(q.getCount()); } /** * Return the work queue for the given CrawlURI's classKey. URIs * are ordered and politeness-delayed within their 'class'. * If the requested queue is not found, a new instance is created. * * @param curi CrawlURI to base queue on * @return the found or created ClassKeyQueue */ protected abstract WorkQueue getQueueFor(CrawlURI curi); /** * Return the work queue for the given classKey, or null * if no such queue exists. * * @param classKey key to look for * @return the found WorkQueue */ protected abstract WorkQueue getQueueFor(String classKey); /** * Return the next CrawlURI to be processed (and presumably * visited/fetched) by a a worker thread. * * Relies on the readyClassQueues having been loaded with * any work queues that are eligible to provide a URI. * * @return next CrawlURI to be processed. Or null if none is available. * * @see org.archive.crawler.framework.Frontier#next() */ public CrawlURI next() throws InterruptedException, EndedException { while (true) { // loop left only by explicit return or exception long now = System.currentTimeMillis(); // Do common checks for pause, terminate, bandwidth-hold preNext(now); synchronized(readyClassQueues) { int activationsNeeded = targetSizeForReadyQueues() - readyClassQueues.size(); while(activationsNeeded > 0 && !inactiveQueues.isEmpty()) { activateInactiveQueue(); activationsNeeded--; } } WorkQueue readyQ = null; Object key = readyClassQueues.poll(DEFAULT_WAIT,TimeUnit.MILLISECONDS); if (key != null) { readyQ = (WorkQueue)this.allQueues.get(key); } if (readyQ != null) { while(true) { // loop left by explicit return or break on empty CrawlURI curi = null; synchronized(readyQ) { curi = readyQ.peek(this); if (curi != null) { // check if curi belongs in different queue String currentQueueKey = getClassKey(curi); if (currentQueueKey.equals(curi.getClassKey())) { // curi was in right queue, emit noteAboutToEmit(curi, readyQ); inProcessQueues.add(readyQ); return curi; } // URI's assigned queue has changed since it // was queued (eg because its IP has become // known). Requeue to new queue. curi.setClassKey(currentQueueKey); readyQ.dequeue(this); decrementQueuedCount(1); curi.setHolderKey(null); // curi will be requeued to true queue after lock // on readyQ is released, to prevent deadlock } else { // readyQ is empty and ready: it's exhausted // release held status, allowing any subsequent // enqueues to again put queue in ready readyQ.clearHeld(); break; } } if(curi!=null) { // complete the requeuing begun earlier sendToQueue(curi); } } } else { // ReadyQ key wasn't in all queues: unexpected if (key != null) { logger.severe("Key "+ key + " in readyClassQueues but not allQueues"); } } if(shouldTerminate) { // skip subsequent steps if already on last legs throw new EndedException("shouldTerminate is true"); } if(inProcessQueues.size()==0) { // Nothing was ready or in progress or imminent to wake; ensure
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -