📄 parsesegment.java
字号:
/* Copyright (c) 2004 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.tools;
import net.nutch.pagedb.FetchListEntry;
import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.protocol.*;
import net.nutch.parse.*;
import net.nutch.plugin.*;
import net.nutch.fetcher.FetcherOutput;
import java.io.EOFException;
import java.io.File;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import java.util.logging.*;
/**
* Parse contents in one segment.
*
* <p>
* It assumes, under given segment, existence of ./fetcher_output/,
* which is typically generated after a non-parsing fetcher run
* (i.e., fetcher is started with option -noParsing).
*
* <p> Contents in one segemnt are parsed and saved in these steps:
* <li> (1) ./fetcher_output/ and ./content/ are looped together
* (possibly by multiple ParserThreads), and content is parsed for each entry.
* The entry number and resultant ParserOutput are saved in ./parser.unsorted.
* <li> (2) ./parser.unsorted is sorted by entry number, result saved as
* ./parser.sorted.
* <li> (3) ./parser.sorted and ./fetcher_output/ are looped together.
* At each entry, ParserOutput is split into ParseDate and ParseText,
* which are saved in ./parse_data/ and ./parse_text/ respectively. Also
* updated is FetcherOutput with parsing status, which is saved in ./fetcher/.
*
* <p> In the end, ./fetcher/ should be identical to one resulted from
* fetcher run WITHOUT option -noParsing.
*
* <p> By default, intermediates ./parser.unsorted and ./parser.sorted
* are removed at the end, unless option -noClean is used. However
* ./fetcher_output/ is kept intact.
*
* <p> Check Fetcher.java and FetcherOutput.java for further discussion.
*
* @author John Xing
*/
public class ParseSegment {
public static final Logger LOG =
LogFormatter.getLogger(ParseSegment.class.getName());
private int threadCount = // max number of threads
NutchConf.getInt("parser.threads.parse", 10);
private NutchFileSystem nfs;
// segment dir
private String directory;
// readers for FetcherOutput (no-parsing) and Content
private ArrayFile.Reader fetcherNPReader;
private ArrayFile.Reader contentReader;
// SequenceFile (unsorted) for ParserOutput
private File unsortedFile;
private SequenceFile.Writer parserOutputWriter;
// SequenceFile (sorted) for ParserOutput
private File sortedFile;
// whether dryRun only (i.e., no real parsing is done)
private boolean dryRun = false;
// whether clean intermediate files
private boolean clean = true;
// entry (record number) in fetcherNPReader (same in contentReader)
private long entry = -1;
// for stats
private long start; // start time
private long bytes; // total bytes parsed
private int pages; // total pages parsed
private int errors; // total pages errored
private ThreadGroup group = new ThreadGroup("parser"); // our thread group
/**
* Inner class ParserThread
*/
private class ParserThread extends Thread {
// current entry that this thread is parsing
private long myEntry = -1;
// for detailed stats
private long t0,t1,t2,t3,t4,t5;
public ParserThread() { super(group, "myThread"); }
/**
* This thread participates in looping through
* entries of FetcherOutput and Content
*/
public void run() {
FetcherOutput fetcherOutput = new FetcherOutput();
Content content = new Content();
FetchListEntry fle = null;
String url = null;
while (true) {
if (LogFormatter.hasLoggedSevere()) // something bad happened
break; // exit
t0 = System.currentTimeMillis();
try {
// must be read in order! thus synchronize threads.
synchronized (ParseSegment.this) {
t1 = System.currentTimeMillis();
try {
if (fetcherNPReader.next(fetcherOutput) == null ||
contentReader.next(content) == null)
return;
} catch (EOFException eof) {
// only partial data available, stop this thread,
// other threads will be stopped also.
return;
}
entry++;
myEntry = entry;
if (LOG.isLoggable(Level.FINE))
LOG.fine("Read in entry "+entry);
// safe guard against mismatched files
//if (entry != fetcherNPReader.key() ||
// entry != contentReader.key()) {
// LOG.severe("Mismatched entries under "
// + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
// continue;
//}
}
t2 = System.currentTimeMillis();
fle = fetcherOutput.getFetchListEntry();
url = fle.getPage().getURL().toString();
LOG.fine("parsing " + url); // parse the page
// safe guard against mismatched files
if (!url.equals(content.getUrl())) {
LOG.severe("Mismatched entries under "
+ FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
continue;
}
// if fetch was successful or
// previously unable to parse (so try again)
if (fetcherOutput.getStatus() == FetcherOutput.SUCCESS ||
fetcherOutput.getStatus() == FetcherOutput.CANT_PARSE) {
handleContent(url, content);
synchronized (ParseSegment.this) {
pages++; // record successful parse
bytes += content.getContent().length;
if ((pages % 100) == 0)
status();
}
} else {
// errored at fetch step
logError(url, new ProtocolException("Error at fetch stage"));
handleNoContent(ParserOutput.NOFETCH);
}
} catch (ParseException e) {
logError(url, e);
handleNoContent(ParserOutput.FAILURE);
} catch (Throwable t) { // an unchecked exception
if (fle != null) {
logError(url, t);
handleNoContent(ParserOutput.UNKNOWN);
} else {
LOG.severe("Unexpected exception");
}
}
}
}
private void logError(String url, Throwable t) {
LOG.info("parse of " + url + " failed with: " + t);
if (LOG.isLoggable(Level.FINE))
LOG.log(Level.FINE, "stack", t); // stack trace
synchronized (ParseSegment.this) { // record failure
errors++;
}
}
private void handleContent(String url, Content content)
throws ParseException {
//String contentType = content.getContentType();
String contentType = content.getMetadata().getProperty("Content-Type");
if (ParseSegment.this.dryRun) {
LOG.info("To be handled as Content-Type: "+contentType);
return;
}
Parser parser = ParserFactory.getParser(contentType, url);
Parse parse = parser.getParse(content);
outputPage
(new ParseText(parse.getText()), parse.getData(),ParserOutput.SUCCESS);
}
private void handleNoContent(int status) {
if (ParseSegment.this.dryRun) {
LOG.info("To be handled as no content");
return;
}
outputPage(new ParseText(""),
new ParseData("", new Outlink[0], new Properties()),
status);
}
private void outputPage
(ParseText parseText, ParseData parseData, int status) {
try {
t3 = System.currentTimeMillis();
synchronized (parserOutputWriter) {
t4 = System.currentTimeMillis();
parserOutputWriter.append(new LongWritable(myEntry),
new ParserOutput(parseData, parseText, status));
t5 = System.currentTimeMillis();
if (LOG.isLoggable(Level.FINE))
LOG.fine("Entry: "+myEntry
+" "+parseData.getMetadata().getProperty("Content-Length")
+" wait="+(t1-t0) +" read="+(t2-t1) +" parse="+(t3-t2)
+" wait="+(t4-t3) +" write="+(t5-t4) +"ms");
}
} catch (Throwable t) {
LOG.severe("error writing output:" + t.toString());
}
}
}
/**
* Inner class ParserOutput: ParseData + ParseText + status
*/
private class ParserOutput extends VersionedWritable {
public static final String DIR_NAME = "parser";
private final static byte VERSION = 1;
// could be more detailed
public final static byte UNKNOWN = (byte)0; // unknown problem in parsing
public final static byte SUCCESS = (byte)1; // parsing succeeded
public final static byte FAILURE = (byte)2; // parsing failed
public final static byte NOFETCH = (byte)3; // fetch was not a SUCCESS
private int status;
private ParseData parseData = new ParseData();
private ParseText parseText = new ParseText();
public ParserOutput() {}
public ParserOutput(ParseData parseData, ParseText parseText, int status) {
this.parseData = parseData;
this.parseText = parseText;
this.status = status;
}
public byte getVersion() { return VERSION; }
public ParseData getParseData() {
return this.parseData;
}
public ParseText getParseText() {
return this.parseText;
}
public int getStatus() {
return this.status;
}
public final void readFields(DataInput in) throws IOException {
super.readFields(in); // check version
status = in.readByte();
parseData.readFields(in);
parseText.readFields(in);
return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -