⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 segmentreader.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.segment;import java.io.*;import java.text.SimpleDateFormat;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.parse.ParseData;import org.apache.nutch.parse.ParseText;import org.apache.nutch.protocol.Content;import org.apache.nutch.util.LogUtil;import org.apache.nutch.util.NutchConfiguration;/** Dump the content of a segment. */public class SegmentReader extends Configured implements Reducer {  public static final Log LOG = LogFactory.getLog(SegmentReader.class);  long recNo = 0L;    private boolean co, fe, ge, pa, pd, pt;  private FileSystem fs;  /**   * Wraps inputs in an {@link ObjectWritable}, to permit merging different   * types in reduce.   */  public static class InputFormat extends SequenceFileInputFormat {    public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)            throws IOException {      reporter.setStatus(split.toString());      return new SequenceFileRecordReader(job, split) {        public synchronized boolean next(Writable key, Writable value) throws IOException {          ObjectWritable wrapper = (ObjectWritable) value;          try {            wrapper.set(getValueClass().newInstance());          } catch (Exception e) {            throw new IOException(e.toString());          }          return super.next(key, (Writable) wrapper.get());        }      };    }  }  /** Implements a text output format */  public static class TextOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase {    public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, String name, Progressable progress) throws IOException {      final Path segmentDumpFile = new Path(job.getOutputPath(), name);      // Get the old copy out of the way      fs.delete(segmentDumpFile);      final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));      return new RecordWriter() {        public synchronized void write(WritableComparable key, Writable value) throws IOException {          ObjectWritable writable = (ObjectWritable) value;          printStream.println((String) writable.get());        }        public synchronized void close(Reporter reporter) throws IOException {          printStream.close();        }      };    }  }  public SegmentReader() {    super(null);  }    public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, boolean pa,          boolean pd, boolean pt) {    super(conf);    this.co = co;    this.fe = fe;    this.ge = ge;    this.pa = pa;    this.pd = pd;    this.pt = pt;    try {      this.fs = FileSystem.get(getConf());    } catch (IOException e) {      e.printStackTrace(LogUtil.getWarnStream(LOG));    }  }  public void configure(JobConf job) {    setConf(job);    this.co = getConf().getBoolean("segment.reader.co", true);    this.fe = getConf().getBoolean("segment.reader.fe", true);    this.ge = getConf().getBoolean("segment.reader.ge", true);    this.pa = getConf().getBoolean("segment.reader.pa", true);    this.pd = getConf().getBoolean("segment.reader.pd", true);    this.pt = getConf().getBoolean("segment.reader.pt", true);    try {      this.fs = FileSystem.get(getConf());    } catch (IOException e) {      e.printStackTrace(LogUtil.getWarnStream(LOG));    }  }  private JobConf createJobConf() {    JobConf job = new JobConf(getConf());    job.setBoolean("segment.reader.co", this.co);    job.setBoolean("segment.reader.fe", this.fe);    job.setBoolean("segment.reader.ge", this.ge);    job.setBoolean("segment.reader.pa", this.pa);    job.setBoolean("segment.reader.pd", this.pd);    job.setBoolean("segment.reader.pt", this.pt);    return job;  }    public void close() {}  public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)          throws IOException {    StringBuffer dump = new StringBuffer();    dump.append("\nRecno:: ").append(recNo++).append("\n");    dump.append("URL:: " + key.toString() + "\n");    while (values.hasNext()) {      Object value = ((ObjectWritable) values.next()).get(); // unwrap      if (value instanceof CrawlDatum) {        dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());      } else if (value instanceof Content) {        dump.append("\nContent::\n").append(((Content) value).toString());      } else if (value instanceof ParseData) {        dump.append("\nParseData::\n").append(((ParseData) value).toString());      } else if (value instanceof ParseText) {        dump.append("\nParseText::\n").append(((ParseText) value).toString());      } else if (LOG.isWarnEnabled()) {        LOG.warn("Unrecognized type: " + value.getClass());      }    }    output.collect(key, new ObjectWritable(dump.toString()));  }  public void dump(Path segment, Path output) throws IOException {        if (LOG.isInfoEnabled()) {      LOG.info("SegmentReader: dump segment: " + segment);    }    JobConf job = createJobConf();    job.setJobName("read " + segment);    if (ge) job.addInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));    if (fe) job.addInputPath(new Path(segment, CrawlDatum.FETCH_DIR_NAME));    if (pa) job.addInputPath(new Path(segment, CrawlDatum.PARSE_DIR_NAME));    if (co) job.addInputPath(new Path(segment, Content.DIR_NAME));    if (pd) job.addInputPath(new Path(segment, ParseData.DIR_NAME));    if (pt) job.addInputPath(new Path(segment, ParseText.DIR_NAME));    job.setInputFormat(InputFormat.class);    job.setInputKeyClass(UTF8.class);    job.setInputValueClass(ObjectWritable.class);    job.setReducerClass(SegmentReader.class);    Path tempDir = new Path("/tmp/segread-" + new java.util.Random().nextInt());    fs.delete(tempDir);        job.setOutputPath(tempDir);    job.setOutputFormat(TextOutputFormat.class);    job.setOutputKeyClass(UTF8.class);    job.setOutputValueClass(ObjectWritable.class);    JobClient.runJob(job);    // concatenate the output    Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump"));    // remove the old file    fs.delete(dumpFile);    Path[] files = fs.listPaths(tempDir);    PrintWriter writer = null;    int currentRecordNumber = 0;    if (files.length > 0) {      writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));      try {        for (int i = 0; i < files.length; i++) {          Path partFile = (Path) files[i];          try {            currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber);          } catch (IOException exception) {            if (LOG.isWarnEnabled()) {              LOG.warn("Couldn't copy the content of " + partFile.toString() +                       " into " + dumpFile.toString());              LOG.warn(exception.getMessage());            }          }        }      } finally {        writer.close();      }    }    fs.delete(tempDir);    if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: done"); }  }  /** Appends two files and updates the Recno counter */  private int append(FileSystem fs, Configuration conf, Path src, PrintWriter writer, int currentRecordNumber)          throws IOException {    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src)));    try {      String line = reader.readLine();      while (line != null) {        if (line.startsWith("Recno:: ")) {          line = "Recno:: " + currentRecordNumber++;        }        writer.println(line);        line = reader.readLine();      }      return currentRecordNumber;    } finally {      reader.close();    }  }  private static final String[][] keys = new String[][] {          {"co", "Content::\n"},          {"ge", "Crawl Generate::\n"},          {"fe", "Crawl Fetch::\n"},          {"pa", "Crawl Parse::\n"},          {"pd", "ParseData::\n"},          {"pt", "ParseText::\n"}  };  public void get(final Path segment, final UTF8 key, Writer writer,          final Map results) throws Exception {    if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: get '" + key + "'"); }    ArrayList threads = new ArrayList();    if (co) threads.add(new Thread() {      public void run() {        try {          List res = getMapRecords(new Path(segment, Content.DIR_NAME), key);          results.put("co", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    if (fe) threads.add(new Thread() {      public void run() {        try {          List res = getMapRecords(new Path(segment, CrawlDatum.FETCH_DIR_NAME), key);          results.put("fe", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    if (ge) threads.add(new Thread() {      public void run() {        try {          List res = getSeqRecords(new Path(segment, CrawlDatum.GENERATE_DIR_NAME), key);          results.put("ge", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    if (pa) threads.add(new Thread() {      public void run() {        try {          List res = getSeqRecords(new Path(segment, CrawlDatum.PARSE_DIR_NAME), key);          results.put("pa", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    if (pd) threads.add(new Thread() {      public void run() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -