📄 fetchlisttool.java
字号:
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.tools;
import java.io.*;
import java.net.*;
import java.util.*;
import java.text.*;
import java.util.logging.*;
import net.nutch.db.*;
import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.pagedb.*;
import net.nutch.linkdb.*;
/**********************************************
* This class takes an IWebDBReader, computes a relevant subset,
* and then emits the subset.
*
* @author Mike Cafarella
***********************************************/
public class FetchListTool {
public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.FetchListTool");
private static String TOP_N_SORTER = "topNSorter";
private static final long FETCH_GENERATION_DELAY_MS = 7 * 24 * 60 * 60 * 1000;
private boolean scoreByLinkCount =
NutchConf.getBoolean("fetchlist.score.by.link.count", false);
NutchFileSystem nfs;
File dbDir;
boolean refetchOnly, anchorOptimize;
float cutoffScore;
int seed;
/**
* The TableSet class will allocate a given FetchListEntry
* into one of several ArrayFiles. It chooses which
* ArrayFile based on a hash of the URL's domain name.
*
* It uses a hash of the domain name so that pages are
* allocated to a random ArrayFile, but same-host pages
* go to the same file (for efficiency purposes during
* fetch).
*
* Further, within a given file, the FetchListEntry items
* appear in random order. This is so that we don't
* hammer the same site over and over again during fetch.
*
* Each table should receive a roughly
* even number of entries, but all URLs for a specific
* domain name will be found in a single table. If
* the dataset is weirdly skewed toward large domains,
* there may be an uneven distribution.
*/
class TableSet {
Vector outputPaths = new Vector();
Vector tables = new Vector();
long appendCounts[];
boolean hasAppended = false;
/**
*/
public TableSet() {
}
/**
* Add a table to the list. Cannot be called
* after we start appending entries.
*/
public synchronized boolean add(String outputPath) throws IOException {
if (hasAppended) {
return false;
}
//
// Record where the file should go. Then open a
// SequenceFile.Writer to record the set of items
// we append to each table.
//
outputPaths.add(outputPath);
tables.add(new SequenceFile.Writer(nfs, outputPath + ".unsorted", MD5Hash.class, FetchListEntry.class));
return true;
}
/**
* Add FetchListEntry items to one of the tables.
* Choose the table based on a hash of the URL domain name.
*/
public synchronized boolean append(FetchListEntry newEntry) throws IOException {
hasAppended = true;
if (appendCounts == null) {
appendCounts = new long[outputPaths.size()];
}
Page fetchPage = newEntry.getPage();
// Extract the hostname from the URL
String host = null;
try {
host = new URL(fetchPage.getURL().toString()).getHost().toLowerCase();
} catch (MalformedURLException e) {
// ignore bad URLs
return false;
}
// Figure out which table is getting the item
MD5Hash hash = MD5Hash.digest(host);
int index = Math.abs(hash.hashCode()^seed) % tables.size();
// Write it down and return
SequenceFile.Writer writer = (SequenceFile.Writer) tables.elementAt(index);
writer.append(fetchPage.getMD5(), newEntry);
appendCounts[index]++;
return true;
}
/**
* Close down the TableSet, so there are no more FetchListEntries
* expected. We now:
* a) Close down all the SequenceFile.Writer objects.
* b) Sort each file
* c) Read each newly-sorted file and copy to an ArrayFile
*/
public synchronized void close() throws IOException {
hasAppended = true;
// A) Close all the SequenceFile.Writers
for (Enumeration e = tables.elements(); e.hasMoreElements(); ) {
((SequenceFile.Writer) e.nextElement()).close();
}
// B) Sort the edit-files
SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new MD5Hash.Comparator(), FetchListEntry.class);
//
// Iterate through each unsorted file. Sort it (while
// measuring the time taken) and upon completion delete
// the unsorted version.
//
long totalEntries = 0;
double totalTime = 0;
int i = 0;
for (Enumeration e = outputPaths.elements(); e.hasMoreElements(); i++) {
String name = (String) e.nextElement();
String unsortedName = name + ".unsorted";
long localStart = System.currentTimeMillis();
sorter.sort(unsortedName, name + ".sorted");
long localEnd = System.currentTimeMillis();
if (appendCounts != null) {
double localSecs = ((localEnd - localStart) / 1000.0);
LOG.info("Processing " + unsortedName + ": Sorted " + appendCounts[i] + " entries in " + localSecs + " seconds.");
LOG.info("Processing " + unsortedName + ": Sorted " + (appendCounts[i] / localSecs) + " entries/second");
totalEntries += appendCounts[i];
totalTime += localSecs;
}
nfs.delete(new File(name + ".unsorted"));
}
LOG.info("Overall processing: Sorted " + totalEntries + " entries in " + totalTime + " seconds.");
LOG.info("Overall processing: Sorted " + (totalTime / totalEntries) + " entries/second");
// C) Read in each newly-sorted file. Copy to an ArrayFile.
for (Enumeration e = outputPaths.elements(); e.hasMoreElements(); ) {
String name = (String) e.nextElement();
SequenceFile.Reader reader = new SequenceFile.Reader(nfs, name + ".sorted");
ArrayFile.Writer af = new ArrayFile.Writer(nfs, name, FetchListEntry.class);
try {
MD5Hash key = new MD5Hash();
FetchListEntry fle = new FetchListEntry();
while (reader.next(key, fle)) {
af.append(fle);
}
} finally {
af.close();
reader.close();
nfs.delete(new File(name + ".sorted"));
}
}
}
}
/*************************************
* SortableScore is just a WritableComparable Float!
*************************************/
public static class SortableScore implements WritableComparable {
float score;
/**
*/
public SortableScore() {
}
/**
*/
public void set(float score) {
this.score = score;
}
/**
*/
public float getFloat() {
return score;
}
////////
// WritableComparable
////////
/**
* Sort them in descending order!
*/
public int compareTo(Object o) {
SortableScore otherScore = (SortableScore) o;
if (score < otherScore.score) {
return 1;
} else if (score == otherScore.score) {
return 0;
} else {
return -1;
}
}
/**
*/
public void write(DataOutput out) throws IOException {
out.writeFloat(score);
}
/**
*/
public void readFields(DataInput in) throws IOException {
this.score = in.readFloat();
}
}
/**
* FetchListTool takes a page db, and emits a RECNO-based
* subset of it.
*/
public FetchListTool(NutchFileSystem nfs, File dbDir, boolean refetchOnly, boolean anchorOptimize, float cutoffScore, int seed) throws IOException, FileNotFoundException {
this.nfs = nfs;
this.dbDir = dbDir;
this.refetchOnly = refetchOnly;
this.anchorOptimize = anchorOptimize;
this.cutoffScore = cutoffScore;
this.seed = seed;
}
/**
* Spit out several fetchlists, so that we can fetch across
* several machines.
*/
public void emitMultipleLists(File dir, int numLists, long topN, long curTime) throws IOException {
//
// Create tables (and directories) for each fetchlist we want.
// Add them all to a TableSet object.
//
TableSet tables = new TableSet();
try {
String datePrefix = getDate();
File workingDir = new File(dir, "tmp_" + getDate());
nfs.mkdirs(workingDir);
try {
for (int i = 0; i < numLists; i++) {
File subdir = new File(dir, datePrefix + "-" + i);
nfs.mkdirs(subdir);
File file = new File(subdir, FetchListEntry.DIR_NAME);
tables.add(file.getPath());
}
// Now go through the fetchlist.
emitFetchList(tables, workingDir, topN, curTime);
} finally {
FileUtil.fullyDelete(nfs, workingDir);
}
} finally {
tables.close();
}
}
/**
* Spit out the fetchlist, to a BDB at the indicated filename.
*/
public void emitFetchList(File segmentDir, long topN, long curTime) throws IOException {
TableSet tables = new TableSet();
File workingDir = new File(segmentDir, "tmp_" + getDate());
nfs.mkdirs(workingDir);
File subdir = new File(segmentDir, getDate());
nfs.mkdirs(subdir);
try {
tables.add(new File(subdir, FetchListEntry.DIR_NAME).getPath());
try {
emitFetchList(tables, workingDir, topN, curTime);
} finally {
tables.close();
}
} finally {
FileUtil.fullyDelete(nfs, workingDir);
}
}
private static String getDate() {
return new SimpleDateFormat("yyyyMMddHHmmss").format
(new Date(System.currentTimeMillis()));
}
/**
* Emit the fetchlist, with the given TableSet. The TableSet is
* responsible for actually appending the item to the output file,
* which is from this function.
*/
void emitFetchList(TableSet tables, File workingDir, long topN, long curTime) throws IOException {
// Iterate through all the Pages, by URL. Iterating
// through by URL means we can save disk seeks when
// calling webdb.getLinks(URL).
//
// However, we don't really want the output to be in URL-ordered
// format. We would like the output to be URL-randomized, which
// an MD5-ordering preserves nicely. But we assume here that
// TableSet will do that randomizing for us. We just need to
// make sure we give it a good sampling of our data. (That is,
// if we are giving TableSet fewer than the max-possible items,
// we should make sure the items come evenly from all over the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -