📄 bdbmultipleworkqueues.java
字号:
cursor = this.pendingUrisDB.openCursor(null, null); // get cap; headKey at this point should always point to // a queue-beginning cap entry (zero-length value) status = cursor.getSearchKey(headKey, result, null); if(status!=OperationStatus.SUCCESS || result.getData().length > 0) { // cap missing throw new DatabaseException("bdb queue cap missing"); } // get next item (real first item of queue) status = cursor.getNext(headKey,result,null); } finally { if(cursor!=null) { cursor.close(); } } return status; } /** * Put the given CrawlURI in at the appropriate place. * * @param curi * @throws DatabaseException */ public void put(CrawlURI curi, boolean overwriteIfPresent) throws DatabaseException { DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey(); if (insertKey == null) { insertKey = calculateInsertKey(curi); curi.setHolderKey(insertKey); } DatabaseEntry value = new DatabaseEntry(); crawlUriBinding.objectToEntry(curi, value); // Output tally on avg. size if level is FINE or greater. if (LOGGER.isLoggable(Level.FINE)) { tallyAverageEntrySize(curi, value); } OperationStatus status; if(overwriteIfPresent) { status = pendingUrisDB.put(null, insertKey, value); } else { status = pendingUrisDB.putNoOverwrite(null, insertKey, value); } if(status!=OperationStatus.SUCCESS) { LOGGER.severe("failed; "+status+ " "+curi); } } private long entryCount = 0; private long entrySizeSum = 0; private int largestEntry = 0; /** * Log average size of database entry. * @param curi CrawlURI this entry is for. * @param value Database entry value. */ private synchronized void tallyAverageEntrySize(CrawlURI curi, DatabaseEntry value) { entryCount++; int length = value.getData().length; entrySizeSum += length; int avg = (int) (entrySizeSum/entryCount); if(entryCount % 1000 == 0) { LOGGER.fine("Average entry size at "+entryCount+": "+avg); } if (length>largestEntry) { largestEntry = length; LOGGER.fine("Largest entry: "+length+" "+curi); if(length>(2*avg)) { LOGGER.fine("excessive?"); } } } /** * Calculate the 'origin' key for a virtual queue of items * with the given classKey. This origin key will be a * prefix of the keys for all items in the queue. * * @param classKey String key to derive origin byte key from * @return a byte array key */ static byte[] calculateOriginKey(String classKey) { byte[] classKeyBytes = null; int len = 0; try { classKeyBytes = classKey.getBytes("UTF-8"); len = classKeyBytes.length; } catch (UnsupportedEncodingException e) { // should be impossible; all JVMs must support UTF-8 e.printStackTrace(); } byte[] keyData = new byte[len+1]; System.arraycopy(classKeyBytes,0,keyData,0,len); keyData[len]=0; return keyData; } /** * Calculate the insertKey that places a CrawlURI in the * desired spot. First bytes are always classKey (usu. host) * based -- ensuring grouping by host -- terminated by a zero * byte. Then 8 bytes of data ensuring desired ordering * within that 'queue' are used. The first byte of these 8 is * priority -- allowing 'immediate' and 'soon' items to * sort above regular. Next 1 byte is 'cost'. Last 6 bytes * are ordinal serial number, ensuring earlier-discovered * URIs sort before later. * * NOTE: Dangers here are: * (1) priorities or costs over 2^7 (signed byte comparison) * (2) ordinals over 2^48 * * Package access & static for testing purposes. * * @param curi * @return a DatabaseEntry key for the CrawlURI */ static DatabaseEntry calculateInsertKey(CrawlURI curi) { byte[] classKeyBytes = null; int len = 0; try { classKeyBytes = curi.getClassKey().getBytes("UTF-8"); len = classKeyBytes.length; } catch (UnsupportedEncodingException e) { // should be impossible; all JVMs must support UTF-8 e.printStackTrace(); } byte[] keyData = new byte[len+9]; System.arraycopy(classKeyBytes,0,keyData,0,len); keyData[len]=0; long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL; ordinalPlus = ((long)curi.getSchedulingDirective() << 56) | ordinalPlus; ordinalPlus = ((((long)curi.getHolderCost()) & 0xFFL) << 48) | ordinalPlus; ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1); return new DatabaseEntry(keyData); } /** * Delete the given CrawlURI from persistent store. Requires * the key under which it was stored be available. * * @param item * @throws DatabaseException */ public void delete(CrawlURI item) throws DatabaseException { OperationStatus status; status = pendingUrisDB.delete(null, (DatabaseEntry) item.getHolderKey()); if (status != OperationStatus.SUCCESS) { LOGGER.severe("expected item not present: " + item + "(" + (new BigInteger(((DatabaseEntry) item.getHolderKey()) .getData())).toString(16) + ")"); } } /** * Method used by BdbFrontier during checkpointing. * <p>The backing bdbje database has been marked deferred write so we save * on writes to disk. Means no guarantees disk will have whats in memory * unless a sync is called (Calling sync on the bdbje Environment is not * sufficent). * <p>Package access only because only Frontiers of this package would ever * need access. * @see <a href="http://www.sleepycat.com/jedocs/GettingStartedGuide/DB.html">Deferred Write Databases</a> */ void sync() { if (this.pendingUrisDB == null) { return; } try { this.pendingUrisDB.sync(); } catch (DatabaseException e) { e.printStackTrace(); } } /** * clean up * */ public void close() { try { this.pendingUrisDB.close(); } catch (DatabaseException e) { e.printStackTrace(); } } /** * Marker for remembering a position within the BdbMultipleWorkQueues. * * @author gojomo */ public class BdbFrontierMarker implements FrontierMarker { DatabaseEntry startKey; Pattern pattern; int nextItemNumber; /** * Create a marker pointed at the given start location. * * @param startKey * @param regexpr */ public BdbFrontierMarker(DatabaseEntry startKey, String regexpr) { this.startKey = startKey; pattern = Pattern.compile(regexpr); nextItemNumber = 1; } /** * @param curi * @return whether the marker accepts the given CrawlURI */ public boolean accepts(CrawlURI curi) { boolean retVal = pattern.matcher(curi.toString()).matches(); if(retVal==true) { nextItemNumber++; } return retVal; } /** * @param key position for marker */ public void setStartKey(DatabaseEntry key) { startKey = key; } /** * @return startKey */ public DatabaseEntry getStartKey() { return startKey; } /* (non-Javadoc) * @see org.archive.crawler.framework.FrontierMarker#getMatchExpression() */ public String getMatchExpression() { return pattern.pattern(); } /* (non-Javadoc) * @see org.archive.crawler.framework.FrontierMarker#getNextItemNumber() */ public long getNextItemNumber() { return nextItemNumber; } /* (non-Javadoc) * @see org.archive.crawler.framework.FrontierMarker#hasNext() */ public boolean hasNext() { // as long as any startKey is stated, consider as having next return startKey != null; } } /** * Add a dummy 'cap' entry at the given insertion key. Prevents * 'seeks' to queue heads from holding lock on last item of * 'preceding' queue. See: * http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102 * * @param origin key at which to insert the cap */ public void addCap(byte[] origin) { try { pendingUrisDB.put(null, new DatabaseEntry(origin), new DatabaseEntry(new byte[0])); } catch (DatabaseException e) { throw new RuntimeException(e); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -