📄 adaptiverevisithostqueue.java
字号:
// info on past crawls), check its scheduling directive // and (possibly) its time of next fetch and update if it // will promote the URI to an earlier processing time. boolean update = false; CrawlURI curiExisting = getCrawlURI(curi.toString()); long oldCuriProcessingTime = curiExisting.getLong( A_TIME_OF_NEXT_PROCESSING); if(curi.getSchedulingDirective() < curiExisting.getSchedulingDirective()){ // New scheduling directive is of higher importance, // update to promote URI. curiExisting.setSchedulingDirective( curi.getSchedulingDirective()); update = true; } if( (curiProcessingTime < oldCuriProcessingTime) && (overrideSetTimeOnDups || update)){ // We update the processing time if it is earlier then // the original and either overrideSetTimeOnDups was set // or update is true, meaning a higher priority scheduling // directive for this URI. curiExisting.putLong( A_TIME_OF_NEXT_PROCESSING, curiProcessingTime); update = true; } if(update){ opStatus = strictAdd(curiExisting,true); //Override } else { return; } } else if(opStatus == OperationStatus.SUCCESS) { // Just inserted a brand new CrawlURI into the queue. size++; } // Finally, check if insert (fresh add or override) into DB was // successful and if so check if we need to update nextReadyTime. if(opStatus == OperationStatus.SUCCESS){ if (curiProcessingTime < nextReadyTime){ // Update nextReadyTime to reflect new value. setNextReadyTime(curiProcessingTime); } if(state == HQSTATE_EMPTY){ // Definately no longer empty. state = HQSTATE_READY; } } else { // Something went wrong. Throw an exception. throw new DatabaseException("Error on add into database for " + "CrawlURI " + curi.toString() + ". " + opStatus.toString()); } } catch (DatabaseException e) { // Blanket catch all DBExceptions and convert to IOExceptions. IOException e2 = new IOException(e.getMessage()); e2.setStackTrace(e.getStackTrace()); //preserve original stacktrace throw e2; } reorder(); // May need a reorder. } /** * An internal method for adding URIs to the queue. * * @param curi The CrawlURI to add * @param overrideDuplicates If true then any existing CrawlURI in the DB * will be overwritten. If false insert into the * queue is only performed if the key doesn't * already exist. * @return The OperationStatus object returned by the put method. * * @throws DatabaseException */ protected OperationStatus strictAdd(CrawlURI curi, boolean overrideDuplicates) throws DatabaseException{ DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); primaryKeyBinding.objectToEntry(curi.toString(), keyEntry); crawlURIBinding.objectToEntry(curi, dataEntry); OperationStatus opStatus = null; if(overrideDuplicates){ opStatus = primaryUriDB.put(null,keyEntry,dataEntry); } else { opStatus = primaryUriDB.putNoOverwrite(null,keyEntry,dataEntry); } return opStatus; } /** * Flush any CrawlURIs in the processingUriDB into the primaryUriDB. URIs * flushed will have their 'time of next fetch' maintained and the * nextReadyTime will be updated if needed. * <p> * No change is made to the list of available slots. * * @throws DatabaseException if one occurs while flushing */ protected void flushProcessingURIs() throws DatabaseException { Cursor processingCursor = processingUriDB.openCursor(null,null); DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); while(true){ OperationStatus opStatus = processingCursor.getFirst( keyEntry, dataEntry, LockMode.DEFAULT); if(opStatus == OperationStatus.SUCCESS){ // Got one! CrawlURI curi = (CrawlURI) crawlURIBinding.entryToObject(dataEntry); // Delete it from processingUriDB deleteInProcessing(curi.toString()); // Add to processingUriDB; strictAdd(curi,false); // Ignore any duplicates. Go with the // ones already in the queue. // Update nextReadyTime if needed. long curiNextReadyTime = curi.getLong( A_TIME_OF_NEXT_PROCESSING); if(curiNextReadyTime<nextReadyTime){ setNextReadyTime(curiNextReadyTime); } } else { // No more entries in processingUriDB processingCursor.close(); return; } } } /** * Count all entries in both primaryUriDB and processingUriDB. * <p> * This method is needed since BDB does not provide a simple way of counting * entries. * <p> * Note: This is an expensive operation, requires a loop through the entire * queue! * @return the number of distinct CrawlURIs in the HQ. * @throws DatabaseException */ protected long countCrawlURIs() throws DatabaseException{ // TODO: Instead of all this, the value should be simply read from the // database. long count = 0; DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); // Count URIs in the queue Cursor primaryCursor = primaryUriDB.openCursor(null,null); OperationStatus opStatus = primaryCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT); while(opStatus == OperationStatus.SUCCESS){ count++; opStatus = primaryCursor.getNext(keyEntry, dataEntry, LockMode.DEFAULT); } primaryCursor.close(); // Now count URIs in the processingUriDB Cursor processingCursor = processingUriDB.openCursor(null,null); opStatus = processingCursor.getFirst(keyEntry, dataEntry, LockMode.DEFAULT); while(opStatus == OperationStatus.SUCCESS){ count++; opStatus = processingCursor.getNext(keyEntry, dataEntry, LockMode.DEFAULT); } processingCursor.close(); return count; } /** * Returns true if this HQ has a CrawlURI matching the uri string currently * being processed. False otherwise. * * @param uri Uri to check * @return true if this HQ has a CrawlURI matching the uri string currently * being processed. False otherwise. * * @throws DatabaseException */ protected boolean inProcessing(String uri) throws DatabaseException{ DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); StringBinding.stringToEntry(uri,keyEntry); OperationStatus opStatus = processingUriDB.get(null, keyEntry, dataEntry, LockMode.DEFAULT); if (opStatus == OperationStatus.SUCCESS){ return true; } return false; //Not found } /** * Removes a URI from the list of URIs belonging to this HQ and are * currently being processed. * <p> * Returns true if successful, false if the URI was not found. * * @param uri The URI string of the CrawlURI to delete. * * @throws DatabaseException * @throws IllegalStateException if the URI was not on the list */ protected void deleteInProcessing(String uri) throws DatabaseException { DatabaseEntry keyEntry = new DatabaseEntry(); StringBinding.stringToEntry(uri, keyEntry); OperationStatus opStatus = processingUriDB.delete(null, keyEntry); if (opStatus != OperationStatus.SUCCESS) { if (opStatus == OperationStatus.NOTFOUND) { throw new IllegalStateException("Trying to deleta a " + "non-existant URI from the list of URIs being " + "processed. HQ: " + hostName + ", CrawlURI: " + uri); } throw new DatabaseException("Error occured deleting URI: " + uri + " from HQ " + hostName + " list " + "of URIs currently being processed. " + opStatus.toString()); } } /** * Adds a CrawlURI to the list of CrawlURIs belonging to this HQ and are * being processed at the moment. * * @param curi * The CrawlURI to add to the list * @throws DatabaseException * @throws IllegalStateException * if the CrawlURI is already in the list of URIs being * processed. */ protected void addInProcessing(CrawlURI curi) throws DatabaseException, IllegalStateException { DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); StringBinding.stringToEntry(curi.toString(), keyEntry); crawlURIBinding.objectToEntry(curi, dataEntry); OperationStatus opStatus = processingUriDB.putNoOverwrite(null, keyEntry, dataEntry); if (opStatus != OperationStatus.SUCCESS) { if (opStatus == OperationStatus.KEYEXIST) { throw new IllegalStateException("Can not insert duplicate " + "URI into list of URIs being processed. " + "HQ: " + hostName + ", CrawlURI: " + curi.toString()); } throw new DatabaseException("Error occured adding CrawlURI: " + curi.toString() + " to HQ " + hostName + " list " + "of URIs currently being processed. " + opStatus.toString()); } } /** * Returns the CrawlURI associated with the specified URI (string) or null * if no such CrawlURI is queued in this HQ. If CrawlURI is being processed * it is not considered to be <i>queued </i> and this method will return * null for any such URIs. * * @param uri * A string representing the URI * @return the CrawlURI associated with the specified URI (string) or null * if no such CrawlURI is queued in this HQ. * * @throws DatabaseException * if a errors occurs reading the database */ protected CrawlURI getCrawlURI(String uri) throws DatabaseException{ DatabaseEntry keyEntry = new DatabaseEntry(); DatabaseEntry dataEntry = new DatabaseEntry(); primaryKeyBinding.objectToEntry(uri,keyEntry); primaryUriDB.get(null,keyEntry,dataEntry,LockMode.DEFAULT);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -