📄 fetchlisttool.java
字号:
// db.)
//
long count = 0;
TreeMap anchorTable = new TreeMap();
Vector unknownDomainLinks = new Vector();
//
// Create a comparator that matches the domainIDs for
// Link objects.
//
Comparator domainComparator = new Comparator() {
public int compare(Object o1, Object o2) {
Link l1 = (Link) o1;
Link l2 = (Link) o2;
if (l1.getDomainID() < l2.getDomainID()) {
return -1;
} else if (l1.getDomainID() == l2.getDomainID()) {
return 0;
} else {
return 1;
}
}
};
//
// Go through all the pages by URL. Filter the ones
// we really don't want, and save the others for possible
// emission.
//
SortableScore curScore = new SortableScore();
File unsortedFile = new File(workingDir, TOP_N_SORTER + ".unsorted");
SequenceFile.Writer writer = new SequenceFile.Writer(nfs, unsortedFile.getPath(), SortableScore.class, FetchListEntry.class);
try {
IWebDBReader webdb = new WebDBReader(nfs, dbDir);
try {
for (Enumeration e = webdb.pages(); e.hasMoreElements(); count++) {
// Grab the next Page.
Page page = (Page) e.nextElement();
boolean shouldFetch = true;
if (((count % 10000) == 0) && (count != 0)) {
LOG.info("Processing page " + count + "...");
}
//
// Don't emit it if the Page's score doesn't meet
// our cutoff value
//
if ((cutoffScore >= 0) && (page.getScore() < cutoffScore)) {
continue;
}
//
// If the item is not yet ready to be fetched, move on.
//
// Also, if getNextFetchTime is set to Long.MAX_VALUE,
// then it should never be fetched.
//
if (page.getNextFetchTime() > curTime ||
page.getNextFetchTime() == Long.MAX_VALUE) {
continue;
}
//
// If we're in refetchOnly mode, set shouldFetch to FALSE
// for any Pages whose URL's MD5 is the same as the
// listed MD5. That indicates that no content has been
// downloaded in the past.
//
if (refetchOnly) {
MD5Hash urlHash = MD5Hash.digest(page.getURL());
if (page.getMD5().equals(urlHash)) {
shouldFetch = false;
}
}
//
// If anchorOptimize mode is on, AND shouldFetch is
// false, then we might apply a further optimization.
// Since a non-fetched Page (that is, a URL-only
// item) can only be discovered via the incoming
// anchor text, we can skip those Pages that have
// only *empty* incoming anchor text.
//
Link inlinks[] = webdb.getLinks(page.getURL());
if ((! shouldFetch) && anchorOptimize) {
boolean foundUsefulAnchor = false;
for (int i = 0; i < inlinks.length; i++) {
UTF8 anchorText = inlinks[i].getAnchorText();
if ((anchorText != null) &&
(anchorText.toString().trim().length() > 0)) {
foundUsefulAnchor = true;
break;
}
}
if (! foundUsefulAnchor) {
continue;
}
}
//
// Uniquify identical anchor text strings by source
// domain. If the anchor text is identical, and
// the domains are identical, then the anchor should
// only be included once.
//
// Links will arrive in the array sorted first by URL,
// and then by source-MD5.
//
int uniqueAnchors = 0;
for (int i = 0; i < inlinks.length; i++) {
String anchor = inlinks[i].getAnchorText().toString().trim();
if (anchor.length() > 0) {
if (inlinks[i].getDomainID() == 0) {
unknownDomainLinks.add(anchor);
} else {
Set domainUniqueLinks = (Set) anchorTable.get(anchor);
if (domainUniqueLinks == null) {
domainUniqueLinks = new TreeSet(domainComparator);
anchorTable.put(anchor, domainUniqueLinks);
}
if (domainUniqueLinks.add(inlinks[i])) {
uniqueAnchors++;
}
}
}
}
//
// Finally, collect the incoming anchor text for
// the current URL. Step one is to add the incoming
// anchors whose links' source-domains are unknown.
// (The target, obviously, the URL we're currently
// processing)
//
int i = 0;
String results[] = new String[uniqueAnchors + unknownDomainLinks.size()];
for (Enumeration e2 = unknownDomainLinks.elements(); e2.hasMoreElements(); i++) {
results[i] = (String) e2.nextElement();
}
unknownDomainLinks.clear();
//
// Step 2, add the anchors that have actually been
// uniquified by source-domain.
//
for (Iterator it = anchorTable.keySet().iterator(); it.hasNext(); ) {
String key = (String) it.next();
Set domainUniqueLinks = (Set) anchorTable.get(key);
for (int j = 0; j < domainUniqueLinks.size(); j++) {
results[i++] = key;
}
}
anchorTable.clear();
//
// Last, add the FetchListEntry to a file so we can
// sort by score. Be sure to modify the Page's
// fetchtime; this allows us to soon generate
// another fetchlist which would not include this
// Page. That's helpful because with two distinct
// fetchlists, it should be possible to fetch and
// perform dbupdate at the same time.
//
// Optionally set the score by the log of number of
// incoming anchors.
curScore.set(scoreByLinkCount ?
(float)Math.log(results.length) : page.getScore());
page.setNextFetchTime(System.currentTimeMillis() + FETCH_GENERATION_DELAY_MS);
writer.append(curScore, new FetchListEntry(shouldFetch, page, results));
}
} finally {
webdb.close();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
writer.close();
}
//
// The next step is to sort the file we created above.
// after being sorted, we add the "topN" items to the
// TableSet.
//
File sortedFile = new File(workingDir, TOP_N_SORTER + ".sorted");
SequenceFile.Sorter topNSorter = new SequenceFile.Sorter(nfs, SortableScore.class, FetchListEntry.class);
topNSorter.sort(unsortedFile.getPath(), sortedFile.getPath());
//
// Last of all, add the topN items to the table set.
//
// This is also where we rewrite the WebDB - we need to do
// this so we can modify the "date" field. Rewriting the
// db can be expensive, but it's that modification that will
// allow us to interleave fetching and db-update.
//
WebDBWriter dbwriter = new WebDBWriter(nfs, dbDir);
try {
SequenceFile.Reader reader = new SequenceFile.Reader(nfs, sortedFile.getPath());
try {
SortableScore key = new SortableScore();
FetchListEntry value = new FetchListEntry();
while (topN > 0 && reader.next(key, value)) {
tables.append(value);
topN--;
//
// Modify the Page in the webdb so that its date
// is set forward a week. This way, we can have
// generate two consecutive different fetchlists
// without an intervening update. So, we generate
// lists A and B, and start fetching A. Upon
// completion, we use A to update the db, and start
// fetching B. This way we have simultaneous
// dbupdate and page fetch, which should double
// our throughput.
//
dbwriter.addPage(value.getPage());
}
} finally {
reader.close();
}
} finally {
dbwriter.close();
}
}
/**
* Generate a fetchlist from the pagedb and linkdb
*/
public static void main(String argv[]) throws IOException, FileNotFoundException {
if (argv.length < 2) {
System.out.println("Usage: FetchListTool (-local | -ndfs <namenode:port>) <db> <segment_dir> [-refetchonly] [-anchoroptimize linkdb] [-topN N] [-cutoff cutoffscore] [-numFetchers numFetchers] [-adddays numDays]");
return;
}
//
// Required args
//
int i = 0;
NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, i);
File dbDir = new File(argv[i++]);
File segmentDir = new File(argv[i++]);
long curTime = System.currentTimeMillis();
//
// Optional args
//
boolean refetchOnly = false, anchorOptimize = false;
long topN = Long.MAX_VALUE;
float cutoffScore = -1.0f;
int numFetchers = 1;
int seed = new Random().nextInt();
try {
for (; i < argv.length; i++) {
if ("-refetchonly".equals(argv[i])) {
refetchOnly = true;
} else if ("-anchoroptimize".equals(argv[i])) {
anchorOptimize = true;
} else if ("-topN".equals(argv[i])) {
if (i+1 < argv.length) {
topN = Long.parseLong(argv[i+1]);
i++;
} else {
System.out.println("No argument present for -topN");
return;
}
} else if ("-cutoff".equals(argv[i])) {
if (i+1 < argv.length) {
cutoffScore = Float.parseFloat(argv[i+1]);
i++;
} else {
System.out.println("No argument present for -cutoffscore");
return;
}
} else if ("-numFetchers".equals(argv[i])) {
if (i+1 < argv.length) {
numFetchers = Integer.parseInt(argv[i+1]);
i++;
} else {
System.out.println("No argument present for -numFetchers");
return;
}
} else if ("-adddays".equals(argv[i])) {
if (i+1 < argv.length) {
long numDays = Integer.parseInt(argv[i+1]);
curTime += numDays * 1000L * 60 * 60 * 24;
} else {
System.out.println("No argument present for -adddays");
return;
}
}
}
} catch (NumberFormatException nfe) {
System.out.println("Badly-formatted number:: " + nfe);
return;
}
//
// Check that args are consistent
//
if (anchorOptimize && !refetchOnly) {
System.out.println("Tool cannot use -anchoroptimize option without -refetchonly option as well.");
return;
}
//
// Finally, start things up.
//
LOG.info("FetchListTool started");
if (topN != Long.MAX_VALUE) {
LOG.info("topN:" + topN);
}
if (cutoffScore >= 0) {
LOG.info("cutoffscore:" + cutoffScore);
}
if (numFetchers > 1) {
LOG.info("seed:" + seed);
}
FetchListTool flt = new FetchListTool(nfs, dbDir, refetchOnly, anchorOptimize, cutoffScore, seed);
if (numFetchers > 1) {
flt.emitMultipleLists(segmentDir, numFetchers, topN, curTime);
} else {
flt.emitFetchList(segmentDir, topN, curTime);
}
nfs.close();
LOG.info("FetchListTool completed");
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -