⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 parsesegment.java

📁 一些简要的公爵类一些简要的公爵类一些简要的公爵类
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* Copyright (c) 2004 The Nutch Organization.  All rights reserved.   */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */

package net.nutch.tools;

import net.nutch.pagedb.FetchListEntry;
import net.nutch.io.*;
import net.nutch.fs.*;
import net.nutch.util.*;
import net.nutch.protocol.*;
import net.nutch.parse.*;
import net.nutch.plugin.*;

import net.nutch.fetcher.FetcherOutput;

import java.io.EOFException;
import java.io.File;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import java.util.Properties;
import java.util.logging.*;

/**
 * Parse contents in one segment.
 *
 * <p>
 * It assumes, under given segment, existence of ./fetcher_output/,
 * which is typically generated after a non-parsing fetcher run
 * (i.e., fetcher is started with option -noParsing).
 *
 * <p> Contents in one segemnt are parsed and saved in these steps:
 * <li> (1) ./fetcher_output/ and ./content/ are looped together
 * (possibly by multiple ParserThreads), and content is parsed for each entry.
 * The entry number and resultant ParserOutput are saved in ./parser.unsorted.
 * <li> (2) ./parser.unsorted is sorted by entry number, result saved as
 * ./parser.sorted.
 * <li> (3) ./parser.sorted and ./fetcher_output/ are looped together.
 * At each entry, ParserOutput is split into ParseDate and ParseText,
 * which are saved in ./parse_data/ and ./parse_text/ respectively. Also
 * updated is FetcherOutput with parsing status, which is saved in ./fetcher/.
 *
 * <p> In the end, ./fetcher/ should be identical to one resulted from
 * fetcher run WITHOUT option -noParsing.
 *
 * <p> By default, intermediates ./parser.unsorted and ./parser.sorted
 * are removed at the end, unless option -noClean is used. However
 * ./fetcher_output/ is kept intact.
 *
 * <p> Check Fetcher.java and FetcherOutput.java for further discussion.
 *
 * @author John Xing
 */

public class ParseSegment {

  public static final Logger LOG =
    LogFormatter.getLogger(ParseSegment.class.getName());

  private int threadCount =                       // max number of threads
    NutchConf.getInt("parser.threads.parse", 10);

  private NutchFileSystem nfs;

  // segment dir
  private String directory;

  // readers for FetcherOutput (no-parsing) and Content
  private ArrayFile.Reader fetcherNPReader;
  private ArrayFile.Reader contentReader;

  // SequenceFile (unsorted) for ParserOutput
  private File unsortedFile;
  private SequenceFile.Writer parserOutputWriter;

  // SequenceFile (sorted) for ParserOutput
  private File sortedFile;

  // whether dryRun only (i.e., no real parsing is done)
  private boolean dryRun = false;

  // whether clean intermediate files
  private boolean clean = true;

  // entry (record number) in fetcherNPReader (same in contentReader)
  private long entry = -1;

  // for stats
  private long start;                             // start time
  private long bytes;                             // total bytes parsed
  private int pages;                              // total pages parsed
  private int errors;                             // total pages errored

  private ThreadGroup group = new ThreadGroup("parser"); // our thread group

  /**
   * Inner class ParserThread
   */
  private class ParserThread extends Thread {

    // current entry that this thread is parsing
    private long myEntry = -1;

    // for detailed stats
    private long t0,t1,t2,t3,t4,t5;

    public ParserThread() { super(group, "myThread"); }

    /**
     * This thread participates in looping through
     * entries of FetcherOutput and Content
     */
    public void run() {

      FetcherOutput fetcherOutput = new FetcherOutput();
      Content content = new Content();

      FetchListEntry fle = null;
      String url = null;

      while (true) {
        if (LogFormatter.hasLoggedSevere())       // something bad happened
          break;                                  // exit

        t0 = System.currentTimeMillis();

        try {

          // must be read in order! thus synchronize threads.
          synchronized (ParseSegment.this) {
            t1 = System.currentTimeMillis();

            try {
              if (fetcherNPReader.next(fetcherOutput) == null ||
                contentReader.next(content) == null)
              return;
            } catch (EOFException eof) {
              // only partial data available, stop this thread,
              // other threads will be stopped also.
              return;
            }

            entry++;
            myEntry = entry;
            if (LOG.isLoggable(Level.FINE))
              LOG.fine("Read in entry "+entry);

            // safe guard against mismatched files
            //if (entry != fetcherNPReader.key() ||
            //    entry != contentReader.key()) {
            //  LOG.severe("Mismatched entries under "
            //    + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
            //  continue;
            //}
          }

          t2 = System.currentTimeMillis();

          fle = fetcherOutput.getFetchListEntry();
          url = fle.getPage().getURL().toString();

          LOG.fine("parsing " + url);            // parse the page

          // safe guard against mismatched files
          if (!url.equals(content.getUrl())) {
            LOG.severe("Mismatched entries under "
              + FetcherOutput.DIR_NAME_NP + " and " + Content.DIR_NAME);
            continue;
          }

          // if fetch was successful or
          // previously unable to parse (so try again)
          if (fetcherOutput.getStatus() == FetcherOutput.SUCCESS ||
              fetcherOutput.getStatus() == FetcherOutput.CANT_PARSE) {
            handleContent(url, content);
            synchronized (ParseSegment.this) {
              pages++;                    // record successful parse
              bytes += content.getContent().length;
              if ((pages % 100) == 0)
                status();
            }
          } else {
            // errored at fetch step
            logError(url, new ProtocolException("Error at fetch stage"));
            handleNoContent(ParserOutput.NOFETCH);
          }

        } catch (ParseException e) {
          logError(url, e);
          handleNoContent(ParserOutput.FAILURE);

        } catch (Throwable t) {                   // an unchecked exception
          if (fle != null) {
            logError(url, t);
            handleNoContent(ParserOutput.UNKNOWN);
          } else {
            LOG.severe("Unexpected exception");
          }
        }
      }
    }

    private void logError(String url, Throwable t) {
      LOG.info("parse of " + url + " failed with: " + t);
      if (LOG.isLoggable(Level.FINE))
        LOG.log(Level.FINE, "stack", t);               // stack trace
      synchronized (ParseSegment.this) {               // record failure
        errors++;
      }
    }

    private void handleContent(String url, Content content)
      throws ParseException {

      //String contentType = content.getContentType();
      String contentType = content.getMetadata().getProperty("Content-Type");

      if (ParseSegment.this.dryRun) {
        LOG.info("To be handled as Content-Type: "+contentType);
        return;
      }

      Parser parser = ParserFactory.getParser(contentType, url);
      Parse parse = parser.getParse(content);

      outputPage
        (new ParseText(parse.getText()), parse.getData(),ParserOutput.SUCCESS);
    }

    private void handleNoContent(int status) {
      if (ParseSegment.this.dryRun) {
        LOG.info("To be handled as no content");
        return;
      }
      outputPage(new ParseText(""),
                 new ParseData("", new Outlink[0], new Properties()),
                 status);
    }
      
    private void outputPage
      (ParseText parseText, ParseData parseData, int status) {
      try {
        t3 = System.currentTimeMillis();
        synchronized (parserOutputWriter) {
          t4 = System.currentTimeMillis();
          parserOutputWriter.append(new LongWritable(myEntry),
            new ParserOutput(parseData, parseText, status));
          t5 = System.currentTimeMillis();
          if (LOG.isLoggable(Level.FINE))
            LOG.fine("Entry: "+myEntry
              +" "+parseData.getMetadata().getProperty("Content-Length")
              +" wait="+(t1-t0) +" read="+(t2-t1) +" parse="+(t3-t2)
              +" wait="+(t4-t3) +" write="+(t5-t4) +"ms");
        }
      } catch (Throwable t) {
        LOG.severe("error writing output:" + t.toString());
      }
    }

  }

  /**
   * Inner class ParserOutput: ParseData + ParseText + status
   */
  private class ParserOutput extends VersionedWritable {
    public static final String DIR_NAME = "parser";

    private final static byte VERSION = 1;

    // could be more detailed
    public final static byte UNKNOWN = (byte)0; // unknown problem in parsing
    public final static byte SUCCESS = (byte)1; // parsing succeeded
    public final static byte FAILURE = (byte)2; // parsing failed
    public final static byte NOFETCH = (byte)3; // fetch was not a SUCCESS

    private int status;

    private ParseData parseData = new ParseData();
    private ParseText parseText = new ParseText();

    public ParserOutput() {}
    
    public ParserOutput(ParseData parseData, ParseText parseText, int status) {
      this.parseData = parseData;
      this.parseText = parseText;
      this.status = status;
    }

    public byte getVersion() { return VERSION; }

    public ParseData getParseData() {
      return this.parseData;
    }

    public ParseText getParseText() {
      return this.parseText;
    }

    public int getStatus() {
      return this.status;
    }

    public final void readFields(DataInput in) throws IOException {
      super.readFields(in);                         // check version
      status = in.readByte();
      parseData.readFields(in);
      parseText.readFields(in);
      return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -