📄 workqueuefrontier.java
字号:
w.print(readyCount); w.print(" ready; "); w.print(snoozedCount); w.print(" snoozed); "); w.print(inactiveCount); w.print(" inactive; "); w.print(retiredCount); w.print(" retired; "); w.print(exhaustedCount); w.print(" exhausted"); w.flush(); } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineLegend() */ public String singleLineLegend() { return "total active in-process ready snoozed inactive retired exhausted"; } /** * This method compiles a human readable report on the status of the frontier * at the time of the call. * @param name Name of report. * @param writer Where to write to. */ public synchronized void reportTo(String name, PrintWriter writer) { if(ALL_NONEMPTY.equals(name)) { allNonemptyReportTo(writer); return; } if(ALL_QUEUES.equals(name)) { allQueuesReportTo(writer); return; } if(name!=null && !STANDARD_REPORT.equals(name)) { writer.print(name); writer.print(" unavailable; standard report:\n"); } standardReportTo(writer); } /** Compact report of all nonempty queues (one queue per line) * * @param writer */ private void allNonemptyReportTo(PrintWriter writer) { ArrayList<WorkQueue> inProcessQueuesCopy; synchronized(this.inProcessQueues) { // grab a copy that will be stable against mods for report duration @SuppressWarnings("unchecked") Collection<WorkQueue> inProcess = this.inProcessQueues; inProcessQueuesCopy = new ArrayList<WorkQueue>(inProcess); } writer.print("\n -----===== IN-PROCESS QUEUES =====-----\n"); queueSingleLinesTo(writer, inProcessQueuesCopy.iterator()); writer.print("\n -----===== READY QUEUES =====-----\n"); queueSingleLinesTo(writer, this.readyClassQueues.iterator()); writer.print("\n -----===== SNOOZED QUEUES =====-----\n"); queueSingleLinesTo(writer, this.snoozedClassQueues.iterator()); writer.print("\n -----===== INACTIVE QUEUES =====-----\n"); queueSingleLinesTo(writer, this.inactiveQueues.iterator()); writer.print("\n -----===== RETIRED QUEUES =====-----\n"); queueSingleLinesTo(writer, this.retiredQueues.iterator()); } /** Compact report of all nonempty queues (one queue per line) * * @param writer */ private void allQueuesReportTo(PrintWriter writer) { queueSingleLinesTo(writer, allQueues.keySet().iterator()); } /** * Writer the single-line reports of all queues in the * iterator to the writer * * @param writer to receive report * @param iterator over queues of interest. */ private void queueSingleLinesTo(PrintWriter writer, Iterator iterator) { Object obj; WorkQueue q; boolean legendWritten = false; while( iterator.hasNext()) { obj = iterator.next(); if (obj == null) { continue; } q = (obj instanceof WorkQueue)? (WorkQueue)obj: (WorkQueue)this.allQueues.get(obj); if(q == null) { writer.print(" ERROR: "+obj); } if(!legendWritten) { writer.println(q.singleLineLegend()); legendWritten = true; } q.singleLineReportTo(writer); } } /** * @param w Writer to print to. */ private void standardReportTo(PrintWriter w) { int allCount = allQueues.size(); int inProcessCount = inProcessQueues.uniqueSet().size(); int readyCount = readyClassQueues.size(); int snoozedCount = snoozedClassQueues.size(); int activeCount = inProcessCount + readyCount + snoozedCount; int inactiveCount = inactiveQueues.size(); int retiredCount = retiredQueues.size(); int exhaustedCount = allCount - activeCount - inactiveCount - retiredCount; w.print("Frontier report - "); w.print(ArchiveUtils.get12DigitDate()); w.print("\n"); w.print(" Job being crawled: "); w.print(controller.getOrder().getCrawlOrderName()); w.print("\n"); w.print("\n -----===== STATS =====-----\n"); w.print(" Discovered: "); w.print(Long.toString(discoveredUriCount())); w.print("\n"); w.print(" Queued: "); w.print(Long.toString(queuedUriCount())); w.print("\n"); w.print(" Finished: "); w.print(Long.toString(finishedUriCount())); w.print("\n"); w.print(" Successfully: "); w.print(Long.toString(succeededFetchCount())); w.print("\n"); w.print(" Failed: "); w.print(Long.toString(failedFetchCount())); w.print("\n"); w.print(" Disregarded: "); w.print(Long.toString(disregardedUriCount())); w.print("\n"); w.print("\n -----===== QUEUES =====-----\n"); w.print(" Already included size: "); w.print(Long.toString(alreadyIncluded.count())); w.print("\n"); w.print(" pending: "); w.print(Long.toString(alreadyIncluded.pending())); w.print("\n"); w.print("\n All class queues map size: "); w.print(Long.toString(allCount)); w.print("\n"); w.print( " Active queues: "); w.print(activeCount); w.print("\n"); w.print(" In-process: "); w.print(inProcessCount); w.print("\n"); w.print(" Ready: "); w.print(readyCount); w.print("\n"); w.print(" Snoozed: "); w.print(snoozedCount); w.print("\n"); w.print(" Inactive queues: "); w.print(inactiveCount); w.print("\n"); w.print(" Retired queues: "); w.print(retiredCount); w.print("\n"); w.print(" Exhausted queues: "); w.print(exhaustedCount); w.print("\n"); w.print("\n -----===== IN-PROCESS QUEUES =====-----\n"); @SuppressWarnings("unchecked") Collection<WorkQueue> inProcess = inProcessQueues; ArrayList<WorkQueue> copy = extractSome(inProcess, REPORT_MAX_QUEUES); appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES); w.print("\n -----===== READY QUEUES =====-----\n"); appendQueueReports(w, this.readyClassQueues.iterator(), this.readyClassQueues.size(), REPORT_MAX_QUEUES); w.print("\n -----===== SNOOZED QUEUES =====-----\n"); copy = extractSome(snoozedClassQueues, REPORT_MAX_QUEUES); appendQueueReports(w, copy.iterator(), copy.size(), REPORT_MAX_QUEUES); WorkQueue longest = longestActiveQueue; if (longest != null) { w.print("\n -----===== LONGEST QUEUE =====-----\n"); longest.reportTo(w); } w.print("\n -----===== INACTIVE QUEUES =====-----\n"); appendQueueReports(w, this.inactiveQueues.iterator(), this.inactiveQueues.size(), REPORT_MAX_QUEUES); w.print("\n -----===== RETIRED QUEUES =====-----\n"); appendQueueReports(w, this.retiredQueues.iterator(), this.retiredQueues.size(), REPORT_MAX_QUEUES); w.flush(); } /** * Extract some of the elements in the given collection to an * ArrayList. This method synchronizes on the given collection's * monitor. The returned list will never contain more than the * specified maximum number of elements. * * @param c the collection whose elements to extract * @param max the maximum number of elements to extract * @return the extraction */ private static <T> ArrayList<T> extractSome(Collection<T> c, int max) { // Try to guess a sane initial capacity for ArrayList // Hopefully given collection won't grow more than 10 items // between now and the synchronized block... int initial = Math.min(c.size() + 10, max); int count = 0; ArrayList<T> list = new ArrayList<T>(initial); synchronized (c) { Iterator<T> iter = c.iterator(); while (iter.hasNext() && (count < max)) { list.add(iter.next()); count++; } } return list; } /** * Append queue report to general Frontier report. * @param w StringBuffer to append to. * @param iterator An iterator over * @param total * @param max */ protected void appendQueueReports(PrintWriter w, Iterator iterator, int total, int max) { Object obj; WorkQueue q; for(int count = 0; iterator.hasNext() && (count < max); count++) { obj = iterator.next(); if (obj == null) { continue; } q = (obj instanceof WorkQueue)? (WorkQueue)obj: (WorkQueue)this.allQueues.get(obj); if(q == null) { w.print("WARNING: No report for queue "+obj); } q.reportTo(w); } if(total > max) { w.print("...and " + (total - max) + " more.\n"); } } /** * Force logging, etc. of operator- deleted CrawlURIs * * @see org.archive.crawler.framework.Frontier#deleted(org.archive.crawler.datamodel.CrawlURI) */ public synchronized void deleted(CrawlURI curi) { //treat as disregarded controller.fireCrawledURIDisregardEvent(curi); log(curi); incrementDisregardedUriCount(); curi.stripToMinimal(); curi.processingCleanup(); } public void considerIncluded(UURI u) { this.alreadyIncluded.note(canonicalize(u)); CrawlURI temp = new CrawlURI(u); temp.setClassKey(getClassKey(temp)); getQueueFor(temp).expend(getCost(temp)); } protected abstract void initQueue() throws IOException; protected abstract void closeQueue() throws IOException; /** * Returns <code>true</code> if the WorkQueue implementation of this * Frontier stores its workload on disk instead of relying * on serialization mechanisms. * * @return a constant boolean value for this class/instance */ protected abstract boolean workQueueDataOnDisk(); public FrontierGroup getGroup(CrawlURI curi) { return getQueueFor(curi); } public long averageDepth() { int inProcessCount = inProcessQueues.uniqueSet().size(); int readyCount = readyClassQueues.size(); int snoozedCount = snoozedClassQueues.size(); int activeCount = inProcessCount + readyCount + snoozedCount; int inactiveCount = inactiveQueues.size(); int totalQueueCount = (activeCount+inactiveCount); return (totalQueueCount == 0) ? 0 : queuedUriCount / totalQueueCount; } public float congestionRatio() { int inProcessCount = inProcessQueues.uniqueSet().size(); int readyCount = readyClassQueues.size(); int snoozedCount = snoozedClassQueues.size(); int activeCount = inProcessCount + readyCount + snoozedCount; int inactiveCount = inactiveQueues.size(); return (float)(activeCount + inactiveCount) / (inProcessCount + snoozedCount); } public long deepestUri() { return longestActiveQueue==null ? -1 : longestActiveQueue.getCount(); } /* (non-Javadoc) * @see org.archive.crawler.framework.Frontier#isEmpty() */ public synchronized boolean isEmpty() { return queuedUriCount == 0 && alreadyIncluded.pending() == 0; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -