⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 job.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Vector;

import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;

/**
 * <p>Title: </p>
 * <p>Description: 每一个job要运行就生成一个job类,然后要执行这个job的worker加入进来,再分配子任务ID,</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */
//问题:1、subJobID如何生成?2、怎么在workerid和subjobid之间形成对应?3、worker完成工作后是退出这个job线程,还是由job线程直接再分配?
//每一个subjob都用一个线程??
//job类的功能?????
public class Job
{
    private String jobId;
    private String dispatcherId;
    public Vector subJobList; //
    private ArrayList finished_sj_idList;
    private Vector workerList; //完成这个job的worker队列
    private MonitorJob job;
    private Vector backupJobList;
    
    public int state=0;
    public boolean backupThreadFlag=false;//the flag of the backup thread;

    public long averageComputingTime=0;
    private long totalTime=0;
    private int count=0;//已经完成的子任务个数
    
//    DispatcherJobManagement jobManager;
    private Integer dummy;
    
    private static int num = 0;
    private DecimalFormat df=new DecimalFormat("######");
    

    public Job(MonitorJob job, String dispatcherId)
    {
        this.job = job; //任务说明文件(monitorjob类型)
        this.dispatcherId = dispatcherId;
        this.jobId = job.getJobID();
        this.subJobList = new Vector();
        this.workerList = new Vector();
        this.finished_sj_idList=new ArrayList();
        this.backupJobList=new Vector();
        df.applyPattern("000000");
    }

    /**
     * 申请新的子任务
     */
    public String[] newSubJob(int n,Integer djm,String backupIp,int backupPort)
    {  
//    	this.jobManager=djm;
    	this.dummy=djm;
    	System.out.println(n);
        String[] sjId=new String[n];
        SubJob[] sj=new SubJob[n];
        /*
        if (num == 0)
        {
        	int m=0;
        	while(m++<n)
        	{
        		sjId[m-1]="SJ" + dispatcherId.substring(1, dispatcherId.length()) +
                jobId.substring(1, jobId.length()) + df.format(m);//"000001";
                sj[m-1] = new SubJob(jobId, sjId[m-1], job.getfileName(),
                                       job.getdataServerIp(), job.getdataServerPort(),
                                       job.getprojectName());
        	}
        	num+=n;
        }
        else
        {*/

        	int m=0;
        	while(m++<n)
        	{
        		sjId[m-1]="SJ" + dispatcherId.substring(1, dispatcherId.length()) +
                jobId.substring(1, jobId.length()) + df.format(++num);//"000001";
        		System.out.println(m+" "+sjId[m-1]);
                sj[m-1] = new SubJob(jobId, sjId[m-1], job.getfileName(),
                                       job.getdataServerIp(), job.getdataServerPort(),
                                       job.getprojectName(),backupIp,backupPort,job.getUserId(),job.getUserPwd());
        	}
        synchronized (subJobList)
        {for(int i=0;i<n;i++)
        {
            subJobList.addElement(sj[i]);
        }
        }
        synchronized (DispatcherJobManagement.jobApply)
        {
        for (int i = 0; i < n; i++)
        {
            DispatcherJobManagement.jobApply.addLast(sj[i]);
        }
        }
        synchronized(dummy)
		{
            try
    		{
            	dummy.notifyAll();
    		}catch(IllegalMonitorStateException e)
    		{
    			e.printStackTrace();
    		}
		}
        return sjId;
    }
    
    /**
     * 申请新的子任务,用于容错,在三个周期内不入jobapply队列,而是先入backupJobList。
     */
    public int newSubJob(String subjobid,Integer djm)
    {  
    	SubJob tmpsj;
    	this.dummy=djm;
       tmpsj= new SubJob(jobId, subjobid, job.getfileName(),
                           job.getdataServerIp(), job.getdataServerPort(),
                          job.getprojectName(),null,0,job.getUserId(),job.getUserPwd());
       synchronized (subJobList)
        {
            subJobList.addElement(tmpsj);
        }
       synchronized(backupJobList)
	   {
       	backupJobList.addElement(tmpsj);
	   }
       //start the backup dealing thread
       if (!backupThreadFlag)
       {
           Thread bd=new BackupDeal();
           bd.start();
           backupThreadFlag=true;
       }
       return 1;
    }
    
    public class BackupDeal
	extends Thread
	{
    	private boolean isActive=true;
    	private static final int INTERVAL= 5 * 1000;
    	
    	public BackupDeal()
    	{
    		
    	}
    	
    	public void run()
        {
        while(isActive)
        {
            heartbeat();
                try
                {
                    Thread.sleep(INTERVAL);
                }
                catch (InterruptedException e)
                {
                //get back to work
                }
            }
        }
    	
    	public void heartbeat()
    	{
    		synchronized(backupJobList)
			{
    			int n=backupJobList.size();
    			for(int i=0;i<n;i++)
    			{
    				SubJob tmpsubjob=(SubJob)backupJobList.elementAt(i);
    				if (tmpsubjob.getState()!=4)
    					{backupJobList.remove(i);
    					continue;
    					}
    				tmpsubjob.intervalTime--;
    				if(tmpsubjob.intervalTime==0)
    				{
    					tmpsubjob.setWorkerState(0);
    					synchronized (DispatcherJobManagement.jobApply)
				        {
				            DispatcherJobManagement.jobApply.addLast(tmpsubjob);
				        }
    					backupJobList.remove(i);
				        synchronized(dummy)
						{
				            try
				    		{
				            	dummy.notifyAll();
				    		}catch(IllegalMonitorStateException e)
				    		{
				    			e.printStackTrace();
				    		}
						}
    				}
    			}
			}
    	}
	}

    public String getJobId()
    {
        return this.jobId;
    }

    /**
     *
     * @return
     */
    public Vector getWorketList()
    {
        return this.workerList;
    }
    
    public Vector getSubJobList()
    {
    	return this.subJobList;
    }
    
    /*
     * 将一个新的子任务的完成时间加入总时间,再计算平均计算时间
     */
    public void calAveTime(long t)
    {
    	this.totalTime+=t;
    	this.count++;
    	this.averageComputingTime=totalTime/count;
    }
    
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -