📄 adaptiverevisithostqueue.java
字号:
} } /** * Same as {@link #getState() getState()} except this method returns a * human readable name for the state instead of its constant integer value. * <p> * Should only be used for reports, error messages and other strings * intended for human eyes. * * @return the human readable name of the current state */ public String getStateByName() { switch(getState()){ case HQSTATE_BUSY : return "busy"; case HQSTATE_EMPTY : return "empty"; case HQSTATE_READY : return "ready"; case HQSTATE_SNOOZED : return "snoozed"; } // This should be impossible unless new states are added without // updating this method. return "undefined"; } /** * Returns the size of the HQ. That is, the number of URIs queued, * including any that are currently being processed. * * @return the size of the HQ. */ public long getSize(){ return size; } /** * Set the AdaptiveRevisitQueueList object that contains this HQ. Will cause * that * object to be notified (via * {@link AdaptiveRevisitQueueList#reorder(AdaptiveRevisitHostQueue) * reorder()} when the * value used for sorting the list of HQs changes. * @param owner the ARHostQueueList object that contains this HQ. */ public void setOwner(AdaptiveRevisitQueueList owner) { this.owner = owner; } /** * Cleanup all open Berkeley Database objects. * <p> * Does <I>not</I> close the Environment. * * @throws IOException if an error occurs closing a database object */ public void close() throws IOException{ try{ secondaryUriDB.close(); processingUriDB.close(); primaryUriDB.close(); } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace throw e2; } } /** * If true then the HQ has no available slot for issuing URIs. * <p> * I.e. number of in processing URIs = valence. * * @return true if number of in processing URIs = valence */ private boolean isBusy(){ return inProcessing == valence; } /** * Overwrites a used (-1) value in wakeUpTime[] with the supplied value. * @param newVal */ private void updateWakeUpTimeSlot(long newVal){ for(int i=0 ; i < valence ; i++){ if(wakeUpTime[i]==-1){ wakeUpTime[i]=newVal; } } reorder(); } /** * A new URI is being issued. Set the wakeup time on an unused slot to -1. * * @return true if a slot was successfully reserved. False otherwise. */ private boolean useWakeUpTimeSlot(){ for(int i=0 ; i < valence ; i++){ if(wakeUpTime[i]>-1 && wakeUpTime[i]<=System.currentTimeMillis()){ wakeUpTime[i]=-1; return true; } } reorder(); return false; } /** * Returns the earliest time when a wake up slot will become available. If * one is already available then this time will be in the past. * <p> * If all slots are taken with URIs currently being processed (i.e. HQ state * is {@link #HQSTATE_BUSY busy} then this will return Long.MAX_VALUE; * @return the earliest time when a wake up slot will become available */ private long getEarliestWakeUpTimeSlot(){ long earliest = Long.MAX_VALUE; for(int i=0 ; i < valence ; i++){ if(wakeUpTime[i]>-1 && wakeUpTime[i]<earliest){ earliest = wakeUpTime[i]; } } return earliest; } /** * Returns a report detailing the status of this HQ. * @param max Maximum number of URIs to show. 0 equals no limit. * @return a report detailing the status of this HQ. */ public String report(int max){ try{ StringBuffer ret = new StringBuffer(256); ret.append("AdaptiveRevisitHostQueue: " + hostName + "\n"); ret.append("Size: " + size + "\n"); ret.append("State: " + getStateByName() + "\n"); if(getState()==HQSTATE_BUSY){ ret.append("Processing URIs: \n"); Cursor processingCursor = processingUriDB.openCursor(null,null); reportURIs(ret, processingCursor, valence); processingCursor.close(); } else { ret.append("Next ready: " + ArchiveUtils.formatMillisecondsToConventional( getNextReadyTime() - System.currentTimeMillis()) + "\n"); } ret.append("Top URIs: \n"); Cursor secondaryCursor = secondaryUriDB.openCursor(null,null); reportURIs(ret,secondaryCursor,max); secondaryCursor.close(); return ret.toString(); } catch( DatabaseException e ){ return "Exception occured compiling report:\n" + e.getMessage(); } } /** * Adds a report of the first <code>max</code> URIs that the cursor points * to to the stringbuffer object. * * @param ret The stringbuffer to append to * @param cursor The cursor pointing at a URI database * @param max Maximum number of URIs to report on. If fewer URIs are in the * database, all URIs are shown * @throws DatabaseException if an error occurs */ private void reportURIs(StringBuffer ret, Cursor cursor, int max) throws DatabaseException{ DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); OperationStatus opStatus = cursor.getFirst(keyEntry,dataEntry,LockMode.DEFAULT); if(max == 0){ // No limit on the number of values returned. max = Integer.MAX_VALUE; } int i = 0; while(i<max && opStatus == OperationStatus.SUCCESS){ CrawlURI tmp = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); ret.append(" URI: " + tmp.toString() + "\n"); switch(tmp.getSchedulingDirective()){ case CandidateURI.HIGHEST : ret.append(" Sched. directive: HIGHEST\n"); break; case CandidateURI.HIGH : ret.append(" Sched. directive: HIGH\n"); break; case CandidateURI.MEDIUM : ret.append(" Sched. directive: MEDIUM\n"); break; case CandidateURI.NORMAL : ret.append(" Sched. directive: NORMAL\n"); break; } ret.append(" Next processing: "); long nextProcessing = tmp.getLong(A_TIME_OF_NEXT_PROCESSING) - System.currentTimeMillis(); if(nextProcessing < 0){ ret.append("Overdue "); nextProcessing = nextProcessing*-1; } ret.append(ArchiveUtils.formatMillisecondsToConventional( nextProcessing) + "\n"); if(tmp.getFetchStatus()!=0){ ret.append(" Last fetch status: " + tmp.getFetchStatus() + "\n"); } if(tmp.containsKey(A_WAIT_INTERVAL)){ ret.append(" Wait interval: " + ArchiveUtils.formatMillisecondsToConventional( tmp.getLong(A_WAIT_INTERVAL)) + "\n"); } if(tmp.containsKey(A_NUMBER_OF_VISITS)){ ret.append(" Visits: " + tmp.getInt( A_NUMBER_OF_VISITS) + "\n"); } if(tmp.containsKey(A_NUMBER_OF_VERSIONS)){ ret.append(" Versions: " + tmp.getInt( A_NUMBER_OF_VERSIONS) + "\n"); } opStatus = cursor.getNext(keyEntry,dataEntry,LockMode.DEFAULT); i++; } } /** * Creates the secondary key for the secondary index. * <p> * The secondary index is the scheduling directive (first sorting) and * the time of next processing (sorted from earlies to latest within each * scheduling directive). If the scheduling directive is missing or * unknown NORMAL will be assumed. */ private static class OrderOfProcessingKeyCreator extends TupleSerialKeyCreator { /** * Constructor. Invokes parent constructor. * * @param classCatalog is the catalog to hold shared class information * and for a database should be a * StoredClassCatalog. * @param dataClass is the CrawlURI class. */ public OrderOfProcessingKeyCreator(ClassCatalog classCatalog, Class dataClass) { super(classCatalog, dataClass); } /* (non-Javadoc) * @see com.sleepycat.bind.serial.TupleSerialKeyCreator#createSecondaryKey(com.sleepycat.bind.tuple.TupleInput, java.lang.Object, com.sleepycat.bind.tuple.TupleOutput) */ public boolean createSecondaryKey(TupleInput primaryKeyInput, Object dataInput, TupleOutput indexKeyOutput) { CrawlURI curi = (CrawlURI)dataInput; int directive = curi.getSchedulingDirective(); // Can not rely on the default directive constants having a good // sort order switch (directive) { case CandidateURI.HIGHEST: directive = 0; break; case CandidateURI.HIGH: directive = 1; break; case CandidateURI.MEDIUM: directive = 2; break; case CandidateURI.NORMAL: directive = 3; break; default: directive = 3; // If directive missing or unknown } indexKeyOutput.writeInt(directive); long timeOfNextProcessing = curi.getLong(A_TIME_OF_NEXT_PROCESSING); indexKeyOutput.writeLong(timeOfNextProcessing); return true; } } /* (non-Javadoc) * @see org.archive.crawler.datamodel.CrawlSubstats.HasCrawlSubstats#getSubstats() */ public CrawlSubstats getSubstats() { return substats; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -