📄 segmentreader.java
字号:
try { List res = getMapRecords(new Path(segment, ParseData.DIR_NAME), key); results.put("pd", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); if (pt) threads.add(new Thread() { public void run() { try { List res = getMapRecords(new Path(segment, ParseText.DIR_NAME), key); results.put("pt", res); } catch (Exception e) { e.printStackTrace(LogUtil.getWarnStream(LOG)); } } }); Iterator it = threads.iterator(); while (it.hasNext()) ((Thread)it.next()).start(); int cnt = 0; do { try { Thread.sleep(5000); } catch (Exception e) {}; it = threads.iterator(); while (it.hasNext()) { if (((Thread)it.next()).isAlive()) cnt++; } if ((cnt > 0) && (LOG.isDebugEnabled())) { LOG.debug("(" + cnt + " to retrieve)"); } } while (cnt > 0); for (int i = 0; i < keys.length; i++) { List res = (List)results.get(keys[i][0]); if (res != null && res.size() > 0) { for (int k = 0; k < res.size(); k++) { writer.write(keys[i][1]); writer.write(res.get(k) + "\n"); } } writer.flush(); } } private List getMapRecords(Path dir, UTF8 key) throws Exception { MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); ArrayList res = new ArrayList(); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable value = (Writable)valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) res.add(value); readers[i].close(); } return res; } private List getSeqRecords(Path dir, UTF8 key) throws Exception { SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), dir); ArrayList res = new ArrayList(); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable aKey = (Writable)keyClass.newInstance(); Writable value = (Writable)valueClass.newInstance(); for (int i = 0; i < readers.length; i++) { while (readers[i].next(aKey, value)) { if (aKey.equals(key)) res.add(value); } readers[i].close(); } return res; } public static class SegmentReaderStats { public long start = -1L; public long end = -1L; public long generated = -1L; public long fetched = -1L; public long fetchErrors = -1L; public long parsed = -1L; public long parseErrors = -1L; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); public void list(List dirs, Writer writer) throws Exception { writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n"); for (int i = 0; i < dirs.size(); i++) { Path dir = (Path)dirs.get(i); SegmentReaderStats stats = new SegmentReaderStats(); getStats(dir, stats); writer.write(dir.getName() + "\t"); if (stats.generated == -1) writer.write("?"); else writer.write(stats.generated + ""); writer.write("\t\t"); if (stats.start == -1) writer.write("?\t"); else writer.write(sdf.format(new Date(stats.start))); writer.write("\t"); if (stats.end == -1) writer.write("?"); else writer.write(sdf.format(new Date(stats.end))); writer.write("\t"); if (stats.fetched == -1) writer.write("?"); else writer.write(stats.fetched + ""); writer.write("\t"); if (stats.parsed == -1) writer.write("?"); else writer.write(stats.parsed + ""); writer.write("\n"); writer.flush(); } } public void getStats(Path segment, final SegmentReaderStats stats) throws Exception { SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); long cnt = 0L; UTF8 key = new UTF8(); for (int i = 0; i < readers.length; i++) { while (readers[i].next(key)) cnt++; readers[i].close(); } stats.generated = cnt; Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME); if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) { cnt = 0L; long start = Long.MAX_VALUE; long end = Long.MIN_VALUE; CrawlDatum value = new CrawlDatum(); MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf()); for (int i = 0; i < mreaders.length; i++) { while (mreaders[i].next(key, value)) { cnt++; if (value.getFetchTime() < start) start = value.getFetchTime(); if (value.getFetchTime() > end) end = value.getFetchTime(); } mreaders[i].close(); } stats.start = start; stats.end = end; stats.fetched = cnt; } Path parseDir = new Path(segment, ParseData.DIR_NAME); if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) { cnt = 0L; long errors = 0L; ParseData value = new ParseData(); MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf()); for (int i = 0; i < mreaders.length; i++) { while (mreaders[i].next(key, value)) { cnt++; if (!value.getStatus().isSuccess()) errors++; } mreaders[i].close(); } stats.parsed = cnt; stats.parseErrors = errors; } } private static final int MODE_DUMP = 0; private static final int MODE_LIST = 1; private static final int MODE_GET = 2; public static void main(String[] args) throws Exception { if (args.length < 2) { usage(); return; } int mode = -1; if (args[0].equals("-dump")) mode = MODE_DUMP; else if (args[0].equals("-list")) mode = MODE_LIST; else if (args[0].equals("-get")) mode = MODE_GET; boolean co = true; boolean fe = true; boolean ge = true; boolean pa = true; boolean pd = true; boolean pt = true; // collect general options for (int i = 1; i < args.length; i++) { if (args[i].equals("-nocontent")) { co = false; args[i] = null; } else if (args[i].equals("-nofetch")) { fe = false; args[i] = null; } else if (args[i].equals("-nogenerate")) { ge = false; args[i] = null; } else if (args[i].equals("-noparse")) { pa = false; args[i] = null; } else if (args[i].equals("-noparsedata")) { pd = false; args[i] = null; } else if (args[i].equals("-noparsetext")) { pt = false; args[i] = null; } } Configuration conf = NutchConfiguration.create(); final FileSystem fs = FileSystem.get(conf); SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt); // collect required args switch (mode) { case MODE_DUMP: String input = args[1]; if (input == null) { System.err.println("Missing required argument: <segment_dir>"); usage(); return; } String output = args.length > 2 ? args[2] : null; if (output == null) { System.err.println("Missing required argument: <output>"); usage(); return; } segmentReader.dump(new Path(input), new Path(output)); return; case MODE_LIST: ArrayList dirs = new ArrayList(); for (int i = 1; i < args.length; i++) { if (args[i] == null) continue; if (args[i].equals("-dir")) { Path dir = new Path(args[++i]); Path[] files = fs.listPaths(dir, new PathFilter() { public boolean accept(Path pathname) { try { if (fs.isDirectory(pathname)) return true; } catch (IOException e) {}; return false; } }); if (files != null && files.length > 0) { dirs.addAll(Arrays.asList(files)); } } else dirs.add(new Path(args[i])); } segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8")); return; case MODE_GET: input = args[1]; if (input == null) { System.err.println("Missing required argument: <segment_dir>"); usage(); return; } String key = args.length > 2 ? args[2] : null; if (key == null) { System.err.println("Missing required argument: <keyValue>"); usage(); return; } segmentReader.get(new Path(input), new UTF8(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap()); return; default: System.err.println("Invalid operation: " + args[0]); usage(); return; } } private static void usage() { System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n"); System.err.println("* General options:"); System.err.println("\t-nocontent\tignore content directory"); System.err.println("\t-nofetch\tignore crawl_fetch directory"); System.err.println("\t-nogenerate\tignore crawl_generate directory"); System.err.println("\t-noparse\tignore crawl_parse directory"); System.err.println("\t-noparsedata\tignore parse_data directory"); System.err.println("\t-noparsetext\tignore parse_text directory"); System.err.println(); System.err.println("* SegmentReader -dump <segment_dir> <output> [general options]"); System.err.println(" Dumps content of a <segment_dir> as a text file to <output>.\n"); System.err.println("\t<segment_dir>\tname of the segment directory."); System.err.println("\t<output>\tname of the (non-existent) output directory."); System.err.println(); System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]"); System.err.println(" List a synopsis of segments in specified directories, or all segments in"); System.err.println(" a directory <segments>, and print it on System.out\n"); System.err.println("\t<segment_dir1> ...\tlist of segment directories to process"); System.err.println("\t-dir <segments>\t\tdirectory that contains multiple segments"); System.err.println(); System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general options]"); System.err.println(" Get a specified record from a segment, and print it on System.out.\n"); System.err.println("\t<segment_dir>\tname of the segment directory."); System.err.println("\t<keyValue>\tvalue of the key (url)."); System.err.println("\t\tNote: put double-quotes around strings with spaces."); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -