📄 webdbwriter.java
字号:
}
/**
* Merge the existing db with the edit-stream into a brand-new file.
*/
void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
// Create the keys and vals we'll be using
DeduplicatingPageSequenceReader edits = new DeduplicatingPageSequenceReader(sortedEdits);
WritableComparable readerKey = new UTF8();
Page readerVal = new Page();
PageInstruction editItem = new PageInstruction();
int futureOrdering = 0;
// Read the first items from both streams
boolean hasEntries = db.next(readerKey, readerVal);
boolean hasEdits = edits.next(editItem);
// As long as we have both edits and entries, we need to
// interleave them....
while (hasEntries && hasEdits) {
int comparison = readerKey.compareTo(editItem.getPage().getURL());
int curInstruction = editItem.getInstruction();
// Perform operations
if ((curInstruction == ADD_PAGE) ||
(curInstruction == ADD_PAGE_WITH_SCORE) ||
(curInstruction == ADD_PAGE_IFN_PRESENT)) {
if (comparison < 0) {
// Write readerKey, just passing it along.
// Don't process the edit yet.
newDb.append(readerKey, readerVal);
itemsWritten++;
hasEntries = db.next(readerKey, readerVal);
} else if (comparison == 0) {
// The keys are equal. If the instruction
// is ADD_PAGE, we write the edit's key and
// replace the old one.
//
// Otherwise, if it's ADD_IFN_PRESENT,
// keep the reader's item intact.
//
if ((curInstruction == ADD_PAGE) ||
(curInstruction == ADD_PAGE_WITH_SCORE)) {
// An ADD_PAGE with an identical pair
// of pages replaces the existing one.
// We may need to note the fact for
// Garbage Collection.
//
// This happens in three stages.
// 1. We write necessary items to the future
// edits-list.
//
pagesByMD5Edits++;
// If this is a replacing add, we don't want
// to disturb the score from the old Page! This,
// way, we can run some link analysis scoring
// while the new Pages are being fetched and
// not lose the info when a Page is replaced.
//
// If it is an ADD_PAGE_WITH_SCORE, then we
// go ahead and replace the old one.
//
// Either way, from now on we treat it
// as an ADD_PAGE
//
Page editItemPage = editItem.getPage();
if (curInstruction == ADD_PAGE) {
editItemPage.setScore(readerVal.getScore(), readerVal.getNextScore());
}
piwriter.appendInstructionInfo(futureEdits, editItemPage, ADD_PAGE, NullWritable.get());
//
// 2. We write the edit-page to *this* table.
//
newDb.append(editItemPage.getURL(), editItemPage);
//
// 3. We want the ADD in the next step (the
// MD5-driven table) to be a "replacing add".
// But that won't happen if the readerItem and
// the editItem Pages are not identical.
// (In this scenario, that means their URLs
// are the same, but their MD5s are different.)
// So, we need to explicitly handle that
// case by issuing a DELETE for the now-obsolete
// item.
if (editItemPage.compareTo(readerVal) != 0) {
pagesByMD5Edits++;
piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
}
itemsWritten++;
// "Delete" the readerVal by skipping it.
hasEntries = db.next(readerKey, readerVal);
} else {
// ADD_PAGE_IFN_PRESENT. We only add IF_NOT
// present. And it was present! So, we treat
// this case like we treat a no-op.
// Just move to the next edit.
}
// In either case, we process the edit.
hasEdits = edits.next(editItem);
} else if (comparison > 0) {
// We have inserted a Page that's before some
// entry in the existing database. So, we just
// need to write down the Page from the Edit file.
// It's like the above case, except we don't tell
// the future-edits to delete anything.
//
// 1. Write the item down for the future.
pagesByMD5Edits++;
//
// If this is an ADD_PAGE_IFN_PRESENT, then
// we may also have a Link we have to take care of!
//
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
Link editLink = editItem.getLink();
if (editLink != null) {
addLink(editLink);
}
}
piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
//
// 2. Write the edit-page to *this* table
newDb.append(editItem.getPage().getURL(), editItem.getPage());
itemsWritten++;
// Process the edit
hasEdits = edits.next(editItem);
}
} else if (curInstruction == DEL_PAGE) {
if (comparison < 0) {
// Write the readerKey, just passing it along.
// We don't process the edit yet.
newDb.append(readerKey, readerVal);
itemsWritten++;
hasEntries = db.next(readerKey, readerVal);
} else if (comparison == 0) {
// Delete it! We can only delete one item
// at a time, as all URLs are unique.
// 1. Tell the future-edits what page will need to
// be deleted.
pagesByMD5Edits++;
piwriter.appendInstructionInfo(futureEdits, readerVal, DEL_PAGE, NullWritable.get());
//
// 2. "Delete" the entry by skipping the Reader
// key.
hasEntries = db.next(readerKey, readerVal);
// Process the edit
hasEdits = edits.next(editItem);
} else if (comparison > 0) {
// Ignore it. We tried to delete an item that's
// not here.
hasEdits = edits.next(editItem);
}
}
}
// Now we have only edits. No more preexisting items!
while (! hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_PAGE ||
curInstruction == ADD_PAGE_WITH_SCORE ||
curInstruction == ADD_PAGE_IFN_PRESENT) {
// No more reader entries, so ADD_PAGE_IFN_PRESENT
// is treated like a simple ADD_PAGE.
// 1. Tell the future edits-list about this new item
pagesByMD5Edits++;
//
// If this is an ADD_PAGE_IFN_PRESENT, then
// we may also have a Link we have to take care of!
//
if (curInstruction == ADD_PAGE_IFN_PRESENT) {
Link editLink = editItem.getLink();
if (editLink != null) {
addLink(editLink);
}
}
piwriter.appendInstructionInfo(futureEdits, editItem.getPage(), ADD_PAGE, NullWritable.get());
// 2. Write the edit page to this table.
newDb.append(editItem.getPage().getURL(), editItem.getPage());
itemsWritten++;
} else if (curInstruction == DEL_PAGE) {
// Ignore it. We tried to delete an item
// that's not here.
}
// Either way, we always process the edit.
hasEdits = edits.next(editItem);
}
// Now we have only preexisting items. We just copy
// them to the new file, in order.
while (hasEntries && ! hasEdits) {
newDb.append(readerKey, readerVal);
itemsWritten++;
hasEntries = db.next(readerKey, readerVal);
}
}
}
/***
* The PagesByMD5Processor is used during close() time for
* the pagesByMD5 table. We instantiate one of these, and it
* takes care of the entire shutdown process.
*/
private class PagesByMD5Processor extends CloseProcessor {
/**
*/
PagesByMD5Processor(MapFile.Reader db, SequenceFile.Writer editWriter) {
super(PAGES_BY_MD5, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.PageComparator(), NullWritable.class), null, Page.class, NullWritable.class);
}
/**
*/
void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
// Create the keys and vals
Page readerItem = new Page();
PageInstruction editItem = new PageInstruction();
// For computing the GC list
Page deletedItem = new Page(), lastItem = new Page();
boolean justDeletedItem = false;
boolean newReaderItem = false;
int itemRepeats = 0;
// Read the first items from both streams
boolean hasEntries = db.next(readerItem, NullWritable.get());
boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
if (hasEntries) {
// The first thing we read should become
// the "previous key". We need this for
// garbage collection.
outBuf.reset();
readerItem.write(outBuf);
inBuf.reset(outBuf.getData(), outBuf.getLength());
lastItem.readFields(inBuf);
itemRepeats = 0;
}
// As long we have both edits and entries, we need to
// interleave them.
while (hasEdits && hasEntries) {
int comparison = readerItem.compareTo(editItem.getPage());
int curInstruction = editItem.getInstruction();
//
// OK! Now perform operations
//
if (curInstruction == ADD_PAGE) {
if (comparison < 0) {
// Write readerItem, just passing it along.
// Don't process the edit yet.
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
newReaderItem = true;
} else if (comparison == 0) {
//
// This is a "replacing ADD", which is generated
// by the above-sequence. We should skip over the
// existing item, and add the new one instead.
//
// Note that by this point, the new version of the
// Page from the edit sequence is guaranteed to
// have the correct score. We make sure of it in
// the mergeEdits() for PagesByURLProcessor.
//
newDb.append(editItem.getPage(), NullWritable.get());
itemsWritten++;
hasEntries = db.next(readerItem, NullWritable.get());
newReaderItem = true;
hasEdits = sortedEdits.next(editItem, NullWritable.get());
} else if (comparison > 0) {
// Write the edit item. We've inserted an item
// that comes before any others.
newDb.append(editItem.getPage(), NullWritable.get());
itemsWritten++;
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
} else if (curInstruction == ADD_PAGE_IFN_PRESENT) {
throw new IOException("Should never process ADD_PAGE_IFN_PRESENT for the index: " + editItem);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -