⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 remoteexperiment.java

📁 wekaUT是 university texas austin 开发的基于weka的半指导学习(semi supervised learning)的分类器
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   */  private synchronized void notifyListeners(boolean status, 					    boolean log, 					    boolean finished,					    String message) {    if (m_listeners.size() > 0) {      for (int i=0;i<m_listeners.size();i++) {	RemoteExperimentListener r = 	  (RemoteExperimentListener)(m_listeners.elementAt(i));	r.remoteExperimentStatus(new RemoteExperimentEvent(status,							   log,							   finished,							   message));      }    } else {      System.err.println(message);    }  }  /**   * Set the abort flag   */  public void abortExperiment() {    m_experimentAborted = true;  }  /**   * Increment the number of successfully completed sub experiments   */  protected synchronized void incrementFinished() {    m_finishedCount++;  }  /**   * Increment the overall number of failures and the number of failures for   * a particular host   * @param hostNum the index of the host to increment failure count   */  protected synchronized void incrementFailed(int hostNum) {    m_failedCount++;    m_remoteHostFailureCounts[hostNum]++;  }  /**   * Push an experiment back on the queue of waiting experiments   * @param expNum the index of the experiment to push onto the queue   */  protected synchronized void waitingExperiment(int expNum) {    m_subExpQueue.push(new Integer(expNum));  }  /**   * Check to see if we have failed to connect to all hosts   */  private boolean checkForAllFailedHosts() {    boolean allbad = true;    for (int i = 0; i < m_remoteHostsStatus.length; i++) {      if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {	allbad = false;	break;      }    }    if (allbad) {      abortExperiment();      notifyListeners(false,true,true,"Experiment aborted! All connections "		      +"to remote hosts failed.");    }    return allbad;  }  /**   * Returns some post experiment information.   * @return a String containing some post experiment info   */  private String postExperimentInfo() {    StringBuffer text = new StringBuffer();    text.append(m_finishedCount+(m_splitByDataSet 				 ? " data sets" 				 : " runs") + " completed successfully. "		+m_failedCount+" failures during running.\n");    System.err.print(text.toString());    return text.toString();  }  /**   * Pushes a host back onto the queue of available hosts and attempts to   * launch a waiting experiment (if any).   * @param hostNum the index of the host to push back onto the queue of   * available hosts   */  protected synchronized void availableHost(int hostNum) {    if (hostNum >= 0) {       if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {	m_remoteHostsQueue.push(new Integer(hostNum));      } else {	notifyListeners(false,true,false,"Max failures exceeded for host "			+((String)m_remoteHosts.elementAt(hostNum))			+". Removed from host list.");	m_removedHosts++;      }    }    // check for all sub exp complete or all hosts failed or failed count    // exceeded    if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {      abortExperiment();      notifyListeners(false,true,true,"Experiment aborted! Max failures "		      +"exceeded on all remote hosts.");      return;    }    if ((getSplitByDataSet() && 	 (m_baseExperiment.getDatasets().size() == m_finishedCount)) ||	(!getSplitByDataSet() && 	 ((getRunUpper() - getRunLower() + 1) == m_finishedCount))) {      notifyListeners(false,true,false,"Experiment completed successfully.");      notifyListeners(false,true,true,postExperimentInfo());      return;    }        if (checkForAllFailedHosts()) {      return;    }    if (m_experimentAborted && 	(m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) {      notifyListeners(false,true,true,"Experiment aborted. All remote tasks "		      +"finished.");    }            if (!m_subExpQueue.empty() && !m_experimentAborted) {      if (!m_remoteHostsQueue.empty()) {	int availHost, waitingExp;	try {	  availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();	  waitingExp = ((Integer)m_subExpQueue.pop()).intValue();	  launchNext(waitingExp, availHost);	} catch (Exception ex) {	  ex.printStackTrace();	}      }    }      }  /**   * Launch a sub experiment on a remote host   * @param wexp the index of the sub experiment to launch   * @param ah the index of the available host to launch on   */  public void launchNext(final int wexp, final int ah) {        Thread subExpThread;    subExpThread = new Thread() {	public void run() {	      	  m_remoteHostsStatus[ah] = IN_USE;	  m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING;	  RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask();	  expSubTsk.setExperiment(m_subExperiments[wexp]);	  String subTaskType = (getSplitByDataSet())	    ? "dataset :" + ((File)m_subExperiments[wexp].getDatasets().			     elementAt(0)).getName()	    : "run :" + m_subExperiments[wexp].getRunLower();	  try {	      /**  BEGIN EDIT - Melville **/	      String hostname = ((String)m_remoteHosts.elementAt(ah));	      String name=null;	      int splitIndex = hostname.indexOf(":");	      if(splitIndex>-1){		  name = "//"		      +hostname.substring(0,splitIndex)		      +"/RemoteEngine"+hostname.substring(splitIndex+1);	      }	      else {		  name = "//"			+((String)m_remoteHosts.elementAt(ah))			+"/RemoteEngine";	      }	      /**  END EDIT - Melville **/	      Compute comp = (Compute) Naming.lookup(name);	    // assess the status of the sub-exp	    notifyListeners(false,true,false,"Starting "			    +subTaskType			    +" on host "			    +((String)m_remoteHosts.elementAt(ah)));	    Object subTaskId = comp.executeTask(expSubTsk);	    boolean finished = false;	    TaskStatusInfo is = null;	    while (!finished) {	      try {		Thread.sleep(2000);				TaskStatusInfo cs = (TaskStatusInfo)comp.		  checkStatus(subTaskId);		if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) {		  // push host back onto queue and try launching any waiting 		  // sub-experiments		  notifyListeners(false, true, false,  cs.getStatusMessage());		  m_remoteHostsStatus[ah] = AVAILABLE;		  incrementFinished();		  availableHost(ah);		  finished = true;		} else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) {		  // a non connection related error---possibly host doesn't have		  // access to data sets or security policy is not set up		  // correctly or classifier(s) failed for some reason		  notifyListeners(false, true, false,  cs.getStatusMessage());		  m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE;		  m_subExpComplete[wexp] = TaskStatusInfo.FAILED;		  notifyListeners(false,true,false,subTaskType				  +" "+cs.getStatusMessage()				  +". Scheduling for execution on another host.");		  incrementFailed(ah);		  // push experiment back onto queue		  waitingExperiment(wexp);			  // push host back onto queue and try launching any waiting 		  // sub-experiments. Host is pushed back on the queue as the		  // failure may be temporary---eg. with InstantDB using the		  // RMI bridge, two or more threads may try to create the		  // experiment index or results table simultaneously; all but		  // one will throw an exception. These hosts are still usable		  // however.		  availableHost(ah);		  finished = true;		} else {		  if (is == null) {		    is = cs;		    notifyListeners(false, true, false, cs.getStatusMessage());		  } else {		    if (cs.getStatusMessage().			compareTo(is.getStatusMessage()) != 0) {		     		      notifyListeners(false, true, false,  				      cs.getStatusMessage());		    }		    is = cs;		  }  		}	      } catch (InterruptedException ie) {	      }	    }	      	  } catch (Exception ce) {	    m_remoteHostsStatus[ah] = CONNECTION_FAILED;	    m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN;	    System.err.println(ce);	    ce.printStackTrace();	    notifyListeners(false,true,false,"Connection to "			    +((String)m_remoteHosts.elementAt(ah))			    +" failed. Scheduling "			    +subTaskType			    +" for execution on another host.");	    checkForAllFailedHosts();	    waitingExperiment(wexp);	  } finally {	    if (isInterrupted()) {	      System.err.println("Sub exp Interupted!");	    }	  }	}	         };    subExpThread.setPriority(Thread.MIN_PRIORITY);    subExpThread.start();  }  /**   * Overides the one in Experiment   * @exception Exception   */  public void nextIteration() throws Exception {  }  /**    * overides the one in Experiment   */  public void advanceCounters() {  }  /**    * overides the one in Experiment   */  public void postProcess() {     }  /**   * Add a host name to the list of remote hosts   * @param hostname the host name to add to the list   */  public void addRemoteHost(String hostname) {    m_remoteHosts.addElement(hostname);  }  /**   * Get the list of remote host names   * @return the list of remote host names   */  public DefaultListModel getRemoteHosts() {    return m_remoteHosts;  }  /**   * Overides toString in Experiment   * @return a description of this remote experiment   */  public String toString() {    String result = m_baseExperiment.toString();    result += "\nRemote Hosts:\n";    for (int i=0;i<m_remoteHosts.size();i++) {      result += ((String)m_remoteHosts.elementAt(i)) +'\n';    }    return result;  }  /**   * Overides runExperiment in Experiment   */  public void runExperiment() {    int totalHosts = m_remoteHostsQueue.size();    // Try to launch sub experiments on all available hosts    for (int i = 0; i < totalHosts; i++) {      availableHost(-1);    }  }  /**   * Configures/Runs the Experiment from the command line.   *   * @param args command line arguments to the Experiment.   */  public static void main(String[] args) {    try {      RemoteExperiment exp = null;      Experiment base = null;      String expFile = Utils.getOption('l', args);      String saveFile = Utils.getOption('s', args);      boolean runExp = Utils.getFlag('r', args);      FastVector remoteHosts = new FastVector();      String runHost = " ";      while (runHost.length() != 0) {	runHost = Utils.getOption('h', args);	if (runHost.length() != 0) {	  remoteHosts.addElement(runHost);	}      }      if (expFile.length() == 0) {	base = new Experiment();	try {	  base.setOptions(args);	  Utils.checkForRemainingOptions(args);	} catch (Exception ex) {	  ex.printStackTrace();	  String result = "Usage:\n\n"	    + "-l <exp file>\n"	    + "\tLoad experiment from file (default use cli options)\n"	    + "-s <exp file>\n"	    + "\tSave experiment to file after setting other options\n"	    + "\t(default don't save)\n"	    + "-h <remote host name>\n"	    +"\tHost to run experiment on (may be specified more than once\n"	    +"\tfor multiple remote hosts)\n"	    + "-r \n"	    + "\tRun experiment on (default don't run)\n\n";	  Enumeration enum = ((OptionHandler)base).listOptions();	  while (enum.hasMoreElements()) {	    Option option = (Option) enum.nextElement();	    result += option.synopsis() + "\n";	    result += option.description() + "\n";	  }	  throw new Exception(result + "\n" + ex.getMessage());	}      } else {	FileInputStream fi = new FileInputStream(expFile);	ObjectInputStream oi = new ObjectInputStream(			       new BufferedInputStream(fi));	Object tmp = oi.readObject();	if (tmp instanceof RemoteExperiment) {	  exp = (RemoteExperiment)tmp;	} else {	  base = (Experiment)tmp;	}	oi.close();      }      if (base != null) {	exp = new RemoteExperiment(base);      }      for (int i=0;i<remoteHosts.size();i++) {	exp.addRemoteHost((String)remoteHosts.elementAt(i));      }      System.err.println("Experiment:\n" + exp.toString());      if (saveFile.length() != 0) {	FileOutputStream fo = new FileOutputStream(saveFile);	ObjectOutputStream oo = new ObjectOutputStream(				new BufferedOutputStream(fo));	oo.writeObject(exp);	oo.close();      }            if (runExp) {	System.err.println("Initializing...");	exp.initialize();	System.err.println("Iterating...");	exp.runExperiment();	System.err.println("Postprocessing...");	exp.postProcess();      }          } catch (Exception ex) {      ex.printStackTrace();      System.err.println(ex.getMessage());    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -