📄 mergesegments.java
字号:
processedRecords++;
if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
LOG.info(" Processed " + processedRecords + " records (" +
(float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
delta = System.currentTimeMillis();
}
if (processedRecords > 0 && (processedRecords % INDEX_SIZE == 0)) {
iw.optimize();
iw.close();
LOG.info(" - creating next subindex...");
masterDir = new File(fsmtIndexDir, "" + masters.size());
if (!masterDir.mkdirs()) {
LOG.error("Could not create a master index dir: " + masterDir);
return;
}
masters.add(masterDir);
iw = new IndexWriter(masterDir, new WhitespaceAnalyzer(), true);
iw.setUseCompoundFile(false);
iw.mergeFactor = INDEX_MERGE_FACTOR;
iw.minMergeDocs = INDEX_MIN_MERGE_DOCS;
}
} catch (Throwable t) {
// we can assume the data is invalid from now on - break here
LOG.info(" - segment " + name + " truncated to " + (i + 1) + " records");
break;
}
}
}
iw.optimize();
LOG.info("* Creating index took " + (System.currentTimeMillis() - s1) + " ms");
s1 = System.currentTimeMillis();
// merge all other indexes using the latest IndexWriter (still open):
if (masters.size() > 1) {
LOG.info(" - merging subindexes...");
stage = SegmentMergeStatus.STAGE_MERGEIDX;
IndexReader[] ireaders = new IndexReader[masters.size() - 1];
for (int i = 0; i < masters.size() - 1; i++) ireaders[i] = IndexReader.open((File)masters.get(i));
iw.addIndexes(ireaders);
for (int i = 0; i < masters.size() - 1; i++) {
ireaders[i].close();
FileUtil.fullyDelete((File)masters.get(i));
}
}
iw.close();
LOG.info("* Optimizing index took " + (System.currentTimeMillis() - s1) + " ms");
/******************************************************
LOG.info("* Removing duplicate entries...");
stage = SegmentMergeStatus.STAGE_DEDUP;
IndexReader ir = IndexReader.open(masterDir);
int i = 0;
long cnt = 0L;
processedRecords = 0L;
s1 = System.currentTimeMillis();
delta = s1;
TermEnum te = ir.terms();
while(te.next()) {
Term t = te.term();
if (t == null) continue;
if (!(t.field().equals("ch") || t.field().equals("uh"))) continue;
cnt++;
processedRecords = cnt / 2;
if (cnt > 0 && (cnt % (LOG_STEP * 2) == 0)) {
LOG.info(" Processed " + processedRecords + " records (" +
(float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
delta = System.currentTimeMillis();
}
// Enumerate all docs with the same URL hash or content hash
TermDocs td = ir.termDocs(t);
if (td == null) continue;
int id = -1;
String time = null;
Document doc = null;
// Keep only the latest version of the document with
// the same hash (url or content). Note: even if the content
// hash is identical, other metadata may be different, so even
// in this case it makes sense to keep the latest version.
while (td.next()) {
int docid = td.doc();
if (!ir.isDeleted(docid)) {
doc = ir.document(docid);
if (time == null) {
time = doc.get("time");
id = docid;
continue;
}
String dtime = doc.get("time");
// "time" is a DateField, and can be compared lexicographically
if (dtime.compareTo(time) > 0) {
if (id != -1) {
ir.delete(id);
}
time = dtime;
id = docid;
} else {
ir.delete(docid);
}
}
}
}
//
// keep the IndexReader open...
//
LOG.info("* Deduplicating took " + (System.currentTimeMillis() - s1) + " ms");
************************************************************/
stage = SegmentMergeStatus.STAGE_WRITING;
IndexReader ir = IndexReader.open(masterDir);
processedRecords = 0L;
File outDir = new File(outputPath, tempSegment);
LOG.info("* Merging all segments into " + outDir.getName());
s1 = System.currentTimeMillis();
delta = s1;
nfs.mkdirs(outDir);
SegmentWriter sw = new SegmentWriter(nfs, outDir, true);
LOG.info(" - opening first output segment in " + outDir.getName());
//FetcherOutput fo = new FetcherOutput();
//Content co = new Content();
ParseText pt = new ParseText();
ParseData pd = new ParseData();
int outputCnt = 0;
for (int n = 0; n < ir.maxDoc(); n++) {
if (ir.isDeleted(n)) {
//System.out.println("-del");
continue;
}
Document doc = ir.document(n);
String segDoc = doc.get("sd");
int idx = segDoc.indexOf('|');
String segName = segDoc.substring(0, idx);
String docName = segDoc.substring(idx + 1);
SegmentReader sr = (SegmentReader) readers.get(segName);
long docid;
try {
docid = Long.parseLong(docName);
} catch (Exception e) {
continue;
}
try {
// get data from the reader
sr.get(docid, pt, pd);
} catch (Throwable thr) {
// don't break the loop, because only one of the segments
// may be corrupted...
LOG.warn(" - corrupt record no. " + docid + " in segment " + sr.segmentDir.getName() + " - skipping.");
continue;
}
sw.append(pt, pd);
outputCnt++;
processedRecords++;
if (processedRecords > 0 && (processedRecords % LOG_STEP == 0)) {
LOG.info(" Processed " + processedRecords + " records (" +
(float)(LOG_STEP * 1000)/(float)(System.currentTimeMillis() - delta) + " rec/s)");
delta = System.currentTimeMillis();
}
}
LOG.info("* Merging took " + (System.currentTimeMillis() - s1) + " ms");
ir.close();
sw.close();
FileUtil.fullyDelete(fsmtIndexDir);
for (Iterator iter = readers.keySet().iterator(); iter.hasNext();) {
SegmentReader sr = (SegmentReader) readers.get(iter.next());
sr.close();
}
if (!outSegment.equals(tempSegment)){
File dir = new File(outputPath,outSegment);
if( nfs.exists(dir) )
FileUtil.fullyDelete(dir);
nfs.rename(outDir,dir);
outDir = dir;
}
/**************************/
if (runIndexer) {
// 初始化分词
//System.out.println("Init WordsSegment begin ...");
//WordsSegment.InitWordsSegment(3);
stage = SegmentMergeStatus.STAGE_INDEXING;
LOG.info("* Creating new segment index(es)...");
File workingDir = new File(outputPath, "indexsegment-workingdir"+System.currentTimeMillis());
if (workingDir.exists()) {
FileUtil.fullyDelete(workingDir);
}
IndexSegment indexer = new IndexSegment(nfs, Integer.MAX_VALUE, outDir, workingDir);
indexer.indexPages();
FileUtil.fullyDelete(workingDir);
//WordsSegment.DestroyWordsSegment();
}
/*****************************/
if (delSegs) {
// This deletes also all corrupt segments, which are
// unusable anyway
stage = SegmentMergeStatus.STAGE_DELETING;
totalRecords = allsegdirs.size();
processedRecords = 0L;
LOG.info("* Deleting old segments...");
for (int k = 0; k < allsegdirs.size(); k++) {
processedRecords++;
File src = (File) allsegdirs.get(k);
FileUtil.fullyDelete(src);
}
}
delta = System.currentTimeMillis() - start;
float eps = (float) total / (float) (delta / 1000);
LOG.info("Finished SegmentMergeTool: INPUT: " + total + " -> OUTPUT: " + outputCnt + " entries in "
+ ((float) delta / 1000f) + " s (" + eps + " entries/sec).");
} catch (Exception e) {
e.printStackTrace();
LOG.error(e.getMessage());
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -