📄 segmentreader.java
字号:
/** * Copyright 2005 The Apache Software Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.nutch.segment;import java.io.*;import java.text.SimpleDateFormat;import java.util.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;import org.apache.hadoop.util.Progressable;import org.apache.nutch.crawl.CrawlDatum;import org.apache.nutch.parse.ParseData;import org.apache.nutch.parse.ParseText;import org.apache.nutch.protocol.Content;import org.apache.nutch.util.LogUtil;import org.apache.nutch.util.NutchConfiguration;/** Dump the content of a segment. */public class SegmentReader extends Configured implements Reducer { public static final Log LOG = LogFactory.getLog(SegmentReader.class); long recNo = 0L; private boolean co, fe, ge, pa, pd, pt; private FileSystem fs; /** * Wraps inputs in an {@link ObjectWritable}, to permit merging different * types in reduce. */ public static class InputFormat extends SequenceFileInputFormat { public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); return new SequenceFileRecordReader(job, split) { public synchronized boolean next(Writable key, Writable value) throws IOException { ObjectWritable wrapper = (ObjectWritable) value; try { wrapper.set(getValueClass().newInstance()); } catch (Exception e) { throw new IOException(e.toString()); } return super.next(key, (Writable) wrapper.get()); } }; } } /** Implements a text output format */ public static class TextOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase { public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { final Path segmentDumpFile = new Path(job.getOutputPath(), name); // Get the old copy out of the way fs.delete(segmentDumpFile); final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile)); return new RecordWriter() { public synchronized void write(WritableComparable key, Writable value) throws IOException { ObjectWritable writable = (ObjectWritable) value; printStream.println((String) writable.get()); } public synchronized void close(Reporter reporter) throws IOException { printStream.close(); } }; } } public SegmentReader() { super(null); } public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, boolean pa, boolean pd, boolean pt) { super(conf); this.co = co; this.fe = fe; this.ge = ge; this.pa = pa; this.pd = pd; this.pt = pt; try { this.fs = FileSystem.get(getConf()); } catch (IOException e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } public void configure(JobConf job) { setConf(job); this.co = getConf().getBoolean("segment.reader.co", true); this.fe = getConf().getBoolean("segment.reader.fe", true); this.ge = getConf().getBoolean("segment.reader.ge", true); this.pa = getConf().getBoolean("segment.reader.pa", true); this.pd = getConf().getBoolean("segment.reader.pd", true); this.pt = getConf().getBoolean("segment.reader.pt", true); try { this.fs = FileSystem.get(getConf()); } catch (IOException e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } private JobConf createJobConf() { JobConf job = new JobConf(getConf()); job.setBoolean("segment.reader.co", this.co); job.setBoolean("segment.reader.fe", this.fe); job.setBoolean("segment.reader.ge", this.ge); job.setBoolean("segment.reader.pa", this.pa); job.setBoolean("segment.reader.pd", this.pd); job.setBoolean("segment.reader.pt", this.pt); return job; } public void close() {} public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { StringBuffer dump = new StringBuffer(); dump.append("\nRecno:: ").append(recNo++).append("\n"); dump.append("URL:: " + key.toString() + "\n"); while (values.hasNext()) { Object value = ((ObjectWritable) values.next()).get(); // unwrap if (value instanceof CrawlDatum) { dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString()); } else if (value instanceof Content) { dump.append("\nContent::\n").append(((Content) value).toString()); } else if (value instanceof ParseData) { dump.append("\nParseData::\n").append(((ParseData) value).toString()); } else if (value instanceof ParseText) { dump.append("\nParseText::\n").append(((ParseText) value).toString()); } else if (LOG.isWarnEnabled()) { LOG.warn("Unrecognized type: " + value.getClass()); } } output.collect(key, new ObjectWritable(dump.toString())); } public void dump(Path segment, Path output) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: dump segment: " + segment); } JobConf job = createJobConf(); job.setJobName("read " + segment); if (ge) job.addInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); if (fe) job.addInputPath(new Path(segment, CrawlDatum.FETCH_DIR_NAME)); if (pa) job.addInputPath(new Path(segment, CrawlDatum.PARSE_DIR_NAME)); if (co) job.addInputPath(new Path(segment, Content.DIR_NAME)); if (pd) job.addInputPath(new Path(segment, ParseData.DIR_NAME)); if (pt) job.addInputPath(new Path(segment, ParseText.DIR_NAME)); job.setInputFormat(InputFormat.class); job.setInputKeyClass(UTF8.class); job.setInputValueClass(ObjectWritable.class); job.setReducerClass(SegmentReader.class); Path tempDir = new Path("/tmp/segread-" + new java.util.Random().nextInt()); fs.delete(tempDir); job.setOutputPath(tempDir); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(UTF8.class); job.setOutputValueClass(ObjectWritable.class); JobClient.runJob(job); // concatenate the output Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump")); // remove the old file fs.delete(dumpFile); Path[] files = fs.listPaths(tempDir); PrintWriter writer = null; int currentRecordNumber = 0; if (files.length > 0) { writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile)))); try { for (int i = 0; i < files.length; i++) { Path partFile = (Path) files[i]; try { currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber); } catch (IOException exception) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't copy the content of " + partFile.toString() + " into " + dumpFile.toString()); LOG.warn(exception.getMessage()); } } } } finally { writer.close(); } } fs.delete(tempDir); if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: done"); } } /** Appends two files and updates the Recno counter */ private int append(FileSystem fs, Configuration conf, Path src, PrintWriter writer, int currentRecordNumber) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src))); try { String line = reader.readLine(); while (line != null) { if (line.startsWith("Recno:: ")) { line = "Recno:: " + currentRecordNumber++; } writer.println(line); line = reader.readLine(); } return currentRecordNumber; } finally { reader.close(); } } private static final String[][] keys = new String[][] { {"co", "Content::\n"}, {"ge", "Crawl Generate::\n"}, {"fe", "Crawl Fetch::\n"}, {"pa", "Crawl Parse::\n"}, {"pd", "ParseData::\n"}, {"pt", "ParseText::\n"} }; public void get(final Path segment, final UTF8 key, Writer writer, final Map results) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: get '" + key + "'"); } ArrayList threads = new ArrayList(); if (co) threads.add(new Thread() { public void run() { try { List res = getMapRecords(new Path(segment, Content.DIR_NAME), key); results.put("co", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); if (fe) threads.add(new Thread() { public void run() { try { List res = getMapRecords(new Path(segment, CrawlDatum.FETCH_DIR_NAME), key); results.put("fe", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); if (ge) threads.add(new Thread() { public void run() { try { List res = getSeqRecords(new Path(segment, CrawlDatum.GENERATE_DIR_NAME), key); results.put("ge", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); if (pa) threads.add(new Thread() { public void run() { try { List res = getSeqRecords(new Path(segment, CrawlDatum.PARSE_DIR_NAME), key); results.put("pa", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); if (pd) threads.add(new Thread() { public void run() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -