📄 distributedsearch.java
字号:
case LIMITNUM:
value = new IntWritable();
break;
default:
throw new RuntimeException("Unknown op code: " + op);
}
value.readFields(in);
}
}
/**
* The Control Center.
*
* 需要特别注意的是Control中的IndexServer和Client中IndexServer,在修改者(DingZhenbo)的眼里是等价的
* 不清除为什么原作者是使用的时候还区别对待了
* 由于优化的需要,全部通过参数中的mode改用Proxy替代。
*
*/
public static class Control extends net.nutch.ipc.Server {
private static Logger TASK_LOG = Logger.getLogger("tasklog");
private Client client;
//private ArrayList segments = new ArrayList();
public static final String[] SEGMENTS_PATH = {"current", "today", "week", "month", "year"};
//** 合并Segment的临时目录
private String merge_temp_path = "merge.temp";
//** 新Segment目录 启动的时候Controller会去检测
private String new_segment_path;
private Map<String, IndexServer> servers = new HashMap<String, IndexServer>();
//private int max_segments = 2;
private int taskID = 0;
private ConcurrentLinkedQueue<Task> taskList = new ConcurrentLinkedQueue<Task>();
class Task {
private int tid;
private byte opID;
private Param param;
public Task(int tid, byte op, Param param){
this.tid = tid;
this.opID = op;
this.param = param;
}
public byte getOp(){
return opID;
}
public int getID(){
return tid;
}
public Param getParam(){
return param;
}
}
class TaskQueue extends Thread {
public TaskQueue(){
this.setDaemon(true);
}
public String getHtml(String urlin) throws IOException {
BufferedReader in = null;
InputStream stream = null;
try{
URL url = new URL(urlin);
stream = url.openStream();
in = new BufferedReader(new InputStreamReader(stream));
String line;
StringBuffer output = new StringBuffer();
while ( (line = in.readLine()) != null) {
output.append(line);
}
in.close();
stream.close();
return output.toString();
}catch(Exception e){
LOG.warn(e.toString());
if ( in != null )
in.close();
if( stream != null )
stream.close();
}
return null;
}
//通知前台服务器切换状态
private boolean inform(String server, InetSocketAddress address, boolean line) throws IOException{
String urlStr =
"http://" + server + setStatServlet + "?" + hostParam + "=" + address.getAddress().getHostAddress() + "&" + portParam + "=" + address.getPort() + "&" + statParam + "=";
if (line)
urlStr += "true";
else
urlStr += "false";
LOG.info("*****"+ urlStr);
String result = getHtml(urlStr);
if (result == null || result.equalsIgnoreCase("false"))
return false;
return true;
}
//
// private String mergeSegments( File[] segments, File destPath ) throws IOException {
// MergeSegments merge = new MergeSegments(new LocalFileSystem(), segments,
// destPath, Long.MAX_VALUE, true, false);
// merge.run();
// return merge.getNewSegmentName();
// }
//
/*
* 检查目录
*/
private int selectIndexs(IndexServer server, String segmentName, String path){
int index = 0;
for (; index<server.getLength(); index++){
InetSocketAddress[] indexs = server.get(index);
boolean find = false;
for (InetSocketAddress address : indexs){
File segmentPath = new File(address.getAddress().getHostAddress() + "." +
address.getPort(), path);
File segmentFile = new File(segmentPath, segmentName);
if (segmentFile.exists() && segmentFile.isDirectory()){
find = true;
break;
}
}
if (find){
break;
}
}
if (index == server.getLength())
return -1;
return index;
}
private String[] getDirNamesByMode(String mode) {
String idxDirStr = ParseText.DIR_NAME+"|"+ParseData.DIR_NAME+"|";
if (mode.equals(MODE_MIX)) {
idxDirStr += IndexSegment.IDX_RAM_NAME + "|"+IndexSegment.IDX_REG_NAME;
}
else if (mode.equals(MODE_RAM)) {
idxDirStr += IndexSegment.IDX_RAM_NAME;
}
else if (mode.equals(MODE_REG)) {
idxDirStr += IndexSegment.IDX_REG_NAME;
}
return idxDirStr.split("[\\|]");
}
//** 修改了Segment的添加逻辑 modified by DingZhenbo 2007-09-19
//** 增加接受多个copy src 子目录
private boolean addSegmentStep1(InetSocketAddress[] indexs, File segmentFile, String mode) throws Exception{
String[] srcDirs = getDirNamesByMode(mode);
if (srcDirs.length == 0) return true;
File doneFile = new File(segmentFile, IndexSegment.IDX_DONE_NAME);
if (!doneFile.exists()) {
LOG.warn("Segment ["+segmentFile.getName() + "] not done. can not add to IndexServer");
return false;
}
List<File> srcFiles = new ArrayList<File>();
for (String dir : srcDirs) {
File srcFile = new File(segmentFile, dir);
if (srcFile.exists()) {
srcFiles.add(srcFile);
}
}
if (srcFiles.size() == 0) {
LOG.warn("No segment ["+segmentFile.getName()+"/] exist. can not add to IndexServer");
return false;
}
for (int i=0; i<indexs.length; i++){
InetSocketAddress address = indexs[i];
LOG.info("Copy new Segment to :"+ address.getAddress().getHostAddress() +
":" + address.getPort());
File destFile = new File(
address.getAddress().getHostAddress() + "." +address.getPort(),
SEGMENTS_PATH[0]);
if (!destFile.exists() || !destFile.isDirectory()){
LOG.warn("Index server:"+address.getAddress().getHostAddress() + ":" +
address.getPort() + " mounted path NOT find!");
return false;
}
//** 修正输出目录到Segment目录 modified by DingZhenbo 2007-09-19
destFile = new File(destFile, segmentFile.getName());
if (!destFile.exists() && !destFile.isDirectory()) {
destFile.mkdir();
}
// move src files
for (File srcFile : srcFiles) {
File checker = new File(destFile, srcFile.getName());
if (!checker.exists()) {
MovePath.movePath(srcFile, destFile, false);
LOG.info("Copy ["+srcFile.getCanonicalPath()+"] to ["+destFile.getCanonicalPath()+"]" );
}
}
MovePath.movePath(doneFile ,destFile, false);
}
//delete index dir note: 这个写法是个trick, 暂时的
if (srcFiles.size() == 3) {
MovePath.delete(srcFiles.get(2));
}
// 查看segment 目录下是否还有其他的文件 如果只剩下index.done文件就把这个目录删除
String[] fs = segmentFile.list();
//如果只剩下index.done parse_text parse_data 删除
if (mode.equals(DistributedSearch.MODE_MIX) || (fs.length < 4)) {
MovePath.delete(segmentFile);
}
return true;
}
private boolean addSegmentStep2(IndexServer server, InetSocketAddress[] indexs, String segmentName, String mode) throws Exception{
for (int i=0; i<indexs.length; i++){
InetSocketAddress address = indexs[i];
//通知前台服务器切换(offline)
server.setOnlineStat(address, false);
LOG.info("Inform front server change indexserver status(offline)");
for (int j=0; j<frontServer.length; j++){
if (!inform(frontServer[j],address,false)){
LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" +
address.getPort() + " offline on frontserver:"+ frontServer[j]);
return false;
}
}
String host = address.getAddress().getHostAddress();
String port = String.valueOf(address.getPort());
// 通知下线服务器添加新段
LOG.info("Inform offline server:"+ host + ":" + port +" add segment");
File segment = new File(segmentName);
server.addSegment(segment.getName(), address);
if (!client.addSegment(segmentName, address, mode)){
LOG.warn("Add Segment:"+ segmentName + " to "+ host + ":"+ port + " Error!");
return false;
}
//通知前台服务器切换(online)
server.setOnlineStat(address, true);
LOG.info("Inform front server change indexserver status(online)");
for (int j=0; j<frontServer.length; j++){
if (!inform(frontServer[j],address,true)){
LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" +
address.getPort() + " online on frontserver:"+ frontServer[j]);
return false;
}
}
}
return true;
}
// private boolean addSegment(Param param) throws Exception {
//
// boolean result = true;
//
// String segmentName = new String(((BytesWritable)param.first).get());
// String mode = new String(((BytesWritable)param.second).get());
//
// if (mode.equals(MODE_MIX)) {
// boolean b = false;
// b = addSegment(segmentName, MODE_REG);
// result = result && b;
// if (!b) {
// LOG.warn("addSegment "+segmentName+" in mode [REG] error.");
// }
// b = addSegment(segmentName, MODE_RAM);
// result = result && b;
// if (!b) {
// LOG.warn("addSegment "+segmentName+" in mode [RAM] error.");
// }
// b = addSegment(segmentName, MODE_SUM);
// result = result && b;
// if (!b) {
// LOG.warn("addSegment "+segmentName+" in mode [SUM] error.");
// }
// } else {
// result = result && addSegment(segmentName, mode);
// if (!result) {
// LOG.warn("addSegment "+segmentName+" ["+mode+"] error.");
// }
// }
//
// return result;
// }
private boolean addSegment(Param param) throws Exception {
String segmentName = new String(((BytesWritable)param.first).get());
String mode = new String(((BytesWritable)param.second).get());
IndexServer server = null;
//如果是MIX模式就使用REG的服务器作addSegment
if (mode.equals(MODE_MIX)) {
server = servers.get(MODE_REG);
} else {
server = servers.get(mode);
}
//如果index server没有分配实际地址就直接返回true
if (server == null) {
LOG.info("There are no index server in mode["+mode+"]");
return true;
}
int index = 0;
File segmentFile = new File(new_segment_path, segmentName);
if (!segmentFile.exists()){
index = selectIndexs(server, segmentName,SEGMENTS_PATH[0]);
if (index < 0)
return true;
else{
if (!addSegmentStep2(server, server.get(index), segmentName, mode))
return false;
}
} else {
index = selectIndexs(server, segmentName, SEGMENTS_PATH[0]);
if (index < 0){
long minDocs = server.maxDocs[0];
for(int i=0; i<server.maxDocs.length; i++){
if (minDocs >= server.maxDocs[i]){
minDocs = server.maxDocs[i];
index = i;
}
}
}
//System.out.println("index = " + index);
if (!addSegmentStep1(server.get(index), segmentFile, mode))
return false;
if (!addSegmentStep2(server, server.get(index), segmentName, mode))
return false;
}
// //如果是摘要服务器就不统计doc数量
// if (mode.equals(MODE_SUM)) return true;
//修改文档数
long docs = 0;
try{
docs = getDocs(server.get(index), mode);
if (docs < 0) {
LOG.info("Invalid docs number : "+docs);
return false;
}
docs -= server.maxDocs[index];
}catch(Exception e){
LOG.info(e.getMessage());
return false;
}
LOG.info("add "+ docs + " docs........");
server.maxDocs[index] += docs;
return true;
}
private boolean mergeSegments(Param param) throws Exception {
if (param == null) return true;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -