📄 boundarypaneldistributed.java
字号:
/* * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. *//* * BoundaryPanelDistrubuted.java * Copyright (C) 2003 Mark Hall * */package weka.gui.boundaryvisualizer;import java.awt.*;import java.awt.event.*;import javax.swing.JPanel;import javax.swing.ToolTipManager;import java.util.Vector;import java.util.Random;import java.rmi.*;import com.sun.image.codec.jpeg.*;import java.awt.image.*;import java.io.*;import weka.core.*;import weka.classifiers.Classifier;import weka.classifiers.bayes.NaiveBayesSimple;import weka.classifiers.bayes.NaiveBayes;import weka.classifiers.trees.J48;import weka.classifiers.lazy.IBk;import weka.classifiers.functions.Logistic;import weka.clusterers.EM;import weka.filters.Filter;import weka.filters.unsupervised.attribute.Remove;import weka.filters.unsupervised.attribute.Add;import weka.experiment.RemoteExperimentListener;import weka.experiment.RemoteExperimentEvent;import weka.experiment.Compute;import weka.experiment.TaskStatusInfo;/** * This class extends BoundaryPanel with code for distributing the * processing necessary to create a visualization among a list of * remote machines. Specifically, a visualization is broken down and * processed row by row using the available remote computers. * * @author <a href="mailto:mhall@cs.waikato.ac.nz">Mark Hall</a> * @version $Revision: 1.5 $ * @since 1.0 * @see BoundaryPanel */public class BoundaryPanelDistributed extends BoundaryPanel { /** a list of RemoteExperimentListeners */ protected Vector m_listeners = new Vector(); /** Holds the names of machines with remoteEngine servers running */ protected Vector m_remoteHosts = new Vector(); /** The queue of available hosts */ private weka.core.Queue m_remoteHostsQueue = new weka.core.Queue(); /** The status of each of the remote hosts */ private int [] m_remoteHostsStatus; /** The number of times tasks have failed on each remote host */ private int [] m_remoteHostFailureCounts; protected static final int AVAILABLE=0; protected static final int IN_USE=1; protected static final int CONNECTION_FAILED=2; protected static final int SOME_OTHER_FAILURE=3; protected static final int MAX_FAILURES=3; /** Set to true if MAX_FAILURES exceeded on all hosts or connections fail on all hosts or user aborts plotting */ private boolean m_plottingAborted = false; /** The number of hosts removed due to exceeding max failures */ private int m_removedHosts; /** The count of failed sub-tasks */ private int m_failedCount; /** The count of successfully completed sub-tasks */ private int m_finishedCount; /** The queue of sub-tasks waiting to be processed */ private weka.core.Queue m_subExpQueue = new weka.core.Queue(); // number of seconds between polling server private int m_minTaskPollTime = 1000; private int [] m_hostPollingTime; /** * Creates a new <code>BoundaryPanelDistributed</code> instance. * * @param panelWidth width of the display * @param panelHeight height of the display */ public BoundaryPanelDistributed(int panelWidth, int panelHeight) { super(panelWidth, panelHeight); } /** * Set a list of host names of machines to distribute processing to * * @param remHosts a Vector of host names (Strings) */ public void setRemoteHosts(Vector remHosts) { m_remoteHosts = remHosts; } /** * Add an object to the list of those interested in recieving update * information from the RemoteExperiment * @param r a listener */ public void addRemoteExperimentListener(RemoteExperimentListener r) { m_listeners.addElement(r); } protected void initialize() { super.initialize(); m_plottingAborted = false; m_finishedCount = 0; m_failedCount = 0; // initialize all remote hosts to available m_remoteHostsStatus = new int [m_remoteHosts.size()]; m_remoteHostFailureCounts = new int [m_remoteHosts.size()]; m_remoteHostsQueue = new weka.core.Queue(); if (m_remoteHosts.size() == 0) { System.err.println("No hosts specified!"); System.exit(1); } // prime the hosts queue m_hostPollingTime = new int [m_remoteHosts.size()]; for (int i=0;i<m_remoteHosts.size();i++) { m_remoteHostsQueue.push(new Integer(i)); m_hostPollingTime[i] = m_minTaskPollTime; } // set up sub taskss (just holds the row numbers to be processed m_subExpQueue = new weka.core.Queue(); for (int i = 0; i < m_panelHeight; i++) { m_subExpQueue.push(new Integer(i)); } try { // need to build classifier and data generator m_classifier.buildClassifier(m_trainingData); } catch (Exception ex) { ex.printStackTrace(); System.exit(1); } boolean [] attsToWeightOn; // build DataGenerator attsToWeightOn = new boolean[m_trainingData.numAttributes()]; attsToWeightOn[m_xAttribute] = true; attsToWeightOn[m_yAttribute] = true; m_dataGenerator.setWeightingDimensions(attsToWeightOn); try { m_dataGenerator.buildGenerator(m_trainingData); } catch (Exception ex) { ex.printStackTrace(); System.exit(1); } } /** * Start processing * * @exception Exception if an error occurs */ public void start() throws Exception { // done in the sub task /* m_numOfSamplesPerGenerator = (int)Math.pow(m_samplesBase, m_trainingData.numAttributes()-3); */ m_stopReplotting = true; if (m_trainingData == null) { throw new Exception("No training data set (BoundaryPanel)"); } if (m_classifier == null) { throw new Exception("No classifier set (BoundaryPanel)"); } if (m_dataGenerator == null) { throw new Exception("No data generator set (BoundaryPanel)"); } if (m_trainingData.attribute(m_xAttribute).isNominal() || m_trainingData.attribute(m_yAttribute).isNominal()) { throw new Exception("Visualization dimensions must be numeric " +"(BoundaryPanel)"); } computeMinMaxAtts(); initialize(); // launch tasks on all available hosts int totalHosts = m_remoteHostsQueue.size(); for (int i = 0; i < totalHosts; i++) { availableHost(-1); Thread.sleep(70); } } /** * Push a host back onto the list of available hosts and launch a waiting * Task (if any). * * @param hostNum the number of the host to return to the queue. -1 * if no host to return. */ protected synchronized void availableHost(int hostNum) { if (hostNum >= 0) { if (m_remoteHostFailureCounts[hostNum] < MAX_FAILURES) { m_remoteHostsQueue.push(new Integer(hostNum)); } else { notifyListeners(false,true,false,"Max failures exceeded for host " +((String)m_remoteHosts.elementAt(hostNum)) +". Removed from host list."); m_removedHosts++; } } // check for all sub exp complete or all hosts failed or failed count // exceeded if (m_failedCount == (MAX_FAILURES * m_remoteHosts.size())) { m_plottingAborted = true; notifyListeners(false,true,true,"Plotting aborted! Max failures " +"exceeded on all remote hosts."); return; } /* System.err.println("--------------"); System.err.println("exp q :"+m_subExpQueue.size()); System.err.println("host list size "+m_remoteHosts.size()); System.err.println("actual host list size "+m_remoteHostsQueue.size()); System.err.println("removed hosts "+m_removedHosts); */ if (m_subExpQueue.size() == 0 && (m_remoteHosts.size() == (m_remoteHostsQueue.size() + m_removedHosts))) { if (m_plotTrainingData) { plotTrainingData(); } notifyListeners(false,true,true,"Plotting completed successfully."); return; } if (checkForAllFailedHosts()) { return; } if (m_plottingAborted && (m_remoteHostsQueue.size() + m_removedHosts) == m_remoteHosts.size()) { notifyListeners(false,true,true,"Plotting aborted. All remote tasks " +"finished."); } if (!m_subExpQueue.empty() && !m_plottingAborted) { if (!m_remoteHostsQueue.empty()) { int availHost, waitingTask; try { availHost = ((Integer)m_remoteHostsQueue.pop()).intValue(); waitingTask = ((Integer)m_subExpQueue.pop()).intValue(); launchNext(waitingTask, availHost); } catch (Exception ex) { ex.printStackTrace(); } } } } /** * Inform all listeners of progress * @param status true if this is a status type of message * @param log true if this is a log type of message * @param finished true if the remote task has finished * @param message the message. */ private synchronized void notifyListeners(boolean status, boolean log, boolean finished, String message) { if (m_listeners.size() > 0) { for (int i=0;i<m_listeners.size();i++) { RemoteExperimentListener r = (RemoteExperimentListener)(m_listeners.elementAt(i)); r.remoteExperimentStatus(new RemoteExperimentEvent(status, log, finished, message)); } } else { System.err.println(message); } } /** * Check to see if we have failed to connect to all hosts */ private boolean checkForAllFailedHosts() { boolean allbad = true; for (int i = 0; i < m_remoteHostsStatus.length; i++) { if (m_remoteHostsStatus[i] != CONNECTION_FAILED) { allbad = false; break; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -