📄 distributedsearch.java
字号:
}
buffer.append(")");
LOG.info(buffer.toString());
}
}
/** The search server. */
public static class Server extends net.nutch.ipc.Server {
private SearchBean bean;
/** Construct a search server on the index and segments in the named
* directory, listening on the named port. */
public Server(String directory, int port) throws IOException {
super(port, Param.class, 15);
this.bean = new SearchBean(directory);
}
public Server(String[] directory, int port) throws IOException {
super(port, Param.class, 15);
this.bean = new SearchBean(directory);
}
public Writable call(Writable param) throws IOException {
Param p = (Param)param;
logRequest(p);
Writable value;
String mode;
switch (p.op) {
case SEGMENTS:
LOG.info(">>> Segments request");
value = new ArrayWritable(bean.getSegmentNames());
break;
case SEARCH:
/******** For Test time out only *********
try{
Thread.sleep(5*1000);
}catch(Exception e){};
******************************************/
value = bean.search((Query)p.first, ((IntWritable)p.second).get(),((IntWritable)p.third).get(),
((LongWritable)p.forth).get(),((LongWritable)p.fifth).get(), new String(((BytesWritable)p.sixth).get()));
LOG.info(">>> Hits length:" +((Hits)value).getLength());
break;
case EXPLAIN:
value = new UTF8(bean.getExplanation((Query)p.first, (Hit)p.second));
break;
case DETAILS:
value = bean.getDetails((Hit)p.first);
break;
case SUMMARY:
value = new BytesWritable(bean.getSummary((HitDetails)p.first,(Query)p.second, ((BooleanWritable)p.third).get()).getBytes());
break;
case SUMMARY_NEW:
value = new BytesWritable(bean.getSummaryNew((HitDetails)p.first, (Query)p.second, ((IntWritable)p.third).get()).getBytes());
break;
case CONTENT:
value = new BytesWritable(bean.getContent((HitDetails)p.first));
break;
case ANCHORS:
value = new ArrayWritable(bean.getAnchors((HitDetails)p.first));
break;
case PARSEDATA:
value = bean.getParseData((HitDetails)p.first);
break;
case PARSETEXT:
value = bean.getParseText((HitDetails)p.first);
break;
case FETCHDATE:
value = new LongWritable(bean.getFetchDate((HitDetails)p.first));
break;
case ADDSEGMENT:
String segmentName = new String(((BytesWritable)p.first).get());
mode = new String(((BytesWritable)p.second).get());
LOG.info("***********Add Segment:"+segmentName+"|"+mode);
value = new BooleanWritable(bean.addSegment(segmentName, mode));
LOG.info("***********Add Segment End["+((BooleanWritable)value).get());
break;
case MAXDOCS:
mode = new String(((BytesWritable)p.first).get());
value = new LongWritable(bean.getMaxDocs(mode));
break;
case DELSEGMENTS:
String segments = new String(((BytesWritable)p.first).get());
mode = new String(((BytesWritable)p.second).get());
System.out.println("***************del segments:"+segments+" mode:"+mode);
StringTokenizer st = new StringTokenizer(segments,"|");
String[] segmentNames= new String[st.countTokens()];
for( int i=0; i<segmentNames.length; i++)
segmentNames[i] = st.nextToken();
value = new BooleanWritable(bean.delSegments(segmentNames, mode));
break;
case MERGESEGMENTS:
String oldSegs = new String(((BytesWritable)p.first).get());
LOG.info("");
System.out.println("***************merge segments:"+oldSegs);
StringTokenizer token = new StringTokenizer(oldSegs,"|");
String[] old = new String[token.countTokens()];
for( int i=0; i<old.length; i++)
old[i] = token.nextToken();
String newSeg = new String(((BytesWritable)p.second).get());
mode = new String(((BytesWritable)p.third).get());
value = new BooleanWritable(bean.mergeSegments(old,newSeg, mode));
break;
case DELDOC:
int docID = ((Hit)p.first).getIndexDocNo();
System.out.println("***********Delete Doc:"+docID);
value = new BooleanWritable(bean.delDoc(docID));
break;
default:
throw new RuntimeException("Unknown op code: " + p.op);
}
return new Result(p.op, value);
}
private static void logRequest(Param p) {
StringBuffer buffer = new StringBuffer();
buffer.append(Thread.currentThread().getName());
buffer.append(": ");
buffer.append(NAMES[p.op]);
buffer.append("(");
if (p.first != NullWritable.get()) {
buffer.append(p.first);
if (p.second != NullWritable.get()) {
buffer.append(", ");
buffer.append(p.second);
}
}
buffer.append(")");
LOG.info(buffer.toString());
}
/** Runs a search server. */
public static void main(String[] args) throws Exception {
String usage = "DistributedSearch$Server <port> <index dir>";
/** Modified by DingZhenbo */
if (args.length < 2 ) {
System.err.println(usage);
System.exit(-1);
}
int port = 7000;
String[] servers = new String[0];
if (StringUtils.isNumeric(args[0])) {
port = Integer.parseInt(args[0]);
servers = args[1].split(" ");
} else {
System.err.println(usage);
System.exit(-1);
}
Server server = new Server(servers, port);
//server.setTimeout(Integer.MAX_VALUE);
server.start();
server.join();
}
}
/** The search client. */
public static class Client extends net.nutch.ipc.Client implements Searcher, HitDetailer, HitSummarizer, HitContent {
private static Logger TIME_OUT_LOG = Logger.getLogger("timeout");
private InetSocketAddress controller;
private Map<String, IndexServer> servers = new HashMap<String, IndexServer>();
private int limitNum = 1;
/**
* 控制器构造
*
* Construct a client talking to servers listed in the named file.
* Each line in the file lists a server hostname and port, separated by
* whitespace.
* @param regIndexFile 长筒服务器配置文件
* @param ramIndexFile 短桶服务器配置文件
*
*/
public Client(File regIndexFile, File ramIndexFile, File summayFile) throws IOException {
super(Result.class);
IndexServer server = null;
if (regIndexFile != null && regIndexFile.exists()) {
LOG.info("init REG index server");
server = new IndexServer();
initIndexServer(regIndexFile, server);
server.setModeFlag(MODE_REG);
if (server.getLength() > 0) {
servers.put(MODE_REG, server);
}
}
if (ramIndexFile != null && ramIndexFile.exists()) {
LOG.info("init RAM index server");
server = new IndexServer();
initIndexServer(ramIndexFile, server);
server.setModeFlag(MODE_RAM);
if (server.getLength() > 0) {
servers.put(MODE_RAM, server);
}
}
//
// if (summayFile != null && summayFile.exists()) {
// LOG.info("init SUMMARY index server");
// server = new IndexServer();
// initIndexServer(summayFile, server);
// server.setModeFlag(MODE_SUM);
// if (server.getLength() > 0) {
// servers.put(MODE_SUM, server);
// }
// }
}
/**
* For 搜索客户端 Only
*
* Modified by DingZhenbo 2007-09-17
*/
public Client(File searchConfigFile) throws IOException {
super(Result.class);
init(readConfig(searchConfigFile)); //搜索客户端
}
/**
* for index creater only
* @param address
* @throws IOException
*/
public Client(InetSocketAddress address) throws IOException{
super(Result.class);
init(address);
}
public void init(InetSocketAddress address ) {
this.controller = address;
}
// /**
// * for control center only
// */
//
// public void init(File idxFileReg, File idxFileOpt) throws IOException {
//
// BufferedReader reader;
//
// reader = new BufferedReader(new FileReader(idxFileReg));
// try {
// initIndexServer(reader, idxServerReg);
// } finally {
// reader.close();
// }
//
// // 设置优化搜索模式 DingZhenbo 2007-09-17
// reader = new BufferedReader(new FileReader(idxFileOpt));
// try {
// initIndexServer(reader, idxServerRam);
// } finally {
// reader.close();
// }
// //buildSegToAddrMap();
// }
/**
* Initialize index server
*
* @author DingZhenbo 2007-09-12
* @param reader
* @throws IOException
*/
private void initIndexServer(File file, IndexServer server) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(file));
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("#")) continue;
StringTokenizer tokens = new StringTokenizer(line," ");
InetSocketAddress[] servers = new InetSocketAddress[tokens.countTokens()];
int i = 0;
while (tokens.hasMoreTokens()) {
String serverAddress = tokens.nextToken();
StringTokenizer st = new StringTokenizer(serverAddress,":");
if (st.hasMoreTokens()){
String host = st.nextToken();
if (st.hasMoreTokens()) {
String port = st.nextToken();
servers[i++] = new InetSocketAddress(host, Integer.parseInt(port));
LOG.info("Client adding server " + host + ":" + port);
}
}
}
server.add(servers);
}
reader.close();
for (int i=0; i<server.getLength(); i++){
System.out.println("************" + i);
InetSocketAddress[] indexs = server.get(i);
for (int j=0; j<indexs.length; j++)
System.out.println(indexs[j].toString() + ":"+ server.getOnlineStat(indexs[j]));
}
}
private void initIndexServer(IndexServer indexServer, final String mode) throws IOException {
assert mode != null;
LOG.info(">>> initIndexServer, mode :"+mode);
Param param = new Param(INDEXS, new BytesWritable(mode.getBytes()));
Result result = (Result)call(param, controller);
if (result == null) {
LOG.warn("Client: no index server from: " + controller);
return;
}
String[] servers = ((ArrayWritable)result.value).toStrings();
for (int i=0; i<servers.length; i++){
LOG.info("Client: Index server "+ servers[i]);
StringTokenizer st = new StringTokenizer(servers[i]," ");
InetSocketAddress[] indexs = new InetSocketAddress[st.countTokens()];
int k = 0;
while (st.hasMoreTokens()){
StringTokenizer st2 = new StringTokenizer(st.nextToken(),":");
String host = st2.nextToken();
int port = Integer.parseInt(st2.nextToken());
boolean onLine = Boolean.valueOf(st2.nextToken()).booleanValue();
LOG.info("on line?"+onLine);
InetSocketAddress address = new InetSocketAddress(host,port);
indexs[k++] = address;
}
indexServer.add(indexs);
}
indexServer.setModeFlag(mode);
}
/**
* 读取控制器地址
* @param config
* @return
* @throws IOException
*/
private static InetSocketAddress[] readConfig(File config) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(config));
ArrayList addrs = new ArrayList();
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("#")) continue;
StringTokenizer tokens = new StringTokenizer(line);
if (tokens.hasMoreTokens()) {
String host = tokens.nextToken();
if (tokens.hasMoreTokens()) {
String port = tokens.nextToken();
addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
LOG.info("Client adding server " + host + ":" + port);
}
}
}
return (InetSocketAddress[])
addrs.toArray(new InetSocketAddress[addrs.size()]);
}
/** Construct a client talking to the named servers. */
public void init(InetSocketAddress[] addresses) throws IOException {
// 首先设置controller 在initIndexServer中会使用到的
this.controller = addresses[0];
// Modify by xie shuqiang. 2005-9-20
// 查询控制器得到所有Index Server
// Modified by DingZhenbo. 2007-09-18
IndexServer server = null;
server = new IndexServer();
initIndexServer(server, MODE_REG);
if (server.getLength() > 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -