📄 remoteexperiment.java
字号:
* @param message the message.
*/
private synchronized void notifyListeners(boolean status,
boolean log,
boolean finished,
String message) {
if (m_listeners.size() > 0) {
for (int i=0;i<m_listeners.size();i++) {
RemoteExperimentListener r =
(RemoteExperimentListener)(m_listeners.elementAt(i));
r.remoteExperimentStatus(new RemoteExperimentEvent(status,
log,
finished,
message));
}
} else {
System.err.println(message);
}
}
/**
* Set the abort flag
*/
public void abortExperiment() {
m_experimentAborted = true;
}
/**
* Increment the number of successfully completed sub experiments
*/
protected synchronized void incrementFinished() {
m_finishedCount++;
}
/**
* Increment the overall number of failures and the number of failures for
* a particular host
* @param hostNum the index of the host to increment failure count
*/
protected synchronized void incrementFailed(int hostNum) {
m_failedCount++;
m_remoteHostFailureCounts[hostNum]++;
}
/**
* Push an experiment back on the queue of waiting experiments
* @param expNum the index of the experiment to push onto the queue
*/
protected synchronized void waitingExperiment(int expNum) {
m_subExpQueue.push(new Integer(expNum));
}
/**
* Check to see if we have failed to connect to all hosts
*/
private boolean checkForAllFailedHosts() {
boolean allbad = true;
for (int i = 0; i < m_remoteHostsStatus.length; i++) {
if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {
allbad = false;
break;
}
}
if (allbad) {
abortExperiment();
notifyListeners(false,true,true,"Experiment aborted! All connections "
+"to remote hosts failed.");
}
return allbad;
}
/**
* Returns some post experiment information.
* @return a String containing some post experiment info
*/
private String postExperimentInfo() {
StringBuffer text = new StringBuffer();
text.append(m_finishedCount+(m_splitByDataSet
? " data sets"
: " runs") + " completed successfully. "
+m_failedCount+" failures during running.\n");
System.err.print(text.toString());
return text.toString();
}
/**
* Pushes a host back onto the queue of available hosts and attempts to
* launch a waiting experiment (if any).
* @param hostNum the index of the host to push back onto the queue of
* available hosts
*/
protected synchronized void availableHost(int hostNum) {
if (hostNum >= 0) {
if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {
m_remoteHostsQueue.push(new Integer(hostNum));
} else {
notifyListeners(false,true,false,"Max failures exceeded for host "
+((String)m_remoteHosts.elementAt(hostNum))
+". Removed from host list.");
m_removedHosts++;
}
}
// check for all sub exp complete or all hosts failed or failed count
// exceeded
if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {
abortExperiment();
notifyListeners(false,true,true,"Experiment aborted! Max failures "
+"exceeded on all remote hosts.");
return;
}
if ((getSplitByDataSet() &&
(m_baseExperiment.getDatasets().size() == m_finishedCount)) ||
(!getSplitByDataSet() &&
((getRunUpper() - getRunLower() + 1) == m_finishedCount))) {
notifyListeners(false,true,false,"Experiment completed successfully.");
notifyListeners(false,true,true,postExperimentInfo());
return;
}
if (checkForAllFailedHosts()) {
return;
}
if (m_experimentAborted &&
(m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) {
notifyListeners(false,true,true,"Experiment aborted. All remote tasks "
+"finished.");
}
if (!m_subExpQueue.empty() && !m_experimentAborted) {
if (!m_remoteHostsQueue.empty()) {
int availHost, waitingExp;
try {
availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();
waitingExp = ((Integer)m_subExpQueue.pop()).intValue();
launchNext(waitingExp, availHost);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
/**
* Launch a sub experiment on a remote host
* @param wexp the index of the sub experiment to launch
* @param ah the index of the available host to launch on
*/
public void launchNext(final int wexp, final int ah) {
Thread subExpThread;
subExpThread = new Thread() {
public void run() {
m_remoteHostsStatus[ah] = IN_USE;
m_subExpComplete[wexp] = TaskStatusInfo.PROCESSING;
RemoteExperimentSubTask expSubTsk = new RemoteExperimentSubTask();
expSubTsk.setExperiment(m_subExperiments[wexp]);
String subTaskType = (getSplitByDataSet())
? "dataset :" + ((File)m_subExperiments[wexp].getDatasets().
elementAt(0)).getName()
: "run :" + m_subExperiments[wexp].getRunLower();
try {
String name = "//"
+((String)m_remoteHosts.elementAt(ah))
+"/RemoteEngine";
Compute comp = (Compute) Naming.lookup(name);
// assess the status of the sub-exp
notifyListeners(false,true,false,"Starting "
+subTaskType
+" on host "
+((String)m_remoteHosts.elementAt(ah)));
Object subTaskId = comp.executeTask(expSubTsk);
boolean finished = false;
TaskStatusInfo is = null;
while (!finished) {
try {
Thread.sleep(2000);
TaskStatusInfo cs = (TaskStatusInfo)comp.
checkStatus(subTaskId);
if (cs.getExecutionStatus() == TaskStatusInfo.FINISHED) {
// push host back onto queue and try launching any waiting
// sub-experiments
notifyListeners(false, true, false, cs.getStatusMessage());
m_remoteHostsStatus[ah] = AVAILABLE;
incrementFinished();
availableHost(ah);
finished = true;
} else if (cs.getExecutionStatus() == TaskStatusInfo.FAILED) {
// a non connection related error---possibly host doesn't have
// access to data sets or security policy is not set up
// correctly or classifier(s) failed for some reason
notifyListeners(false, true, false, cs.getStatusMessage());
m_remoteHostsStatus[ah] = SOME_OTHER_FAILURE;
m_subExpComplete[wexp] = TaskStatusInfo.FAILED;
notifyListeners(false,true,false,subTaskType
+" "+cs.getStatusMessage()
+". Scheduling for execution on another host.");
incrementFailed(ah);
// push experiment back onto queue
waitingExperiment(wexp);
// push host back onto queue and try launching any waiting
// sub-experiments. Host is pushed back on the queue as the
// failure may be temporary---eg. with InstantDB using the
// RMI bridge, two or more threads may try to create the
// experiment index or results table simultaneously; all but
// one will throw an exception. These hosts are still usable
// however.
availableHost(ah);
finished = true;
} else {
if (is == null) {
is = cs;
notifyListeners(false, true, false, cs.getStatusMessage());
} else {
if (cs.getStatusMessage().
compareTo(is.getStatusMessage()) != 0) {
notifyListeners(false, true, false,
cs.getStatusMessage());
}
is = cs;
}
}
} catch (InterruptedException ie) {
}
}
} catch (Exception ce) {
m_remoteHostsStatus[ah] = CONNECTION_FAILED;
m_subExpComplete[wexp] = TaskStatusInfo.TO_BE_RUN;
System.err.println(ce);
ce.printStackTrace();
notifyListeners(false,true,false,"Connection to "
+((String)m_remoteHosts.elementAt(ah))
+" failed. Scheduling "
+subTaskType
+" for execution on another host.");
checkForAllFailedHosts();
waitingExperiment(wexp);
} finally {
if (isInterrupted()) {
System.err.println("Sub exp Interupted!");
}
}
}
};
subExpThread.setPriority(Thread.MIN_PRIORITY);
subExpThread.start();
}
/**
* Overides the one in Experiment
* @exception Exception
*/
public void nextIteration() throws Exception {
}
/**
* overides the one in Experiment
*/
public void advanceCounters() {
}
/**
* overides the one in Experiment
*/
public void postProcess() {
}
/**
* Add a host name to the list of remote hosts
* @param hostname the host name to add to the list
*/
public void addRemoteHost(String hostname) {
m_remoteHosts.addElement(hostname);
}
/**
* Get the list of remote host names
* @return the list of remote host names
*/
public DefaultListModel getRemoteHosts() {
return m_remoteHosts;
}
/**
* Overides toString in Experiment
* @return a description of this remote experiment
*/
public String toString() {
String result = m_baseExperiment.toString();
result += "\nRemote Hosts:\n";
for (int i=0;i<m_remoteHosts.size();i++) {
result += ((String)m_remoteHosts.elementAt(i)) +'\n';
}
return result;
}
/**
* Overides runExperiment in Experiment
*/
public void runExperiment() {
int totalHosts = m_remoteHostsQueue.size();
// Try to launch sub experiments on all available hosts
for (int i = 0; i < totalHosts; i++) {
availableHost(-1);
}
}
/**
* Configures/Runs the Experiment from the command line.
*
* @param args command line arguments to the Experiment.
*/
public static void main(String[] args) {
try {
RemoteExperiment exp = null;
Experiment base = null;
String expFile = Utils.getOption('l', args);
String saveFile = Utils.getOption('s', args);
boolean runExp = Utils.getFlag('r', args);
FastVector remoteHosts = new FastVector();
String runHost = " ";
while (runHost.length() != 0) {
runHost = Utils.getOption('h', args);
if (runHost.length() != 0) {
remoteHosts.addElement(runHost);
}
}
if (expFile.length() == 0) {
base = new Experiment();
try {
base.setOptions(args);
Utils.checkForRemainingOptions(args);
} catch (Exception ex) {
ex.printStackTrace();
String result = "Usage:\n\n"
+ "-l <exp file>\n"
+ "\tLoad experiment from file (default use cli options)\n"
+ "-s <exp file>\n"
+ "\tSave experiment to file after setting other options\n"
+ "\t(default don't save)\n"
+ "-h <remote host name>\n"
+"\tHost to run experiment on (may be specified more than once\n"
+"\tfor multiple remote hosts)\n"
+ "-r \n"
+ "\tRun experiment on (default don't run)\n\n";
Enumeration em = ((OptionHandler)base).listOptions();
while (em.hasMoreElements()) {
Option option = (Option) em.nextElement();
result += option.synopsis() + "\n";
result += option.description() + "\n";
}
throw new Exception(result + "\n" + ex.getMessage());
}
} else {
FileInputStream fi = new FileInputStream(expFile);
ObjectInputStream oi = new ObjectInputStream(
new BufferedInputStream(fi));
Object tmp = oi.readObject();
if (tmp instanceof RemoteExperiment) {
exp = (RemoteExperiment)tmp;
} else {
base = (Experiment)tmp;
}
oi.close();
}
if (base != null) {
exp = new RemoteExperiment(base);
}
for (int i=0;i<remoteHosts.size();i++) {
exp.addRemoteHost((String)remoteHosts.elementAt(i));
}
System.err.println("Experiment:\n" + exp.toString());
if (saveFile.length() != 0) {
FileOutputStream fo = new FileOutputStream(saveFile);
ObjectOutputStream oo = new ObjectOutputStream(
new BufferedOutputStream(fo));
oo.writeObject(exp);
oo.close();
}
if (runExp) {
System.err.println("Initializing...");
exp.initialize();
System.err.println("Iterating...");
exp.runExperiment();
System.err.println("Postprocessing...");
exp.postProcess();
}
} catch (Exception ex) {
ex.printStackTrace();
System.err.println(ex.getMessage());
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -