📄 jobfaulttoleranthandler.java
字号:
/*
* Created on 2005-10-8
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Vector;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class JobFaultTolerantHandler extends Thread {
private LinkedList jobApply; //申请woker的job(subjob)队列,先来先服务
private LinkedList runningJobList;//已经分配过,还未运行完毕的(subJob)队列。
private LinkedList urgentJobList;//超过指定时间(T)仍未计算完毕的子任务队列。
private Vector jobList;
private boolean isActive = true;
private static int INTERVAL=60*1000;
public JobFaultTolerantHandler(Vector jobL) {
jobApply = DispatcherJobManagement.jobApply;
runningJobList = DispatcherJobManagement.runningJobList;
urgentJobList = DispatcherJobManagement.urgentJobList;
jobList = jobL;
}
public void run() {
while (isActive)
{
//System.out.println(dIpAddr+workerHeartbeatPort);
handler();
try
{
if(!interrupted())
{
Thread.sleep(INTERVAL);
}
else {
isActive=false;
}
}
catch (InterruptedException e)
{
isActive=false;//get back to work
}
}
}
private void handler()
{
Job tmpJob = null;
Hashtable ht = new Hashtable();
synchronized (jobList) {
for (int i = 0; i < jobList.size(); i++) {
tmpJob = (Job) jobList.elementAt(i);
ht.put(tmpJob.getJobId(), new Long(
tmpJob.averageComputingTime));
}
}
synchronized (runningJobList) {
long currenttime = System.currentTimeMillis();
for (int i = 0; i < runningJobList.size(); i++) {
SubJob sj = (SubJob) runningJobList.get(i);
String jobId = sj.getJobId();
long runtime = currenttime - sj.distributeTime;
Long avetime;
if(Parameter.jobFaultTolerantTime==0)
avetime = (Long) ht.get(jobId);
else avetime=new Long(Parameter.jobFaultTolerantTime);
if (avetime.longValue() == 0)
continue;
long temptime = runtime - avetime.longValue();
switch (sj.redun_num) {
case 1:
if (temptime >= 2 * avetime.longValue()) {
this.urgentJobList.addFirst(sj);
sj.redun_num++;
}
break;
case 0:
if (temptime >= avetime.longValue()) {
this.urgentJobList.add(sj);
sj.redun_num++;
}
break;
default:
break;
}
}//for
}//synchronized
}//handler
public void quit()
{
System.out.println("JobFaultTolerantHandle quitting...");
isActive = false;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -