⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 segmentreader.java

📁 nutch0.8源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        try {          List res = getMapRecords(new Path(segment, ParseData.DIR_NAME), key);          results.put("pd", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    if (pt) threads.add(new Thread() {      public void run() {        try {          List res = getMapRecords(new Path(segment, ParseText.DIR_NAME), key);          results.put("pt", res);        } catch (Exception e) {          e.printStackTrace(LogUtil.getWarnStream(LOG));        }      }    });    Iterator it = threads.iterator();    while (it.hasNext()) ((Thread)it.next()).start();    int cnt = 0;    do {      try {        Thread.sleep(5000);      } catch (Exception e) {};      it = threads.iterator();      while (it.hasNext()) {        if (((Thread)it.next()).isAlive()) cnt++;      }      if ((cnt > 0) && (LOG.isDebugEnabled())) {        LOG.debug("(" + cnt + " to retrieve)");      }    } while (cnt > 0);    for (int i = 0; i < keys.length; i++) {      List res = (List)results.get(keys[i][0]);      if (res != null && res.size() > 0) {        for (int k = 0; k < res.size(); k++) {          writer.write(keys[i][1]);          writer.write(res.get(k) + "\n");        }      }      writer.flush();    }  }    private List getMapRecords(Path dir, UTF8 key) throws Exception {    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());    ArrayList res = new ArrayList();    Class keyClass = readers[0].getKeyClass();    Class valueClass = readers[0].getValueClass();    if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))      throw new IOException("Incompatible key (" + keyClass.getName() + ")");    Writable value = (Writable)valueClass.newInstance();    // we don't know the partitioning schema    for (int i = 0; i < readers.length; i++) {      if (readers[i].get(key, value) != null)        res.add(value);      readers[i].close();    }    return res;  }  private List getSeqRecords(Path dir, UTF8 key) throws Exception {    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), dir);    ArrayList res = new ArrayList();    Class keyClass = readers[0].getKeyClass();    Class valueClass = readers[0].getValueClass();    if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))      throw new IOException("Incompatible key (" + keyClass.getName() + ")");    Writable aKey = (Writable)keyClass.newInstance();    Writable value = (Writable)valueClass.newInstance();    for (int i = 0; i < readers.length; i++) {      while (readers[i].next(aKey, value)) {        if (aKey.equals(key))          res.add(value);      }      readers[i].close();    }    return res;  }  public static class SegmentReaderStats {    public long start = -1L;    public long end = -1L;    public long generated = -1L;    public long fetched = -1L;    public long fetchErrors = -1L;    public long parsed = -1L;    public long parseErrors = -1L;  }    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");    public void list(List dirs, Writer writer) throws Exception {    writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");    for (int i = 0; i < dirs.size(); i++) {      Path dir = (Path)dirs.get(i);      SegmentReaderStats stats = new SegmentReaderStats();      getStats(dir, stats);      writer.write(dir.getName() + "\t");      if (stats.generated == -1) writer.write("?");      else writer.write(stats.generated + "");      writer.write("\t\t");      if (stats.start == -1) writer.write("?\t");      else writer.write(sdf.format(new Date(stats.start)));      writer.write("\t");      if (stats.end == -1) writer.write("?");      else writer.write(sdf.format(new Date(stats.end)));      writer.write("\t");      if (stats.fetched == -1) writer.write("?");      else writer.write(stats.fetched + "");      writer.write("\t");      if (stats.parsed == -1) writer.write("?");      else writer.write(stats.parsed + "");      writer.write("\n");      writer.flush();    }  }    public void getStats(Path segment, final SegmentReaderStats stats) throws Exception {    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME));    long cnt = 0L;    UTF8 key = new UTF8();    for (int i = 0; i < readers.length; i++) {      while (readers[i].next(key)) cnt++;      readers[i].close();    }    stats.generated = cnt;    Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);    if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {      cnt = 0L;      long start = Long.MAX_VALUE;      long end = Long.MIN_VALUE;      CrawlDatum value = new CrawlDatum();      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf());      for (int i = 0; i < mreaders.length; i++) {        while (mreaders[i].next(key, value)) {          cnt++;          if (value.getFetchTime() < start) start = value.getFetchTime();          if (value.getFetchTime() > end) end = value.getFetchTime();        }        mreaders[i].close();      }      stats.start = start;      stats.end = end;      stats.fetched = cnt;    }    Path parseDir = new Path(segment, ParseData.DIR_NAME);    if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {      cnt = 0L;      long errors = 0L;      ParseData value = new ParseData();      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf());      for (int i = 0; i < mreaders.length; i++) {        while (mreaders[i].next(key, value)) {          cnt++;          if (!value.getStatus().isSuccess()) errors++;        }        mreaders[i].close();      }      stats.parsed = cnt;      stats.parseErrors = errors;    }  }    private static final int MODE_DUMP = 0;  private static final int MODE_LIST = 1;  private static final int MODE_GET = 2;  public static void main(String[] args) throws Exception {    if (args.length < 2) {      usage();      return;    }    int mode = -1;    if (args[0].equals("-dump"))      mode = MODE_DUMP;    else if (args[0].equals("-list"))      mode = MODE_LIST;    else if (args[0].equals("-get")) mode = MODE_GET;    boolean co = true;    boolean fe = true;    boolean ge = true;    boolean pa = true;    boolean pd = true;    boolean pt = true;    // collect general options    for (int i = 1; i < args.length; i++) {      if (args[i].equals("-nocontent")) {        co = false;        args[i] = null;      } else if (args[i].equals("-nofetch")) {        fe = false;        args[i] = null;      } else if (args[i].equals("-nogenerate")) {        ge = false;        args[i] = null;      } else if (args[i].equals("-noparse")) {        pa = false;        args[i] = null;      } else if (args[i].equals("-noparsedata")) {        pd = false;        args[i] = null;      } else if (args[i].equals("-noparsetext")) {        pt = false;        args[i] = null;      }    }    Configuration conf = NutchConfiguration.create();    final FileSystem fs = FileSystem.get(conf);    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt);    // collect required args    switch (mode) {      case MODE_DUMP:        String input = args[1];        if (input == null) {          System.err.println("Missing required argument: <segment_dir>");          usage();          return;        }        String output = args.length > 2 ? args[2] : null;        if (output == null) {          System.err.println("Missing required argument: <output>");          usage();          return;        }        segmentReader.dump(new Path(input), new Path(output));        return;      case MODE_LIST:        ArrayList dirs = new ArrayList();        for (int i = 1; i < args.length; i++) {          if (args[i] == null) continue;          if (args[i].equals("-dir")) {            Path dir = new Path(args[++i]);            Path[] files = fs.listPaths(dir, new PathFilter() {              public boolean accept(Path pathname) {                try {                  if (fs.isDirectory(pathname)) return true;                } catch (IOException e) {};                return false;              }            });            if (files != null && files.length > 0) {              dirs.addAll(Arrays.asList(files));            }          } else dirs.add(new Path(args[i]));        }        segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));        return;      case MODE_GET:        input = args[1];        if (input == null) {          System.err.println("Missing required argument: <segment_dir>");          usage();          return;        }        String key = args.length > 2 ? args[2] : null;        if (key == null) {          System.err.println("Missing required argument: <keyValue>");          usage();          return;        }        segmentReader.get(new Path(input), new UTF8(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap());        return;      default:        System.err.println("Invalid operation: " + args[0]);        usage();        return;    }  }  private static void usage() {    System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n");    System.err.println("* General options:");    System.err.println("\t-nocontent\tignore content directory");    System.err.println("\t-nofetch\tignore crawl_fetch directory");    System.err.println("\t-nogenerate\tignore crawl_generate directory");    System.err.println("\t-noparse\tignore crawl_parse directory");    System.err.println("\t-noparsedata\tignore parse_data directory");    System.err.println("\t-noparsetext\tignore parse_text directory");    System.err.println();    System.err.println("* SegmentReader -dump <segment_dir> <output> [general options]");    System.err.println("  Dumps content of a <segment_dir> as a text file to <output>.\n");    System.err.println("\t<segment_dir>\tname of the segment directory.");    System.err.println("\t<output>\tname of the (non-existent) output directory.");    System.err.println();    System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]");    System.err.println("  List a synopsis of segments in specified directories, or all segments in");    System.err.println("  a directory <segments>, and print it on System.out\n");    System.err.println("\t<segment_dir1> ...\tlist of segment directories to process");    System.err.println("\t-dir <segments>\t\tdirectory that contains multiple segments");    System.err.println();    System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general options]");    System.err.println("  Get a specified record from a segment, and print it on System.out.\n");    System.err.println("\t<segment_dir>\tname of the segment directory.");    System.err.println("\t<keyValue>\tvalue of the key (url).");    System.err.println("\t\tNote: put double-quotes around strings with spaces.");  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -