⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedsearch.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
	  		}
  		}
  		  		
  		if (!TASK_LOG.isInfoEnabled())
  			TASK_LOG.setLevel(Level.INFO);

  		//创建任务管理队列线程
  		taskQueue = new TaskQueue();
  		taskQueue.start();

  		//检查任务日志,没有完成的任务重新运行,获取当前的任务号  		
  		checkTaskLog(taskLogFile);
  		//检查新段目录,把没有添加的段放到任务队列
  		checkNewSegment(new_segment_path);
  	}
  	
  	
	
  	//控制器取得每一个Index server的段名列表,并且校验是否一致
  	//注意因为已经有了实际的呼叫地址,所以这里并不需要mode参数
  	public void initSegments(IndexServer server) throws IOException{
                
        for (int i=0; i<server.getLength(); i++){
            
        	InetSocketAddress[] address = server.get(i);
        	//ArrayList addrs = new ArrayList();
        	//for (int j=0; j<indexs.length; j++){
        	//	addrs.add(indexs[j]);
        	//}
        	//InetSocketAddress[] address = (InetSocketAddress[])addrs.toArray(new InetSocketAddress[addrs.size()]);
        	//System.out.println("address length:"+ address.length);

        	Param param = new Param(SEGMENTS, NullWritable.get());
        	
        	LOG.info(">>>>"+address.length);
        	
            Writable[] params = new Writable[address.length];
            for (int j = 0; j < params.length; j++) {
              params[j] = param;                     // build param for parallel call
            }
            
        	Writable[] results = client.call(params, address); // make parallel call
        	if (results == null || results.length != address.length){
        		throw new IOException("DistributedSearch: Control->getSegments(): error");
        	}
        	String segNameString = "";
        	for (int j=0; j<results.length; j++){
        		Result result = (Result)results[j];
        		if (result == null){
        			LOG.warn("Client: no segments from: " + address[j]);
                    return;
        		}
        		String[] segments = ((ArrayWritable)result.value).toStrings();
        		Arrays.sort(segments);
        		String segNames = "";
        		for (int k = 0; k<segments.length; k++) {
        			segNames += segments[k] + "/"; 
        		}
        		//System.out.println(segNames);
        		if (j == 0)
        			segNameString = segNames;
        		else if (!segNameString.equals(segNames)){
        			LOG.warn("Client: Segment not same :"+ address[j]);
        			throw new IOException("Client: Segment not same :"+ address[j]);
        		}
        	}
        	
        	StringTokenizer tokens = new StringTokenizer(segNameString,"/");
        	while (tokens.hasMoreTokens()){
        		server.addSegment(tokens.nextToken(),address);
        	}
        	//segments.add(segNameString);
        }
  	}
  	
  	//控制器取得每一个Index server的maxDocs,并且校验是否一致
  	public void initDocs(String mode, IndexServer server) throws IOException {
  		if (server.maxDocs == null 
  				|| server.maxDocs.length < server.getLength()) {
  			server.maxDocs = new long[server.getLength()];
  		}
  		
  		for (int i=0; i<server.getLength(); i++) {
  			long docs = getDocs(server.get(i), mode);
  			server.maxDocs[i] = docs;
  		}
  	}
  	
  	private void checkTaskLog(File logFile) throws IOException{
  		BufferedReader in = new BufferedReader(new FileReader(logFile));
  		String line = "";
  		HashMap<String,String> taskMap = new HashMap<String,String>();
  		int begin = 0;
  		while((line = in.readLine())!=null){
  			begin = line.indexOf('|');
  			if (begin < 0) continue;
  			String taskStr = line.substring(begin + 1).trim();

  			begin = taskStr.indexOf(" ");
  			if (begin < 0) continue;
  			String tid = taskStr.substring(0,begin);
  			if (Integer.parseInt(tid) > taskID)
  				taskID = Integer.parseInt(tid);
  			taskStr = taskStr.substring(begin +1);
  			if (taskStr.equals("success"))
  				taskMap.remove(tid);
  			else
  				taskMap.put(tid, taskStr);
  		}
  		in.close();
  		System.out.println("******current task id is :"+taskID);
  		if (taskMap.size() > 0){
  			System.out.println("******Re do tasks : "+taskMap.size());
  			Set tidSet = taskMap.keySet();
  			int[] tidArray = new int[tidSet.size()];
  			int i = 0;
  			for (Object tid : tidSet)
  				tidArray[i++] = Integer.parseInt((String)tid);
  			Arrays.sort(tidArray);
  			for(i=0; i<tidArray.length; i++){
  				redo(String.valueOf(tidArray[i]), taskMap.get(tidArray[i]));
  			}
  			taskMap.clear();
  		}
  	}

  	private void redo(String tid, String paramStr) {
  		
  		if (paramStr == null || StringUtils.isEmpty(paramStr)) return;
  		
  		StringTokenizer token = new StringTokenizer(paramStr, " ");
  		int paramNum = token.countTokens();
  		String params[] = new String[paramNum];
  		for(int i=0; i<paramNum; i++){
  			params[i] = token.nextToken().trim();
  		}
  		int op = Integer.parseInt(params[0]);
  		int taskID = Integer.parseInt(tid);
  		Param param = null;
  		Task task = null;  		
  		switch(op){
  		case ADDSEGMENT:
  			System.out.println("*****Re do task : Add segment ");
  			param = new Param(ADDSEGMENT, new BytesWritable(params[1].getBytes()), new BytesWritable(params[2].getBytes()));
  			task = new Task(taskID, ADDSEGMENT, param);
        	taskList.add(task);
  			break;
  		case MERGESEGMENTS:
  			param = new Param(MERGESEGMENTS,new BytesWritable(params[1].getBytes()),
  	    			new BytesWritable(params[2].getBytes()),
  	    			new BytesWritable(params[3].getBytes())); 

  			task = new Task(taskID,MERGESEGMENTS,param);
  			taskList.add(task);
  			break;
  		default:
  			LOG.warn("Unknown OP:" + op);
  		}
  	}
  	
  	public void checkNewSegment(String path) throws IOException {
  		
  		File segmentPath = new File(path);
  		String[] segments = segmentPath.list();
  		
  		if (segments == null || segments.length == 0)
  			return;
  		
  		IndexServer server = null;
  		//摘要
  		String sumDirStr = ParseText.DIR_NAME+"|"+ParseData.DIR_NAME+"|";
  		
//  		server = servers.get(MODE_SUM);    		
//  		if (server != null) {
//  			checkNewSegment(sumDirStr.split("[\\|]"), segmentPath, server, MODE_SUM);
//  			sumDirStr = "";
//  		} 
  		
  		String idxDirStr = "";
  		
  		//优化
  		server = servers.get(MODE_RAM);
  		if (server != null) {
  			idxDirStr = sumDirStr + IndexSegment.IDX_RAM_NAME;
  			checkNewSegment(idxDirStr.split("[\\|]"), segmentPath, server, MODE_RAM);	
  		} 
  		//常规
  		server = servers.get(MODE_REG);
  		if (server != null) {
  			idxDirStr = sumDirStr + IndexSegment.IDX_REG_NAME;
  			checkNewSegment(idxDirStr.split("[\\|]"), segmentPath, server, MODE_REG);	
  		} 
  	}

	private void checkNewSegment(String[] checkDirs, File segmentPath, IndexServer server, String mode) throws IOException {
		
		String[] segments = segmentPath.list();	
		
		for( int i=0; i<segments.length; i++){
  			File segment = new File(segmentPath, segments[i]);
  			File doneFile = new File(segment, net.nutch.indexer.IndexSegment.IDX_DONE_NAME);
  			if (!doneFile.exists())
  				continue;			

  			//源目录检测
  			int dirNum = checkDirs.length;
			for (String dir : checkDirs) {
				File src = new File(segment, dir);
				if (!src.exists()) {
					dirNum--;
				}
			}
  			//源子目录不存在了 就跳过,只要存在一个就做addSegment操作, addSegment操作会把存在的都加过去,不存在目录的会被略过			
			if (dirNum == 0) {
				continue;
			}
  			
  			if (server.getSegment(segments[i]) != null) {
  				for (String dir : checkDirs) {
  					File src = new File(segment, dir);
  					if (src.exists()) {
  						MovePath.delete(src);
  					}
  				}
  				//如果只有index.done文件 就移除segment
  				if (segment.list().length < 2) {
  					MovePath.delete(segment);
  				}
  				continue;
  			}
  			// 参数中加入mode参数
  			Param param = new Param(ADDSEGMENT, new BytesWritable(segments[i].getBytes()), new BytesWritable(mode.getBytes()));
  			Task task = new Task(getTaskID(), ADDSEGMENT, param);
        	taskList.add(task);
  		}
	}

  	private long getDocs(InetSocketAddress[] address, String mode) throws IOException {
  		 		
  		Writable[] params = new Writable[address.length];
  		
  		if (mode.equals(MODE_MIX)) {
  			mode = MODE_REG;
  		}
  		
  		Param param = new Param(MAXDOCS, new BytesWritable(mode.getBytes()));
  		
  		for (int j = 0; j < params.length; j++) {
  			params[j] = param;                     // build param for parallel call
  		}
  		
  		Writable[] results = client.call(params, address); // make parallel call

  		if (results == null || results.length != address.length){
  			throw new IOException("DistributedSearch: Control->getDocs(): error");
  		}
  		
  		long docs = 0;
  		
  		for (int j=0; j<results.length; j++){
  			Result result = (Result)results[j];
  			if (result == null){
  				LOG.warn("Client: no docs from: " + address[j]);
  				throw new IOException("Control: Can't get maxdocs!");
  			}
  			if (j == 0)
  				docs = ((LongWritable)result.value).get();
  			else if (docs != ((LongWritable)result.value).get()){
  				LOG.warn("Control: max docs not same!" + address[j]);
  				throw new IOException("Control: max docs not same!");
  			}
  		}
  		return docs;
  	}
  	
  	private synchronized int getTaskID(){
  		return ++taskID;
  	}
  	
  	public Writable call(Writable param) throws IOException {
  		Param p = (Param)param;
  		logRequest(p);
        Writable value;
        String mode;
        switch (p.op) {
        case INDEXS:
        	//取出模式标识
        	mode = new String(((BytesWritable)p.first).get());
        	//根据标识设置代理
        	IndexServer server = servers.get(mode);
        	
        	String[] serversArray;
        	
        	if (server == null) {
        		serversArray = new String[0];
        	} else {        	
	        	serversArray = new String[server.getLength()];
	        	for (int i=0; i<server.getLength(); i++){
	        		InetSocketAddress[] indexAddresses = server.get(i);
	        		String serverAddrs = "";
	        		for (int j=0; j<indexAddresses.length; j++){
	        			InetAddress addr = indexAddresses[j].getAddress();
	        			String serverAddr =         				
	        				addr.getHostAddress() + ":" + indexAddresses[j].getPort() + ":" + server.getOnlineStat(indexAddresses[j]);
	        			serverAddrs += serverAddr + " ";
	        		}
	        		serversArray[i] = serverAddrs;
	        	}
        	}
        	
        	value = new ArrayWritable(serversArray);
        	break;
        /******************
        case SEGMENTS:
        	String[] segmentArray = new String[addresses.getLength()];
        	for( int i=0; i<segments.size(); i++){
        		segmentArray[i] = (String)segments.get(i);
        	}
        	value = new ArrayWritable(segmentArray);
        	break;
        *******************/
        case FINDSEGMENT:
        	String segment = new String(((BytesWritable)p.first).get());
        	//取出操作标识
        	mode = new String(((BytesWritable)p.second).get());
        	//根据标识设置代理
        	server = client.servers.get(mode);
        	if (server == null) {
        		throw new RuntimeException("Unknown FINDSEGMENT MODE: " + mode); 
        	}
        	
        	InetSocketAddress[] address = 
        		(InetSocketAddress[])server.getSegment(segment);
        	if (address == null){
        		System.out.println("can't find this segment:"+segment);
        		return null;
        	}
        	String result = "";
        	for (int i=0; i<address.length; i++){
        		result += address[i].getAddress().getHostAddress() + ":" +address[i].getPort();
        		result += " ";
        	}
        	value = new BytesWritable(result.getBytes());
        	break;
        case ADDSEGMENT:
        	String segmentName = new String(((BytesWritable)p.first).get());
        	mode = new String(((BytesWritable)p.second).get());
        	int tid = getTaskID();
        	TASK_LOG.info(tid + " " + p.op + " " + segmentName + " "+mode);
        	Task task = new Task(tid,ADDSEGMENT,p);
        	taskList.add(task);
        	value = new BooleanWritable(true);
        	break;
        case MERGESEGMENTS:
        	String oldSegmentsName = new String(((BytesWritable)p.first).get());
  			String newSegment = new String(((BytesWritable)p.second).get());
  			mode = new String(((BytesWritable)p.third).get());
  			tid = getTaskID();
        	TASK_LOG.info(tid + " " + p.op + " " + oldSegmentsName + " " + newSegment +" "+mode);
        	task = new Task(tid,MERGESEGMENTS, p);
        	taskList.add(task);
        	value = new BooleanWritable(true);
        	break;
        case LIMITNUM:
        	value = new IntWritable(client.limitNum);
        	break;
        default:
        	throw new RuntimeException("Unknown op code: " + p.op);
        }
        return new Result(p.op, value);
  	}
  	
  	private static void logRequest(Param p) {
        StringBuffer buffer = new StringBuffer();
        buffer.append(Thread.currentThread().getName());
        buffer.append(": ");
        buffer.append(NAMES[p.op]);
        buffer.append("(");
        if (p.first != NullWritable.get()) {
          buffer.append(p.first);
          if (p.second != NullWritable.get()) {
            buffer.append(", ");
            buffer.append(p.second);
          }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -