📄 job.java
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;
/**
* <p>Title: </p>
* <p>Description: 每一个job要运行就生成一个job类,然后要执行这个job的worker加入进来,再分配子任务ID,</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
//问题:1、subJobID如何生成?2、怎么在workerid和subjobid之间形成对应?3、worker完成工作后是退出这个job线程,还是由job线程直接再分配?
//每一个subjob都用一个线程??
//job类的功能?????
public class Job
{
private String jobId;
private String dispatcherId;
public Vector subJobList; //
private ArrayList finished_sj_idList;
private Vector workerList; //完成这个job的worker队列
private MonitorJob job;
private Vector backupJobList;
public int state=0;
public boolean backupThreadFlag=false;//the flag of the backup thread;
public long averageComputingTime=0;
private long totalTime=0;
private int count=0;//已经完成的子任务个数
// DispatcherJobManagement jobManager;
private Integer dummy;
private static int num = 0;
private DecimalFormat df=new DecimalFormat("######");
public Job(MonitorJob job, String dispatcherId)
{
this.job = job; //任务说明文件(monitorjob类型)
this.dispatcherId = dispatcherId;
this.jobId = job.getJobID();
this.subJobList = new Vector();
this.workerList = new Vector();
this.finished_sj_idList=new ArrayList();
this.backupJobList=new Vector();
df.applyPattern("000000");
}
/**
* 申请新的子任务
*/
public String[] newSubJob(int n,Integer djm,String backupIp,int backupPort)
{
// this.jobManager=djm;
this.dummy=djm;
System.out.println(n);
String[] sjId=new String[n];
SubJob[] sj=new SubJob[n];
/*
if (num == 0)
{
int m=0;
while(m++<n)
{
sjId[m-1]="SJ" + dispatcherId.substring(1, dispatcherId.length()) +
jobId.substring(1, jobId.length()) + df.format(m);//"000001";
sj[m-1] = new SubJob(jobId, sjId[m-1], job.getfileName(),
job.getdataServerIp(), job.getdataServerPort(),
job.getprojectName());
}
num+=n;
}
else
{*/
int m=0;
while(m++<n)
{
sjId[m-1]="SJ" + dispatcherId.substring(1, dispatcherId.length()) +
jobId.substring(1, jobId.length()) + df.format(++num);//"000001";
System.out.println(m+" "+sjId[m-1]);
sj[m-1] = new SubJob(jobId, sjId[m-1], job.getfileName(),
job.getdataServerIp(), job.getdataServerPort(),
job.getprojectName(),backupIp,backupPort,job.getUserId(),job.getUserPwd());
}
synchronized (subJobList)
{for(int i=0;i<n;i++)
{
subJobList.addElement(sj[i]);
}
}
synchronized (DispatcherJobManagement.jobApply)
{
for (int i = 0; i < n; i++)
{
DispatcherJobManagement.jobApply.addLast(sj[i]);
}
}
synchronized(dummy)
{
try
{
dummy.notifyAll();
}catch(IllegalMonitorStateException e)
{
e.printStackTrace();
}
}
return sjId;
}
/**
* 申请新的子任务,用于容错,在三个周期内不入jobapply队列,而是先入backupJobList。
*/
public int newSubJob(String subjobid,Integer djm)
{
SubJob tmpsj;
this.dummy=djm;
tmpsj= new SubJob(jobId, subjobid, job.getfileName(),
job.getdataServerIp(), job.getdataServerPort(),
job.getprojectName(),null,0,job.getUserId(),job.getUserPwd());
synchronized (subJobList)
{
subJobList.addElement(tmpsj);
}
synchronized(backupJobList)
{
backupJobList.addElement(tmpsj);
}
//start the backup dealing thread
if (!backupThreadFlag)
{
Thread bd=new BackupDeal();
bd.start();
backupThreadFlag=true;
}
return 1;
}
public class BackupDeal
extends Thread
{
private boolean isActive=true;
private static final int INTERVAL= 5 * 1000;
public BackupDeal()
{
}
public void run()
{
while(isActive)
{
heartbeat();
try
{
Thread.sleep(INTERVAL);
}
catch (InterruptedException e)
{
//get back to work
}
}
}
public void heartbeat()
{
synchronized(backupJobList)
{
int n=backupJobList.size();
for(int i=0;i<n;i++)
{
SubJob tmpsubjob=(SubJob)backupJobList.elementAt(i);
if (tmpsubjob.getState()!=4)
{backupJobList.remove(i);
continue;
}
tmpsubjob.intervalTime--;
if(tmpsubjob.intervalTime==0)
{
tmpsubjob.setWorkerState(0);
synchronized (DispatcherJobManagement.jobApply)
{
DispatcherJobManagement.jobApply.addLast(tmpsubjob);
}
backupJobList.remove(i);
synchronized(dummy)
{
try
{
dummy.notifyAll();
}catch(IllegalMonitorStateException e)
{
e.printStackTrace();
}
}
}
}
}
}
}
public String getJobId()
{
return this.jobId;
}
/**
*
* @return
*/
public Vector getWorketList()
{
return this.workerList;
}
public Vector getSubJobList()
{
return this.subJobList;
}
/*
* 将一个新的子任务的完成时间加入总时间,再计算平均计算时间
*/
public void calAveTime(long t)
{
this.totalTime+=t;
this.count++;
this.averageComputingTime=totalTime/count;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -