📄 jenvironment.java
字号:
/**
*
* Program : JEnvironment.java
*
* Author : Vijayakrishnan Menon
*
* Date : 17th Dec 2005
*
* Organization : Centre for Excellence in
* Computational Engineering and Networking (CEN),
* Amrita Viswa Vidyapeetham
*
**/
package JAMPack;
/** The SingletonException class acts as a signalling mechanism if the framework
* tries to spawn multi environments that is conceptually and ethically wrong.
* The Environment is distributed by having local references in each machine
* when it is setup. Thus an accidental attempt to duplicate the environment
* can be catostrophic to the system. It is as they say "Only single Runtime"
* for all jvms. */
class SingletonException extends Exception {
public SingletonException() {
super("Attempt to re-instantiate a singleton environment");
}
}
/** The Environment is like the centre bolt of the frame work. The kernal of the
* grid system starts here. It continues, with the suplied user code. The
* important deamons are initiated in seperate dedicated continuous threads,
* set to be deamons. These treads include the peer communication portal, the
* monitor portal, the rmi server and the naming service registry. */
public final class JEnvironment {
/** The Port id and other environment constants constants*/
public static final int MAX_NO_OF_REMOTE_PROCESSES = 5;
public static final int PING_PORT = 5000;
public static final int HEARTBEAT_PORT = 5002;
public static final int PEER_DATA_PORT = 5004;
public static final int PEER_OBJECT_PORT = 5010;
public static final int REGISTRY_PORT = 5006;
public static final int WEB_PORT = 5008;
/** The Environmental services and attributes */
protected static java.rmi.registry.Registry _registry;
protected static boolean _singleton = true;
protected static Thread _pingMonitor;
protected static Thread _webServer;
protected static JPeerServerThreads _peerServer;
protected static Thread _heatbeat;
protected static int _pulseRate = 20;
protected static RemoteThread []_rmtThreads = new RemoteThread[MAX_NO_OF_REMOTE_PROCESSES];
/** The private constructor ensures the access lock, so that users cannot
* spawn multiple environments at will. The instantiation is also
* controlled by a singleton static flag. The only way to capture the
* instance is through the getInstance method, which can only be called
* once. The all-host-list is passed as a text file with either the host
* names or the ip text(dotted decimal) written one after the other. */
protected JEnvironment() {
try {
System.setProperty("java.security.policy","java.policy");
System.setSecurityManager(new java.rmi.RMISecurityManager());
/*-----------------Peer to Peer Communication listener------------*/
JEnvironment._peerServer = JPeerServer.getPeerServerInstance(JEnvironment.PEER_DATA_PORT,
JEnvironment.PEER_OBJECT_PORT);
/*---------------------------Ping Monitor---------------------------
* The Ping Monitor thread listens and responds to the incomming
* pings from any host in the environment. If the host receives a
* "AUM@" message, from the ping client then the remote host is
* online and worrking perfectly */
JEnvironment._pingMonitor = new Thread("Ping Monitor") {
public void run() {
System.out.println("Ping Monitor initiated......on port "+JEnvironment.PING_PORT );
byte []pingData = "aum@".getBytes();
try {
java.net.ServerSocket pingMonitor = new java.net.ServerSocket(JEnvironment.PING_PORT);
java.net.Socket reply = null;
while(true) {
reply = pingMonitor.accept();
(reply.getOutputStream()).write(pingData);
reply.close();
}
}
catch (java.io.IOException e) { e.printStackTrace(); }
}
};
/*----------------------------Web Server----------------------------
* The web server is a HTTP v1.1 server on port 80. This is
* necessory for Dynamic class loading of Task classes marshelled
* into the Environment by this local host. The tasks which are
* marshelled can only be dynamically unmarshelled, because the
* class file for that exists only in this local host.
**/
JEnvironment._webServer = new Thread("Web Server") {
public void run() {
System.out.println("Webserver initiated......Running on HTTP v1.1 on port "+JEnvironment.WEB_PORT);
try { JWebserver.startServer(); }
catch(java.io.IOException e) { e.printStackTrace(); }
}
};
/*------------------------Heartbeat Server--------------------------
* The Heartbeat server maintains the update on hostfiles and keeps
* the environment informed about the node's status.This happens by
* sending synchronous broadcast UDP messages. The schedulers are
* expected to capture the pulse and update there host files
* accordingly .
**/
JEnvironment._heatbeat = new Thread("Heartbeat") {
public void run() {
byte []outBP = "aum@".getBytes();
byte []inBP = new byte[4];
java.net.DatagramSocket heart = null;
java.net.DatagramPacket inPulse = null;
java.net.DatagramPacket outPulse = null;
try{
heart = new java.net.DatagramSocket(JEnvironment.HEARTBEAT_PORT);
inPulse = new java.net.DatagramPacket(inBP, inBP.length);
}
catch(java.net.SocketException e) {e.printStackTrace();}
System.out.println("Starting Heartbeat System.......");
while(true) {
try {
heart.receive(inPulse);
outPulse = new java.net.DatagramPacket(outBP, outBP.length
, inPulse.getAddress(), JEnvironment.HEARTBEAT_PORT);
heart.send(outPulse);
}
catch(java.io.IOException e) {e.printStackTrace();}
}
}
};
/*-----------------------------------------------------------------*/
JEnvironment._pingMonitor.start();
JEnvironment._webServer.start();
JEnvironment._peerServer.start();
JEnvironment._heatbeat.start();
JEnvironment._registry = java.rmi.registry.LocateRegistry.createRegistry(JEnvironment.REGISTRY_PORT);
JEnvironment._singleton = false;
System.out.println("Waiting for the RMI Registry to initialise.......Done\n"+
"Running on port "+JEnvironment.REGISTRY_PORT);
JEnvironment.createRemoteThreads();
}
catch(java.rmi.RemoteException e) { e.printStackTrace(); }
catch(java.io.IOException e) { e.printStackTrace(); }
}
/** This is the only way to instantiate an Environment. If this method is
* called twice then a SingletonException is thrown. */
public static JEnvironment getInstance()throws SingletonException {
if (JEnvironment._singleton) return new JEnvironment();
else throw new SingletonException();
}
/** Returns a referance to the rmi registry */
public static java.rmi.registry.Registry getRegistry() {
return JEnvironment._registry;
}
/** The methods that creates remote objects and binds them to a pre
* allocated registry */
protected static void createRemoteThreads() {
for(int i=0; i<MAX_NO_OF_REMOTE_PROCESSES; i++) {
try {
JEnvironment._rmtThreads[i] = new JRemoteThread(i+1);
JEnvironment._registry.bind("RemoteThread"+(i+1), JEnvironment._rmtThreads[i]);
}
catch(java.rmi.RemoteException e) { e.printStackTrace(); }
catch(java.rmi.AlreadyBoundException e) { e.printStackTrace(); }
}
}
public static Thread getWebServer() {
return JEnvironment._webServer;
}
public static Thread getPingMonitor() {
return JEnvironment._pingMonitor;
}
public static JPeerServerThreads getPeerServer() {
return JEnvironment._peerServer;
}
public static void main(String args[]) throws Exception {
JEnvironment.getInstance();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -