📄 distributedanalysistool.java
字号:
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.tools;
import java.io.*;
import java.util.*;
import java.util.logging.*;
import net.nutch.io.*;
import net.nutch.db.*;
import net.nutch.fs.*;
import net.nutch.net.*;
import net.nutch.util.*;
import net.nutch.linkdb.*;
import net.nutch.pagedb.*;
import net.nutch.fetcher.*;
/***************************************
* DistributedAnalysisTool performs link-analysis by reading
* exclusively from a IWebDBReader, and writing to
* an IWebDBWriter.
*
* This tool can be used in phases via the command line
* to compute the LinkAnalysis score across many machines.
*
* For a single iteration of LinkAnalysis, you must have:
*
* 1) An "initRound" step that writes down how the work should be
* divided. This outputs a "dist" directory which must be made
* available to later steps. It requires the input db directory.
*
* 2) As many simultaneous "computeRound" steps as you like, but this
* number must be determined in step 1. Each step may be run
* on different machines, or on the same, or however you like.
* It requires the the "db" and "dist" directories (or copies) as
* inputs. Each run will output an "instructions file".
*
* 3) A "completeRound" step, which integrates the results of all the
* many "computeRound" steps. It writes to a "db" directory. It
* assumes that all the instructions files have been gathered into
* a single "dist" input directory. If you're running everything
* on a single filesystem, this will happen easily. If not, then
* you will have to gather the files by hand (or with a script).
*
* For more iterations, repeat steps 1 - 3!
*
* @author Mike Cafarella
***************************************/
public class DistributedAnalysisTool {
final private static String ASSIGN_FILE_PREFIX = "assignment";
final private static String SCORE_EDITS_FILE_PREFIX = "scoreEdits";
final private static String ASSIGN_COMPLETE = "assignComplete";
final private static float DEFAULT_SCORE = 0.15f;
final private static float DECAY_VALUE = 0.85f;
public static final Logger LOG = LogFormatter.getLogger("net.nutch.tools.DistributedAnalysisTool");
/**
* The EditSet inner class represents all of the sorted edits
* files we must process. The edit-loop can repeatedly ask
* an EditSet for the "next item", and the EditSet will
* seamlessly deal with opening and closing files
*/
class EditSet {
File distDir;
int numEditFiles;
int curEditFile;
SequenceFile.Reader curReader;
/**
* The "distDir" is where we find all the edit files.
* The "numEditFiles" is now many we can expect to get there.
*/
public EditSet(File distDir, int numEditFiles) throws IOException {
this.distDir = distDir;
this.numEditFiles = numEditFiles;
this.curEditFile = 0;
getNextReader();
}
/**
* Get the next item for reading, closing and opening
* files if necessary. Return false if there are no
* more items to return.
*/
public synchronized boolean next(Writable key, Writable val) throws IOException {
//
// Open the next input stream if necessary
//
if (curReader == null) {
getNextReader();
// Assume each edits-file has at least one entry in it.
if (curReader == null) {
return false;
}
}
return curReader.next(key, val);
}
/**
* Create the next edit reader and return it.
*/
private void getNextReader() throws IOException {
if (curReader != null) {
curReader.close();
}
if (curEditFile < numEditFiles) {
curReader = new SequenceFile.Reader(nfs, new File(distDir, SCORE_EDITS_FILE_PREFIX + "." + curEditFile + ".sorted").getPath());
LOG.info("Opened stream to file " + curEditFile);
curEditFile++;
}
}
/**
*/
public synchronized void close() throws IOException {
if (curReader != null) {
curReader.close();
}
curEditFile = numEditFiles;
}
}
/**
* This is a Writable version of a Float. We
* need this so we can store it in a SequenceFile
*/
class ScoreValue implements Writable {
float score;
float nextScore;
/**
*/
public ScoreValue() {
}
/**
*/
public void setScore(float f) {
this.score = f;
}
/**
*/
public void setNextScore(float f) {
this.nextScore = f;
}
/**
*/
public float score() {
return score;
}
/**
*/
public float nextScore() {
return nextScore;
}
/**
*/
public void write(DataOutput out) throws IOException {
out.writeFloat(score);
out.writeFloat(nextScore);
}
/**
*/
public void readFields(DataInput in) throws IOException {
this.score = in.readFloat();
this.nextScore = in.readFloat();
}
}
NutchFileSystem nfs;
File dbDir;
/**
* Give the pagedb and linkdb files and their cache sizes
*/
public DistributedAnalysisTool(NutchFileSystem nfs, File dbDir) throws IOException, FileNotFoundException {
this.nfs = nfs;
this.dbDir = dbDir;
}
/**
* This method prepares the ground for a set of processes
* to distribute a round of LinkAnalysis work. It writes out
* the "assignments" to a directory. This directory must be
* made accessible to all the processes. (It may be mounted by
* all of them, or copied to all of them.)
*
* This is run by a single process, and it is run first.
*/
public boolean initRound(int numProcesses, File distDir) throws IOException {
//
// The distDir must be empty or non-existent.
//
if ((nfs.exists(distDir) && nfs.isFile(distDir)) ||
(nfs.exists(distDir) && (nfs.listFiles(distDir).length != 0))) {
LOG.severe("Must be an empty or non-existent dir: " + distDir);
return false;
}
if (! nfs.exists(distDir)) {
nfs.mkdirs(distDir);
}
//
// Figure out how many db items we have, and how many
// processes they are allocated to.
//
long startPages[] = new long[numProcesses];
long totalPages = 0;
IWebDBReader reader = new WebDBReader(nfs, dbDir);
try {
totalPages = reader.numPages();
} finally {
reader.close();
}
long chunkSize = totalPages / numProcesses;
long pagesProcessedSoFar = 0;
//
// From zero to the 2nd-to-last item, assign a
// chunk's worth of pages. The value at each index
// indicates the start page for that process.
//
startPages[0] = 0;
for (int i = 1; i < numProcesses; i++) {
startPages[i] = startPages[i-1] + chunkSize;
}
//
// Emit the assignments for the processes
//
try {
// Write out each file
for (int i = 0; i < numProcesses; i++) {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(new File(distDir, ASSIGN_FILE_PREFIX + "." + i))));
try {
// Start page
out.writeLong(startPages[i]);
// How many pages to process
if (i != numProcesses - 1) {
out.writeLong(chunkSize);
} else {
// in last index, make up for remainders
out.writeLong(totalPages - ((numProcesses - 1) * chunkSize));
}
} finally {
out.close();
}
}
//
// Write a file that indicates we finished correctly.
// This makes it easier for controlling scripts to
// check whether this process completed.
//
// It also holds some overall instruction information,
// so we can do some error-checking at complete-time.
//
File completeFile = new File(distDir, "assignComplete");
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(nfs.create(completeFile)));
try {
out.writeInt(numProcesses);
out.writeLong(totalPages);
// Compute extents
long extent[] = new long[numProcesses];
for (int i = 0; i < numProcesses - 1; i++) {
extent[i] = chunkSize * (i + 1);
}
extent[numProcesses-1] = totalPages - (chunkSize * (numProcesses - 1));
// Emit extents
for (int i = 0; i < extent.length; i++) {
out.writeLong(extent[i]);
}
} finally {
out.close();
}
return true;
} catch (IOException ex) {
LOG.severe(ex.toString());
LOG.severe("Sorry, could not finish assignments");
}
return false;
}
/**
* This method is invoked by one of the many processes involved
* in LinkAnalysis. There will be many of these running at the
* same time. That's OK, though, since there's no locking
* that has to go on between them.
*
* This computes the LinkAnalysis score for a given region
* of the database. It writes its ID, the region params, and
* the scores-to-be-written into a flat file. This file is
* labelled according to its processid, and is found inside distDir.
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -