⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 remoteexperiment.java

📁 一个数据挖掘软件ALPHAMINERR的整个过程的JAVA版源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   * @param message the message.
   */
  private synchronized void notifyListeners(boolean status, 
					    boolean log, 
					    boolean finished,
					    String message) {
    if (m_listeners.size() > 0) {
      for (int i=0;i<m_listeners.size();i++) {
	RemoteExperimentListener r = 
	  (RemoteExperimentListener)(m_listeners.elementAt(i));
	r.remoteExperimentStatus(new RemoteExperimentEvent(status,
							   log,
							   finished,
							   message));
      }
    } else {
      System.err.println(message);
    }
  }

  /**
   * Set the abort flag
   */
  public void abortExperiment() {
    m_experimentAborted = true;
  }

  /**
   * Increment the number of successfully completed sub experiments
   */
  protected synchronized void incrementFinished() {
    m_finishedCount++;
  }

  /**
   * Increment the overall number of failures and the number of failures for
   * a particular host
   * @param hostNum the index of the host to increment failure count
   */
  protected synchronized void incrementFailed(int hostNum) {
    m_failedCount++;
    m_remoteHostFailureCounts[hostNum]++;
  }

  /**
   * Push an experiment back on the queue of waiting experiments
   * @param expNum the index of the experiment to push onto the queue
   */
  protected synchronized void waitingExperiment(int expNum) {
    m_subExpQueue.push(new Integer(expNum));
  }

  /**
   * Check to see if we have failed to connect to all hosts
   */
  private boolean checkForAllFailedHosts() {
    boolean allbad = true;
    for (int i = 0; i < m_remoteHostsStatus.length; i++) {
      if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {
	allbad = false;
	break;
      }
    }
    if (allbad) {
      abortExperiment();
      notifyListeners(false,true,true,"Experiment aborted! All connections "
		      +"to remote hosts failed.");
    }
    return allbad;
  }

  /**
   * Returns some post experiment information.
   * @return a String containing some post experiment info
   */
  private String postExperimentInfo() {
    StringBuffer text = new StringBuffer();
    text.append(m_finishedCount+(m_splitByDataSet 
				 ? " data sets" 
				 : " runs") + " completed successfully. "
		+m_failedCount+" failures during running.\n");
    System.err.print(text.toString());
    return text.toString();
  }

  /**
   * Pushes a host back onto the queue of available hosts and attempts to
   * launch a waiting experiment (if any).
   * @param hostNum the index of the host to push back onto the queue of
   * available hosts
   */
  protected synchronized void availableHost(int hostNum) {
    if (hostNum >= 0) { 
      if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {
	m_remoteHostsQueue.push(new Integer(hostNum));
      } else {
	notifyListeners(false,true,false,"Max failures exceeded for host "
			+((String)m_remoteHosts.elementAt(hostNum))
			+". Removed from host list.");
	m_removedHosts++;
      }
    }

    // check for all sub exp complete or all hosts failed or failed count
    // exceeded
    if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {
      abortExperiment();
      notifyListeners(false,true,true,"Experiment aborted! Max failures "
		      +"exceeded on all remote hosts.");
      return;
    }

    if ((getSplitByDataSet() && 
	 (m_baseExperiment.getDatasets().size() == m_finishedCount)) ||
	(!getSplitByDataSet() && 
	 ((getRunUpper() - getRunLower() + 1) == m_finishedCount))) {
      notifyListeners(false,true,false,"Experiment completed successfully.");
      notifyListeners(false,true,true,postExperimentInfo());
      return;
    }
    
    if (checkForAllFailedHosts()) {
      return;
    }

    if (m_experimentAborted && 
	(m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) {
      notifyListeners(false,true,true,"Experiment aborted. All remote tasks "
		      +"finished.");
    }
        
    if (!m_subExpQueue.empty() && !m_experimentAborted) {
      if (!m_remoteHostsQueue.empty()) {
	int availHost, waitingExp;
	try {
	  availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();
	  waitingExp = ((Integer)m_subExpQueue.pop()).intValue();
	  launchNext(waitingExp, availHost);
	} catch (Exception ex) {
	  ex.printStackTrace();
	}
      }
    }    
  }

  /**
   * Launch a sub experiment on a remote host
   * @param wexp the index of the sub experiment to launch
   * @param ah the index of the available host to launch on
   */
  public void launchNext(final int wexp, final int ah) {
    
    Thread subExpThread;
    subExpThread = new Thread() {
	public void run() {	      
	  m_remoteHostsStatus[ah] = IN_USE;
	  m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING;
	  RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask();
	  expSubTsk.setExperiment(m_subExperiments[wexp]);
	  String subTaskType = (getSplitByDataSet())
	    ? "dataset :" + ((File)m_subExperiments[wexp].getDatasets().
			     elementAt(0)).getName()
	    : "run :" + m_subExperiments[wexp].getRunLower();
	  try {
	    String name = "//"
	      +((String)m_remoteHosts.elementAt(ah))
	      +"/RemoteEngine";
	    Compute comp = (Compute) Naming.lookup(name);
	    // assess the status of the sub-exp
	    notifyListeners(false,true,false,"Starting "
			    +subTaskType
			    +" on host "
			    +((String)m_remoteHosts.elementAt(ah)));
	    Object subTaskId = comp.executeTask(expSubTsk);
	    boolean finished = false;
	    TaskStatusInfo is = null;
	    while (!finished) {
	      try {
		Thread.sleep(2000);
		
		TaskStatusInfo cs = (TaskStatusInfo)comp.
		  checkStatus(subTaskId);
		if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) {
		  // push host back onto queue and try launching any waiting 
		  // sub-experiments
		  notifyListeners(false, true, false,  cs.getStatusMessage());
		  m_remoteHostsStatus[ah] = AVAILABLE;
		  incrementFinished();
		  availableHost(ah);
		  finished = true;
		} else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) {
		  // a non connection related error---possibly host doesn't have
		  // access to data sets or security policy is not set up
		  // correctly or classifier(s) failed for some reason
		  notifyListeners(false, true, false,  cs.getStatusMessage());
		  m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE;
		  m_subExpComplete[wexp] = TaskStatusInfo.FAILED;
		  notifyListeners(false,true,false,subTaskType
				  +" "+cs.getStatusMessage()
				  +". Scheduling for execution on another host.");
		  incrementFailed(ah);
		  // push experiment back onto queue
		  waitingExperiment(wexp);	
		  // push host back onto queue and try launching any waiting 
		  // sub-experiments. Host is pushed back on the queue as the
		  // failure may be temporary---eg. with InstantDB using the
		  // RMI bridge, two or more threads may try to create the
		  // experiment index or results table simultaneously; all but
		  // one will throw an exception. These hosts are still usable
		  // however.
		  availableHost(ah);
		  finished = true;
		} else {
		  if (is == null) {
		    is = cs;
		    notifyListeners(false, true, false, cs.getStatusMessage());
		  } else {
		    if (cs.getStatusMessage().
			compareTo(is.getStatusMessage()) != 0) {
		     
		      notifyListeners(false, true, false,  
				      cs.getStatusMessage());
		    }
		    is = cs;
		  }  
		}
	      } catch (InterruptedException ie) {
	      }
	    }	      

	  } catch (Exception ce) {
	    m_remoteHostsStatus[ah] = CONNECTION_FAILED;
	    m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN;
	    System.err.println(ce);
	    ce.printStackTrace();
	    notifyListeners(false,true,false,"Connection to "
			    +((String)m_remoteHosts.elementAt(ah))
			    +" failed. Scheduling "
			    +subTaskType
			    +" for execution on another host.");
	    checkForAllFailedHosts();
	    waitingExperiment(wexp);
	  } finally {
	    if (isInterrupted()) {
	      System.err.println("Sub exp Interupted!");
	    }
	  }
	}	   
      };
    subExpThread.setPriority(Thread.MIN_PRIORITY);
    subExpThread.start();
  }

  /**
   * Overides the one in Experiment
   * @exception Exception
   */
  public void nextIteration() throws Exception {

  }

  /** 
   * overides the one in Experiment
   */
  public void advanceCounters() {

  }

  /** 
   * overides the one in Experiment
   */
  public void postProcess() {
   
  }

  /**
   * Add a host name to the list of remote hosts
   * @param hostname the host name to add to the list
   */
  public void addRemoteHost(String hostname) {
    m_remoteHosts.addElement(hostname);
  }

  /**
   * Get the list of remote host names
   * @return the list of remote host names
   */
  public DefaultListModel getRemoteHosts() {
    return m_remoteHosts;
  }

  /**
   * Overides toString in Experiment
   * @return a description of this remote experiment
   */
  public String toString() {
    String result = m_baseExperiment.toString();

    result += "\nRemote Hosts:\n";
    for (int i=0;i<m_remoteHosts.size();i++) {
      result += ((String)m_remoteHosts.elementAt(i)) +'\n';
    }
    return result;
  }

  /**
   * Overides runExperiment in Experiment
   */
  public void runExperiment() {
    int totalHosts = m_remoteHostsQueue.size();
    // Try to launch sub experiments on all available hosts
    for (int i = 0; i < totalHosts; i++) {
      availableHost(-1);
    }
  }

  /**
   * Configures/Runs the Experiment from the command line.
   *
   * @param args command line arguments to the Experiment.
   */
  public static void main(String[] args) {

    try {
      RemoteExperiment exp = null;
      Experiment base = null;
      String expFile = Utils.getOption('l', args);
      String saveFile = Utils.getOption('s', args);
      boolean runExp = Utils.getFlag('r', args);
      FastVector remoteHosts = new FastVector();
      String runHost = " ";
      while (runHost.length() != 0) {
	runHost = Utils.getOption('h', args);
	if (runHost.length() != 0) {
	  remoteHosts.addElement(runHost);
	}
      }
      if (expFile.length() == 0) {
	base = new Experiment();
	try {
	  base.setOptions(args);
	  Utils.checkForRemainingOptions(args);
	} catch (Exception ex) {
	  ex.printStackTrace();
	  String result = "Usage:\n\n"
	    + "-l <exp file>\n"
	    + "\tLoad experiment from file (default use cli options)\n"
	    + "-s <exp file>\n"
	    + "\tSave experiment to file after setting other options\n"
	    + "\t(default don't save)\n"
	    + "-h <remote host name>\n"
	    +"\tHost to run experiment on (may be specified more than once\n"
	    +"\tfor multiple remote hosts)\n"
	    + "-r \n"
	    + "\tRun experiment on (default don't run)\n\n";
	  Enumeration em = ((OptionHandler)base).listOptions();
	  while (em.hasMoreElements()) {
	    Option option = (Option) em.nextElement();
	    result += option.synopsis() + "\n";
	    result += option.description() + "\n";
	  }
	  throw new Exception(result + "\n" + ex.getMessage());
	}
      } else {
	FileInputStream fi = new FileInputStream(expFile);
	ObjectInputStream oi = new ObjectInputStream(
			       new BufferedInputStream(fi));
	Object tmp = oi.readObject();
	if (tmp instanceof RemoteExperiment) {
	  exp = (RemoteExperiment)tmp;
	} else {
	  base = (Experiment)tmp;
	}
	oi.close();
      }
      if (base != null) {
	exp = new RemoteExperiment(base);
      }
      for (int i=0;i<remoteHosts.size();i++) {
	exp.addRemoteHost((String)remoteHosts.elementAt(i));
      }
      System.err.println("Experiment:\n" + exp.toString());

      if (saveFile.length() != 0) {
	FileOutputStream fo = new FileOutputStream(saveFile);
	ObjectOutputStream oo = new ObjectOutputStream(
				new BufferedOutputStream(fo));
	oo.writeObject(exp);
	oo.close();
      }
      
      if (runExp) {
	System.err.println("Initializing...");
	exp.initialize();
	System.err.println("Iterating...");
	exp.runExperiment();
	System.err.println("Postprocessing...");
	exp.postProcess();
      }      
    } catch (Exception ex) {
      ex.printStackTrace();
      System.err.println(ex.getMessage());
    }
  }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -