📄 webdbwriter.java
字号:
//
// This situation indicates that the Link's
// target page has been deleted, probably
// because we repeatedly failed to fetch the URL.
// So, we should delete the Link.
//
returnCode = LINK_INVALID;
}
}
return returnCode;
}
/**
*/
public void close() throws IOException {
pagedb.close();
}
}
/**
* Closes down and merges changes to the URL-driven link
* table. This does nothing fancy, and propagates nothing
* to a further stage. There is no next stage!
*/
private class LinksByURLProcessor extends CloseProcessor {
MapFile.Reader pageDb;
SequenceFile.Writer futureEdits;
/**
*/
public LinksByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, MapFile.Reader pageDb, SequenceFile.Writer futureEdits) {
super(LINKS_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new LinkInstruction.UrlComparator(), NullWritable.class), new Link.UrlComparator(), Link.class, NullWritable.class);
this.pageDb = pageDb;
this.futureEdits = futureEdits;
}
/**
*/
public long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
long result = super.closeDown(workingDir, outputDir, numEdits);
//pageDb.close();
return result;
}
/**
* Merge the existing db with the edit-stream into a brand-new file.
*/
void mergeEdits(MapFile.Reader db, SequenceFile.Reader sortedEdits, MapFile.Writer newDb) throws IOException {
WritableComparator comparator = new Link.UrlComparator();
// Create the keys and vals we'll use
LinkInstruction editItem = new LinkInstruction();
Link readerItem = new Link();
// Read the first items from both streams
boolean hasEntries = db.next(readerItem, NullWritable.get());
boolean hasEdits = sortedEdits.next(editItem, NullWritable.get());
TargetTester targetTester = new TargetTester(pageDb);
// As long as we have both edits and entries to process,
// we need to interleave them
while (hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_LINK) {
// When we add a link, we may replace a previous
// link with identical URL and MD5 values. Our
// comparator will test both
//
int comparison = comparator.compare(readerItem, editItem.getLink());
if (comparison < 0) {
// Write the readerKey, just passing it along.
// Don't process the edit yet.
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
targetOutlinkEdits++;
} else {
boolean oldOutlinkStatus = readerItem.targetHasOutlink();
boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
// Do the conditional so we minimize unnecessary
// mod-writes.
if (oldOutlinkStatus != newOutlinkStatus) {
readerItem.setTargetHasOutlink(newOutlinkStatus);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
targetOutlinkEdits++;
}
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// Write the new item, "replacing" the old one.
// We move to the next edit instruction and move
// past the replaced db entry.
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
// Delete the edit/readerItem from the other table if it's
// found to be invalid.
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEntries = db.next(readerItem, NullWritable.get());
hasEdits = sortedEdits.next(editItem, NullWritable.get());
} else if (comparison > 0) {
// Write the new item. We stay at the current
// db entry.
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
// Delete the edit from the other table if it's invalid
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
} else if (curInstruction == DEL_LINK) {
// When we delete a link, we do it by MD5 and apply
// it to the index first. A single delete instruction
// may remove many items in the db, during the earlier
// processing. However, unlike the index-processing stage,
// here we can expect a new DEL instruction for every
// item that we remove from the db.
//
int comparison = comparator.compare(readerItem, editItem.getLink());
if (comparison < 0) {
// Write readerKey, just passing it along. Don't
// process the edit yet.
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
// Delete the reader item if it's found to be invalid
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
} else {
readerItem.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
hasEntries = db.next(readerItem, NullWritable.get());
} else if (comparison == 0) {
// "Delete" the item by passing by the readerKey.
// We want a new entry, as well as the next instruction
// to process.
hasEntries = db.next(readerItem, NullWritable.get());
hasEdits = sortedEdits.next(editItem, NullWritable.get());
} else if (comparison > 0) {
// Ignore, move on to next instruction
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
}
}
// Now we have only edits. No more preexisting items!
while (! hasEntries && hasEdits) {
int curInstruction = editItem.getInstruction();
if (curInstruction == ADD_LINK) {
//
// Add the item from the edit list.
//
//
// Make sure the outlinks flag is set properly.
//
Link editLink = editItem.getLink();
int linkTest = targetTester.hasOutlinks(editLink.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, editLink, DEL_SINGLE_LINK, NullWritable.get());
} else {
editLink.setTargetHasOutlink(linkTest == HAS_OUTLINKS);
liwriter.appendInstructionInfo(futureEdits, editLink, ADD_LINK, NullWritable.get());
newDb.append(editLink, NullWritable.get());
itemsWritten++;
}
targetOutlinkEdits++;
} else if (curInstruction == DEL_LINK) {
// Ignore operation
}
// Move on to next edit
hasEdits = sortedEdits.next(editItem, NullWritable.get());
}
// Now we have only preexisting items. Just copy them
// to the new file, in order.
while (hasEntries && ! hasEdits) {
//
// Simply copy the remaining database items.
//
//
// First, make sure the 'outlinks' flag is set properly.
//
int linkTest = targetTester.hasOutlinks(readerItem.getURL());
if (linkTest == LINK_INVALID) {
liwriter.appendInstructionInfo(futureEdits, readerItem, DEL_SINGLE_LINK, NullWritable.get());
targetOutlinkEdits++;
} else {
boolean oldOutlinkStatus = readerItem.targetHasOutlink();
boolean newOutlinkStatus = (linkTest == HAS_OUTLINKS);
if (oldOutlinkStatus != newOutlinkStatus) {
readerItem.setTargetHasOutlink(newOutlinkStatus);
liwriter.appendInstructionInfo(futureEdits, readerItem, ADD_LINK, NullWritable.get());
targetOutlinkEdits++;
}
// Now copy the object
newDb.append(readerItem, NullWritable.get());
itemsWritten++;
}
// Move on to next
hasEntries = db.next(readerItem, NullWritable.get());
}
targetTester.close();
}
}
/**
* Create the WebDB for the first time.
*/
public static void createWebDB(NutchFileSystem nfs, File dbDir) throws IOException {
WebDBWriter starter = new WebDBWriter(nfs, dbDir, true);
starter.close();
}
boolean haveEdits = false;
NutchFileSystem fs = null;
File dbDir, dbFile, oldDbFile, newDbFile, tmp;
MapFile.Reader pagesByURL, pagesByMD5, linksByURL, linksByMD5;
SequenceFile.Writer pagesByURLWriter, pagesByMD5Writer, linksByURLWriter, linksByMD5Writer;
long pagesByURLEdits = 0, pagesByMD5Edits = 0, linksByURLEdits = 0, linksByMD5Edits = 0, targetOutlinkEdits = 0;
PageInstructionWriter piwriter = new PageInstructionWriter();
LinkInstructionWriter liwriter = new LinkInstructionWriter();
DataInputBuffer inBuf = new DataInputBuffer();
DataOutputBuffer outBuf = new DataOutputBuffer();
/**
* Create a WebDBWriter.
*/
public WebDBWriter(NutchFileSystem fs, File dbDir) throws IOException {
this(fs, dbDir, false);
}
/**
* Private constructor, so we can either open or create the db files.
*/
private WebDBWriter(NutchFileSystem fs, File dbDir, boolean create) throws IOException {
this.fs = fs;
this.dbDir = dbDir;
this.dbFile = new File(dbDir, "webdb");
this.oldDbFile = new File(dbDir, "webdb.old");
this.newDbFile = new File(dbDir, "webdb.new");
this.tmp = new File(newDbFile, "tmp");
if ((! fs.exists(dbDir)) && create) {
fs.mkdirs(dbDir);
}
if (! fs.exists(dbDir) || ! fs.isDirectory(dbDir)) {
throw new IOException("Database " + dbDir + " is not a directory.");
}
// Lock the writeLock immediately.
fs.lock(new File(dbDir, "dbwritelock"), false);
// Resolve any partial-state dirs from the last run.
if (fs.exists(oldDbFile)) {
if (fs.exists(dbFile)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -