📄 adaptiverevisithostqueue.java
字号:
CrawlURI curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); return curi; } /** * Update CrawlURI that has completed processing. * * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's * {@link #next() next()} method. * @param needWait If true then the URI was processed successfully, * requiring a period of suspended action on that host. If * valence is > 1 then seperate times are maintained for * each slot. * @param wakeupTime If new state is * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed} * then this parameter should contain the time (in * milliseconds) when it will be safe to wake the HQ up * again. Otherwise this parameter will be ignored. * * @throws IllegalStateException if the CrawlURI * does not match a CrawlURI issued for crawling by this HQ's * {@link AdaptiveRevisitHostQueue#next() next()}. * @throws IOException if an error occurs accessing the database */ public void update(CrawlURI curi, boolean needWait, long wakeupTime) throws IllegalStateException, IOException{ update(curi,needWait,wakeupTime,false); } /** * Update CrawlURI that has completed processing. * * @param curi The CrawlURI. This must be a CrawlURI issued by this HQ's * {@link #next() next()} method. * @param needWait If true then the URI was processed successfully, * requiring a period of suspended action on that host. If * valence is > 1 then seperate times are maintained for * each slot. * @param wakeupTime If new state is * {@link AdaptiveRevisitHostQueue#HQSTATE_SNOOZED snoozed} * then this parameter should contain the time (in * milliseconds) when it will be safe to wake the HQ up * again. Otherwise this parameter will be ignored. * @param forgetURI If true, the URI will be deleted from the queue. * * @throws IllegalStateException if the CrawlURI * does not match a CrawlURI issued for crawling by this HQ's * {@link AdaptiveRevisitHostQueue#next() next()}. * @throws IOException if an error occurs accessing the database */ public void update(CrawlURI curi, boolean needWait, long wakeupTime, boolean forgetURI) throws IllegalStateException, IOException{ if (logger.isLoggable(Level.FINE)) { logger.fine("Updating " + curi.toString()); } try{ // First add it to the regular queue (if not forgetting it). if (forgetURI == false){ OperationStatus opStatus = strictAdd(curi,false); if(opStatus != OperationStatus.SUCCESS){ if(opStatus == OperationStatus.KEYEXIST){ throw new IllegalStateException("Trying to update a" + " CrawlURI failed because it was in the queue" + " of URIs waiting for processing. URIs currently" + " being processsed can never be in that queue." + " HQ: " + hostName + ", CrawlURI: " + curi.toString()); } } // Check if we need to update nextReadyTime long curiTimeOfNextProcessing = curi.getLong( A_TIME_OF_NEXT_PROCESSING); if(nextReadyTime > curiTimeOfNextProcessing){ setNextReadyTime(curiTimeOfNextProcessing); } } else { size--; } // Then remove from list of in processing URIs deleteInProcessing(curi.toString()); inProcessing--; // Update the wakeUpTime slot. if(needWait==false){ // Ok, no wait then. Set wake up time to 0. wakeupTime = 0; } updateWakeUpTimeSlot(wakeupTime); } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace throw e2; } } /** * Returns the 'top' URI in the AdaptiveRevisitHostQueue. * <p> * HQ state will be set to {@link AdaptiveRevisitHostQueue#HQSTATE_BUSY busy} if this * method returns normally. * * @return a CrawlURI ready for processing * * @throws IllegalStateException if the HostQueues current state is not * ready {@link AdaptiveRevisitHostQueue#HQSTATE_READY ready} * @throws IOException if an error occurs reading from the database */ public CrawlURI next() throws IllegalStateException, IOException{ try{ // Ok, lets issue a URI, first check state and reserve slot. if(getState()!=HQSTATE_READY || useWakeUpTimeSlot()==false){ throw new IllegalStateException("Can not issue next URI when " + "HQ " + hostName + " state is " + getStateByName()); } DatabaseEntry keyEntry = new DatabaseEntry(); // Get the top URI CrawlURI curi = peek(); // Add it to processingUriDB addInProcessing(curi); // Delete it from the primaryUriDB primaryKeyBinding.objectToEntry(curi.toString(),keyEntry); OperationStatus opStatus = primaryUriDB.delete(null,keyEntry); if(opStatus != OperationStatus.SUCCESS){ throw new DatabaseException("Error occured removing URI: " + curi.toString() + " from HQ " + hostName + " priority queue for processing. " + opStatus.toString()); } // Finally update nextReadyTime with new top if one exists. CrawlURI top = peek(); long nextReady = Long.MAX_VALUE; if(top != null){ nextReady = top.getLong( A_TIME_OF_NEXT_PROCESSING); } inProcessing++; setNextReadyTime(nextReady); logger.fine("Issuing " + curi.toString()); return curi; } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace throw e2; } } /** * Returns the URI with the earliest time of next processing. I.e. the URI * at the head of this host based priority queue. * <p> * Note: This method will return the head CrawlURI regardless of wether it * is safe to start processing it or not. CrawlURI will remain in the queue. * The returned CrawlURI should only be used for queue inspection, it can * <i>not</i> be updated and returned to the queue. To get URIs ready for * processing use {@link #next() next()}. * * @return the URI with the earliest time of next processing or null if * the queue is empty or all URIs are currently being processed. * @throws IllegalStateException * * @throws IOException if an error occurs reading from the database */ public CrawlURI peek() throws IllegalStateException, IOException{ try{ DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); CrawlURI curi = null; Cursor secondaryCursor = secondaryUriDB.openCursor(null,null); OperationStatus opStatus = secondaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT); if( opStatus == OperationStatus.SUCCESS){ curi = (CrawlURI)crawlURIBinding.entryToObject(dataEntry); } else { if( opStatus == OperationStatus.NOTFOUND ){ curi = null; } else { throw new IOException("Error occured in " + "AdaptiveRevisitHostQueue.peek()." + opStatus.toString()); } } secondaryCursor.close(); return curi; } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace throw e2; } } /** * Returns the current state of the HQ. * * @return the current state of the HQ. * * @see AdaptiveRevisitHostQueue#HQSTATE_BUSY * @see AdaptiveRevisitHostQueue#HQSTATE_EMPTY * @see AdaptiveRevisitHostQueue#HQSTATE_READY * @see AdaptiveRevisitHostQueue#HQSTATE_SNOOZED */ public int getState(){ if(state != HQSTATE_EMPTY){ // Need to confirm state if(isBusy()){ state = HQSTATE_BUSY; } else { long currentTime = System.currentTimeMillis(); long wakeTime = getEarliestWakeUpTimeSlot(); if(wakeTime > currentTime || nextReadyTime > currentTime){ state = HQSTATE_SNOOZED; } else { state = HQSTATE_READY; } } } return state; } /** * Returns the time when the HQ will next be ready to issue a URI. * <p> * If the queue is in a {@link #HQSTATE_SNOOZED snoozed} state then this * time will be in the future and reflects either the time when the HQ will * again be able to issue URIs for processing because politness constraints * have ended, or when a URI next becomes available for visit, whichever is * larger. * <p> * If the queue is in a {@link #HQSTATE_READY ready} state this time will * be in the past and reflect the earliest time when the HQ had a URI ready * for processing, taking time spent snoozed for politness concerns into * account. * <p> * If the HQ is in any other state then the return value of this method is * equal to Long.MAX_VALUE. * <p> * This value may change each time a URI is added, issued or updated. * * @return the time when the HQ will next be ready to issue a URI */ public long getNextReadyTime(){ if(getState()==HQSTATE_BUSY || getState()==HQSTATE_EMPTY){ // Have no idea when HQ next be able issue a URI return Long.MAX_VALUE; } long wakeTime = getEarliestWakeUpTimeSlot(); return nextReadyTime > wakeTime ? nextReadyTime : wakeTime; } /** * Updates nextReadyTime (if smaller) with the supplied value * @param newTime the new value of nextReady Time; */ protected void setNextReadyTime(long newTime){ if(logger.isLoggable(Level.FINEST)){ logger.finest("Setting next ready to new value " + newTime + " from " + getNextReadyTime()); } nextReadyTime=newTime; reorder(); } /** * Method is called whenever something has been done that might have * changed the value of the 'published' time of next ready. If an owner * has been specified it will be notified that the value may have changed.. */ protected void reorder(){ if(owner != null){ owner.reorder(this);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -