📄 distributedanalysistool.java
字号:
public void computeRound(int processId, File distDir) throws IOException {
File assignFile = new File(distDir, ASSIGN_FILE_PREFIX + "." + processId);
long startIndex = 0, extent = 0;
DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(assignFile)));
try {
startIndex = in.readLong();
extent = in.readLong();
} finally {
in.close();
}
LOG.info("Start at: "+ startIndex);
LOG.info("Extent: "+ extent);
//
// Open scoreEdits file for this process. Write down
// all the score-edits we want to perform.
//
File scoreEdits = new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + processId);
SequenceFile.Writer scoreWriter = new SequenceFile.Writer(nfs, scoreEdits.getPath() + ".unsorted", UTF8.class, ScoreValue.class);
//
// Go through the appropriate WebDB range, and figure out
// next scores
//
try {
// Iterate through all items in the webdb, sorted by URL
long curIndex = 0;
ScoreValue score = new ScoreValue();
IWebDBReader reader = new WebDBReader(nfs, dbDir);
try {
for (Enumeration e = reader.pagesByMD5(); e.hasMoreElements(); curIndex++) {
//
// Find our starting place
//
if (curIndex < startIndex) {
e.nextElement();
continue;
}
//
// Bail if we've been here too long
//
if (curIndex - startIndex > extent) {
break;
}
//
// OK, do some analysis!
//
Page curPage = (Page) e.nextElement();
Link outLinks[] = reader.getLinks(curPage.getMD5());
int targetOutlinkers = 0;
for (int i = 0; i < outLinks.length; i++) {
if (outLinks[i].targetHasOutlink()) {
targetOutlinkers++;
}
}
//
// For our purposes here, assume every Page
// has an inlink, even though that might not
// really be true. It's close enough.
//
//
// In case there's no previous nextScore, grab
// score as an approximation.
//
float curNextScore = curPage.getNextScore();
if (outLinks.length > 0 && curNextScore == 0.0f) {
curNextScore = curPage.getScore();
}
//
// Compute contributions
//
float contributionForAll = (outLinks.length > 0) ? (curNextScore / outLinks.length) : 0.0f;
float contributionForOutlinkers = (targetOutlinkers > 0) ? (curNextScore / targetOutlinkers) : 0.0f;
for (int i = 0; i < outLinks.length; i++) {
// emit the target URL and the contribution
score.setScore(contributionForAll);
score.setNextScore(outLinks[i].targetHasOutlink() ? contributionForOutlinkers : 0.0f);
scoreWriter.append(outLinks[i].getURL(), score);
}
if (((curIndex - startIndex) % 5000) == 0) {
LOG.info("Pages consumed: " + (curIndex - startIndex) + " (at index " + curIndex + ")");
}
}
} finally {
reader.close();
}
} finally {
scoreWriter.close();
}
// Now sort the resulting score-edits file
SequenceFile.Sorter sorter = new SequenceFile.Sorter(nfs, new UTF8.Comparator(), ScoreValue.class);
sorter.sort(scoreEdits.getPath() + ".unsorted", scoreEdits.getPath() + ".sorted");
nfs.delete(new File(scoreEdits.getPath() + ".unsorted"));
}
/**
* This method collates and executes all the instructions
* computed by the many executors of computeRound(). It
* figures out what to write by looking at all the flat
* files found in the distDir. These files are labelled
* according to the processes that filled them. This method
* will check to make sure all those files are present
* before starting work.
*
* If the processors are distributed, you might have to
* copy all the instruction files to a single distDir before
* starting this method.
*
* Of course, this method is executed on only one process.
* It is run last.
*/
public void completeRound(File distDir, File scoreFile) throws IOException {
//
// Load the overall assignment file, so we can
// see how many processes we have and how many
// operations each should include
//
int numProcesses = 0;
long totalPages = 0;
long extent[] = null;
File overall = new File(distDir, "assignComplete");
DataInputStream in = new DataInputStream(new BufferedInputStream(nfs.open(overall)));
try {
numProcesses = in.readInt();
totalPages = in.readLong();
extent = new long[numProcesses];
for (int i = 0; i < numProcesses; i++) {
extent[i] = in.readLong();
}
} finally {
in.close();
in = null;
}
//
// Go through each instructions file we have, and
// apply each one to the webdb.
//
ScoreStats scoreStats = new ScoreStats();
IWebDBReader reader = new WebDBReader(nfs, dbDir);
IWebDBWriter writer = new WebDBWriter(nfs, dbDir);
EditSet editSet = new EditSet(distDir, numProcesses);
try {
int count = 0;
UTF8 curEditURL = new UTF8();
ScoreValue curContribution = new ScoreValue();
boolean hasEdit = editSet.next(curEditURL, curContribution);
//
// Go through all the Pages, in URL-sort order.
// We also read from the score-edit file in URL-sort order.
//
for (Enumeration e = reader.pages(); e.hasMoreElements(); count++) {
Page curPage = (Page) e.nextElement();
if (! hasEdit) {
break;
}
//
// Apply the current score-edit to the db item,
// if appropriate
//
int comparison = curPage.getURL().compareTo(curEditURL);
float newScore = 0.0f, newNextScore = 0.0f;
if (comparison < 0) {
// Fine. The edit applies to a Page we will
// hit later. Ignore it, and move onto the next
// Page. This should only happen with Pages
// that have no incoming links, which are necessarily
// special-case Pages.
//
// However, that means the Page's score should
// be set to the minimum possible, as we have no
// incoming links.
newScore = (1 - DECAY_VALUE);
newNextScore = (1 - DECAY_VALUE);
} else if (comparison > 0) {
// Error! We should never hit this situation.
// It means we have a score-edit for an item
// that's not found in the database!
throw new IOException("Impossible situation. There is a score-edit for " + curEditURL + ", which comes after the current Page " + curPage.getURL());
} else {
//
// The only really interesting case is when the
// score-edit and the curPage are the same.
//
// Sum all the contributions
while (hasEdit && curPage.getURL().compareTo(curEditURL) == 0) {
newScore += curContribution.score();
newNextScore += curContribution.nextScore();
hasEdit = editSet.next(curEditURL, curContribution);
}
newScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newScore);
newNextScore = (1 - DECAY_VALUE) + (DECAY_VALUE * newNextScore);
}
// Finally, assign it.
curPage.setScore(newScore, newNextScore);
writer.addPageWithScore(curPage);
scoreStats.addScore(newScore);
if ((count % 5000) == 0) {
LOG.info("Pages written: " + count);
}
}
LOG.info("Pages encountered: " + count);
LOG.info("Target pages from init(): " + totalPages);
} finally {
reader.close();
editSet.close();
writer.close();
}
//
// Emit the score distribution info
//
if (scoreFile.exists()) {
scoreFile.delete();
}
PrintStream pout = new PrintStream(new BufferedOutputStream(nfs.create(scoreFile)));
try {
scoreStats.emitDistribution(pout);
} finally {
pout.close();
}
//
// Delete all the distributed overhead files
//
FileUtil.fullyDelete(nfs, distDir);
}
/**
* Kick off the link analysis. Submit the locations of the
* Webdb and the number of iterations.
*
* DAT -initRound <n_processes> <dist_dir> <db_dir>
* DAT -computeRound <process_id> <dist_dir> <db_dir>
* DAT -completeRound <dist_dir> <db_dir>
*/
public static void main(String argv[]) throws IOException {
if (argv.length < 2) {
System.out.println("usage: java net.nutch.tools.DistributedAnalysisTool (-local | -ndfs <namenode:port>) -initRound|-computeRound|-completeRound (numProcesses | processId) <dist_dir> <db_dir>");
return;
}
String command = null;
int numProcesses = 0, processId = 0, numIterations = 0;
File distDir = null, dbDir = null;
NutchFileSystem nfs = NutchFileSystem.parseArgs(argv, 0);
for (int i = 0; i < argv.length; i++) {
if ("-initRound".equals(argv[i])) {
command = argv[i];
numProcesses = Integer.parseInt(argv[i+1]);
distDir = new File(argv[i+2]);
dbDir = new File(argv[i+3]);
i+=3;
} else if ("-computeRound".equals(argv[i])) {
command = argv[i];
processId = Integer.parseInt(argv[i+1]);
distDir = new File(argv[i+2]);
dbDir = new File(argv[i+3]);
i+=3;
} else if ("-completeRound".equals(argv[i])) {
command = argv[i];
distDir = new File(argv[i+1]);
dbDir = new File(argv[i+2]);
i+=2;
}
}
System.out.println("Started at " + new Date(System.currentTimeMillis()));
try {
DistributedAnalysisTool dat =
new DistributedAnalysisTool(nfs, dbDir);
if ("-initRound".equals(command)) {
dat.initRound(numProcesses, distDir);
} else if ("-computeRound".equals(command)) {
dat.computeRound(processId, distDir);
} else if ("-completeRound".equals(command)) {
dat.completeRound(distDir, new File(dbDir, "linkstats.txt"));
} else {
System.out.println("No directive.");
}
} finally {
System.out.println("Finished at " + new Date(System.currentTimeMillis()));
nfs.close();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -