⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jenvironment.java

📁 JAMPACK Grid programming
💻 JAVA
字号:
/**
 *
 * Program         : JEnvironment.java
 *
 * Author          : Vijayakrishnan Menon
 *
 * Date            : 17th Dec 2005
 *
 * Organization    : Centre for Excellence in 
 *						Computational Engineering and Networking (CEN),
 *                   		Amrita Viswa Vidyapeetham
 *
 **/

package JAMPack;

/** The SingletonException class acts as a signalling mechanism if the framework
  * tries to spawn multi environments that is conceptually and ethically wrong.
  * The Environment is distributed by having local references in each machine
  * when it is setup. Thus an accidental attempt to duplicate the environment
  * can be catostrophic to the system. It is as they say "Only single Runtime"
  * for all jvms. */

class SingletonException extends Exception {

    public SingletonException() {
        super("Attempt to re-instantiate a singleton environment");
    }
}

/** The Environment is like the centre bolt of the frame work. The kernal of the
  * grid system starts here. It continues, with the suplied user code. The
  * important deamons are initiated in seperate dedicated continuous threads,
  * set to be deamons. These treads include the peer communication portal, the
  * monitor portal, the rmi server and the naming service registry.   */

public final class JEnvironment  {

	/** The Port id and other environment constants constants*/
    public static final int MAX_NO_OF_REMOTE_PROCESSES  = 5;
    public static final int PING_PORT = 5000;
    public static final int HEARTBEAT_PORT = 5002;
    public static final int PEER_DATA_PORT = 5004;
    public static final int PEER_OBJECT_PORT = 5010;
    public static final int REGISTRY_PORT = 5006;
    public static final int WEB_PORT = 5008;

	/** The Environmental services and attributes */
    protected static java.rmi.registry.Registry _registry;
    protected static boolean _singleton = true;
    protected static Thread _pingMonitor;
    protected static Thread _webServer;
    protected static JPeerServerThreads _peerServer;
    protected static Thread _heatbeat;
    protected static int _pulseRate = 20;

    protected static RemoteThread []_rmtThreads = new RemoteThread[MAX_NO_OF_REMOTE_PROCESSES];
    
    /** The private constructor ensures the access lock, so that users cannot
      * spawn multiple environments at will. The instantiation is also
      * controlled by a singleton  static flag. The only way to capture the
      * instance is through the getInstance method, which can only be called
      * once. The all-host-list is passed as a text file with either the host
      * names or the ip text(dotted decimal) written one after the other. */

    protected JEnvironment() {
        try {
            System.setProperty("java.security.policy","java.policy");
			System.setSecurityManager(new java.rmi.RMISecurityManager());

            /*-----------------Peer to Peer Communication listener------------*/

			JEnvironment._peerServer =  JPeerServer.getPeerServerInstance(JEnvironment.PEER_DATA_PORT,
																				JEnvironment.PEER_OBJECT_PORT);

            /*---------------------------Ping Monitor---------------------------
             * The Ping Monitor thread listens and responds to the incomming 
             * pings from any host in the environment. If the host receives a 
             * "AUM@" message, from the ping client then the remote host is 
             * online and worrking perfectly */

            JEnvironment._pingMonitor = new Thread("Ping Monitor") {
                public void run() {
                    System.out.println("Ping Monitor initiated......on port "+JEnvironment.PING_PORT );
                    byte []pingData = "aum@".getBytes();
                    try {
                        java.net.ServerSocket pingMonitor = new java.net.ServerSocket(JEnvironment.PING_PORT);
                        java.net.Socket reply = null;

                        while(true) {
                            reply = pingMonitor.accept();
                            (reply.getOutputStream()).write(pingData);
                            reply.close();
                        }
                    }
                    catch (java.io.IOException e) { e.printStackTrace(); }
                }                               
            };

            /*----------------------------Web Server----------------------------
             * The web server is a HTTP v1.1 server on port 80. This is 
             * necessory for Dynamic class loading of Task classes marshelled 
             * into the Environment by this local host. The tasks which are 
             * marshelled can only be dynamically unmarshelled, because the 
             * class file for that exists only in this local host. 
             **/

            JEnvironment._webServer = new Thread("Web Server") {
                public void run() {
                    System.out.println("Webserver initiated......Running on HTTP v1.1 on port "+JEnvironment.WEB_PORT);
                    try { JWebserver.startServer(); }
                    catch(java.io.IOException e) { e.printStackTrace(); }
                }
            };
            
            /*------------------------Heartbeat Server--------------------------
             * The Heartbeat server maintains the update on hostfiles and keeps 
             * the environment informed about the node's status.This happens by 
             * sending synchronous broadcast UDP messages. The schedulers are 
             * expected to capture the pulse and update there host files 
             * accordingly .
             **/
             
             JEnvironment._heatbeat = new Thread("Heartbeat") {
             	public void run() {
             		byte []outBP = "aum@".getBytes();
             		byte []inBP = new byte[4];
             		java.net.DatagramSocket heart = null;
             		java.net.DatagramPacket inPulse = null;
             		java.net.DatagramPacket outPulse = null;
             		
             		try{
	             		heart = new java.net.DatagramSocket(JEnvironment.HEARTBEAT_PORT);
	             		inPulse = new java.net.DatagramPacket(inBP, inBP.length);	             					
					} 
					catch(java.net.SocketException e) {e.printStackTrace();}
									
					System.out.println("Starting Heartbeat System.......");														
             		
             		while(true) {
             			try {
             				heart.receive(inPulse);
             				outPulse = new java.net.DatagramPacket(outBP, outBP.length
	             					, inPulse.getAddress(), JEnvironment.HEARTBEAT_PORT);
	             			heart.send(outPulse);             				
             			}
             			catch(java.io.IOException e) {e.printStackTrace();}
             			
             		}
             	}
             };

            /*-----------------------------------------------------------------*/

            JEnvironment._pingMonitor.start();
            JEnvironment._webServer.start();
            JEnvironment._peerServer.start();
            JEnvironment._heatbeat.start();

            JEnvironment._registry = java.rmi.registry.LocateRegistry.createRegistry(JEnvironment.REGISTRY_PORT);
            JEnvironment._singleton = false;
            System.out.println("Waiting for the RMI Registry to initialise.......Done\n"+
            						"Running on port "+JEnvironment.REGISTRY_PORT);
            JEnvironment.createRemoteThreads();
        }
        catch(java.rmi.RemoteException e) { e.printStackTrace(); }
        catch(java.io.IOException e) { e.printStackTrace(); }
    }

    /** This is the only way to instantiate an Environment. If this method is 
      * called twice then a SingletonException is thrown. */
      
    public static JEnvironment getInstance()throws SingletonException  {
        if (JEnvironment._singleton)  return new JEnvironment();
        else throw new SingletonException();
    }

    /** Returns a referance to the rmi registry */
    
    public static java.rmi.registry.Registry getRegistry() {
        return JEnvironment._registry;
    }

    /** The methods that creates remote objects and binds them to a pre 
      * allocated registry */
      
    protected static void createRemoteThreads() {
        for(int i=0; i<MAX_NO_OF_REMOTE_PROCESSES; i++) {
            try {
                JEnvironment._rmtThreads[i] = new JRemoteThread(i+1);
                JEnvironment._registry.bind("RemoteThread"+(i+1), JEnvironment._rmtThreads[i]);
            }
            catch(java.rmi.RemoteException e) { e.printStackTrace(); }
            catch(java.rmi.AlreadyBoundException e) { e.printStackTrace(); }            
        }
    }

   	public static Thread getWebServer() {
    	return JEnvironment._webServer;
    }

	public static Thread getPingMonitor() {
    	return JEnvironment._pingMonitor;
    }

    public static JPeerServerThreads getPeerServer() {
    	return JEnvironment._peerServer;
    }
 
	public static void main(String args[]) throws Exception {
		JEnvironment.getInstance();
	} 	
 
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -