📄 mergesegments.java
字号:
package net.nutch.util;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
import net.nutch.fs.FileUtil;
import net.nutch.fs.NutchFileSystem;
import net.nutch.io.MD5Hash;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import net.nutch.indexer.IndexSegment;
import net.nutch.segment.SegmentReader;
import net.nutch.segment.SegmentWriter;
public class MergeSegments implements Runnable{
public static final Logger LOG = Logger.getLogger("segment");
/** Log progress update every LOG_STEP items. */
public static int LOG_STEP = 20000;
/** Temporary de-dup index size. Larger indexes tend to slow down indexing.
* Too many indexes slow down the subsequent index merging. It's a tradeoff value...
*/
public static int INDEX_SIZE = 250000;
public static int INDEX_MERGE_FACTOR = 30;//30
public static int INDEX_MIN_MERGE_DOCS = 100;//100
private NutchFileSystem nfs = null;
private File[] segments = null;
private int stage = SegmentMergeStatus.STAGE_OPENING;
private long totalRecords = 0L;
private long processedRecords = 0L;
private long start = 0L;
private long maxCount = Long.MAX_VALUE;
private File outputPath = null;
private String outSegment = null;
private String tempSegment = null;
private List segdirs = null;
private List allsegdirs = null;
private boolean runIndexer = false;
private boolean delSegs = false;
private HashMap readers = new HashMap();
/**
* Create a SegmentMergeTool.
* @param nfs filesystem
* @param segments list of input segments
* @param output output directory, where output segments will be created
* @param maxCount maximum number of records per output segment. If this
* value is 0, then the default value {@link Long#MAX_VALUE} is used.
* @param runIndexer run indexer on output segment(s)
* @param delSegs delete input segments when finished
* @throws Exception
*/
public MergeSegments(NutchFileSystem nfs, File[] segments, File output, long maxCount, boolean runIndexer, boolean delSegs) throws IOException {
this.nfs = nfs;
this.segments = segments;
this.runIndexer = runIndexer;
this.delSegs = delSegs;
if (maxCount > 0) this.maxCount = maxCount;
allsegdirs = Arrays.asList(segments);
this.outputPath = output;
this.outSegment = SegmentWriter.getNewSegmentName();
this.tempSegment = outSegment;
/*
if (nfs.exists(output)) {
if( !nfs.isDirectory(output) )
throw new Exception("MergeSegment Error! " + output.toString() + " is not a path!");
int i = 0;
for(; i<segments.length; i++){
if( output.compareTo(segments[i]) == 0 )
break;
}
if (i < segments.length){//合并到其中的一个segment中
File parent = output.getParentFile();
tempSegment = SegmentWriter.getNewSegmentName();
}
} else nfs.mkdirs(output);
*/
if (!nfs.exists(output)){
nfs.mkdirs(output);
}
}
public static class SegmentMergeStatus {
public static final int STAGE_OPENING = 0;
public static final int STAGE_MASTERIDX = 1;
public static final int STAGE_MERGEIDX = 2;
public static final int STAGE_DEDUP = 3;
public static final int STAGE_WRITING = 4;
public static final int STAGE_INDEXING = 5;
public static final int STAGE_DELETING = 6;
public static final String[] stages = {
"opening input segments",
"creating master index",
"merging sub-indexes",
"deduplicating",
"writing output segment(s)",
"indexing output segment(s)",
"deleting input segments"
};
public int stage;
public File[] inputSegments;
public long startTime, curTime;
public long totalRecords;
public long processedRecords;
public SegmentMergeStatus() {};
public SegmentMergeStatus(int stage, File[] inputSegments, long startTime,
long totalRecords, long processedRecords) {
this.stage = stage;
this.inputSegments = inputSegments;
this.startTime = startTime;
this.curTime = System.currentTimeMillis();
this.totalRecords = totalRecords;
this.processedRecords = processedRecords;
}
}
public SegmentMergeStatus getStatus() {
SegmentMergeStatus status = new SegmentMergeStatus(stage, segments, start,
totalRecords, processedRecords);
return status;
}
// Return New Segment's Absolute Path
public String getNewSegmentName(){
File newSegment = new File(outputPath,outSegment);
return newSegment.getAbsolutePath();
}
/** Run the tool, periodically reporting progress. */
public void run() {
start = System.currentTimeMillis();
stage = SegmentMergeStatus.STAGE_OPENING;
long delta;
LOG.info("* Opening " + allsegdirs.size() + " segments:");
try {
segdirs = new ArrayList();
// open all segments
for (int i = 0; i < allsegdirs.size(); i++) {
File dir = (File) allsegdirs.get(i);
SegmentReader sr = null;
try {
// try to autofix it if corrupted...
sr = new SegmentReader(nfs, dir, true);
} catch (Exception e) {
// this segment is hosed beyond repair, don't use it
continue;
}
segdirs.add(dir);
totalRecords += sr.size;
LOG.info(" - segment " + dir.getName() + ": " + sr.size + " records.");
readers.put(dir.getName(), sr);
}
long total = totalRecords;
LOG.info("* TOTAL " + total + " input records in " + segdirs.size() + " segments.");
LOG.info("* Creating master index...");
stage = SegmentMergeStatus.STAGE_MASTERIDX;
// XXX Note that Lucene indexes don't work with NutchFileSystem for now.
// XXX For now always assume LocalFileSystem here...
Vector masters = new Vector();
File fsmtIndexDir = new File(outputPath, ".fastmerge_index"+System.currentTimeMillis());
File masterDir = new File(fsmtIndexDir, "0");
if (!masterDir.mkdirs()) {
LOG.error("Could not create a master index dir: " + masterDir);
return;
}
masters.add(masterDir);
IndexWriter iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
iw.setUseCompoundFile(false);
iw.mergeFactor = INDEX_MERGE_FACTOR;
iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
long s1 = System.currentTimeMillis();
Iterator it = readers.values().iterator();
processedRecords = 0L;
delta = System.currentTimeMillis();
while (it.hasNext()) {
SegmentReader sr = (SegmentReader) it.next();
String name = sr.segmentDir.getName();
//FetcherOutput fo = new FetcherOutput();
ParseText parseText = new ParseText();
ParseData parseData = new ParseData();
for (long i = 0; i < sr.size; i++) {
try {
if (!sr.get(i, parseText, parseData)) break;
Properties prop = parseData.getMetadata();
Document doc = new Document();
doc.add(new Field("sd", name + "|" + i, true, false, false));
//doc.add(new Field("uh", MD5Hash.digest(fo.getUrl().toString()).toString(), true, true, false));
//doc.add(new Field("ch", fo.getMD5Hash().toString(), true, true, false));
String url = prop.getProperty("url");
if( url != null && url.length()>0 )
doc.add(new Field("uh", MD5Hash.digest(url).toString(), true, true, false));
String md5 = prop.getProperty("digest");
if( md5!=null && md5.length()>0 )
doc.add(new Field("ch", prop.getProperty("digest"), true, true, false));
//doc.add(new Field("time", DateField.timeToString(fo.getFetchDate()), true, false, false));
//doc.add(new Field("time", prop.getProperty("pubTime"), true, false, false));
//doc.add(new Field("from",prop.getProperty("from"),true,false,false));
//doc.add(new Field("gid",prop.getProperty("gid"),true,false,false));
iw.addDocument(doc);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -