⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mergesegments.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package net.nutch.util;

import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Vector;

import net.nutch.fs.FileUtil;
import net.nutch.fs.NutchFileSystem;
import net.nutch.io.MD5Hash;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;

import org.apache.log4j.Logger;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;

import net.nutch.indexer.IndexSegment;
import net.nutch.segment.SegmentReader;
import net.nutch.segment.SegmentWriter;

public class MergeSegments implements Runnable{
	  public static final Logger LOG = Logger.getLogger("segment");

	  /** Log progress update every LOG_STEP items. */
	  public static int LOG_STEP = 20000;
	  /** Temporary de-dup index size. Larger indexes tend to slow down indexing.
	   * Too many indexes slow down the subsequent index merging. It's a tradeoff value...
	   */
	  public static int INDEX_SIZE = 250000;
	  public static int INDEX_MERGE_FACTOR = 30;//30
	  public static int INDEX_MIN_MERGE_DOCS = 100;//100
	  
	  private NutchFileSystem nfs = null;
	  private File[] segments = null;
	  private int stage = SegmentMergeStatus.STAGE_OPENING;
	  private long totalRecords = 0L;
	  private long processedRecords = 0L;
	  private long start = 0L;
	  private long maxCount = Long.MAX_VALUE;
	  private File outputPath = null;
	  private String outSegment = null;
	  private String tempSegment = null;
	  private List segdirs = null;
	  private List allsegdirs = null;
	  private boolean runIndexer = false;
	  private boolean delSegs = false;
	  private HashMap readers = new HashMap();

	  /**
	   * Create a SegmentMergeTool.
	   * @param nfs filesystem
	   * @param segments list of input segments
	   * @param output output directory, where output segments will be created
	   * @param maxCount maximum number of records per output segment. If this
	   * value is 0, then the default value {@link Long#MAX_VALUE} is used.
	   * @param runIndexer run indexer on output segment(s)
	   * @param delSegs delete input segments when finished
	   * @throws Exception
	   */
	  public MergeSegments(NutchFileSystem nfs, File[] segments, File output, long maxCount, boolean runIndexer, boolean delSegs) throws IOException {
	    this.nfs = nfs;
	    this.segments = segments;
	    this.runIndexer = runIndexer;
	    this.delSegs = delSegs;
	    if (maxCount > 0) this.maxCount = maxCount;
	    allsegdirs = Arrays.asList(segments);
	    this.outputPath = output;
	    this.outSegment = SegmentWriter.getNewSegmentName();
	    this.tempSegment = outSegment;
	    /*
	    if (nfs.exists(output)) {
	    	if( !nfs.isDirectory(output) )
	    		throw new Exception("MergeSegment Error! " + output.toString() + " is not a path!");
	    	int i = 0;
	    	for(; i<segments.length; i++){
	    		if( output.compareTo(segments[i]) == 0 )
	    			break;
	    	}
	    	if (i < segments.length){//合并到其中的一个segment中
	    		File parent = output.getParentFile();
	    		tempSegment = SegmentWriter.getNewSegmentName();
	    	}
	    } else nfs.mkdirs(output);
	    */
	    if (!nfs.exists(output)){
	    	nfs.mkdirs(output);
	    }
	  }

	  public static class SegmentMergeStatus {
	    public static final int STAGE_OPENING     = 0;
	    public static final int STAGE_MASTERIDX   = 1;
	    public static final int STAGE_MERGEIDX    = 2;
	    public static final int STAGE_DEDUP       = 3;
	    public static final int STAGE_WRITING     = 4;
	    public static final int STAGE_INDEXING    = 5;
	    public static final int STAGE_DELETING    = 6;
	    public static final String[] stages = {
	            "opening input segments",
	            "creating master index",
	            "merging sub-indexes",
	            "deduplicating",
	            "writing output segment(s)",
	            "indexing output segment(s)",
	            "deleting input segments"
	    };
	    public int stage;
	    public File[] inputSegments;
	    public long startTime, curTime;
	    public long totalRecords;
	    public long processedRecords;
	    
	    public SegmentMergeStatus() {};
	    
	    public SegmentMergeStatus(int stage, File[] inputSegments, long startTime,
	            long totalRecords, long processedRecords) {
	      this.stage = stage;
	      this.inputSegments = inputSegments;
	      this.startTime = startTime;
	      this.curTime = System.currentTimeMillis();
	      this.totalRecords = totalRecords;
	      this.processedRecords = processedRecords;
	    }    
	  }
	  
	  public SegmentMergeStatus getStatus() {
	    SegmentMergeStatus status = new SegmentMergeStatus(stage, segments, start,
	            totalRecords, processedRecords);
	    return status;
	  }
	 // Return New Segment's Absolute Path
	  public String getNewSegmentName(){
	  	File newSegment = new File(outputPath,outSegment);
	  	return newSegment.getAbsolutePath();
	  }
	  
	  /** Run the tool, periodically reporting progress. */
	  public void run() {
	    start = System.currentTimeMillis();
	    stage = SegmentMergeStatus.STAGE_OPENING;
	    long delta;
	    LOG.info("* Opening " + allsegdirs.size() + " segments:");
	    try {
	      segdirs = new ArrayList();
	      // open all segments
	      for (int i = 0; i < allsegdirs.size(); i++) {
	        File dir = (File) allsegdirs.get(i);
	        SegmentReader sr = null;
	        try {
	          // try to autofix it if corrupted...
	          sr = new SegmentReader(nfs, dir, true);
	        } catch (Exception e) {
	          // this segment is hosed beyond repair, don't use it
	          continue;
	        }
	        segdirs.add(dir);
	        totalRecords += sr.size;
	        LOG.info(" - segment " + dir.getName() + ": " + sr.size + " records.");
	        readers.put(dir.getName(), sr);
	      }
	      long total = totalRecords;
	      LOG.info("* TOTAL " + total + " input records in " + segdirs.size() + " segments.");
	      LOG.info("* Creating master index...");
	      stage = SegmentMergeStatus.STAGE_MASTERIDX;
	      // XXX Note that Lucene indexes don't work with NutchFileSystem for now.
	      // XXX For now always assume LocalFileSystem here...
	      Vector masters = new Vector();
	      File fsmtIndexDir = new File(outputPath, ".fastmerge_index"+System.currentTimeMillis());
	      File masterDir = new File(fsmtIndexDir, "0");
	      if (!masterDir.mkdirs()) {
	        LOG.error("Could not create a master index dir: " + masterDir);
	        return;
	      }
	      masters.add(masterDir);
	      IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
	      iw.setUseCompoundFile(false);
	      iw.mergeFactor = INDEX_MERGE_FACTOR;
	      iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
	      long s1 = System.currentTimeMillis();
	      Iterator it = readers.values().iterator();
	      processedRecords = 0L;
	      delta = System.currentTimeMillis();
	      while (it.hasNext()) {
	        SegmentReader sr = (SegmentReader) it.next();
	        String name = sr.segmentDir.getName();
	        //FetcherOutput fo = new FetcherOutput();
	        ParseText parseText = new ParseText();
	        ParseData parseData = new ParseData();
	        for (long i = 0; i < sr.size; i++) {
	          try {
	            if (!sr.get(i, parseText, parseData)) break;
	            Properties prop = parseData.getMetadata();
	            
	            Document doc = new Document();
	            doc.add(new Field("sd", name + "|" + i, true, false, false));
	            //doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false));
	            //doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false));
	            String url = prop.getProperty("url");
	            if( url != null && url.length()>0 )
	            	doc.add(new Field("uh", MD5Hash.digest(url).toString(), true, true, false));
	            String md5 = prop.getProperty("digest");
	            if( md5!=null && md5.length()>0 )
	            	doc.add(new Field("ch", prop.getProperty("digest"), true, true, false));
	            //doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
	            //doc.add(new Field("time", prop.getProperty("pubTime"), true, false, false));
	            //doc.add(new Field("from",prop.getProperty("from"),true,false,false));
	            //doc.add(new Field("gid",prop.getProperty("gid"),true,false,false));
	            iw.addDocument(doc);
	            

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -