📄 webdbwriter.java
字号:
} else if (curInstruction == DEL_PAGE) {
if (comparison < 0) {
// Write the readerKey, just passing it along.
// Don't process the edit yet.
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
newReaderItem = true;
} else if (comparison == 0) {
// Delete it! Remember only one entry can
// be deleted at a time!
//
// "Delete" the entry by skipping over the reader
// item. We move onto the next item in the existing
// index, as well as the next edit instruction.
hasEntries = db.next(readerItem, NullWritable.get());
newReaderItem = true;
hasEdits = sortedEdits.next(editItem, NullWritable.get());
// We need to set this flag for GC'ing.
justDeletedItem = true;
} else if (comparison > 0) {
// This should never happen! We should only be
// deleting items that actually appear!
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
}
}
// GARBAGE COLLECTION
// We want to detect when we have deleted the
// last MD5 of a certain value. We can have
// multiple MD5s in the same index, as long as
// they have different URLs. When the last MD5
// is deleted, we want to know so we can modify
// the LinkDB.
if (newReaderItem) {
// If we have a different readerItem which is just
// the same as our last one, then we know it's a
// repeat!
if (hasEntries && readerItem.getMD5().compareTo(lastItem.getMD5()) == 0) {
itemRepeats++;
} else {
// The current readerItem and the lastItem
// MD5s are not equal.
//
// If the last item was deleted, AND if the
// deleted item is not a repeat of the current item,
// then that MD5 should be garbage collected.
if (justDeletedItem && itemRepeats == 0) {
deleteLink(lastItem.getMD5());
}
// The current readerItem is the new "last key".
outBuf.reset();
readerItem.write(outBuf);
inBuf.reset(outBuf.getData(), outBuf.getLength());
lastItem.readFields(inBuf);
itemRepeats = 0;
}
// Clear "new-reader-item" bit
newReaderItem = false;
}
// Clear "last-deleted" bit
justDeletedItem = false;
}
// Now we have only edits. No more preexisting items!
while (! hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_PAGE) {
// Just write down the new page!
newDb.append(editItem.getPage(), NullWritable.get());
itemsWritten++;
} else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
} else if (curInstruction == DEL_PAGE) {
// This should never happen! We should only be
// deleting items that actually appear!
throw new IOException("An unapplicable DEL_PAGE should never appear during index-merge: " + editItem);
}
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
// Now we have only preexisting items. We just copy them
// to the new file, in order
while (hasEntries && ! hasEdits) {
// Simply copy through the remaining database items
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
newReaderItem = true;
}
}
}
/**
* The LinksByMD5Processor is used during close() for
* the pagesByMD5 table. It processes all the edits to
* this table, and also generates edits for the linksByURL
* table.
*/
private class LinksByMD5Processor extends CloseProcessor {
SequenceFile.Writer futureEdits;
/**
*/
public LinksByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) {
super(LINKS_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new LinkInstruction.MD5Comparator(), NullWritable.class), new Link.MD5Comparator(), Link.class, NullWritable.class);
this.futureEdits = futureEdits;
}
/**
* Merges edits into the md5-driven link table. Also generates
* edit sequence to apply to the URL-driven table.
*/
void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
WritableComparator comparator = new Link.MD5Comparator();
DeduplicatingLinkSequenceReader edits = new DeduplicatingLinkSequenceReader(sortedEdits);
// Create the keys and vals we'll use
LinkInstruction editItem = new LinkInstruction();
Link readerItem = new Link();
// Read the first items from both streams
boolean hasEntries = db.next(readerItem, NullWritable.get());
boolean hasEdits = edits.next(editItem);
// As long as we have both edits and entries to process,
// we need to interleave them
while (hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
// Perform operations
if (curInstruction == ADD_LINK) {
// When we add a link, we may replace a previous
// link with identical URL and MD5 values. The
// MD5FirstComparator will use both values.
//
int comparison = comparator.compare(readerItem, editItem.getLink());
if (comparison < 0) {
// Write the readerKey, just passing it along.
// Don't process the edit yet.
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// 1. Write down the item for table-edits
if (futureEdits != null) {
linksByURLEdits++;
liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
}
// 2. Write the new item, "replacing" the old one.
// We move to the next edit instruction and move
// past the replaced db entry.
newDb.append(editItem.getLink(), NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
hasEdits = edits.next(editItem);
} else if (comparison > 0) {
// 1. Write down the item for table-edits
if (futureEdits != null) {
linksByURLEdits++;
liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
}
// 2. Write the new item. We stay at the current
// db entry.
newDb.append(editItem.getLink(), NullWritable.get());
itemsWritten++;
hasEdits = edits.next(editItem);
}
} else if ((curInstruction == DEL_LINK) ||
(curInstruction == DEL_SINGLE_LINK)) {
// When we delete a link, we might delete many
// at once! We are interested only in the MD5
// here. If there are entries with identical MD5
// values, but different URLs, we get rid of them
// all.
int comparison = 0;
if (curInstruction == DEL_LINK) {
comparison = readerItem.getFromID().compareTo(editItem.getLink().getFromID());
} else {
comparison = readerItem.md5Compare(editItem.getLink());
}
if (comparison < 0) {
// Write the readerKey, just passing it along.
// Don't process the edit yet.
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// Delete it (or them!)
// 1. Write the full instruction for the next
// delete-stage. That includes the read-in
// value
// 2. "Delete" the entry by skipping the
// readerKey. We DO NOT go to the next edit
// instruction! There might still be more
// entries in the database to which we should
// apply this delete-edit.
//
// Step 1. Write entry for future table-edits
if (futureEdits != null) {
linksByURLEdits++;
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_LINK, NullWritable.get());
}
// Step 2.
// We might want to delete multiple MD5s with
// a single delete() operation, so keep this
// edit instruction around
hasEntries = db.next(readerItem, NullWritable.get());
if (curInstruction == DEL_SINGLE_LINK) {
hasEdits = edits.next(editItem);
}
} else if (comparison > 0) {
// Ignore, move on to next instruction
hasEdits = edits.next(editItem);
}
}
}
// Now we have only edits. No more preexisting items!
while (! hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_LINK) {
// 1. Write down the item for future table-edits
if (futureEdits != null) {
linksByURLEdits++;
liwriter.appendInstructionInfo(futureEdits, editItem.getLink(), ADD_LINK, NullWritable.get());
}
// 2. Just add the item from the edit list
newDb.append(editItem.getLink(), NullWritable.get());
itemsWritten++;
} else if (curInstruction == DEL_LINK) {
// Ignore operation
}
// Move on to next edit
hasEdits = edits.next(editItem);
}
// Now we have only preexisting items. Just copy them
// to the new file, in order.
while (hasEntries && ! hasEdits) {
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
}
}
}
/**
* This class helps the LinksByURLProcessor test a list of
* Page objects, sorted by URL, for outlink-counts. We query
* this class with a series of questions, based on Links sorted
* by target URL.
*/
private class TargetTester {
MapFile.Reader pagedb;
boolean hasPage = false;
UTF8 pageURL = null;
Page page = null;
/**
*/
public TargetTester(MapFile.Reader pagedb) throws IOException {
this.pagedb = pagedb;
this.pageURL = new UTF8();
this.page = new Page();
this.hasPage = pagedb.next(pageURL, page);
}
/**
* Match the given URL against the sorted series of Page URLs.
*/
public int hasOutlinks(UTF8 curURL) throws IOException {
int returnCode = NO_OUTLINKS;
int comparison = pageURL.compareTo(curURL);
while (hasPage && comparison < 0) {
hasPage = pagedb.next(pageURL, page);
if (hasPage) {
comparison = pageURL.compareTo(curURL);
}
}
if (hasPage) {
if (comparison == 0) {
returnCode = (page.getNumOutlinks() > 0) ? HAS_OUTLINKS : NO_OUTLINKS;
} else if (comparison > 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -