⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jobfaulttoleranthandler.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
/*
 * Created on 2005-10-8
 *
 * TODO To change the template for this generated file go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Vector;



/**
 * @author Administrator
 * 
 * TODO To change the template for this generated type comment go to Window -
 * Preferences - Java - Code Style - Code Templates
 */
public class JobFaultTolerantHandler extends Thread {

	private LinkedList jobApply; //申请woker的job(subjob)队列,先来先服务

	private LinkedList runningJobList;//已经分配过,还未运行完毕的(subJob)队列。

	private LinkedList urgentJobList;//超过指定时间(T)仍未计算完毕的子任务队列。

	private Vector jobList;

	private boolean isActive = true;
	
	private static int INTERVAL=60*1000;

	public JobFaultTolerantHandler(Vector jobL) {
		jobApply = DispatcherJobManagement.jobApply;
		runningJobList = DispatcherJobManagement.runningJobList;
		urgentJobList = DispatcherJobManagement.urgentJobList;
		jobList = jobL;
	}

	public void run() {
		while (isActive)
        {
            //System.out.println(dIpAddr+workerHeartbeatPort);
            handler();
            try
            {
            	if(!interrupted())
        		{
                Thread.sleep(INTERVAL);
        		}
            	else {
            	 isActive=false;
            	}
            }
            catch (InterruptedException e)
            {
                isActive=false;//get back to work
            }
        }

			
	}
	
	
	private void handler()
	{
		Job tmpJob = null;
		Hashtable ht = new Hashtable();
		synchronized (jobList) {
			for (int i = 0; i < jobList.size(); i++) {
				tmpJob = (Job) jobList.elementAt(i);
				ht.put(tmpJob.getJobId(), new Long(
						tmpJob.averageComputingTime));
			}
		}
		synchronized (runningJobList) {
			long currenttime = System.currentTimeMillis();
			for (int i = 0; i < runningJobList.size(); i++) {
				SubJob sj = (SubJob) runningJobList.get(i);
				String jobId = sj.getJobId();
				long runtime = currenttime - sj.distributeTime;
				Long avetime;
				if(Parameter.jobFaultTolerantTime==0)
				  avetime = (Long) ht.get(jobId);
				else avetime=new Long(Parameter.jobFaultTolerantTime);
				if (avetime.longValue() == 0)
					continue;
				long temptime = runtime - avetime.longValue();
				switch (sj.redun_num) {
				case 1:
					if (temptime >= 2 * avetime.longValue()) {
						this.urgentJobList.addFirst(sj);
						sj.redun_num++;
					}
					break;
				case 0:
					if (temptime >= avetime.longValue()) {
						this.urgentJobList.add(sj);
						sj.redun_num++;
					}
					break;
				default:
					break;
				}
			}//for
		}//synchronized
	}//handler
	
	public void quit()
    {
    	System.out.println("JobFaultTolerantHandle quitting...");
        isActive = false;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -