⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jscheduler.java

📁 JAMPACK Grid programming
💻 JAVA
字号:
/**
 *
 * Program         : JScheduler.java
 *
 * Author          : Vijayakrishnan Menon
 *
 * Date            : 22nd Dec 2005
 *
 * Organization    : Centre for Excellence in 
 *						Computational Engineering and Networking (CEN),
 *                   		Amrita Viswa Vidyapeetham
 **/
 
package JAMPack;


/** The Scheduler is an inevitable part of any adaptive multiprocess environment. 
  * The scheduler manages and shedules jobs seeded on the environment. This 
  * scheduler works by fetching idle resources (namely remote processing 
  * threads, in this context). The resources are distributed over a peer network
  * environment, which can be though of as a pool. What the scheduler does is to
  * borrow something from the pool and then put it back when its done. This way 
  * the sheduler can manage the entire resources, both local and remote. A new 
  * local scheduler in want of a resource is automatically redirected. 
  **/

public class JScheduler {
	protected static JGrid _gridWorld;
	private static boolean _singleton = true;
	protected static int _totalNoOfCPUs;
	
	
	
	/** Constructor for the Scheduler. It only initialises the static Scheduler.
	  * The constructor initialises the Scheduler's resource pool namely the 
	  * grid world. Each task posted will be scheduled as per availability of 
	  * the grid resources.The resources are used and then reset for future 
	  * usage.
	  **/		
	protected JScheduler(String hostFile) {
		System.setProperty("java.security.policy","java.policy");
		
		try { System.setProperty("java.rmi.server.codebase","http://"
			   +java.net.InetAddress.getLocalHost().getHostAddress()+":"+JEnvironment.WEB_PORT+"/");
		}catch(java.net.UnknownHostException e) { e.printStackTrace();}
				 
		System.setSecurityManager(new java.rmi.RMISecurityManager());
		
		String []hosts = JScheduler.readHostFile(hostFile);	     
	    JScheduler._gridWorld = new JGrid(hosts);
	    JScheduler._totalNoOfCPUs = JScheduler._gridWorld.getSize();
	}
	
	
	/** The Constructor is protected and cannot instantiate an object. This 
	  * Instance method is used for that purpose and helps keep the scheduler 
	  * singlrton and from resenting itself.  
	  **/	
	public static JScheduler getInstance(String hostFile) throws SingletonException {
		if(JScheduler._singleton) return new JScheduler(hostFile);
		else throw new SingletonException();  
	}
	
	
	/** Read a host list from a text file and return a array of ip strings */
	protected static String[] readHostFile(String fileName) {
		String ip = null, ipList = null;
		try {	
			java.io.BufferedReader in = new java.io.BufferedReader( new java.io.FileReader(fileName));
					
			ip = in.readLine();
			ipList = ip;
			
			do {
				ip = in.readLine();
				if (ip != null)	ipList += " "+ip;
			}while( ip != null );
			
			in.close();
		} catch (java.io.IOException e) { e.printStackTrace();}
		return ipList.split(" ");
	}
	
	
	/** Post is called by a user task as a single point entry into the grid 
	  * runtime. The parameters involes a task object and an expected threshold 
	  * of splits it should be given.  
	  **/
	public static void post(Task task, int cpuThreshold)	{
		JProcess p = null;
		try	{	p = makeProcess(task,cpuThreshold); }
		catch(UnknownRankException e) { e.printStackTrace(); }
		p.start();
	}	
	
	
	/** Internal method to test the liveliness of a node over the runtime. Tries 
	  * to ping trafic class 4 packets and get back the runtime signature. This 
	  * method needs to be improved as an ICMP ping later on as it takes a long 
	  * time for faulty nodes. 
	  **/
	protected static boolean ping (int rank){
		byte []pingData = "    ".getBytes(); 
		String ip;
		try {
			ip = _gridWorld.getIPForRank(rank);
			java.net.Socket pingComm = new java.net.Socket();
			pingComm.setSoTimeout(1000);
			pingComm.setTcpNoDelay(true);
			pingComm.setTrafficClass(0x10);
			pingComm.connect(new java.net.InetSocketAddress(ip,JEnvironment.PING_PORT));
			pingComm.getInputStream().read(pingData);							
		}
		catch (UnknownRankException e) { e.printStackTrace(); return false; }
		catch (java.net.UnknownHostException e) { e.printStackTrace(); return false;}
		catch (java.io.IOException e) { e.printStackTrace(); return false; }
			
		return true;	
	}	
	
	/** The Internal method called by post that actually synthesis a sub grid 
	  * and creates a Process object. The sub grid creation is done adaptively 
	  * based on available resources. If the threshold is not reach....well be 
	  * happy with what you got. 
	  **/
	protected static JProcess makeProcess(Task task, int cpuThreshold) throws UnknownRankException {
		if(cpuThreshold > _totalNoOfCPUs) return null;
		
		int rankList[] = new int[cpuThreshold],j = 0,i;
		RemoteThread []remoteThreads = new RemoteThread[cpuThreshold];
		
		for(i = 0, j = 0; ( (j < rankList.length) && (i < _totalNoOfCPUs) ) ; i++) 
			if(ping(i))
				if((remoteThreads[j] = _gridWorld.getValidThreadForRank(i)) != null)				
					rankList[j++] = i ;					
					
		
		JGrid subGrid = null;				
		if(j!=0)  { 
			int actual_RankList[] = new int[j];
			for(i=0;i<j;i++)	actual_RankList[i] = rankList[i];   
			subGrid = new JGrid(_gridWorld,actual_RankList);	
		}
		else  return null;
				
		return new JProcess(subGrid,task,remoteThreads);		
	}
	
}



/** A Process represents a fully ditributed process that has been posted and 
  * running. A thread runs, that initiates the distribution and initiation of 
  * the task. The sheduler creates a new process for every single task that is 
  * posted over the local seed. 
  **/
class JProcess implements Runnable{
	protected JGrid _subGrid;
	protected Task _task;
	protected RemoteThread _remoteThreads[];	
	protected Thread _pThread; 
	protected Thread _pFailMonitor;
	protected Thread _pWaitAll;
	
	protected boolean playSafe = true;
	
		
	public JProcess(JGrid grid,Task task,RemoteThread []rThreads) {
		this._subGrid = grid;
		this._task = task;	
		this._remoteThreads = rThreads;		
		_pThread = new Thread((Runnable)this);		
	}	
		
	public void start() {
		this._pThread.start();		
	}	
		
	public void run() {		
		while (JLocalTaskQueue.addTask(this));
			
		try{	
			for(int i=0;i<this._remoteThreads.length;i++)
				this._remoteThreads[i].initialise(i, this._remoteThreads.length, this._task, this._subGrid);
					
			for(int i=0;i<this._remoteThreads.length;i++)
				this._remoteThreads[i].start();
				
			for(int i=0;i<this._remoteThreads.length;i++)
				while(this._remoteThreads[i].isAlive()) { Thread.sleep(2000);}
				
		}catch(java.rmi.RemoteException e) { e.printStackTrace(); }		
		catch(InterruptedException e) { e.printStackTrace(); }	
		
		JLocalTaskQueue.removeTask(this);		
	}			
		
	public String[] getWorkers() {
		return this._subGrid.getHostList();
	}
		
	/*protected void failSafe(int failedRank, String replaceIP) { 
		this.suspendAll(failedRank);
		String []hosts = this.getWorkers();
		
		hosts[failedRank] = replaceIP;
		
		this._subGrid = new JGrid(hosts);
		
		this._remoteThreads[failedRank] = this._subGrid.getValidThreadForRank(failedRank);
		
		this._remoteThreads[failedRank].initialise(failedRank,this._subGrid.getSize(),this._task,this._subGrid);
		this._remoteThreads[failedRank].start();
		
		this._remoteThreads[failedRank].setGrid(this._subGrid);
		this.resumeAll();
	}*/
	
		
	public void suspendAll() throws java.rmi.RemoteException {
		for(int i=0;i<this._remoteThreads.length;i++){
			this._remoteThreads[i].suspend();
		}
	}
	
	
	public void suspendAll(int failedRank) throws java.rmi.RemoteException {
		for(int i=0;i<this._remoteThreads.length;i++){
			if(i != failedRank)
				this._remoteThreads[i].suspend();
		}
	}
		
	public void resumeAll() throws java.rmi.RemoteException {
		for(int i=0;i<this._remoteThreads.length;i++){
			this._remoteThreads[i].resume();
		}
	}
	
	protected void stopAll() throws java.rmi.RemoteException {
		for(int i=0;i<this._remoteThreads.length;i++){
			this._remoteThreads[i].resume();
		}
	}
}




⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -