📄 workqueue.java
字号:
/** * Whether the queue is already in a lifecycle stage -- * such as ready, in-progress, snoozed -- and thus should * not be redundantly inserted to readyClassQueues * * @return isHeld */ public boolean isHeld() { return isHeld; } /** * Set isHeld to true */ public void setHeld() { isHeld = true; } /** * Forgive the peek, allowing a subsequent peek to * return a different item. * */ public void unpeek() { peekItem = null; } public final int compareTo(Object obj) { if(this == obj) { return 0; // for exact identity only } WorkQueue other = (WorkQueue) obj; if(getWakeTime() > other.getWakeTime()) { return 1; } if(getWakeTime() < other.getWakeTime()) { return -1; } // at this point, the ordering is arbitrary, but still // must be consistent/stable over time return this.classKey.compareTo(other.getClassKey()); } /** * Update the given CrawlURI, which should already be present. (This * is not checked.) Equivalent to an enqueue without affecting the count. * * @param frontier Work queues manager. * @param curi CrawlURI to update. */ public void update(final WorkQueueFrontier frontier, CrawlURI curi) { try { insert(frontier, curi, true); } catch (IOException e) { //FIXME better exception handling e.printStackTrace(); throw new RuntimeException(e); } } /** * @return Returns the count. */ public synchronized long getCount() { return this.count; } /** * Insert the given curi, whether it is already present or not. * @param frontier WorkQueueFrontier. * @param curi CrawlURI to insert. * @throws IOException */ private void insert(final WorkQueueFrontier frontier, CrawlURI curi, boolean overwriteIfPresent) throws IOException { insertItem(frontier, curi, overwriteIfPresent); lastQueued = curi.toString(); } /** * Insert the given curi, whether it is already present or not. * Hook for subclasses. * * @param frontier WorkQueueFrontier. * @param curi CrawlURI to insert. * @throws IOException if there was a problem while inserting the item */ protected abstract void insertItem(final WorkQueueFrontier frontier, CrawlURI curi, boolean expectedPresent) throws IOException; /** * Delete URIs matching the given pattern from this queue. * @param frontier WorkQueues manager. * @param match the pattern to match * @return count of deleted URIs * @throws IOException if there was a problem while deleting */ protected abstract long deleteMatchingFromQueue( final WorkQueueFrontier frontier, final String match) throws IOException; /** * Removes the given item from the queue. * * This is only used to remove the first item in the queue, * so it is not necessary to implement a random-access queue. * * @param frontier Work queues manager. * @throws IOException if there was a problem while deleting the item */ protected abstract void deleteItem(final WorkQueueFrontier frontier, final CrawlURI item) throws IOException; /** * Returns first item from queue (does not delete) * * @return The peeked item, or null * @throws IOException if there was a problem while peeking */ protected abstract CrawlURI peekItem(final WorkQueueFrontier frontier) throws IOException; /** * Suspends this WorkQueue. Closes all connections to resources etc. * * @param frontier * @throws IOException */ protected void suspend(final WorkQueueFrontier frontier) throws IOException { } /** * Resumes this WorkQueue. Eventually opens connections to resources etc. * * @param frontier * @throws IOException */ protected void resume(final WorkQueueFrontier frontier) throws IOException { } public void setActive(final WorkQueueFrontier frontier, final boolean b) { if(active != b) { active = b; try { if(active) { resume(frontier); } else { suspend(frontier); } } catch (IOException e) { //FIXME better exception handling e.printStackTrace(); throw new RuntimeException(e); } } } // // Reporter // /* (non-Javadoc) * @see org.archive.util.Reporter#getReports() */ public String[] getReports() { return new String[] {}; } /* (non-Javadoc) * @see org.archive.util.Reporter#reportTo(java.io.Writer) */ public void reportTo(PrintWriter writer) { reportTo(null,writer); } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineReportTo(java.io.Writer) */ public void singleLineReportTo(PrintWriter writer) { // queue name writer.print(classKey); writer.print(" "); // count of items writer.print(Long.toString(count)); writer.print(" "); // enqueue count writer.print(Long.toString(enqueueCount)); writer.print(" "); writer.print(sessionBalance); writer.print(" "); writer.print(lastCost); writer.print("("); writer.print(ArchiveUtils.doubleToString( ((double) totalExpenditure / costCount), 1)); writer.print(")"); writer.print(" "); // last dequeue time, if any, or '-' if (lastDequeueTime != 0) { writer.print(ArchiveUtils.getLog17Date(lastDequeueTime)); } else { writer.print("-"); } writer.print(" "); // wake time if snoozed, or '-' if (wakeTime != 0) { writer.print(ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis())); } else { writer.print("-"); } writer.print(" "); writer.print(Long.toString(totalExpenditure)); writer.print("/"); writer.print(Long.toString(totalBudget)); writer.print(" "); writer.print(Long.toString(errorCount)); writer.print(" "); writer.print(lastPeeked); writer.print(" "); writer.print(lastQueued); writer.print("\n"); } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineLegend() */ public String singleLineLegend() { return "queue currentSize totalEnqueues sessionBalance lastCost " + "(averageCost) lastDequeueTime wakeTime " + "totalSpend/totalBudget errorCount lastPeekUri lastQueuedUri"; } /* (non-Javadoc) * @see org.archive.util.Reporter#singleLineReport() */ public String singleLineReport() { return ArchiveUtils.singleLineReport(this); } /** * @param writer * @throws IOException */ public void reportTo(String name, PrintWriter writer) { // name is ignored: only one kind of report for now writer.print("Queue "); writer.print(classKey); writer.print("\n"); writer.print(" "); writer.print(Long.toString(count)); writer.print(" items"); if (wakeTime != 0) { writer.print("\n wakes in: "+ArchiveUtils.formatMillisecondsToConventional(wakeTime - System.currentTimeMillis())); } writer.print("\n last enqueued: "); writer.print(lastQueued); writer.print("\n last peeked: "); writer.print(lastPeeked); writer.print("\n"); writer.print(" total expended: "); writer.print(Long.toString(totalExpenditure)); writer.print(" (total budget: "); writer.print(Long.toString(totalBudget)); writer.print(")\n"); writer.print(" active balance: "); writer.print(sessionBalance); writer.print("\n last(avg) cost: "); writer.print(lastCost); writer.print("("); writer.print(ArchiveUtils.doubleToString( ((double) totalExpenditure / costCount), 1)); writer.print(")\n\n"); } public CrawlSubstats getSubstats() { return substats; } /** * Set the retired status of this queue. * * @param b new value for retired status */ public void setRetired(boolean b) { this.retired = b; } public boolean isRetired() { return retired; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -