⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedsearch.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
  			//检查要合并段的服务器
  			String oldSegmentsName = new String(((BytesWritable)param.first).get());
  			
  			if (oldSegmentsName == null || oldSegmentsName.length() == 0)
  				return true;
  			
  			String newSegment = new String(((BytesWritable)param.second).get());
  			
  			if (newSegment == null || newSegment.length() == 0)
  				return true;
  			
  			String mode = new String(((BytesWritable)param.third).get());
  			
  			File segmentFile = new File(newSegment);
  			  			
  			boolean result = true;
  			
  			if (mode.equals(MODE_MIX)) {
//  				boolean b = false;
  				//如果是MIX状态就使用常规服务器的地址 因为长桶短桶的Segment是一致的
  				result = merge(oldSegmentsName, segmentFile, servers.get(MODE_REG), client.servers.get(MODE_REG), mode);  				
//  				if (!b) {
//  					LOG.warn("Merge in [RAM] mode failure");
//  				}
//  				result = result && b;
//  				b = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), MODE_REG);
//  				if (!b) {
//  					LOG.warn("Merge in [REG] mode failure");
//  				}
//  				result = result && b;
//  				b = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), MODE_SUM);
//  				
//  				if (!b) {
//  					LOG.warn("Merge in [SUM] mode failure");
//  				}
//  				result = result && b;
  				if (!result) {
  					LOG.warn("Merge in mode["+mode+"] failure");
  				}
  			}
  			else {
  				result = merge(oldSegmentsName, segmentFile, servers.get(mode), client.servers.get(mode), mode);
  				if (!result) {
  					LOG.warn("Merge in mode["+mode+"] failure");
  				}
  			}
  			  			
  			return result; 
  			
  		}

		private boolean merge(String oldSegmentsName, File segmentFile,
				IndexServer proxy_control, IndexServer proxy_client, final String mode)
				throws IOException {
			
			String[] srcDirs = getDirNamesByMode(mode);
			//不是valid mode值直接返回true
			if (srcDirs.length == 0) return true;
			
  			StringTokenizer token = new StringTokenizer(oldSegmentsName,"|");
  			ArrayList<String> oldList = new ArrayList<String>();
  			String oldSegmentsPath = null;
  			
  			while(token.hasMoreTokens()){
  				File oldSegmentFile = new File(token.nextToken());
  				if (oldSegmentsPath == null){
  					oldSegmentsPath = oldSegmentFile.getParent();
  				}
  				oldList.add(oldSegmentFile.getName());
  			}
  			
  			if (oldSegmentsPath == null || oldList.size() == 0)
  				return true;
  			
  			String[] oldSegments = (String[])oldList.toArray(new String[oldList.size()]);
  			oldList.clear();
  			
  			int i_SEGENTS_PATH = -1;
  			for (int i=0; i<SEGMENTS_PATH.length; i++){
  				if (oldSegmentsPath.endsWith(SEGMENTS_PATH[i])){
  					i_SEGENTS_PATH = i;
  					break;
  				}
  			}
  			
  			InetSocketAddress[] indexs = 
  				(InetSocketAddress[])proxy_client.getSegment(oldSegments[0]);  			
  			
  			if (indexs == null){
  				int i = selectIndexs(proxy_control, segmentFile.getName(),SEGMENTS_PATH[i_SEGENTS_PATH+1]);
  				if (i < 0){
  					selectIndexs(proxy_control, oldSegments[0],SEGMENTS_PATH[i_SEGENTS_PATH]);
  				}
  				if (i < 0)
  					return true;
  				indexs = proxy_control.get(i);
  			}
  			/*
  			for (int i=0; i<indexs.length; i++){
  				if (!addresses.getStat(indexs[i])){
  					LOG.warn("TaskQueue: addSegment()" + indexs[i].getAddress().getHostAddress() + 
  							":" + indexs[i].getPort() + " is offline!");
  					return false;
  				}
  			}
  			*/
  			//拷贝合并的新段到索引服务器
  			//Modified by dingzhenbo
  			if (! (segmentFile.exists() && segmentFile.isDirectory())){
  				LOG.warn("Segment ["+segmentFile.getName()+"] not valid. can not add to IndexServer");
  				return false;
  			}
  				
			File doneFile = new File(segmentFile, IndexSegment.IDX_DONE_NAME);
			
			if (!doneFile.exists()) {
				LOG.warn("Segment ["+segmentFile.getName()+"] not done. can not add to IndexServer");
  				return false;								
			}

			List<File> srcFiles = new ArrayList<File>(); 
			for (String dir : srcDirs) {
				File src = new File(segmentFile, dir);
				if (src.exists()) {
					srcFiles.add(src);
				}
			}
			
			if (srcFiles.size() == 0) {
				LOG.warn("Source semengt ["+segmentFile.getCanonicalPath()+"] not valid, can not merge!");
				return false;
			}
			
			for (int i=0; i<indexs.length; i++){
				InetSocketAddress address = indexs[i];
				LOG.info("Copy merged Segment to :"+ address.getAddress().getHostAddress() + 
						":" + address.getPort());
				//** {current today week ...}
				File destFile = new File(address.getAddress().getHostAddress() + "." + 
						address.getPort(), SEGMENTS_PATH[i_SEGENTS_PATH + 1]);

				if (!destFile.exists() || !destFile.isDirectory()){
					LOG.warn("Index server:"+address.getAddress().getHostAddress() + 
							":" + address.getPort() + " mounted path NOT find!");
					return false;
				}
				
				//如果没有新增的段就创建出来 by dingzhenbo
				
				File newDestFile = new File(destFile, segmentFile.getName());
				if (!newDestFile.exists()) {
					newDestFile.mkdir();
				}
				
				for (File src : srcFiles) {
					MovePath.movePath(src,newDestFile,false);
				}
				// done file
				MovePath.movePath(doneFile, newDestFile, false);
			}
			
			MovePath.delete(segmentFile);
  			
  			for (int i=0; i<indexs.length; i++){
  				InetSocketAddress address = indexs[i];
  				proxy_control.setOnlineStat( address, false );
  				//通知前台服务器切换(offline)
  				LOG.info("Inform front server change indexserver status(offline)");
  				for (int j=0; j<frontServer.length; j++){
  					if (!inform(frontServer[j],address,false)){
  						LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" + 
  								address.getPort() + " offline on frontserver:"+ frontServer[j]);
  						return false;
  					}
  				}
  				//addresses.setStat(address,false);
  				String host = address.getAddress().getHostAddress();
  				String port = String.valueOf(address.getPort());
  				//通知下线服务器段合并
  				LOG.info("Inform offline server:"+ host + ":" + port +" merge segments:" + 
  						oldSegmentsName + " to "+ segmentFile.getName());
  				if (!client.mergeSegments(oldSegments, segmentFile.getName(), host, port)){
						LOG.warn("Merge Segments to :" + segmentFile.getName() +" on "+ host + 
								":"+ port + " Error!");
						return false;
  				}
  				//删除旧的段
				for (int j=0; j<oldSegments.length; j++){
					File delSegment = new File(host + "." + port + File.separator + SEGMENTS_PATH[i_SEGENTS_PATH], 
							oldSegments[j]);
					MovePath.delete(delSegment);
				}
				proxy_control.setOnlineStat(address, true);
  				//通知前台服务器切换(online)
  				LOG.info("Inform front server change indexserver status(online)");
  				for (int j=0; j<frontServer.length; j++){
  					if (!inform(frontServer[j],address,true)){
  						LOG.warn("Cant set "+ address.getAddress().getHostAddress() + ":" + 
  								address.getPort() + " offline on frontserver:"+ frontServer[j]);
  						return false;
  					}
  				}
  			}
  			
  			/** 
  			 * IndexServer替换 
  			 * 这里原来是 client.IndexServer
  			 * Modified by DingZhenbo 2007-09-13 
  			 */
  			proxy_client.addSegment(segmentFile.getName(),indexs);
  			for (int i=0; i<oldSegments.length; i++)
  				proxy_client.delSegment(oldSegments[i]);
  			
  			return true;
		}
  		
  		public void run(){
  			while(true){
  				try{
  					Task task = (Task)taskList.peek();
  					if (task == null){
  						sleep(10000);
  						continue;
  					}
  					int taskID = task.getID();
  					byte op = task.getOp();
  					Param param = task.getParam();
  					LOG.info("" + op);
  					LOG.info("" + param);
  					switch(op){
  					case ADDSEGMENT:
  						if (!addSegment(param))
  							LOG.warn("Add Segment :"+ new String(((BytesWritable)param.first).get()) + " in mode ["+new String(((BytesWritable)param.second).get()) +"] Error!");
  						else{
  							TASK_LOG.info(taskID + " " + "success");
  							taskList.poll();
  						}
  						break;
  					case MERGESEGMENTS:
  						if (!mergeSegments(param))
  							LOG.warn("Merge Segments :" + new String(((BytesWritable)param.first).get()) +
  									" to " + new String(((BytesWritable)param.second).get()) + 
  									" in mode["+new String(((BytesWritable)param.third).get())+"] Error!");
  						else{
  							TASK_LOG.info(taskID + " " + "success");
  							taskList.poll();
  						}
  						break;
  					default:
  						LOG.warn("Unknown OP:" + op);
  					}
  					sleep(5000);
  				}catch(Exception e){
  					LOG.warn("Controller TaskQueue thread error!"+ e.toString());
  					e.printStackTrace(System.err);
  					taskList.poll();
  					try{
  						sleep(10000);
  					}catch(Exception e1){
  						LOG.warn("Controller TaskQueue thread error! Must return."+ e.toString());
  						break;
  					}
  				}
  			}  			
  		}
  	}
  	//任务队列
  	private TaskQueue taskQueue;
  	//前台服务器IP数组
  	private String[] frontServer;
  	
  	private String setStatServlet;
  	
  	private String hostParam;
  	private String portParam;
  	private String statParam;
  	private File taskLogFile;

  	
  	public Control(File configfile, int port) throws IOException {
  		super(port, Param.class, 5);
  		
  		java.util.Properties properties = new java.util.Properties();
  		FileInputStream in = new FileInputStream(configfile);
  		properties.load(in);
  		in.close();
  		String frontservers = properties.getProperty("FRONT_SERVERS");
  		if (frontservers == null || frontservers.length() == 0)
  			throw new IOException("Not find FRONT_SERVERS property in file "+ configfile);
  		else{
  			StringTokenizer token = new StringTokenizer(frontservers," ");
  			frontServer = new String[token.countTokens()];
  			for (int i=0; i<frontServer.length; i++)
  				frontServer[i] = token.nextToken();
  		}
  		
  		setStatServlet = properties.getProperty("SET_STAT_SERVLET");
  		if (setStatServlet == null || setStatServlet.length() == 0)
  			throw new IOException("Not find SET_STAT_SERVLET property in file "+ configfile);
  		
  		hostParam = properties.getProperty("HOST_PARAM_NAME");
  		if (hostParam == null || hostParam.length() == 0)
  			throw new IOException("Not find HOST_PARAM_NAME property in file "+ configfile);
  		
  		portParam = properties.getProperty("PORT_PARAM_NAME");
  		if (portParam == null || portParam.length() == 0)
  			throw new IOException("Not find PORT_PARAM_NAME property in file "+ configfile);
  		
  		statParam = properties.getProperty("STAT_PARAM_NAME");
  		if (statParam == null || statParam.length() == 0)
  			throw new IOException("Not find STAT_PARAM_NAME property in file "+ configfile);
  		
  		String idxSrvFileName = properties.getProperty("INDEX_SERVER_FILE");
  		String sumSrvFileName = properties.getProperty("SUMMARY_SERVER_FILE");
  		
  		//check index server config , 二者不允许同时都是null
  		if (StringUtils.isEmpty(idxSrvFileName)) {
  			throw new RuntimeException("NO INDEX_SERVER_FILE property in file "+ configfile);
  		}
  		
  		new_segment_path = properties.getProperty("NEW_SEGMENT_PATH");
  		if (new_segment_path == null || new_segment_path.length() == 0)
  			throw new IOException("Not find NEW_SEGMENT_PATH property in file" + configfile);
  		
  		merge_temp_path = properties.getProperty("MERGE_TEMP_PATH");
  		if (merge_temp_path == null || merge_temp_path.length() == 0)
  			throw new IOException("Not find MERGE_TEMP_PATH property in file" + configfile);
  		/***************
  		String max_segments_str = properties.getProperty("MAX_SEGMENTS");
  		if (max_segments_str == null || max_segments_str.length() == 0)
  			throw new IOException("Not find MAX_SEGMENTS property in file" + configfile);
  		max_segments = Integer.parseInt(max_segments_str);
  		****************/
  		String taskLog = properties.getProperty("TASK_LOG");
  		if (taskLog == null || taskLog.length() == 0)
  			throw new IOException("Not find TASK_LOG property in file" + configfile);
  		
  		taskLogFile = new File(taskLog);
  		if (!taskLogFile.exists() || taskLogFile.isDirectory() || !taskLogFile.canRead() || !taskLogFile.canWrite())
  			throw new IOException("Task LOG File :" + taskLogFile.getAbsolutePath() + " Error!");
  		
  		//** Added by DingZhenbo 2007-09
  		File idxSrvFile = StringUtils.isEmpty(idxSrvFileName) ? null : new File(idxSrvFileName);
  		File sumSrvFile = StringUtils.isEmpty(sumSrvFileName) ? null : new File(sumSrvFileName);
  		//** 短桶限制数量
  		String limit = properties.getProperty("INDEX_LIMIT_NUM");
  		if (StringUtils.isEmpty(limit)) {
  			limit = "1"; // the default value
  		}
  		
  		client = new Client(idxSrvFile, idxSrvFile, sumSrvFile);
  		//add by xie shuqiang. 2006.09.27
  		client.setTimeout(600000);
  		client.setLimitNum(Integer.valueOf(limit));
  		
  		
  		//获取摘要服务器段
  		IndexServer server = null;
  		
//  		server = client.servers.get(MODE_SUM);  		
//  		if (server != null) {
//  			initSegments(server);
//  			servers.put(MODE_REG, server);
//  		}
  		// Load regular index server
  		server = client.servers.get(MODE_REG);
  		if ( server != null) {
  			initSegments(server);
  			initDocs(MODE_REG, server);
  			servers.put(MODE_REG, server);
  		}
  		// Load optimize options when we using optimizing
  		server = client.servers.get(MODE_RAM);
  		if (server != null) {
	  		initSegments(server);
	  		initDocs(MODE_RAM, server);
  			servers.put(MODE_RAM, server);
  		}
  		
  		for (String key : servers.keySet()) {
  			server = servers.get(key);
  			System.out.println("["+key + "] -> "+servers.get(key).getLength());
	  		for (long docs : server.maxDocs) {
	  			System.out.println("******* ["+key+" ] MaxDocs: "+ docs);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -