📄 jscheduler.java
字号:
/**
*
* Program : JScheduler.java
*
* Author : Vijayakrishnan Menon
*
* Date : 22nd Dec 2005
*
* Organization : Centre for Excellence in
* Computational Engineering and Networking (CEN),
* Amrita Viswa Vidyapeetham
**/
package JAMPack;
/** The Scheduler is an inevitable part of any adaptive multiprocess environment.
* The scheduler manages and shedules jobs seeded on the environment. This
* scheduler works by fetching idle resources (namely remote processing
* threads, in this context). The resources are distributed over a peer network
* environment, which can be though of as a pool. What the scheduler does is to
* borrow something from the pool and then put it back when its done. This way
* the sheduler can manage the entire resources, both local and remote. A new
* local scheduler in want of a resource is automatically redirected.
**/
public class JScheduler {
protected static JGrid _gridWorld;
private static boolean _singleton = true;
protected static int _totalNoOfCPUs;
/** Constructor for the Scheduler. It only initialises the static Scheduler.
* The constructor initialises the Scheduler's resource pool namely the
* grid world. Each task posted will be scheduled as per availability of
* the grid resources.The resources are used and then reset for future
* usage.
**/
protected JScheduler(String hostFile) {
System.setProperty("java.security.policy","java.policy");
try { System.setProperty("java.rmi.server.codebase","http://"
+java.net.InetAddress.getLocalHost().getHostAddress()+":"+JEnvironment.WEB_PORT+"/");
}catch(java.net.UnknownHostException e) { e.printStackTrace();}
System.setSecurityManager(new java.rmi.RMISecurityManager());
String []hosts = JScheduler.readHostFile(hostFile);
JScheduler._gridWorld = new JGrid(hosts);
JScheduler._totalNoOfCPUs = JScheduler._gridWorld.getSize();
}
/** The Constructor is protected and cannot instantiate an object. This
* Instance method is used for that purpose and helps keep the scheduler
* singlrton and from resenting itself.
**/
public static JScheduler getInstance(String hostFile) throws SingletonException {
if(JScheduler._singleton) return new JScheduler(hostFile);
else throw new SingletonException();
}
/** Read a host list from a text file and return a array of ip strings */
protected static String[] readHostFile(String fileName) {
String ip = null, ipList = null;
try {
java.io.BufferedReader in = new java.io.BufferedReader( new java.io.FileReader(fileName));
ip = in.readLine();
ipList = ip;
do {
ip = in.readLine();
if (ip != null) ipList += " "+ip;
}while( ip != null );
in.close();
} catch (java.io.IOException e) { e.printStackTrace();}
return ipList.split(" ");
}
/** Post is called by a user task as a single point entry into the grid
* runtime. The parameters involes a task object and an expected threshold
* of splits it should be given.
**/
public static void post(Task task, int cpuThreshold) {
JProcess p = null;
try { p = makeProcess(task,cpuThreshold); }
catch(UnknownRankException e) { e.printStackTrace(); }
p.start();
}
/** Internal method to test the liveliness of a node over the runtime. Tries
* to ping trafic class 4 packets and get back the runtime signature. This
* method needs to be improved as an ICMP ping later on as it takes a long
* time for faulty nodes.
**/
protected static boolean ping (int rank){
byte []pingData = " ".getBytes();
String ip;
try {
ip = _gridWorld.getIPForRank(rank);
java.net.Socket pingComm = new java.net.Socket();
pingComm.setSoTimeout(1000);
pingComm.setTcpNoDelay(true);
pingComm.setTrafficClass(0x10);
pingComm.connect(new java.net.InetSocketAddress(ip,JEnvironment.PING_PORT));
pingComm.getInputStream().read(pingData);
}
catch (UnknownRankException e) { e.printStackTrace(); return false; }
catch (java.net.UnknownHostException e) { e.printStackTrace(); return false;}
catch (java.io.IOException e) { e.printStackTrace(); return false; }
return true;
}
/** The Internal method called by post that actually synthesis a sub grid
* and creates a Process object. The sub grid creation is done adaptively
* based on available resources. If the threshold is not reach....well be
* happy with what you got.
**/
protected static JProcess makeProcess(Task task, int cpuThreshold) throws UnknownRankException {
if(cpuThreshold > _totalNoOfCPUs) return null;
int rankList[] = new int[cpuThreshold],j = 0,i;
RemoteThread []remoteThreads = new RemoteThread[cpuThreshold];
for(i = 0, j = 0; ( (j < rankList.length) && (i < _totalNoOfCPUs) ) ; i++)
if(ping(i))
if((remoteThreads[j] = _gridWorld.getValidThreadForRank(i)) != null)
rankList[j++] = i ;
JGrid subGrid = null;
if(j!=0) {
int actual_RankList[] = new int[j];
for(i=0;i<j;i++) actual_RankList[i] = rankList[i];
subGrid = new JGrid(_gridWorld,actual_RankList);
}
else return null;
return new JProcess(subGrid,task,remoteThreads);
}
}
/** A Process represents a fully ditributed process that has been posted and
* running. A thread runs, that initiates the distribution and initiation of
* the task. The sheduler creates a new process for every single task that is
* posted over the local seed.
**/
class JProcess implements Runnable{
protected JGrid _subGrid;
protected Task _task;
protected RemoteThread _remoteThreads[];
protected Thread _pThread;
protected Thread _pFailMonitor;
protected Thread _pWaitAll;
protected boolean playSafe = true;
public JProcess(JGrid grid,Task task,RemoteThread []rThreads) {
this._subGrid = grid;
this._task = task;
this._remoteThreads = rThreads;
_pThread = new Thread((Runnable)this);
}
public void start() {
this._pThread.start();
}
public void run() {
while (JLocalTaskQueue.addTask(this));
try{
for(int i=0;i<this._remoteThreads.length;i++)
this._remoteThreads[i].initialise(i, this._remoteThreads.length, this._task, this._subGrid);
for(int i=0;i<this._remoteThreads.length;i++)
this._remoteThreads[i].start();
for(int i=0;i<this._remoteThreads.length;i++)
while(this._remoteThreads[i].isAlive()) { Thread.sleep(2000);}
}catch(java.rmi.RemoteException e) { e.printStackTrace(); }
catch(InterruptedException e) { e.printStackTrace(); }
JLocalTaskQueue.removeTask(this);
}
public String[] getWorkers() {
return this._subGrid.getHostList();
}
/*protected void failSafe(int failedRank, String replaceIP) {
this.suspendAll(failedRank);
String []hosts = this.getWorkers();
hosts[failedRank] = replaceIP;
this._subGrid = new JGrid(hosts);
this._remoteThreads[failedRank] = this._subGrid.getValidThreadForRank(failedRank);
this._remoteThreads[failedRank].initialise(failedRank,this._subGrid.getSize(),this._task,this._subGrid);
this._remoteThreads[failedRank].start();
this._remoteThreads[failedRank].setGrid(this._subGrid);
this.resumeAll();
}*/
public void suspendAll() throws java.rmi.RemoteException {
for(int i=0;i<this._remoteThreads.length;i++){
this._remoteThreads[i].suspend();
}
}
public void suspendAll(int failedRank) throws java.rmi.RemoteException {
for(int i=0;i<this._remoteThreads.length;i++){
if(i != failedRank)
this._remoteThreads[i].suspend();
}
}
public void resumeAll() throws java.rmi.RemoteException {
for(int i=0;i<this._remoteThreads.length;i++){
this._remoteThreads[i].resume();
}
}
protected void stopAll() throws java.rmi.RemoteException {
for(int i=0;i<this._remoteThreads.length;i++){
this._remoteThreads[i].resume();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -