📄 distributedsearch.java
字号:
}
}
if (!TASK_LOG.isInfoEnabled())
TASK_LOG.setLevel(Level.INFO);
//创建任务管理队列线程
taskQueue = new TaskQueue();
taskQueue.start();
//检查任务日志,没有完成的任务重新运行,获取当前的任务号
checkTaskLog(taskLogFile);
//检查新段目录,把没有添加的段放到任务队列
checkNewSegment(new_segment_path);
}
//控制器取得每一个Index server的段名列表,并且校验是否一致
//注意因为已经有了实际的呼叫地址,所以这里并不需要mode参数
public void initSegments(IndexServer server) throws IOException{
for (int i=0; i<server.getLength(); i++){
InetSocketAddress[] address = server.get(i);
//ArrayList addrs = new ArrayList();
//for (int j=0; j<indexs.length; j++){
// addrs.add(indexs[j]);
//}
//InetSocketAddress[] address = (InetSocketAddress[])addrs.toArray(new InetSocketAddress[addrs.size()]);
//System.out.println("address length:"+ address.length);
Param param = new Param(SEGMENTS, NullWritable.get());
LOG.info(">>>>"+address.length);
Writable[] params = new Writable[address.length];
for (int j = 0; j < params.length; j++) {
params[j] = param; // build param for parallel call
}
Writable[] results = client.call(params, address); // make parallel call
if (results == null || results.length != address.length){
throw new IOException("DistributedSearch: Control->getSegments(): error");
}
String segNameString = "";
for (int j=0; j<results.length; j++){
Result result = (Result)results[j];
if (result == null){
LOG.warn("Client: no segments from: " + address[j]);
return;
}
String[] segments = ((ArrayWritable)result.value).toStrings();
Arrays.sort(segments);
String segNames = "";
for (int k = 0; k<segments.length; k++) {
segNames += segments[k] + "/";
}
//System.out.println(segNames);
if (j == 0)
segNameString = segNames;
else if (!segNameString.equals(segNames)){
LOG.warn("Client: Segment not same :"+ address[j]);
throw new IOException("Client: Segment not same :"+ address[j]);
}
}
StringTokenizer tokens = new StringTokenizer(segNameString,"/");
while (tokens.hasMoreTokens()){
server.addSegment(tokens.nextToken(),address);
}
//segments.add(segNameString);
}
}
//控制器取得每一个Index server的maxDocs,并且校验是否一致
public void initDocs(String mode, IndexServer server) throws IOException {
if (server.maxDocs == null
|| server.maxDocs.length < server.getLength()) {
server.maxDocs = new long[server.getLength()];
}
for (int i=0; i<server.getLength(); i++) {
long docs = getDocs(server.get(i), mode);
server.maxDocs[i] = docs;
}
}
private void checkTaskLog(File logFile) throws IOException{
BufferedReader in = new BufferedReader(new FileReader(logFile));
String line = "";
HashMap<String,String> taskMap = new HashMap<String,String>();
int begin = 0;
while((line = in.readLine())!=null){
begin = line.indexOf('|');
if (begin < 0) continue;
String taskStr = line.substring(begin + 1).trim();
begin = taskStr.indexOf(" ");
if (begin < 0) continue;
String tid = taskStr.substring(0,begin);
if (Integer.parseInt(tid) > taskID)
taskID = Integer.parseInt(tid);
taskStr = taskStr.substring(begin +1);
if (taskStr.equals("success"))
taskMap.remove(tid);
else
taskMap.put(tid, taskStr);
}
in.close();
System.out.println("******current task id is :"+taskID);
if (taskMap.size() > 0){
System.out.println("******Re do tasks : "+taskMap.size());
Set tidSet = taskMap.keySet();
int[] tidArray = new int[tidSet.size()];
int i = 0;
for (Object tid : tidSet)
tidArray[i++] = Integer.parseInt((String)tid);
Arrays.sort(tidArray);
for(i=0; i<tidArray.length; i++){
redo(String.valueOf(tidArray[i]), taskMap.get(tidArray[i]));
}
taskMap.clear();
}
}
private void redo(String tid, String paramStr) {
if (paramStr == null || StringUtils.isEmpty(paramStr)) return;
StringTokenizer token = new StringTokenizer(paramStr, " ");
int paramNum = token.countTokens();
String params[] = new String[paramNum];
for(int i=0; i<paramNum; i++){
params[i] = token.nextToken().trim();
}
int op = Integer.parseInt(params[0]);
int taskID = Integer.parseInt(tid);
Param param = null;
Task task = null;
switch(op){
case ADDSEGMENT:
System.out.println("*****Re do task : Add segment ");
param = new Param(ADDSEGMENT, new BytesWritable(params[1].getBytes()), new BytesWritable(params[2].getBytes()));
task = new Task(taskID, ADDSEGMENT, param);
taskList.add(task);
break;
case MERGESEGMENTS:
param = new Param(MERGESEGMENTS,new BytesWritable(params[1].getBytes()),
new BytesWritable(params[2].getBytes()),
new BytesWritable(params[3].getBytes()));
task = new Task(taskID,MERGESEGMENTS,param);
taskList.add(task);
break;
default:
LOG.warn("Unknown OP:" + op);
}
}
public void checkNewSegment(String path) throws IOException {
File segmentPath = new File(path);
String[] segments = segmentPath.list();
if (segments == null || segments.length == 0)
return;
IndexServer server = null;
//摘要
String sumDirStr = ParseText.DIR_NAME+"|"+ParseData.DIR_NAME+"|";
// server = servers.get(MODE_SUM);
// if (server != null) {
// checkNewSegment(sumDirStr.split("[\\|]"), segmentPath, server, MODE_SUM);
// sumDirStr = "";
// }
String idxDirStr = "";
//优化
server = servers.get(MODE_RAM);
if (server != null) {
idxDirStr = sumDirStr + IndexSegment.IDX_RAM_NAME;
checkNewSegment(idxDirStr.split("[\\|]"), segmentPath, server, MODE_RAM);
}
//常规
server = servers.get(MODE_REG);
if (server != null) {
idxDirStr = sumDirStr + IndexSegment.IDX_REG_NAME;
checkNewSegment(idxDirStr.split("[\\|]"), segmentPath, server, MODE_REG);
}
}
private void checkNewSegment(String[] checkDirs, File segmentPath, IndexServer server, String mode) throws IOException {
String[] segments = segmentPath.list();
for( int i=0; i<segments.length; i++){
File segment = new File(segmentPath, segments[i]);
File doneFile = new File(segment, net.nutch.indexer.IndexSegment.IDX_DONE_NAME);
if (!doneFile.exists())
continue;
//源目录检测
int dirNum = checkDirs.length;
for (String dir : checkDirs) {
File src = new File(segment, dir);
if (!src.exists()) {
dirNum--;
}
}
//源子目录不存在了 就跳过,只要存在一个就做addSegment操作, addSegment操作会把存在的都加过去,不存在目录的会被略过
if (dirNum == 0) {
continue;
}
if (server.getSegment(segments[i]) != null) {
for (String dir : checkDirs) {
File src = new File(segment, dir);
if (src.exists()) {
MovePath.delete(src);
}
}
//如果只有index.done文件 就移除segment
if (segment.list().length < 2) {
MovePath.delete(segment);
}
continue;
}
// 参数中加入mode参数
Param param = new Param(ADDSEGMENT, new BytesWritable(segments[i].getBytes()), new BytesWritable(mode.getBytes()));
Task task = new Task(getTaskID(), ADDSEGMENT, param);
taskList.add(task);
}
}
private long getDocs(InetSocketAddress[] address, String mode) throws IOException {
Writable[] params = new Writable[address.length];
if (mode.equals(MODE_MIX)) {
mode = MODE_REG;
}
Param param = new Param(MAXDOCS, new BytesWritable(mode.getBytes()));
for (int j = 0; j < params.length; j++) {
params[j] = param; // build param for parallel call
}
Writable[] results = client.call(params, address); // make parallel call
if (results == null || results.length != address.length){
throw new IOException("DistributedSearch: Control->getDocs(): error");
}
long docs = 0;
for (int j=0; j<results.length; j++){
Result result = (Result)results[j];
if (result == null){
LOG.warn("Client: no docs from: " + address[j]);
throw new IOException("Control: Can't get maxdocs!");
}
if (j == 0)
docs = ((LongWritable)result.value).get();
else if (docs != ((LongWritable)result.value).get()){
LOG.warn("Control: max docs not same!" + address[j]);
throw new IOException("Control: max docs not same!");
}
}
return docs;
}
private synchronized int getTaskID(){
return ++taskID;
}
public Writable call(Writable param) throws IOException {
Param p = (Param)param;
logRequest(p);
Writable value;
String mode;
switch (p.op) {
case INDEXS:
//取出模式标识
mode = new String(((BytesWritable)p.first).get());
//根据标识设置代理
IndexServer server = servers.get(mode);
String[] serversArray;
if (server == null) {
serversArray = new String[0];
} else {
serversArray = new String[server.getLength()];
for (int i=0; i<server.getLength(); i++){
InetSocketAddress[] indexAddresses = server.get(i);
String serverAddrs = "";
for (int j=0; j<indexAddresses.length; j++){
InetAddress addr = indexAddresses[j].getAddress();
String serverAddr =
addr.getHostAddress() + ":" + indexAddresses[j].getPort() + ":" + server.getOnlineStat(indexAddresses[j]);
serverAddrs += serverAddr + " ";
}
serversArray[i] = serverAddrs;
}
}
value = new ArrayWritable(serversArray);
break;
/******************
case SEGMENTS:
String[] segmentArray = new String[addresses.getLength()];
for( int i=0; i<segments.size(); i++){
segmentArray[i] = (String)segments.get(i);
}
value = new ArrayWritable(segmentArray);
break;
*******************/
case FINDSEGMENT:
String segment = new String(((BytesWritable)p.first).get());
//取出操作标识
mode = new String(((BytesWritable)p.second).get());
//根据标识设置代理
server = client.servers.get(mode);
if (server == null) {
throw new RuntimeException("Unknown FINDSEGMENT MODE: " + mode);
}
InetSocketAddress[] address =
(InetSocketAddress[])server.getSegment(segment);
if (address == null){
System.out.println("can't find this segment:"+segment);
return null;
}
String result = "";
for (int i=0; i<address.length; i++){
result += address[i].getAddress().getHostAddress() + ":" +address[i].getPort();
result += " ";
}
value = new BytesWritable(result.getBytes());
break;
case ADDSEGMENT:
String segmentName = new String(((BytesWritable)p.first).get());
mode = new String(((BytesWritable)p.second).get());
int tid = getTaskID();
TASK_LOG.info(tid + " " + p.op + " " + segmentName + " "+mode);
Task task = new Task(tid,ADDSEGMENT,p);
taskList.add(task);
value = new BooleanWritable(true);
break;
case MERGESEGMENTS:
String oldSegmentsName = new String(((BytesWritable)p.first).get());
String newSegment = new String(((BytesWritable)p.second).get());
mode = new String(((BytesWritable)p.third).get());
tid = getTaskID();
TASK_LOG.info(tid + " " + p.op + " " + oldSegmentsName + " " + newSegment +" "+mode);
task = new Task(tid,MERGESEGMENTS, p);
taskList.add(task);
value = new BooleanWritable(true);
break;
case LIMITNUM:
value = new IntWritable(client.limitNum);
break;
default:
throw new RuntimeException("Unknown op code: " + p.op);
}
return new Result(p.op, value);
}
private static void logRequest(Param p) {
StringBuffer buffer = new StringBuffer();
buffer.append(Thread.currentThread().getName());
buffer.append(": ");
buffer.append(NAMES[p.op]);
buffer.append("(");
if (p.first != NullWritable.get()) {
buffer.append(p.first);
if (p.second != NullWritable.get()) {
buffer.append(", ");
buffer.append(p.second);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -