⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedsearch.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      case LIMITNUM:
    	  value = new IntWritable();
    	  break;
      default:
        throw new RuntimeException("Unknown op code: " + op);
      }

      value.readFields(in);
    }
  }

  /** 
   * The Control Center.
   * 
   *  需要特别注意的是Control中的IndexServer和Client中IndexServer,在修改者(DingZhenbo)的眼里是等价的
   *  不清除为什么原作者是使用的时候还区别对待了
   *  由于优化的需要,全部通过参数中的mode改用Proxy替代。
   * 
   */
  public static class Control extends net.nutch.ipc.Server {
	
	private static Logger TASK_LOG = Logger.getLogger("tasklog");
  	
  	private Client client;
  	//private ArrayList segments = new ArrayList();

  	public static final String[] SEGMENTS_PATH = {"current", "today", "week", "month", "year"};
  	//** 合并Segment的临时目录
  	private String merge_temp_path = "merge.temp";
  	//** 新Segment目录 启动的时候Controller会去检测
  	private String new_segment_path;
  	
  	private Map<String, IndexServer> servers = new HashMap<String, IndexServer>(); 
  	
  	//private int max_segments = 2;
  	private int taskID = 0;
  	  	
  	private ConcurrentLinkedQueue<Task> taskList = new ConcurrentLinkedQueue<Task>();
  	
  	class Task {
  		private int tid;
  		private byte opID;
  		private Param param;
  		
  		public Task(int tid, byte op, Param param){
  			this.tid = tid;
  			this.opID = op;
  			this.param = param;
  		}
  		
  		public byte getOp(){
  			return opID;
  		}
  		
  		public int getID(){
  			return tid;
  		}
  		
  		public Param getParam(){
  			return param;
  		}
  	}

  	class TaskQueue extends Thread {
  		
  		public TaskQueue(){
  			this.setDaemon(true);
  		}
  		
  		public String getHtml(String urlin) throws IOException {
  			BufferedReader in = null;
  			InputStream stream = null;
  			try{
  				URL url = new URL(urlin);
  				stream = url.openStream();
  				in = new BufferedReader(new InputStreamReader(stream));
  				String line;
  				StringBuffer output = new StringBuffer();
  				while ( (line = in.readLine()) != null) {
  					output.append(line);
  				}
  				in.close();
  				stream.close();
  				return output.toString();
  			}catch(Exception e){
  				LOG.warn(e.toString());
  				if ( in != null )
  					in.close();
  				if( stream != null )
  					stream.close();
  			}
  			return null;
  		}
  		//通知前台服务器切换状态
  		private boolean inform(String server, InetSocketAddress address, boolean line) throws IOException{
  			String urlStr = 
  				"http://" + server + setStatServlet + "?" + hostParam + "=" + address.getAddress().getHostAddress() + "&" + portParam + "=" + address.getPort() + "&" + statParam + "=";
  			if (line)
  				urlStr += "true";
  			else
  				urlStr += "false";
  			LOG.info("*****"+ urlStr);
  			String result = getHtml(urlStr);
  			if (result == null || result.equalsIgnoreCase("false"))
  				return false;
  			return true;
  		}
//
//  		private String mergeSegments( File[] segments, File destPath ) throws IOException {
//  			MergeSegments merge = new MergeSegments(new LocalFileSystem(), segments,
//					destPath, Long.MAX_VALUE, true, false);
//  			merge.run();
//  			return merge.getNewSegmentName();
//  		}
//
  		/*
  		 * 检查目录
  		 */
  		private int selectIndexs(IndexServer server, String segmentName, String path){
  			int index = 0;
  			for (; index<server.getLength(); index++){
  				InetSocketAddress[] indexs = server.get(index);
  				boolean find = false;
  				for (InetSocketAddress address : indexs){
  					File segmentPath = new File(address.getAddress().getHostAddress() + "." + 
  							address.getPort(), path);
  					File segmentFile = new File(segmentPath, segmentName);
  					if (segmentFile.exists() && segmentFile.isDirectory()){
  						find = true;
  						break;
  					}
  				}
  				if (find){
  					break;
  				}
  			}
  			if (index == server.getLength())
  				return -1;
  			return index;
  		}
  		
  		private String[] getDirNamesByMode(String mode) {
  			
  			String idxDirStr = ParseText.DIR_NAME+"|"+ParseData.DIR_NAME+"|";
  			
  			if (mode.equals(MODE_MIX)) {
  				idxDirStr += IndexSegment.IDX_RAM_NAME + "|"+IndexSegment.IDX_REG_NAME; 
  			} 
  			else if (mode.equals(MODE_RAM)) {
  				idxDirStr += IndexSegment.IDX_RAM_NAME;
  			}
  			else if (mode.equals(MODE_REG)) {
  				idxDirStr += IndexSegment.IDX_REG_NAME;
  			} 
  			return idxDirStr.split("[\\|]");
  		}
  		
		//** 修改了Segment的添加逻辑 modified by DingZhenbo 2007-09-19
  		//** 增加接受多个copy src 子目录
  		private boolean addSegmentStep1(InetSocketAddress[] indexs, File segmentFile, String mode) throws Exception{
  			
  			
  			String[] srcDirs = getDirNamesByMode(mode);
  			
  			if (srcDirs.length == 0) return true;
 
			File doneFile = new File(segmentFile, IndexSegment.IDX_DONE_NAME);			
			if (!doneFile.exists()) {
				LOG.warn("Segment ["+segmentFile.getName() + "] not done. can not add to IndexServer");
  				return false;								
			}
			
			List<File> srcFiles = new ArrayList<File>();
			
			for (String dir : srcDirs) {
				File srcFile = new File(segmentFile, dir);
				if (srcFile.exists()) {
					srcFiles.add(srcFile);
				}
			}
			
			if (srcFiles.size() == 0) {
				LOG.warn("No segment ["+segmentFile.getName()+"/] exist. can not add to IndexServer");
				return false;
			}
			
  			for (int i=0; i<indexs.length; i++){
  				
  				InetSocketAddress address = indexs[i];
  				
  				LOG.info("Copy new Segment to :"+ address.getAddress().getHostAddress() + 
  						":" + address.getPort());
  				
  				File destFile = new File(
  						address.getAddress().getHostAddress() + "." +address.getPort(), 
  						SEGMENTS_PATH[0]);
  				
  				if (!destFile.exists() || !destFile.isDirectory()){
  					LOG.warn("Index server:"+address.getAddress().getHostAddress() + ":" + 
  							address.getPort() + " mounted path NOT find!");
  					return false;
  				}
 
  				//** 修正输出目录到Segment目录 modified by DingZhenbo 2007-09-19
  				destFile = new File(destFile, segmentFile.getName());
  				if (!destFile.exists() && !destFile.isDirectory()) {
  					destFile.mkdir();
  				}
  				// move src files
  				for (File srcFile : srcFiles) {
  					File checker = new File(destFile, srcFile.getName());
  					if (!checker.exists()) {
	  	  				MovePath.movePath(srcFile, destFile, false);
	  	  				LOG.info("Copy ["+srcFile.getCanonicalPath()+"] to ["+destFile.getCanonicalPath()+"]" );
  					}
  				}
  				MovePath.movePath(doneFile ,destFile, false);
  			}
  			//delete index dir note: 这个写法是个trick, 暂时的 
  			if (srcFiles.size() == 3) {
  				MovePath.delete(srcFiles.get(2));
  			}  			
  			// 查看segment 目录下是否还有其他的文件 如果只剩下index.done文件就把这个目录删除
  			String[] fs = segmentFile.list();
  			//如果只剩下index.done parse_text parse_data 删除 
  			if (mode.equals(DistributedSearch.MODE_MIX) || (fs.length < 4)) {
  				MovePath.delete(segmentFile);
  			}
  			
  			return true;
  		}
  		
  		private boolean addSegmentStep2(IndexServer server, InetSocketAddress[] indexs, String segmentName, String mode) throws Exception{
  			
  			for (int i=0; i<indexs.length; i++){
  				
  				InetSocketAddress address = indexs[i];
  				//通知前台服务器切换(offline)
  				server.setOnlineStat(address, false);

  				LOG.info("Inform front server change indexserver status(offline)");
  				for (int j=0; j<frontServer.length; j++){
  					if (!inform(frontServer[j],address,false)){
  						LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" + 
  								address.getPort() + " offline on frontserver:"+ frontServer[j]);
  						return false;
  					}
  				}

  				String host = address.getAddress().getHostAddress();
  				String port = String.valueOf(address.getPort());
  				// 通知下线服务器添加新段
  				LOG.info("Inform offline server:"+ host + ":" + port +" add segment");

  				
  		    	File segment = new File(segmentName);
  		    	
  		    	server.addSegment(segment.getName(), address);  				

  				if (!client.addSegment(segmentName, address, mode)){
  					LOG.warn("Add Segment:"+ segmentName + " to "+ host + ":"+ port + " Error!");
  					return false;
  				}
  				//通知前台服务器切换(online)
  				server.setOnlineStat(address, true);
  				LOG.info("Inform front server change indexserver status(online)");
  				for (int j=0; j<frontServer.length; j++){
  					if (!inform(frontServer[j],address,true)){
  						LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" + 
  								address.getPort() + " online on frontserver:"+ frontServer[j]);
  						return false;
  					}
  				}
  			}
  			return true;
  		}
  		
//  		private boolean addSegment(Param param) throws Exception {
//  			
//  			boolean result = true;
//
//  			String segmentName = new String(((BytesWritable)param.first).get());
//  			String mode = new String(((BytesWritable)param.second).get());
//  			
//  			if (mode.equals(MODE_MIX)) {
//  				boolean b = false;
//  				b = addSegment(segmentName, MODE_REG);
//  				result = result && b;
//  				if (!b) {
//  					LOG.warn("addSegment "+segmentName+" in mode [REG] error.");
//  				}
//  				b = addSegment(segmentName, MODE_RAM);
//  				result = result && b;
//  				if (!b) {
//  					LOG.warn("addSegment "+segmentName+" in mode [RAM] error.");
//  				}
//  				b = addSegment(segmentName, MODE_SUM);
//  				result = result && b;
//  				if (!b) {
//  					LOG.warn("addSegment "+segmentName+" in mode [SUM] error.");
//  				}  				
//  			} else {
//  				result = result && addSegment(segmentName, mode);
//  				if (!result) {
//  					LOG.warn("addSegment "+segmentName+" ["+mode+"] error.");
//  				}
//  			}
//  			
//  			return result;  			
//  		}

		private boolean addSegment(Param param) throws Exception {

			String segmentName = new String(((BytesWritable)param.first).get());
  			String mode = new String(((BytesWritable)param.second).get());
  			
  			IndexServer server = null;
  			//如果是MIX模式就使用REG的服务器作addSegment
  			if (mode.equals(MODE_MIX)) {
  				server = servers.get(MODE_REG);
  			} else {
  				server = servers.get(mode);  				
  			}
						
			//如果index server没有分配实际地址就直接返回true
			if (server == null) {				
				LOG.info("There are no index server in mode["+mode+"]");
				return true;
			}
			
			int index = 0;
						
			File segmentFile = new File(new_segment_path, segmentName);
  			
  			if (!segmentFile.exists()){
  				index = selectIndexs(server, segmentName,SEGMENTS_PATH[0]);
  				if (index < 0)
  					return true;
  				else{
  					if (!addSegmentStep2(server, server.get(index), segmentName, mode))
  						return false;
  				}
  			} else {
  				
  				index = selectIndexs(server, segmentName, SEGMENTS_PATH[0]);
  				if (index < 0){
  					long minDocs = server.maxDocs[0];
  	  				for(int i=0; i<server.maxDocs.length; i++){
  	  					if (minDocs >= server.maxDocs[i]){
  	  						minDocs = server.maxDocs[i];
  	  						index = i;
  	  					}
  	  				}
  				}
 
  				//System.out.println("index = " + index);
  				if (!addSegmentStep1(server.get(index), segmentFile, mode))
  					return false;
  				if (!addSegmentStep2(server, server.get(index), segmentName, mode))
  					return false;
  			}
//  			//如果是摘要服务器就不统计doc数量
//  			if (mode.equals(MODE_SUM)) return true;
  			//修改文档数
  			long docs = 0;
  			try{
  				docs = getDocs(server.get(index), mode);
  				if (docs < 0) {
  					LOG.info("Invalid docs number : "+docs);
  					return false;
  				}
  				docs -=  server.maxDocs[index];
  			}catch(Exception e){
  				LOG.info(e.getMessage());
  				return false;
  			}
  			LOG.info("add "+ docs + " docs........");
  			server.maxDocs[index] += docs;  			
  			return true;
		}

  		private boolean mergeSegments(Param param) throws Exception {
  			
  			if (param == null) return true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -