📄 distributedsearch.java
字号:
//检查要合并段的服务器
String oldSegmentsName = new String(((BytesWritable)param.first).get());
if (oldSegmentsName == null || oldSegmentsName.length() == 0)
return true;
String newSegment = new String(((BytesWritable)param.second).get());
if (newSegment == null || newSegment.length() == 0)
return true;
String mode = new String(((BytesWritable)param.third).get());
File segmentFile = new File(newSegment);
boolean result = true;
if (mode.equals(MODE_MIX)) {
// boolean b = false;
//如果是MIX状态就使用常规服务器的地址 因为长桶短桶的Segment是一致的
result = merge(oldSegmentsName, segmentFile, servers.get(MODE_REG), client.servers.get(MODE_REG), mode);
// if (!b) {
// LOG.warn("Merge in [RAM] mode failure");
// }
// result = result && b;
// b = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), MODE_REG);
// if (!b) {
// LOG.warn("Merge in [REG] mode failure");
// }
// result = result && b;
// b = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), MODE_SUM);
//
// if (!b) {
// LOG.warn("Merge in [SUM] mode failure");
// }
// result = result && b;
if (!result) {
LOG.warn("Merge in mode["+mode+"] failure");
}
}
else {
result = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), mode);
if (!result) {
LOG.warn("Merge in mode["+mode+"] failure");
}
}
return result;
}
private boolean merge(String oldSegmentsName, File segmentFile,
IndexServer proxy_control, IndexServer proxy_client, final String mode)
throws IOException {
String[] srcDirs = getDirNamesByMode(mode);
//不是valid mode值直接返回true
if (srcDirs.length == 0) return true;
StringTokenizer token = new StringTokenizer(oldSegmentsName,"|");
ArrayList<String> oldList = new ArrayList<String>();
String oldSegmentsPath = null;
while(token.hasMoreTokens()){
File oldSegmentFile = new File(token.nextToken());
if (oldSegmentsPath == null){
oldSegmentsPath = oldSegmentFile.getParent();
}
oldList.add(oldSegmentFile.getName());
}
if (oldSegmentsPath == null || oldList.size() == 0)
return true;
String[] oldSegments = (String[])oldList.toArray(new String[oldList.size()]);
oldList.clear();
int i_SEGENTS_PATH = -1;
for (int i=0; i<SEGMENTS_PATH.length; i++){
if (oldSegmentsPath.endsWith(SEGMENTS_PATH[i])){
i_SEGENTS_PATH = i;
break;
}
}
InetSocketAddress[] indexs =
(InetSocketAddress[])proxy_client.getSegment(oldSegments[0]);
if (indexs == null){
int i = selectIndexs(proxy_control, segmentFile.getName(),SEGMENTS_PATH[i_SEGENTS_PATH+1]);
if (i < 0){
selectIndexs(proxy_control, oldSegments[0],SEGMENTS_PATH[i_SEGENTS_PATH]);
}
if (i < 0)
return true;
indexs = proxy_control.get(i);
}
/*
for (int i=0; i<indexs.length; i++){
if (!addresses.getStat(indexs[i])){
LOG.warn("TaskQueue: addSegment()" + indexs[i].getAddress().getHostAddress() +
":" + indexs[i].getPort() + " is offline!");
return false;
}
}
*/
//拷贝合并的新段到索引服务器
//Modified by dingzhenbo
if (! (segmentFile.exists() && segmentFile.isDirectory())){
LOG.warn("Segment ["+segmentFile.getName()+"] not valid. can not add to IndexServer");
return false;
}
File doneFile = new File(segmentFile, IndexSegment.IDX_DONE_NAME);
if (!doneFile.exists()) {
LOG.warn("Segment ["+segmentFile.getName()+"] not done. can not add to IndexServer");
return false;
}
List<File> srcFiles = new ArrayList<File>();
for (String dir : srcDirs) {
File src = new File(segmentFile, dir);
if (src.exists()) {
srcFiles.add(src);
}
}
if (srcFiles.size() == 0) {
LOG.warn("Source semengt ["+segmentFile.getCanonicalPath()+"] not valid, can not merge!");
return false;
}
for (int i=0; i<indexs.length; i++){
InetSocketAddress address = indexs[i];
LOG.info("Copy merged Segment to :"+ address.getAddress().getHostAddress() +
":" + address.getPort());
//** {current today week ...}
File destFile = new File(address.getAddress().getHostAddress() + "." +
address.getPort(), SEGMENTS_PATH[i_SEGENTS_PATH + 1]);
if (!destFile.exists() || !destFile.isDirectory()){
LOG.warn("Index server:"+address.getAddress().getHostAddress() +
":" + address.getPort() + " mounted path NOT find!");
return false;
}
//如果没有新增的段就创建出来 by dingzhenbo
File newDestFile = new File(destFile, segmentFile.getName());
if (!newDestFile.exists()) {
newDestFile.mkdir();
}
for (File src : srcFiles) {
MovePath.movePath(src,newDestFile,false);
}
// done file
MovePath.movePath(doneFile, newDestFile, false);
}
MovePath.delete(segmentFile);
for (int i=0; i<indexs.length; i++){
InetSocketAddress address = indexs[i];
proxy_control.setOnlineStat( address, false );
//通知前台服务器切换(offline)
LOG.info("Inform front server change indexserver status(offline)");
for (int j=0; j<frontServer.length; j++){
if (!inform(frontServer[j],address,false)){
LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" +
address.getPort() + " offline on frontserver:"+ frontServer[j]);
return false;
}
}
//addresses.setStat(address,false);
String host = address.getAddress().getHostAddress();
String port = String.valueOf(address.getPort());
//通知下线服务器段合并
LOG.info("Inform offline server:"+ host + ":" + port +" merge segments:" +
oldSegmentsName + " to "+ segmentFile.getName());
if (!client.mergeSegments(oldSegments, segmentFile.getName(), host, port)){
LOG.warn("Merge Segments to :" + segmentFile.getName() +" on "+ host +
":"+ port + " Error!");
return false;
}
//删除旧的段
for (int j=0; j<oldSegments.length; j++){
File delSegment = new File(host + "." + port + File.separator + SEGMENTS_PATH[i_SEGENTS_PATH],
oldSegments[j]);
MovePath.delete(delSegment);
}
proxy_control.setOnlineStat(address, true);
//通知前台服务器切换(online)
LOG.info("Inform front server change indexserver status(online)");
for (int j=0; j<frontServer.length; j++){
if (!inform(frontServer[j],address,true)){
LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" +
address.getPort() + " offline on frontserver:"+ frontServer[j]);
return false;
}
}
}
/**
* IndexServer替换
* 这里原来是 client.IndexServer
* Modified by DingZhenbo 2007-09-13
*/
proxy_client.addSegment(segmentFile.getName(),indexs);
for (int i=0; i<oldSegments.length; i++)
proxy_client.delSegment(oldSegments[i]);
return true;
}
public void run(){
while(true){
try{
Task task = (Task)taskList.peek();
if (task == null){
sleep(10000);
continue;
}
int taskID = task.getID();
byte op = task.getOp();
Param param = task.getParam();
LOG.info("" + op);
LOG.info("" + param);
switch(op){
case ADDSEGMENT:
if (!addSegment(param))
LOG.warn("Add Segment :"+ new String(((BytesWritable)param.first).get()) + " in mode ["+new String(((BytesWritable)param.second).get()) +"] Error!");
else{
TASK_LOG.info(taskID + " " + "success");
taskList.poll();
}
break;
case MERGESEGMENTS:
if (!mergeSegments(param))
LOG.warn("Merge Segments :" + new String(((BytesWritable)param.first).get()) +
" to " + new String(((BytesWritable)param.second).get()) +
" in mode["+new String(((BytesWritable)param.third).get())+"] Error!");
else{
TASK_LOG.info(taskID + " " + "success");
taskList.poll();
}
break;
default:
LOG.warn("Unknown OP:" + op);
}
sleep(5000);
}catch(Exception e){
LOG.warn("Controller TaskQueue thread error!"+ e.toString());
e.printStackTrace(System.err);
taskList.poll();
try{
sleep(10000);
}catch(Exception e1){
LOG.warn("Controller TaskQueue thread error! Must return."+ e.toString());
break;
}
}
}
}
}
//任务队列
private TaskQueue taskQueue;
//前台服务器IP数组
private String[] frontServer;
private String setStatServlet;
private String hostParam;
private String portParam;
private String statParam;
private File taskLogFile;
public Control(File configfile, int port) throws IOException {
super(port, Param.class, 5);
java.util.Properties properties = new java.util.Properties();
FileInputStream in = new FileInputStream(configfile);
properties.load(in);
in.close();
String frontservers = properties.getProperty("FRONT_SERVERS");
if (frontservers == null || frontservers.length() == 0)
throw new IOException("Not find FRONT_SERVERS property in file "+ configfile);
else{
StringTokenizer token = new StringTokenizer(frontservers," ");
frontServer = new String[token.countTokens()];
for (int i=0; i<frontServer.length; i++)
frontServer[i] = token.nextToken();
}
setStatServlet = properties.getProperty("SET_STAT_SERVLET");
if (setStatServlet == null || setStatServlet.length() == 0)
throw new IOException("Not find SET_STAT_SERVLET property in file "+ configfile);
hostParam = properties.getProperty("HOST_PARAM_NAME");
if (hostParam == null || hostParam.length() == 0)
throw new IOException("Not find HOST_PARAM_NAME property in file "+ configfile);
portParam = properties.getProperty("PORT_PARAM_NAME");
if (portParam == null || portParam.length() == 0)
throw new IOException("Not find PORT_PARAM_NAME property in file "+ configfile);
statParam = properties.getProperty("STAT_PARAM_NAME");
if (statParam == null || statParam.length() == 0)
throw new IOException("Not find STAT_PARAM_NAME property in file "+ configfile);
String idxSrvFileName = properties.getProperty("INDEX_SERVER_FILE");
String sumSrvFileName = properties.getProperty("SUMMARY_SERVER_FILE");
//check index server config , 二者不允许同时都是null
if (StringUtils.isEmpty(idxSrvFileName)) {
throw new RuntimeException("NO INDEX_SERVER_FILE property in file "+ configfile);
}
new_segment_path = properties.getProperty("NEW_SEGMENT_PATH");
if (new_segment_path == null || new_segment_path.length() == 0)
throw new IOException("Not find NEW_SEGMENT_PATH property in file" + configfile);
merge_temp_path = properties.getProperty("MERGE_TEMP_PATH");
if (merge_temp_path == null || merge_temp_path.length() == 0)
throw new IOException("Not find MERGE_TEMP_PATH property in file" + configfile);
/***************
String max_segments_str = properties.getProperty("MAX_SEGMENTS");
if (max_segments_str == null || max_segments_str.length() == 0)
throw new IOException("Not find MAX_SEGMENTS property in file" + configfile);
max_segments = Integer.parseInt(max_segments_str);
****************/
String taskLog = properties.getProperty("TASK_LOG");
if (taskLog == null || taskLog.length() == 0)
throw new IOException("Not find TASK_LOG property in file" + configfile);
taskLogFile = new File(taskLog);
if (!taskLogFile.exists() || taskLogFile.isDirectory() || !taskLogFile.canRead() || !taskLogFile.canWrite())
throw new IOException("Task LOG File :" + taskLogFile.getAbsolutePath() + " Error!");
//** Added by DingZhenbo 2007-09
File idxSrvFile = StringUtils.isEmpty(idxSrvFileName) ? null : new File(idxSrvFileName);
File sumSrvFile = StringUtils.isEmpty(sumSrvFileName) ? null : new File(sumSrvFileName);
//** 短桶限制数量
String limit = properties.getProperty("INDEX_LIMIT_NUM");
if (StringUtils.isEmpty(limit)) {
limit = "1"; // the default value
}
client = new Client(idxSrvFile, idxSrvFile, sumSrvFile);
//add by xie shuqiang. 2006.09.27
client.setTimeout(600000);
client.setLimitNum(Integer.valueOf(limit));
//获取摘要服务器段
IndexServer server = null;
// server = client.servers.get(MODE_SUM);
// if (server != null) {
// initSegments(server);
// servers.put(MODE_REG, server);
// }
// Load regular index server
server = client.servers.get(MODE_REG);
if ( server != null) {
initSegments(server);
initDocs(MODE_REG, server);
servers.put(MODE_REG, server);
}
// Load optimize options when we using optimizing
server = client.servers.get(MODE_RAM);
if (server != null) {
initSegments(server);
initDocs(MODE_RAM, server);
servers.put(MODE_RAM, server);
}
for (String key : servers.keySet()) {
server = servers.get(key);
System.out.println("["+key + "] -> "+servers.get(key).getLength());
for (long docs : server.maxDocs) {
System.out.println("******* ["+key+" ] MaxDocs: "+ docs);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -