📄 dispatcherjobmanagement.java
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.util.LinkedList;
import java.util.Vector;
/**
* <p>Title: </p>
* <p>Description: 对worker进行调度和管理。有个空闲worker队列,把空闲worker交给需要job的job类。</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherJobManagement
{
//private int userPort; //与用户的应用主程序通信的服务器端口
private Vector jobList; //job队列
private Vector workerNodeList; //worker队列
private Vector idleWorkerNodeList; //空闲worker队列
public static LinkedList jobApply; //申请woker的job(subjob)队列,先来先服务
public static LinkedList runningJobList;//已经分配过,还未运行完毕的(subJob)队列。
public static LinkedList urgentJobList;//超过指定时间(T)仍未计算完毕的子任务队列。
//private DispatcherNode dispatcherNode;
//private DispatcherNodeManagement dispatcherNodeManagement;
private boolean isActive = true;
public DispatcherJobServerHandler handler;
private UserCommunication uc;
DispatcherInfo nodeInfo;
private Integer dummy;
public DispatcherJobManagement( Vector jobList,
Vector workerNodeList,
Vector idleWorkerNodeList,DispatcherInfo dispatcherInfo,Integer dum)
{
jobApply = new LinkedList();
runningJobList=new LinkedList();
urgentJobList=new LinkedList();
//this.userPort = userPort;
this.jobList = jobList;
this.workerNodeList = workerNodeList;
this.idleWorkerNodeList = idleWorkerNodeList;
this.nodeInfo=dispatcherInfo;
this.dummy=dum;
}
public void startJobServer()
{
handler = new DispatcherJobServerHandler();
handler.start();
//uc = new UserCommunication(userPort,jobList,jobApply);
//uc.start();
}
public void quit()
{
System.out.println("jobmanagerment quitting...");
if(handler!=null&&handler.isAlive())
{handler.interrupt();
handler.terminate();
}
System.out.println("jobmanagement quit!");
}
/**
* 取消作业的处理(默认为只有还未被应用主程序申请的job才会被取消)
* @param jobId
* @return
*/
public boolean cancelJob(String jobId)
{
synchronized (jobList)
{
int n = jobList.size();
for (int i = 0; i < n; i++)
{
Job tmpjob = (Job) jobList.elementAt(i);
if (jobId.equals(tmpjob.getJobId()))
{
jobList.removeElement(tmpjob);
return true;
}
}
}
return false;
}
public boolean jobReceiving(Job tmpJob)
{
//Job tmpJob = new Job(tmpMonitorJob,dispatcherId);
synchronized (jobList)
{
jobList.addElement(tmpJob);
}
return true;
}
/**
*
* <p>Title: </p>
* <p>Description:向subjob分配worker </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
class DispatcherJobServerHandler
extends Thread
{
private boolean djsIsActive = true;
private boolean runflag=true;
private static final int POLL_INTERVAL = 1000;
public DispatcherJobServerHandler()
{
}
public void run()
{
while (djsIsActive)
{
this.workerDistribute();
}
System.out.println("jobserverhandler quit!");
return;
} //run
/**
* 将空闲worker分配给一个job线程(预分配)
*/
public synchronized void workerDistribute() {
int whichJobList=0;
try {
if(Parameter.jobDebugIsActive)
{
System.out.println("workerDistributing starting!");
System.err.println("the jobApply size is: "+jobApply.size());
System.err.println("the urgentjoblist size is: "+urgentJobList.size());
}
synchronized (dummy) {
while (runflag && ((jobApply.size()) == 0)
&& (urgentJobList.size() == 0)
|| (idleWorkerNodeList.size()) == 0)
dummy.wait();
}
if(Parameter.jobDebugIsActive)
System.out.println("workerDistributing start!");
if (jobApply.size() == 0&&urgentJobList.size() == 0)
return;
SubJob tmpjob=null;
if(urgentJobList.size() != 0)
tmpjob=(SubJob)urgentJobList.getFirst();
else if(jobApply.size()!=0)
{
whichJobList=1;
tmpjob = (SubJob) jobApply.getFirst();
}
else return;
String tmpworkerid="";
if(idleWorkerNodeList.size()!=0)
tmpworkerid = (String) idleWorkerNodeList.firstElement();
else return;
int i = 0;
WorkerNode tmpwn=null;
while(i<workerNodeList.size())
{
tmpwn=(WorkerNode) (workerNodeList.elementAt(i));
if(tmpworkerid.compareTo(tmpwn.getWorkerId())==0)
break;
i++;
}
if (i >= workerNodeList.size()) {
System.out
.println("Haven't found this worker in idleWorkerNodeList!");
} else {
if(whichJobList==0)
tmpwn.isReduncyWorker=true;
else tmpwn.isReduncyWorker=false;
tmpwn.setJob(tmpjob); //将这个subjob预分配给这个worker
tmpwn.setJobApply(jobApply);
tmpjob.setWorkerNode(tmpwn.getWorkerId());//同时将这个worker分配给subjob
if(whichJobList==0)
urgentJobList.removeFirst();
else jobApply.removeFirst();
idleWorkerNodeList.removeElementAt(0);
System.out.println("idleWorkerNodeList.size(): "
+ idleWorkerNodeList.size());
nodeInfo.setIdleWorkerCount(idleWorkerNodeList.size());
tmpjob.distributeTime = System.currentTimeMillis();//记录分配子任务的时间。
runningJobList.add(tmpjob);
}
} catch (Exception e) {
//e.printStackTrace();
djsIsActive = false;
runflag = false;
System.out.println(e.toString());
}
}
public void terminate()
{
System.out.println("DispatcherJobServer handler quitting...");
djsIsActive = false;
runflag=false;
}
} //DispatcherJobServerHandler
class TestDeadSubJob extends Thread
{
}
public void log(Exception e)
{
e.printStackTrace();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -