⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distributedsearch.java

📁 nutch搜索的改进型工具和优化爬虫的相关工具
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        }
        buffer.append(")");
        LOG.info(buffer.toString());
      }
  }
  /** The search server. */
  public static class Server extends net.nutch.ipc.Server {
	  
    private SearchBean bean;
 
    /** Construct a search server on the index and segments in the named
     * directory, listening on the named port. */
    public Server(String directory, int port) throws IOException {
      super(port, Param.class, 15);
      this.bean = new SearchBean(directory);
    }
    
    public Server(String[] directory, int port) throws IOException {
        super(port, Param.class, 15);
        this.bean = new SearchBean(directory);
     }

    public Writable call(Writable param) throws IOException {
      Param p = (Param)param;
      logRequest(p);
      
      Writable value;
      String mode;
      
      switch (p.op) {
      case SEGMENTS:
    	  LOG.info(">>> Segments request");
        value = new ArrayWritable(bean.getSegmentNames());
        break;
      case SEARCH:
    	  /******** For Test time out only *********
    	  try{
        	  Thread.sleep(5*1000);
          }catch(Exception e){};
          ******************************************/
        value = bean.search((Query)p.first, ((IntWritable)p.second).get(),((IntWritable)p.third).get(),
        			((LongWritable)p.forth).get(),((LongWritable)p.fifth).get(), new String(((BytesWritable)p.sixth).get()));
        LOG.info(">>> Hits length:" +((Hits)value).getLength());                
        break;
      case EXPLAIN:
        value = new UTF8(bean.getExplanation((Query)p.first, (Hit)p.second));
        break;
      case DETAILS:
        value = bean.getDetails((Hit)p.first);
        break;
      case SUMMARY:
        value = new BytesWritable(bean.getSummary((HitDetails)p.first,(Query)p.second, ((BooleanWritable)p.third).get()).getBytes());
        break;
      case SUMMARY_NEW:
          value = new BytesWritable(bean.getSummaryNew((HitDetails)p.first, (Query)p.second, ((IntWritable)p.third).get()).getBytes());
          break;
      case CONTENT:
        value = new BytesWritable(bean.getContent((HitDetails)p.first));
        break;
      case ANCHORS:
        value = new ArrayWritable(bean.getAnchors((HitDetails)p.first));
        break;
      case PARSEDATA:
        value = bean.getParseData((HitDetails)p.first);
        break;
      case PARSETEXT:
        value = bean.getParseText((HitDetails)p.first);
        break;
      case FETCHDATE:
        value = new LongWritable(bean.getFetchDate((HitDetails)p.first));
        break;
      case ADDSEGMENT:
      	String segmentName = new String(((BytesWritable)p.first).get());
      	mode = new String(((BytesWritable)p.second).get());
      	LOG.info("***********Add Segment:"+segmentName+"|"+mode);
      	value = new BooleanWritable(bean.addSegment(segmentName, mode));
      	LOG.info("***********Add Segment End["+((BooleanWritable)value).get());
      	break;
      case MAXDOCS:
    	mode = new String(((BytesWritable)p.first).get());
      	value = new LongWritable(bean.getMaxDocs(mode));
      	break;
      case DELSEGMENTS:
      	String segments = new String(((BytesWritable)p.first).get());
      	mode = new String(((BytesWritable)p.second).get());
      	System.out.println("***************del segments:"+segments+" mode:"+mode);
      	StringTokenizer st = new StringTokenizer(segments,"|");
      	String[] segmentNames= new String[st.countTokens()];
      	for( int i=0; i<segmentNames.length; i++)
      		segmentNames[i] = st.nextToken();
      	
      	value = new BooleanWritable(bean.delSegments(segmentNames, mode));
      	break;
      case MERGESEGMENTS:
      	String oldSegs = new String(((BytesWritable)p.first).get());
      	LOG.info("");
      	System.out.println("***************merge segments:"+oldSegs);
      	StringTokenizer token = new StringTokenizer(oldSegs,"|");
      	String[] old = new String[token.countTokens()];
      	for( int i=0; i<old.length; i++)
      		old[i] = token.nextToken();
      	String newSeg = new String(((BytesWritable)p.second).get());   	
      	mode = new String(((BytesWritable)p.third).get());
      	value = new BooleanWritable(bean.mergeSegments(old,newSeg, mode));
      	break;
      case DELDOC:
      	int docID = ((Hit)p.first).getIndexDocNo();
      	System.out.println("***********Delete Doc:"+docID);
      	value = new BooleanWritable(bean.delDoc(docID));
      	break;
      default:
        throw new RuntimeException("Unknown op code: " + p.op);
      }
      return new Result(p.op, value);
    }

    private static void logRequest(Param p) {
      StringBuffer buffer = new StringBuffer();
      buffer.append(Thread.currentThread().getName());
      buffer.append(": ");
      buffer.append(NAMES[p.op]);
      buffer.append("(");
      if (p.first != NullWritable.get()) {
        buffer.append(p.first);
        if (p.second != NullWritable.get()) {
          buffer.append(", ");
          buffer.append(p.second);
        }
      }
      buffer.append(")");
      LOG.info(buffer.toString());
    }

    /** Runs a search server. */
    public static void main(String[] args) throws Exception {
    	
      String usage = "DistributedSearch$Server <port> <index dir>";

      /** Modified by DingZhenbo */
      
      if (args.length < 2 ) {
        System.err.println(usage);
        System.exit(-1);
      }
      
      int port = 7000;
      String[] servers = new String[0];
      
      if (StringUtils.isNumeric(args[0])) {
    	  port = Integer.parseInt(args[0]);
    	  servers = args[1].split(" ");
      } else {
          System.err.println(usage);
          System.exit(-1);
      }
      
      Server server = new Server(servers, port);
      //server.setTimeout(Integer.MAX_VALUE);
      server.start();
      server.join();
    }
  }

  /** The search client. */
  public static class Client extends net.nutch.ipc.Client implements Searcher, HitDetailer, HitSummarizer, HitContent {
	
	private static Logger TIME_OUT_LOG = Logger.getLogger("timeout");
	
  	private InetSocketAddress controller;
  	
  	private Map<String, IndexServer> servers = new HashMap<String, IndexServer>();
  	
  	private int limitNum = 1;

    /**
     * 控制器构造
     *  
     * Construct a client talking to servers listed in the named file.
     * Each line in the file lists a server hostname and port, separated by
     * whitespace.
     * @param regIndexFile 长筒服务器配置文件
     * @param ramIndexFile 短桶服务器配置文件 
     *     	
     */
    public Client(File regIndexFile, File ramIndexFile, File summayFile) throws IOException {
    	super(Result.class);
    	
    	IndexServer server = null;
    	
    	if (regIndexFile != null && regIndexFile.exists()) {
    		LOG.info("init REG index server");
    		server = new IndexServer();
    		initIndexServer(regIndexFile, server);
    		server.setModeFlag(MODE_REG);
    		if (server.getLength() > 0) {
    			servers.put(MODE_REG, server);
    		}
    	}
    	
    	if (ramIndexFile != null && ramIndexFile.exists()) {
    		LOG.info("init RAM index server");
    		server = new IndexServer();
    		initIndexServer(ramIndexFile, server);
    		server.setModeFlag(MODE_RAM);
    		if (server.getLength() > 0) {
    			servers.put(MODE_RAM, server);
    		}
    	}
//    	
//    	if (summayFile != null && summayFile.exists()) {
//    		LOG.info("init SUMMARY index server");
//    		server = new IndexServer();
//    		initIndexServer(summayFile, server);
//    		server.setModeFlag(MODE_SUM);
//    		if (server.getLength() > 0) {
//    			servers.put(MODE_SUM, server);
//    		}
//    	}    	
    }
	/**
	 * For 搜索客户端 Only
	 * 
	 * Modified by DingZhenbo 2007-09-17
	 */
    public Client(File searchConfigFile) throws IOException {
    	super(Result.class);
    	init(readConfig(searchConfigFile)); //搜索客户端
    }
    
    /**
     * for index creater only
     * @param address
     * @throws IOException
     */
    public Client(InetSocketAddress address) throws IOException{
    	super(Result.class);
    	init(address);
    }
    
    public void init(InetSocketAddress address ) {
    	this.controller = address;
    }
    
//    /** 
//     * for control center only   
//     */
//      
//    public void init(File idxFileReg, File idxFileOpt) throws IOException {
//    	
//    	BufferedReader reader; 
//    	
//    	reader = new BufferedReader(new FileReader(idxFileReg));    	
//    	try {
//    		initIndexServer(reader, idxServerReg);
//    	} finally {
//    		reader.close();
//    	}
//    	
//    	// 设置优化搜索模式 DingZhenbo 2007-09-17
//    	reader = new BufferedReader(new FileReader(idxFileOpt));
//    	try {
//    		initIndexServer(reader, idxServerRam);
//    	} finally {
//    		reader.close();
//    	}
//    	//buildSegToAddrMap();
//    }
    
    /**

     * Initialize index server
     * 
     * @author DingZhenbo 2007-09-12
     * @param reader
     * @throws IOException
     */
	private void initIndexServer(File file, IndexServer server) throws IOException {
		
		BufferedReader reader = new BufferedReader(new FileReader(file));
		
		String line;
    	while ((line = reader.readLine()) != null) {
    		if (line.startsWith("#")) continue;
    		StringTokenizer tokens = new StringTokenizer(line," ");
    		InetSocketAddress[] servers = new InetSocketAddress[tokens.countTokens()];
    		int i = 0;
    		while (tokens.hasMoreTokens()) {
    			String serverAddress = tokens.nextToken();
    			StringTokenizer st = new StringTokenizer(serverAddress,":");
    			if (st.hasMoreTokens()){
    				String host = st.nextToken();
    				if (st.hasMoreTokens()) {
    					String port = st.nextToken();
    					servers[i++] = new InetSocketAddress(host, Integer.parseInt(port));
    					LOG.info("Client adding server "  + host + ":" + port);
    				}
    			}
    		}
    		server.add(servers);
    	}
    	
    	reader.close();
    	
    	for (int i=0; i<server.getLength(); i++){
    		System.out.println("************" + i);
    		InetSocketAddress[] indexs = server.get(i);
    		for (int j=0; j<indexs.length; j++)
    			System.out.println(indexs[j].toString() + ":"+ server.getOnlineStat(indexs[j]));
    	}
	}
	
	private void initIndexServer(IndexServer indexServer, final String mode) throws IOException {
		
		assert mode != null;
		LOG.info(">>> initIndexServer, mode :"+mode);
		Param param = new Param(INDEXS, new BytesWritable(mode.getBytes())); 
		Result result = (Result)call(param, controller);
		if (result == null) {
			LOG.warn("Client: no index server from: " + controller);
			return;
		}
		String[] servers = ((ArrayWritable)result.value).toStrings();
		
		for (int i=0; i<servers.length; i++){
			LOG.info("Client: Index server "+ servers[i]);
			StringTokenizer st = new StringTokenizer(servers[i]," ");
			InetSocketAddress[] indexs = new InetSocketAddress[st.countTokens()];
			int k = 0;
			while (st.hasMoreTokens()){
				StringTokenizer st2 = new StringTokenizer(st.nextToken(),":");
				String host = st2.nextToken();
				int port = Integer.parseInt(st2.nextToken());
				boolean onLine = Boolean.valueOf(st2.nextToken()).booleanValue();
				LOG.info("on line?"+onLine);
				InetSocketAddress address = new InetSocketAddress(host,port);
				indexs[k++] = address;
			}
			indexServer.add(indexs);
		}
		
		indexServer.setModeFlag(mode);
	}
	
	/**
	 * 读取控制器地址
	 * @param config
	 * @return
	 * @throws IOException
	 */
    private static InetSocketAddress[] readConfig(File config) throws IOException {
      BufferedReader reader = new BufferedReader(new FileReader(config));
      ArrayList addrs = new ArrayList();
      String line;
      while ((line = reader.readLine()) != null) {
    	  if (line.startsWith("#")) continue;
        StringTokenizer tokens = new StringTokenizer(line);
        if (tokens.hasMoreTokens()) {
          String host = tokens.nextToken();
          if (tokens.hasMoreTokens()) {
            String port = tokens.nextToken();
            addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
            LOG.info("Client adding server "  + host + ":" + port);
          }
        }
      }
      return (InetSocketAddress[])
        addrs.toArray(new InetSocketAddress[addrs.size()]);
    }

    /** Construct a client talking to the named servers. */
    public void init(InetSocketAddress[] addresses) throws IOException {
    	// 首先设置controller 在initIndexServer中会使用到的
    	this.controller = addresses[0];
    	// Modify by xie shuqiang. 2005-9-20
    	// 查询控制器得到所有Index Server
    	// Modified by DingZhenbo. 2007-09-18
    	IndexServer server = null;
    	server = new IndexServer();
    	initIndexServer(server, MODE_REG);
    	if (server.getLength() > 0) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -