📄 remoteexperiment.java
字号:
*/ private synchronized void notifyListeners(boolean status, boolean log, boolean finished, String message) { if (m_listeners.size() > 0) { for (int i=0;i<m_listeners.size();i++) { RemoteExperimentListener r = (RemoteExperimentListener)(m_listeners.elementAt(i)); r.remoteExperimentStatus(new RemoteExperimentEvent(status, log, finished, message)); } } else { System.err.println(message); } } /** * Set the abort flag */ public void abortExperiment() { m_experimentAborted = true; } /** * Increment the number of successfully completed sub experiments */ protected synchronized void incrementFinished() { m_finishedCount++; } /** * Increment the overall number of failures and the number of failures for * a particular host * @param hostNum the index of the host to increment failure count */ protected synchronized void incrementFailed(int hostNum) { m_failedCount++; m_remoteHostFailureCounts[hostNum]++; } /** * Push an experiment back on the queue of waiting experiments * @param expNum the index of the experiment to push onto the queue */ protected synchronized void waitingExperiment(int expNum) { m_subExpQueue.push(new Integer(expNum)); } /** * Check to see if we have failed to connect to all hosts */ private boolean checkForAllFailedHosts() { boolean allbad = true; for (int i = 0; i < m_remoteHostsStatus.length; i++) { if (m_remoteHostsStatus[i] != CONNECTION_FAILED) { allbad = false; break; } } if (allbad) { abortExperiment(); notifyListeners(false,true,true,"Experiment aborted! All connections " +"to remote hosts failed."); } return allbad; } /** * Returns some post experiment information. * @return a String containing some post experiment info */ private String postExperimentInfo() { StringBuffer text = new StringBuffer(); text.append(m_finishedCount+(m_splitByDataSet ? " data sets" : " runs") + " completed successfully. " +m_failedCount+" failures during running.\n"); System.err.print(text.toString()); return text.toString(); } /** * Pushes a host back onto the queue of available hosts and attempts to * launch a waiting experiment (if any). * @param hostNum the index of the host to push back onto the queue of * available hosts */ protected synchronized void availableHost(int hostNum) { if (hostNum >= 0) { if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) { m_remoteHostsQueue.push(new Integer(hostNum)); } else { notifyListeners(false,true,false,"Max failures exceeded for host " +((String)m_remoteHosts.elementAt(hostNum)) +". Removed from host list."); m_removedHosts++; } } // check for all sub exp complete or all hosts failed or failed count // exceeded if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) { abortExperiment(); notifyListeners(false,true,true,"Experiment aborted! Max failures " +"exceeded on all remote hosts."); return; } if ((getSplitByDataSet() && (m_baseExperiment.getDatasets().size() == m_finishedCount)) || (!getSplitByDataSet() && ((getRunUpper() - getRunLower() + 1) == m_finishedCount))) { notifyListeners(false,true,false,"Experiment completed successfully."); notifyListeners(false,true,true,postExperimentInfo()); return; } if (checkForAllFailedHosts()) { return; } if (m_experimentAborted && (m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) { notifyListeners(false,true,true,"Experiment aborted. All remote tasks " +"finished."); } if (!m_subExpQueue.empty() && !m_experimentAborted) { if (!m_remoteHostsQueue.empty()) { int availHost, waitingExp; try { availHost = ((Integer)m_remoteHostsQueue.pop()).intValue(); waitingExp = ((Integer)m_subExpQueue.pop()).intValue(); launchNext(waitingExp, availHost); } catch (Exception ex) { ex.printStackTrace(); } } } } /** * Launch a sub experiment on a remote host * @param wexp the index of the sub experiment to launch * @param ah the index of the available host to launch on */ public void launchNext(final int wexp, final int ah) { Thread subExpThread; subExpThread = new Thread() { public void run() { m_remoteHostsStatus[ah] = IN_USE; m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING; RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask(); expSubTsk.setExperiment(m_subExperiments[wexp]); String subTaskType = (getSplitByDataSet()) ? "dataset :" + ((File)m_subExperiments[wexp].getDatasets(). elementAt(0)).getName() : "run :" + m_subExperiments[wexp].getRunLower(); try { /** BEGIN EDIT - Melville **/ String hostname = ((String)m_remoteHosts.elementAt(ah)); String name=null; int splitIndex = hostname.indexOf(":"); if(splitIndex>-1){ name = "//" +hostname.substring(0,splitIndex) +"/RemoteEngine"+hostname.substring(splitIndex+1); } else { name = "//" +((String)m_remoteHosts.elementAt(ah)) +"/RemoteEngine"; } /** END EDIT - Melville **/ Compute comp = (Compute) Naming.lookup(name); // assess the status of the sub-exp notifyListeners(false,true,false,"Starting " +subTaskType +" on host " +((String)m_remoteHosts.elementAt(ah))); Object subTaskId = comp.executeTask(expSubTsk); boolean finished = false; TaskStatusInfo is = null; while (!finished) { try { Thread.sleep(2000); TaskStatusInfo cs = (TaskStatusInfo)comp. checkStatus(subTaskId); if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) { // push host back onto queue and try launching any waiting // sub-experiments notifyListeners(false, true, false, cs.getStatusMessage()); m_remoteHostsStatus[ah] = AVAILABLE; incrementFinished(); availableHost(ah); finished = true; } else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) { // a non connection related error---possibly host doesn't have // access to data sets or security policy is not set up // correctly or classifier(s) failed for some reason notifyListeners(false, true, false, cs.getStatusMessage()); m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE; m_subExpComplete[wexp] = TaskStatusInfo.FAILED; notifyListeners(false,true,false,subTaskType +" "+cs.getStatusMessage() +". Scheduling for execution on another host."); incrementFailed(ah); // push experiment back onto queue waitingExperiment(wexp); // push host back onto queue and try launching any waiting // sub-experiments. Host is pushed back on the queue as the // failure may be temporary---eg. with InstantDB using the // RMI bridge, two or more threads may try to create the // experiment index or results table simultaneously; all but // one will throw an exception. These hosts are still usable // however. availableHost(ah); finished = true; } else { if (is == null) { is = cs; notifyListeners(false, true, false, cs.getStatusMessage()); } else { if (cs.getStatusMessage(). compareTo(is.getStatusMessage()) != 0) { notifyListeners(false, true, false, cs.getStatusMessage()); } is = cs; } } } catch (InterruptedException ie) { } } } catch (Exception ce) { m_remoteHostsStatus[ah] = CONNECTION_FAILED; m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN; System.err.println(ce); ce.printStackTrace(); notifyListeners(false,true,false,"Connection to " +((String)m_remoteHosts.elementAt(ah)) +" failed. Scheduling " +subTaskType +" for execution on another host."); checkForAllFailedHosts(); waitingExperiment(wexp); } finally { if (isInterrupted()) { System.err.println("Sub exp Interupted!"); } } } }; subExpThread.setPriority(Thread.MIN_PRIORITY); subExpThread.start(); } /** * Overides the one in Experiment * @exception Exception */ public void nextIteration() throws Exception { } /** * overides the one in Experiment */ public void advanceCounters() { } /** * overides the one in Experiment */ public void postProcess() { } /** * Add a host name to the list of remote hosts * @param hostname the host name to add to the list */ public void addRemoteHost(String hostname) { m_remoteHosts.addElement(hostname); } /** * Get the list of remote host names * @return the list of remote host names */ public DefaultListModel getRemoteHosts() { return m_remoteHosts; } /** * Overides toString in Experiment * @return a description of this remote experiment */ public String toString() { String result = m_baseExperiment.toString(); result += "\nRemote Hosts:\n"; for (int i=0;i<m_remoteHosts.size();i++) { result += ((String)m_remoteHosts.elementAt(i)) +'\n'; } return result; } /** * Overides runExperiment in Experiment */ public void runExperiment() { int totalHosts = m_remoteHostsQueue.size(); // Try to launch sub experiments on all available hosts for (int i = 0; i < totalHosts; i++) { availableHost(-1); } } /** * Configures/Runs the Experiment from the command line. * * @param args command line arguments to the Experiment. */ public static void main(String[] args) { try { RemoteExperiment exp = null; Experiment base = null; String expFile = Utils.getOption('l', args); String saveFile = Utils.getOption('s', args); boolean runExp = Utils.getFlag('r', args); FastVector remoteHosts = new FastVector(); String runHost = " "; while (runHost.length() != 0) { runHost = Utils.getOption('h', args); if (runHost.length() != 0) { remoteHosts.addElement(runHost); } } if (expFile.length() == 0) { base = new Experiment(); try { base.setOptions(args); Utils.checkForRemainingOptions(args); } catch (Exception ex) { ex.printStackTrace(); String result = "Usage:\n\n" + "-l <exp file>\n" + "\tLoad experiment from file (default use cli options)\n" + "-s <exp file>\n" + "\tSave experiment to file after setting other options\n" + "\t(default don't save)\n" + "-h <remote host name>\n" +"\tHost to run experiment on (may be specified more than once\n" +"\tfor multiple remote hosts)\n" + "-r \n" + "\tRun experiment on (default don't run)\n\n"; Enumeration enum = ((OptionHandler)base).listOptions(); while (enum.hasMoreElements()) { Option option = (Option) enum.nextElement(); result += option.synopsis() + "\n"; result += option.description() + "\n"; } throw new Exception(result + "\n" + ex.getMessage()); } } else { FileInputStream fi = new FileInputStream(expFile); ObjectInputStream oi = new ObjectInputStream( new BufferedInputStream(fi)); Object tmp = oi.readObject(); if (tmp instanceof RemoteExperiment) { exp = (RemoteExperiment)tmp; } else { base = (Experiment)tmp; } oi.close(); } if (base != null) { exp = new RemoteExperiment(base); } for (int i=0;i<remoteHosts.size();i++) { exp.addRemoteHost((String)remoteHosts.elementAt(i)); } System.err.println("Experiment:\n" + exp.toString()); if (saveFile.length() != 0) { FileOutputStream fo = new FileOutputStream(saveFile); ObjectOutputStream oo = new ObjectOutputStream( new BufferedOutputStream(fo)); oo.writeObject(exp); oo.close(); } if (runExp) { System.err.println("Initializing..."); exp.initialize(); System.err.println("Iterating..."); exp.runExperiment(); System.err.println("Postprocessing..."); exp.postProcess(); } } catch (Exception ex) { ex.printStackTrace(); System.err.println(ex.getMessage()); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -