📄 webdbwriter.java
字号:
currentUrl.compareTo(current.getPage().getURL()) == 0);
return true;
}
}
/*************************************************
* Holds an instruction over a Link.
*************************************************/
public static class LinkInstruction implements WritableComparable {
Link link;
int instruction;
/**
*/
public LinkInstruction() {
}
/**
*/
public LinkInstruction(Link link, int instruction) {
set(link, instruction);
}
/**
* Re-init from another LinkInstruction's info.
*/
public void set(LinkInstruction that) {
this.instruction = that.instruction;
if (this.link == null)
this.link = new Link();
this.link.set(that.link);
}
/**
* Re-init with a Link and an instruction
*/
public void set(Link link, int instruction) {
this.link = link;
this.instruction = instruction;
}
//
// WritableComparable
//
public int compareTo(Object o) {
return this.link.compareTo(((LinkInstruction) o).link);
}
public void write(DataOutput out) throws IOException {
out.writeByte(instruction);
link.write(out);
}
public void readFields(DataInput in) throws IOException {
this.instruction = in.readByte();
if (link == null)
link = new Link();
link.readFields(in);
}
public Link getLink() {
return link;
}
public int getInstruction() {
return instruction;
}
/*******************************************************
* Sorts the instruction first by Md5, then by opcode.
*******************************************************/
public static class MD5Comparator extends WritableComparator {
private static final Link.MD5Comparator MD5_COMPARATOR =
new Link.MD5Comparator();
public MD5Comparator() { super(LinkInstruction.class); }
public int compare(WritableComparable a, WritableComparable b) {
LinkInstruction instructionA = (LinkInstruction)a;
LinkInstruction instructionB = (LinkInstruction)b;
return instructionA.link.md5Compare(instructionB.link);
}
/** Optimized comparator. */
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return MD5_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
}
}
/*********************************************************
* Sorts the instruction first by url, then by opcode.
*********************************************************/
public static class UrlComparator extends WritableComparator {
private static final Link.UrlComparator URL_COMPARATOR =
new Link.UrlComparator();
public UrlComparator() { super(LinkInstruction.class); }
public int compare(WritableComparable a, WritableComparable b) {
LinkInstruction instructionA = (LinkInstruction)a;
LinkInstruction instructionB = (LinkInstruction)b;
return instructionA.link.urlCompare(instructionB.link);
}
/**
* Optimized comparator.
*/
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return URL_COMPARATOR.compare(b1, s1+1, l1-1, b2, s2+1, l2-1);
}
}
}
/*******************************************************
* LinkInstructionWriter very efficiently writes a
* LinkInstruction to a SequenceFile.Writer. Much better
* than calling "writer.append(new LinkInstruction())"
********************************************************/
public static class LinkInstructionWriter {
LinkInstruction li = new LinkInstruction();
/**
*/
public LinkInstructionWriter() {
}
/**
* Append the LinkInstruction info to the indicated SequenceFile
* and keep the LI for later reuse.
*/
public synchronized void appendInstructionInfo(SequenceFile.Writer writer, Link link, int opcode, Writable val) throws IOException {
li.set(link, opcode);
writer.append(li, val);
}
}
/********************************************************
* This class deduplicates link operations. We want to
* sort by MD5, then by URL. But all operations
* should be unique.
*********************************************************/
class DeduplicatingLinkSequenceReader {
Link currentKey = new Link();
LinkInstruction current = new LinkInstruction();
SequenceFile.Reader edits;
boolean haveCurrent;
/**
*/
public DeduplicatingLinkSequenceReader(SequenceFile.Reader edits) throws IOException {
this.edits = edits;
this.haveCurrent = edits.next(current, NullWritable.get());
}
/**
* The incoming stream of edits is sorted first by MD5, then by URL.
* MD5-only values always come before MD5+URL.
*/
public boolean next(LinkInstruction key) throws IOException {
if (! haveCurrent) {
return false;
}
currentKey.set(current.getLink());
do {
key.set(current);
} while ((haveCurrent = edits.next(current, NullWritable.get())) &&
currentKey.compareTo(current.getLink()) == 0);
return true;
}
}
/**************************************************
* The CloseProcessor class is used when we close down
* the webdb. We give it the path, members, and class values
* needed to apply changes to any of our 4 data tables.
*
* This is an abstract class. Each subclass must define
* the exact merge procedure. However, file-handling
* and edit-processing is standardized as much as possible.
*
**************************************************/
private abstract class CloseProcessor {
String basename;
MapFile.Reader oldDb;
SequenceFile.Writer editWriter;
SequenceFile.Sorter sorter;
WritableComparator comparator;
Class keyClass, valueClass;
long itemsWritten = 0;
/**
* Store away these members for later use.
*/
CloseProcessor(String basename, MapFile.Reader oldDb, SequenceFile.Writer editWriter, SequenceFile.Sorter sorter, WritableComparator comparator, Class keyClass, Class valueClass) {
this.basename = basename;
this.oldDb = oldDb;
this.editWriter = editWriter;
this.sorter = sorter;
this.comparator = comparator;
this.keyClass = keyClass;
this.valueClass = valueClass;
}
/**
* Perform the shutdown sequence for this Processor.
* There is a lot of file-moving and edit-sorting that
* is common across all the 4 tables.
*
* Returns how many items were written out by this close().
*/
long closeDown(File workingDir, File outputDir, long numEdits) throws IOException {
File editsFile = new File(workingDir, basename + ".out");
File newDbFile = new File(outputDir, basename);
File sortedEditsFile = new File(editsFile.getPath() + ".sorted");
editWriter.close();
// If there are edits, then process them.
if (numEdits != 0) {
// Sort the edits
long startSort = System.currentTimeMillis();
sorter.sort(editsFile.getPath(), sortedEditsFile.getPath());
// sorter.close();
long endSort = System.currentTimeMillis();
LOG.info("Processing " + basename + ": Sorted " + numEdits + " instructions in " + ((endSort - startSort) / 1000.0) + " seconds.");
LOG.info("Processing " + basename + ": Sorted " + (numEdits / ((endSort - startSort) / 1000.0)) + " instructions/second");
// Rename appropriately
fs.delete(editsFile);
fs.rename(sortedEditsFile, editsFile);
// Read the sorted edits
SequenceFile.Reader sortedEdits = new SequenceFile.Reader(fs, editsFile.getPath());
// Create a brand-new output db for the integrated data
MapFile.Writer newDb = (comparator == null) ? new MapFile.Writer(fs, newDbFile.getPath(), keyClass, valueClass) : new MapFile.Writer(fs, newDbFile.getPath(), comparator, valueClass);
// Iterate through the edits, and merge changes with existing
// db into the brand-new file
oldDb.reset();
// Merge the edits. We did it!
long startMerge = System.currentTimeMillis();
mergeEdits(oldDb, sortedEdits, newDb);
long endMerge = System.currentTimeMillis();
LOG.info("Processing " + basename + ": Merged to new DB containing " + itemsWritten + " records in " + ((endMerge - startMerge) / 1000.0) + " seconds");
LOG.info("Processing " + basename + ": Merged " + (itemsWritten / ((endMerge - startMerge) / 1000.0)) + " records/second");
// Close down readers, writers
sortedEdits.close();
newDb.close();
} else {
// Otherwise, simply copy the file into place,
// without all the processing overhead.
long startCopy = System.currentTimeMillis();
File curFile = new File(dbFile, basename);
FileUtil.recursiveCopy(fs, curFile, newDbFile);
long endCopy = System.currentTimeMillis();
LOG.info("Processing " + basename + ": Copied file (" + newDbFile.length()+ " bytes) in " + ((endCopy - startCopy) / 1000.0) + " secs.");
}
// Delete the now-consumed edits file to save space
fs.delete(editsFile);
return itemsWritten;
}
/**
* The loop that actually applies the changes and writes to
* a new db. This is different for every subclass!
*/
abstract void mergeEdits(MapFile.Reader db, SequenceFile.Reader edits, MapFile.Writer newDb) throws IOException;
}
/***
* The PagesByURLProcessor is used during close() time for
* the pagesByURL table. We instantiate one of these, and it
* takes care of the entire shutdown process.
*/
private class PagesByURLProcessor extends CloseProcessor {
SequenceFile.Writer futureEdits;
/**
* We store "futureEdits" so we can write edits for the
* next table-db step
*/
PagesByURLProcessor(MapFile.Reader db, SequenceFile.Writer editWriter, SequenceFile.Writer futureEdits) {
super(PAGES_BY_URL, db, editWriter, new SequenceFile.Sorter(fs, new PageInstruction.UrlComparator(), NullWritable.class), new UTF8.Comparator(), null, Page.class);
this.futureEdits = futureEdits;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -