⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mergesegments.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	            processedRecords++;
	            if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
	              LOG.info(" Processed " + processedRecords + " records (" +
	                      (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
	              delta = System.currentTimeMillis();
	            }
	            if (processedRecords > 0 && (processedRecords % INDEX_SIZE == 0)) {
	              iw.optimize();
	              iw.close();
	              LOG.info(" - creating next subindex...");
	              masterDir = new File(fsmtIndexDir, "" + masters.size());
	              if (!masterDir.mkdirs()) {
	                LOG.error("Could not create a master index dir: " + masterDir);
	                return;
	              }
	              masters.add(masterDir);
	              iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
	              iw.setUseCompoundFile(false);
	              iw.mergeFactor = INDEX_MERGE_FACTOR;
	              iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
	            }
	          } catch (Throwable t) {
	            // we can assume the data is invalid from now on - break here
	            LOG.info(" - segment " + name + " truncated to " + (i + 1) + " records");
	            break;
	          }
	        }
	      }
	      iw.optimize();
	      LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
	      s1 = System.currentTimeMillis();
	      // merge all other indexes using the latest IndexWriter (still open):
	      if (masters.size() > 1) {
	        LOG.info(" - merging subindexes...");
	        stage = SegmentMergeStatus.STAGE_MERGEIDX;
	        IndexReader[] ireaders = new IndexReader[masters.size() - 1];
	        for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File)masters.get(i));
	        iw.addIndexes(ireaders);
	        for (int i = 0; i < masters.size() - 1; i++) {
	          ireaders[i].close();
	          FileUtil.fullyDelete((File)masters.get(i));
	        }
	      }
	      iw.close();
	      LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
	      /******************************************************
	      LOG.info("* Removing duplicate entries...");
	      stage = SegmentMergeStatus.STAGE_DEDUP;
	      IndexReader ir = IndexReader.open(masterDir);
	      int i = 0;
	      long cnt = 0L;
	      processedRecords = 0L;
	      s1 = System.currentTimeMillis();
	      delta = s1;
	      TermEnum te = ir.terms();
	      while(te.next()) {
	        Term t = te.term();
	        if (t == null) continue;
	        if (!(t.field().equals("ch") || t.field().equals("uh"))) continue;
	        cnt++;
	        processedRecords = cnt / 2;
	        if (cnt > 0 && (cnt % (LOG_STEP  * 2) == 0)) {
	          LOG.info(" Processed " + processedRecords + " records (" +
	                  (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
	          delta = System.currentTimeMillis();
	        }
	        // Enumerate all docs with the same URL hash or content hash
	        TermDocs td = ir.termDocs(t);
	        if (td == null) continue;
	        int id = -1;
	        String time = null;
	        Document doc = null;
	        // Keep only the latest version of the document with
	        // the same hash (url or content). Note: even if the content
	        // hash is identical, other metadata may be different, so even
	        // in this case it makes sense to keep the latest version.
	        while (td.next()) {
	          int docid = td.doc();
	          if (!ir.isDeleted(docid)) {
	            doc = ir.document(docid);
	            if (time == null) {
	              time = doc.get("time");
	              id = docid;
	              continue;
	            }
	            String dtime = doc.get("time");
	            // "time" is a DateField, and can be compared lexicographically
	            if (dtime.compareTo(time) > 0) {
	              if (id != -1) {
	                ir.delete(id);
	              }
	              time = dtime;
	              id = docid;
	            } else {
	              ir.delete(docid);
	            }
	          }
	        }
	      }
	      //
	      // keep the IndexReader open...
	      //
	      
	      LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
	      ************************************************************/
	      stage = SegmentMergeStatus.STAGE_WRITING;
	      
	      IndexReader ir = IndexReader.open(masterDir);
	      
	      processedRecords = 0L;

	      File outDir = new File(outputPath, tempSegment);
	      LOG.info("* Merging all segments into " + outDir.getName());
	      s1 = System.currentTimeMillis();
	      delta = s1;
	      nfs.mkdirs(outDir);
	      SegmentWriter sw = new SegmentWriter(nfs, outDir, true);
	      LOG.info(" - opening first output segment in " + outDir.getName());
	      //FetcherOutput fo = new FetcherOutput();
	      //Content co = new Content();
	      ParseText pt = new ParseText();
	      ParseData pd = new ParseData();
	      int outputCnt = 0;
	      for (int n = 0; n < ir.maxDoc(); n++) {
	        if (ir.isDeleted(n)) {
	          //System.out.println("-del");
	          continue;
	        }
	        Document doc = ir.document(n);
	        String segDoc = doc.get("sd");
	        int idx = segDoc.indexOf('|');
	        String segName = segDoc.substring(0, idx);
	        String docName = segDoc.substring(idx + 1);
	        SegmentReader sr = (SegmentReader) readers.get(segName);
	        long docid;
	        try {
	          docid = Long.parseLong(docName);
	        } catch (Exception e) {
	          continue;
	        }
	        try {
	          // get data from the reader
	          sr.get(docid, pt, pd);
	        } catch (Throwable thr) {
	          // don't break the loop, because only one of the segments
	          // may be corrupted...
	          LOG.warn(" - corrupt record no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping.");
	          continue;
	        }
	        sw.append(pt, pd);
	        outputCnt++;
	        processedRecords++;
	        if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
	          LOG.info(" Processed " + processedRecords + " records (" +
	                  (float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
	          delta = System.currentTimeMillis();
	        }
	      }
	      LOG.info("* Merging took " + (System.currentTimeMillis() - s1) + " ms");
	      ir.close();
	      sw.close();
	      FileUtil.fullyDelete(fsmtIndexDir);
	      for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) {
	        SegmentReader sr = (SegmentReader) readers.get(iter.next());
	        sr.close();
	      }
	      
	      if (!outSegment.equals(tempSegment)){
	      	File dir = new File(outputPath,outSegment);
	      	if( nfs.exists(dir) )
	      		FileUtil.fullyDelete(dir);
	      	nfs.rename(outDir,dir);
	      	outDir = dir;
	      }
	      /**************************/
	      if (runIndexer) {
//	      初始化分词
			//System.out.println("Init WordsSegment begin ...");
			//WordsSegment.InitWordsSegment(3);
			
	        stage = SegmentMergeStatus.STAGE_INDEXING;
	        LOG.info("* Creating new segment index(es)...");
	        File workingDir = new File(outputPath, "indexsegment-workingdir"+System.currentTimeMillis());
	        if (workingDir.exists()) {
	              FileUtil.fullyDelete(workingDir);
	        }
	        IndexSegment indexer = new IndexSegment(nfs, Integer.MAX_VALUE, outDir, workingDir);
	        indexer.indexPages();
	        FileUtil.fullyDelete(workingDir);
	        //WordsSegment.DestroyWordsSegment();
	      }
	      /*****************************/
	      if (delSegs) {
	        // This deletes also all corrupt segments, which are
	        // unusable anyway
	        stage = SegmentMergeStatus.STAGE_DELETING;
	        totalRecords = allsegdirs.size();
	        processedRecords = 0L;
	        LOG.info("* Deleting old segments...");
	        for (int k = 0; k < allsegdirs.size(); k++) {
	          processedRecords++;
	          File src = (File) allsegdirs.get(k);
	          FileUtil.fullyDelete(src);
	        }
	      }
	      delta = System.currentTimeMillis() - start;
	      float eps = (float) total / (float) (delta / 1000);
	      LOG.info("Finished SegmentMergeTool: INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
	              + ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
	    } catch (Exception e) {
	      e.printStackTrace();
	      LOG.error(e.getMessage());
	    }
	  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -