⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fetcher.java

📁 爬虫数据的改进,并修正了一些bug
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            parseTextWriter.append(text);
            parseDataWriter.append(parseData);
          }
        }
      } catch (Throwable t) {
        LOG.severe("error writing output:" + t.toString());
      }
    }
                                       
  }
			
  public Fetcher(NutchFileSystem nfs, String directory, boolean parsing)
    throws IOException {

    this.parsing = parsing;

    // Set up in/out streams
    fetchList = new ArrayFile.Reader
      (nfs, new File(directory, FetchListEntry.DIR_NAME).toString());
    if (this.parsing) {
      fetcherWriter = new ArrayFile.Writer
        (nfs, new File(directory, FetcherOutput.DIR_NAME).toString(),
        FetcherOutput.class);
    } else {
      fetcherWriter = new ArrayFile.Writer
        (nfs, new File(directory, FetcherOutput.DIR_NAME_NP).toString(),
        FetcherOutput.class);
    }
    contentWriter = new ArrayFile.Writer
      (nfs, new File(directory, Content.DIR_NAME).toString(), Content.class);
    if (this.parsing) {
      parseTextWriter = new ArrayFile.Writer(nfs,
        new File(directory, ParseText.DIR_NAME).toString(), ParseText.class);
      parseDataWriter = new ArrayFile.Writer(nfs,
        new File(directory, ParseData.DIR_NAME).toString(), ParseData.class);
    }
    name = new File(directory).getName();
  }

  /** Set thread count */
  public void setThreadCount(int threadCount) {
    this.threadCount=threadCount;
  }

  /** Set the logging level. */
  public static void setLogLevel(Level level) {
    LOG.setLevel(level);
    PluginRepository.LOG.setLevel(level);
    ParserFactory.LOG.setLevel(level);
    LOG.info("logging at " + level);
  }

  /** Runs the fetcher. */
  public void run() throws IOException, InterruptedException {
    start = System.currentTimeMillis();
    for (int i = 0; i < threadCount; i++) {       // spawn threads
      FetcherThread thread = new FetcherThread(THREAD_GROUP_NAME+i); 
      thread.start();
    }

    // Quit monitoring if all FetcherThreads are gone.
    // There could still be other threads, which may well be runaway threads
    // started by external libs via FetcherThreads and it is generally safe
    // to ignore them because our main FetcherThreads have finished their jobs.
    // In fact we are a little more cautious here by making sure
    // there is no more outstanding page fetches via monitoring
    // changes of pages, errors and bytes.
    int pages0 = pages; int errors0 = errors; long bytes0 = bytes;
  
    while (true) {
      Thread.sleep(1000);

      if (LogFormatter.hasLoggedSevere()) 
        throw new RuntimeException("SEVERE error logged.  Exiting fetcher.");

      int n = group.activeCount();
      Thread[] list = new Thread[n];
      group.enumerate(list);
      boolean noMoreFetcherThread = true; // assumption
      for (int i = 0; i < n; i++) {
        // this thread may have gone away in the meantime
        if (list[i] == null) continue;
        String name = list[i].getName();
        if (name.startsWith(THREAD_GROUP_NAME)) // prove it
          noMoreFetcherThread = false;
        if (LOG.isLoggable(Level.FINE))
          LOG.fine(list[i].toString());
      }
      if (noMoreFetcherThread) {
        if (LOG.isLoggable(Level.FINE))
          LOG.fine("number of active threads: "+n);
        if (pages == pages0 && errors == errors0 && bytes == bytes0)
          break;
        status();
        pages0 = pages; errors0 = errors; bytes0 = bytes;
      }
    }

    fetchList.close();                            // close databases
    fetcherWriter.close();
    contentWriter.close();
    if (this.parsing) {
      parseTextWriter.close();
      parseDataWriter.close();
    }

  }
  
  public static class FetcherStatus {
    private String name;
    private long startTime, curTime;
    private int pageCount, errorCount;
    private long byteCount;
    
    /**
     * FetcherStatus encapsulates a snapshot of the Fetcher progress status.
     * @param name short name of the segment being processed
     * @param start the time in millisec. this fetcher was started
     * @param pages number of pages fetched
     * @param errors number of fetching errors
     * @param bytes number of bytes fetched
     */
    public FetcherStatus(String name, long start, int pages, int errors, long bytes) {
      this.name = name;
      this.startTime = start;
      this.curTime = System.currentTimeMillis();
      this.pageCount = pages;
      this.errorCount = errors;
      this.byteCount = bytes;
    }
    
    public String getName() {return name;}
    public long getStartTime() {return startTime;}
    public long getCurTime() {return curTime;}
    public long getElapsedTime() {return curTime - startTime;}
    public int getPageCount() {return pageCount;}
    public int getErrorCount() {return errorCount;}
    public long getByteCount() {return byteCount;}
    
    public String toString() {
      return "status: segment " + name + ", "
        + pageCount + " pages, "
        + errorCount + " errors, "
        + byteCount + " bytes, "
        + (curTime - startTime) + " ms";
    }
  }
  
  public synchronized FetcherStatus getStatus() {
    return new FetcherStatus(name, start, pages, errors, bytes);
  }

  /** Display the status of the fetcher run. */
  public synchronized void status() {
    FetcherStatus status = getStatus();
    LOG.info(status.toString());
    LOG.info("status: "
             + (((float)status.getPageCount())/(status.getElapsedTime()/1000.0f))+" pages/s, "
             + (((float)status.getByteCount()*8/1024)/(status.getElapsedTime()/1000.0f))+" kb/s, "
             + (((float)status.getByteCount())/status.getPageCount()) + " bytes/page");
  }

  /** Run the fetcher. */
  public static void main(String[] args) throws Exception {
    int threadCount = -1;
    long delay = -1;
    String logLevel = "info";
    boolean parsing = true;
    boolean showThreadID = false;
    String directory = null;

    String usage = "Usage: Fetcher (-local | -ndfs <namenode:port>) [-logLevel level] [-noParsing] [-showThreadID] [-threads n] <dir>";

    if (args.length == 0) {
      System.err.println(usage);
      System.exit(-1);
    }
      
    int i = 0;
    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i);
    for (; i < args.length; i++) {       // parse command line
      if (args[i] == null) {
          continue;
      } else if (args[i].equals("-threads")) {    // found -threads option
        threadCount =  Integer.parseInt(args[++i]);
      } else if (args[i].equals("-logLevel")) {
        logLevel = args[++i];
      } else if (args[i].equals("-noParsing")) {
        parsing = false;
      } else if (args[i].equals("-showThreadID")) {
        showThreadID = true;
      } else                                      // root is required parameter
        directory = args[i];
    }

    Fetcher fetcher = new Fetcher(nfs, directory, parsing);// make a Fetcher
    if (threadCount != -1) {                      // set threadCount option
      fetcher.setThreadCount(threadCount);
    }

    // set log level
    fetcher.setLogLevel(Level.parse(logLevel.toUpperCase()));

    if (showThreadID) {
      LogFormatter.setShowThreadIDs(showThreadID);
    }
    
    try {
      fetcher.run();                                // run the Fetcher
    } finally {
      nfs.close();
    }

  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -