📄 distributedsearch.java
字号:
/* Copyright (c) 2003 The Nutch Organization. All rights reserved. */
/* Use subject to the conditions in http://www.nutch.org/LICENSE.txt. */
package net.nutch.searcher;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.nutch.indexer.IndexSegment;
import net.nutch.io.ArrayWritable;
import net.nutch.io.BooleanWritable;
import net.nutch.io.BytesWritable;
import net.nutch.io.IntWritable;
import net.nutch.io.LongWritable;
import net.nutch.io.NullWritable;
import net.nutch.io.UTF8;
import net.nutch.io.Writable;
import net.nutch.parse.ParseData;
import net.nutch.parse.ParseText;
import net.nutch.util.MovePath;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/** Implements the search API over IPC connnections. */
public class DistributedSearch {
public static final String MODE_MIX = "MIX";
//** 短桶 Short Server
public static final String MODE_RAM = "RAM";
//** 长桶 Long Server
public static final String MODE_REG = "REG";
private DistributedSearch() {} // no public ctor
// op codes for IPC calls
private static final byte SEGMENTS = (byte)0;
private static final byte SEARCH = (byte)1;
private static final byte EXPLAIN = (byte)2;
private static final byte DETAILS = (byte)3;
private static final byte SUMMARY = (byte)4;
private static final byte CONTENT = (byte)5;
private static final byte ANCHORS = (byte)6;
private static final byte PARSEDATA = (byte)7;
private static final byte PARSETEXT = (byte)8;
private static final byte FETCHDATE = (byte)9;
private static final byte ADDSEGMENT = (byte)10;
private static final byte MAXDOCS = (byte)11;
private static final byte DELSEGMENTS = (byte)12;
private static final byte MERGESEGMENTS = (byte)13;
private static final byte DELDOC = (byte)14;
private static final byte INDEXS = (byte)15;
private static final byte FINDSEGMENT =(byte)16;
private static final byte SUMMARY_NEW = (byte)17;
private static final byte LIMITNUM = (byte)18;
/** Names of the op codes. */
private static final String[] NAMES = new String[19];
static {
NAMES[SEGMENTS] = "getSegmentNames";
NAMES[SEARCH] = "search";
NAMES[EXPLAIN] = "getExplanation";
NAMES[DETAILS] = "getDetails";
NAMES[SUMMARY] = "getSummary";
NAMES[CONTENT] = "getContent";
NAMES[ANCHORS] = "getAnchors";
NAMES[PARSEDATA] = "getParseData";
NAMES[PARSETEXT] = "getParseText";
NAMES[FETCHDATE] = "getFetchDate";
NAMES[ADDSEGMENT] = "addSegment";
NAMES[MAXDOCS] = "getMaxDocs";
NAMES[DELSEGMENTS] = "delSegments";
NAMES[MERGESEGMENTS] = "mergeSegments";
NAMES[DELDOC] = "delDoc";
NAMES[INDEXS] = "getIndexServers";
NAMES[FINDSEGMENT] = "findSegment";
NAMES[SUMMARY_NEW] = "getSummaryNew";
NAMES[LIMITNUM] = "getLimitNum";
}
/**
* The parameter passed with IPC requests. Public only so that {@link Server} can construct instances.
*
*/
public static class Param implements Writable {
private byte op; // the op code
private Writable first; // the first operand
private Writable second; // the second operand
private Writable third;
private Writable forth;
/**
* Add by liubin.2006-02-13
*/
private Writable fifth;
private Writable sixth;
//private Writable seventh;
public Param() {}
Param(byte op, Writable first) {
this(op, first, NullWritable.get());
}
Param(byte op, Writable first, Writable second) {
this(op, first, second, NullWritable.get());
}
/*
* Add By xie.
*/
Param(byte op, Writable first, Writable second, Writable third) {
this(op, first, second, third, NullWritable.get());
}
/**
* Add by liubin.2006-02-13.op = search
*/
Param(byte op, Writable first, Writable second, Writable third, Writable forth) {
this(op, first, second, third, forth, NullWritable.get());
}
/**
* Add by liubin 2006-02-13
*/
Param(byte op, Writable first, Writable second,Writable third,Writable forth, Writable fifth) {
this(op, first, second, third, forth, fifth, NullWritable.get());
}
Param(byte op, Writable first, Writable second,Writable third,Writable forth, Writable fifth, Writable sixth) {
this.op = op;
this.first = first;
this.second = second;
this.third = third;
this.forth = forth;
this.fifth = fifth;
this.sixth = sixth;
}
/**
* Add by liubin.2006-02-13. op = search
*/
/*
Param(byte op, Writable first, Writable second,Writable third,Writable forth,
Writable fifth, Writable sixth, Writable seventh) {
this.op = op;
this.first = first;
this.second = second;
this.third = third;
this.forth = forth;
this.fifth = fifth;
this.sixth = sixth;
this.seventh = seventh;
}
*/
public void write(DataOutput out) throws IOException {
out.writeByte(op);
first.write(out);
second.write(out);
third.write(out);
forth.write(out);
fifth.write(out);
sixth.write(out);
//seventh.write(out);
}
public void readFields(DataInput in) throws IOException {
op = in.readByte();
switch (op) {
case SEGMENTS:
first = NullWritable.get();
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
sixth = NullWritable.get();
break;
case MAXDOCS:
first = new BytesWritable(); //** Search mode flag added by DingZhenbo
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
sixth = NullWritable.get();
break;
case INDEXS:
first = new BytesWritable(); //** Search mode flag added by DingZhenbo
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case SEARCH:
first = new Query();
second = new IntWritable();
third = new IntWritable();
forth = new LongWritable();
fifth = new LongWritable();
sixth = new BytesWritable();
//seventh = new LongWritable();
break;
case EXPLAIN:
first = new Query();
second = new Hit();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case DETAILS:
case DELDOC:
first = new Hit();
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case SUMMARY:
first = new HitDetails();
second = new Query();
//third = NullWritable.get();
third = new BooleanWritable();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case SUMMARY_NEW:
first = new HitDetails();
second = new Query();
third = new IntWritable();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case CONTENT:
case ANCHORS:
case PARSEDATA:
case PARSETEXT:
case FETCHDATE:
first = new HitDetails();
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case ADDSEGMENT:
case DELSEGMENTS:
case FINDSEGMENT:
first = new BytesWritable();
second = new BytesWritable(); //** mode param
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case MERGESEGMENTS:
first = new BytesWritable();
second = new BytesWritable();
third = new BytesWritable(); //** mode param
forth = NullWritable.get();
fifth = NullWritable.get();
//sixth = NullWritable.get();
//seventh = NullWritable.get();
break;
case LIMITNUM:
first = NullWritable.get();
second = NullWritable.get();
third = NullWritable.get();
forth = NullWritable.get();
fifth = NullWritable.get();
break;
default:
throw new RuntimeException("Unknown op code: " + op);
}
first.readFields(in);
second.readFields(in);
third.readFields(in);
forth.readFields(in);
fifth.readFields(in);
//sixth.readFields(in);
//seventh.readFields(in);
}
}
/** The parameter returned with IPC responses. Public only so that {@link
* Client} can construct instances. */
public static class Result implements Writable {
private byte op;
private Writable value;
public Result() {}
Result(byte op, Writable value) {
this.op = op;
this.value = value;
}
public void write(DataOutput out) throws IOException {
out.writeByte(op);
value.write(out);
}
public void readFields(DataInput in) throws IOException {
op = in.readByte();
switch (op) {
case SEGMENTS:
case INDEXS:
case ANCHORS:
value = new ArrayWritable(UTF8.class);
break;
case SEARCH:
value = new Hits();
break;
case EXPLAIN:
value = new UTF8();
break;
case DETAILS:
value = new HitDetails();
break;
case SUMMARY:
case SUMMARY_NEW:
case CONTENT:
case FINDSEGMENT:
value = new BytesWritable();
break;
case PARSEDATA:
value = new ParseData();
break;
case PARSETEXT:
value = new ParseText();
break;
case FETCHDATE:
case MAXDOCS:
value = new LongWritable();
break;
case ADDSEGMENT:
case DELSEGMENTS:
case MERGESEGMENTS:
case DELDOC:
value = new BooleanWritable();
break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -