📄 boundarypaneldistributed.java
字号:
/*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*
* BoundaryPanelDistrubuted.java
* Copyright (C) 2003 Mark Hall
*
*/
package weka.gui.boundaryvisualizer;
import java.awt.BorderLayout;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.ObjectInputStream;
import java.rmi.Naming;
import java.util.Vector;
import weka.classifiers.Classifier;
import weka.core.FastVector;
import weka.core.Instances;
import weka.core.Utils;
import weka.experiment.Compute;
import weka.experiment.RemoteExperimentEvent;
import weka.experiment.RemoteExperimentListener;
import weka.experiment.TaskStatusInfo;
/**
* This class extends BoundaryPanel with code for distributing the
* processing necessary to create a visualization among a list of
* remote machines. Specifically, a visualization is broken down and
* processed row by row using the available remote computers.
*
* @author <a href="mailto:mhall@cs.waikato.ac.nz">Mark Hall</a>
* @version $Revision$
* @since 1.0
* @see BoundaryPanel
*/
public class BoundaryPanelDistributed extends BoundaryPanel {
/** a list of RemoteExperimentListeners */
protected Vector m_listeners = new Vector();
/** Holds the names of machines with remoteEngine servers running */
protected Vector m_remoteHosts = new Vector();
/** The queue of available hosts */
private weka.core.Queue m_remoteHostsQueue = new weka.core.Queue();
/** The status of each of the remote hosts */
private int [] m_remoteHostsStatus;
/** The number of times tasks have failed on each remote host */
private int [] m_remoteHostFailureCounts;
protected static final int AVAILABLE=0;
protected static final int IN_USE=1;
protected static final int CONNECTION_FAILED=2;
protected static final int SOME_OTHER_FAILURE=3;
protected static final int MAX_FAILURES=3;
/** Set to true if MAX_FAILURES exceeded on all hosts or connections fail
on all hosts or user aborts plotting */
private boolean m_plottingAborted = false;
/** The number of hosts removed due to exceeding max failures */
private int m_removedHosts;
/** The count of failed sub-tasks */
private int m_failedCount;
/** The count of successfully completed sub-tasks */
private int m_finishedCount;
/** The queue of sub-tasks waiting to be processed */
private weka.core.Queue m_subExpQueue = new weka.core.Queue();
// number of seconds between polling server
private int m_minTaskPollTime = 1000;
private int [] m_hostPollingTime;
/**
* Creates a new <code>BoundaryPanelDistributed</code> instance.
*
* @param panelWidth width of the display
* @param panelHeight height of the display
*/
public BoundaryPanelDistributed(int panelWidth, int panelHeight) {
super(panelWidth, panelHeight);
}
/**
* Set a list of host names of machines to distribute processing to
*
* @param remHosts a Vector of host names (Strings)
*/
public void setRemoteHosts(Vector remHosts) {
m_remoteHosts = remHosts;
}
/**
* Add an object to the list of those interested in recieving update
* information from the RemoteExperiment
* @param r a listener
*/
public void addRemoteExperimentListener(RemoteExperimentListener r) {
m_listeners.addElement(r);
}
protected void initialize() {
super.initialize();
m_plottingAborted = false;
m_finishedCount = 0;
m_failedCount = 0;
// initialize all remote hosts to available
m_remoteHostsStatus = new int [m_remoteHosts.size()];
m_remoteHostFailureCounts = new int [m_remoteHosts.size()];
m_remoteHostsQueue = new weka.core.Queue();
if (m_remoteHosts.size() == 0) {
System.err.println("No hosts specified!");
System.exit(1);
}
// prime the hosts queue
m_hostPollingTime = new int [m_remoteHosts.size()];
for (int i=0;i<m_remoteHosts.size();i++) {
m_remoteHostsQueue.push(new Integer(i));
m_hostPollingTime[i] = m_minTaskPollTime;
}
// set up sub taskss (just holds the row numbers to be processed
m_subExpQueue = new weka.core.Queue();
for (int i = 0; i < m_panelHeight; i++) {
m_subExpQueue.push(new Integer(i));
}
try {
// need to build classifier and data generator
m_classifier.buildClassifier(m_trainingData);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
}
boolean [] attsToWeightOn;
// build DataGenerator
attsToWeightOn = new boolean[m_trainingData.numAttributes()];
attsToWeightOn[m_xAttribute] = true;
attsToWeightOn[m_yAttribute] = true;
m_dataGenerator.setWeightingDimensions(attsToWeightOn);
try {
m_dataGenerator.buildGenerator(m_trainingData);
} catch (Exception ex) {
ex.printStackTrace();
System.exit(1);
}
}
/**
* Start processing
*
* @exception Exception if an error occurs
*/
public void start() throws Exception {
// done in the sub task
/* m_numOfSamplesPerGenerator =
(int)Math.pow(m_samplesBase, m_trainingData.numAttributes()-3); */
m_stopReplotting = true;
if (m_trainingData == null) {
throw new Exception("No training data set (BoundaryPanel)");
}
if (m_classifier == null) {
throw new Exception("No classifier set (BoundaryPanel)");
}
if (m_dataGenerator == null) {
throw new Exception("No data generator set (BoundaryPanel)");
}
if (m_trainingData.attribute(m_xAttribute).isNominal() ||
m_trainingData.attribute(m_yAttribute).isNominal()) {
throw new Exception("Visualization dimensions must be numeric "
+"(BoundaryPanel)");
}
computeMinMaxAtts();
initialize();
// launch tasks on all available hosts
int totalHosts = m_remoteHostsQueue.size();
for (int i = 0; i < totalHosts; i++) {
availableHost(-1);
Thread.sleep(70);
}
}
/**
* Push a host back onto the list of available hosts and launch a waiting
* Task (if any).
*
* @param hostNum the number of the host to return to the queue. -1
* if no host to return.
*/
protected synchronized void availableHost(int hostNum) {
if (hostNum >= 0) {
if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) {
m_remoteHostsQueue.push(new Integer(hostNum));
} else {
notifyListeners(false,true,false,"Max failures exceeded for host "
+((String)m_remoteHosts.elementAt(hostNum))
+". Removed from host list.");
m_removedHosts++;
}
}
// check for all sub exp complete or all hosts failed or failed count
// exceeded
if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) {
m_plottingAborted = true;
notifyListeners(false,true,true,"Plotting aborted! Max failures "
+"exceeded on all remote hosts.");
return;
}
/* System.err.println("--------------");
System.err.println("exp q :"+m_subExpQueue.size());
System.err.println("host list size "+m_remoteHosts.size());
System.err.println("actual host list size "+m_remoteHostsQueue.size());
System.err.println("removed hosts "+m_removedHosts); */
if (m_subExpQueue.size() == 0 &&
(m_remoteHosts.size() ==
(m_remoteHostsQueue.size() + m_removedHosts))) {
if (m_plotTrainingData) {
plotTrainingData();
}
notifyListeners(false,true,true,"Plotting completed successfully.");
return;
}
if (checkForAllFailedHosts()) {
return;
}
if (m_plottingAborted &&
(m_remoteHostsQueue.size() + m_removedHosts) ==
m_remoteHosts.size()) {
notifyListeners(false,true,true,"Plotting aborted. All remote tasks "
+"finished.");
}
if (!m_subExpQueue.empty() && !m_plottingAborted) {
if (!m_remoteHostsQueue.empty()) {
int availHost, waitingTask;
try {
availHost = ((Integer)m_remoteHostsQueue.pop()).intValue();
waitingTask = ((Integer)m_subExpQueue.pop()).intValue();
launchNext(waitingTask, availHost);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
/**
* Inform all listeners of progress
* @param status true if this is a status type of message
* @param log true if this is a log type of message
* @param finished true if the remote task has finished
* @param message the message.
*/
private synchronized void notifyListeners(boolean status,
boolean log,
boolean finished,
String message) {
if (m_listeners.size() > 0) {
for (int i=0;i<m_listeners.size();i++) {
RemoteExperimentListener r =
(RemoteExperimentListener)(m_listeners.elementAt(i));
r.remoteExperimentStatus(new RemoteExperimentEvent(status,
log,
finished,
message));
}
} else {
System.err.println(message);
}
}
/**
* Check to see if we have failed to connect to all hosts
*/
private boolean checkForAllFailedHosts() {
boolean allbad = true;
for (int i = 0; i < m_remoteHostsStatus.length; i++) {
if (m_remoteHostsStatus[i] != CONNECTION_FAILED) {
allbad = false;
break;
}
}
if (allbad) {
m_plottingAborted = true;
notifyListeners(false,true,true,"Plotting aborted! All connections "
+"to remote hosts failed.");
}
return allbad;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -