⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatcherjobmanagement.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;


import java.util.LinkedList;
import java.util.Vector;

/**
 * <p>Title: </p>
 * <p>Description: 对worker进行调度和管理。有个空闲worker队列,把空闲worker交给需要job的job类。</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DispatcherJobManagement
{
    //private int userPort; //与用户的应用主程序通信的服务器端口
    private Vector jobList; //job队列
    private Vector workerNodeList; //worker队列
    private Vector idleWorkerNodeList; //空闲worker队列
    public static LinkedList jobApply; //申请woker的job(subjob)队列,先来先服务
    public static LinkedList runningJobList;//已经分配过,还未运行完毕的(subJob)队列。
    public static LinkedList urgentJobList;//超过指定时间(T)仍未计算完毕的子任务队列。
    //private DispatcherNode dispatcherNode;
    //private DispatcherNodeManagement dispatcherNodeManagement;
    private boolean isActive = true;
    public DispatcherJobServerHandler handler;
    private UserCommunication uc;       
    
    DispatcherInfo nodeInfo;
    
    private Integer dummy;   
 

    public DispatcherJobManagement( Vector jobList,
                                   Vector workerNodeList,
                                   Vector idleWorkerNodeList,DispatcherInfo dispatcherInfo,Integer dum)
    {
        jobApply = new LinkedList();
        runningJobList=new LinkedList();
        urgentJobList=new LinkedList();
        //this.userPort = userPort;
        this.jobList = jobList;
        this.workerNodeList = workerNodeList;
        this.idleWorkerNodeList = idleWorkerNodeList;
        this.nodeInfo=dispatcherInfo;
        this.dummy=dum;    
    }

    public void startJobServer()
    {
        handler = new DispatcherJobServerHandler();
        handler.start();
        //uc = new UserCommunication(userPort,jobList,jobApply);
        //uc.start();
    }

    public void quit()
    {
    	System.out.println("jobmanagerment quitting...");
    	if(handler!=null&&handler.isAlive())
           {handler.interrupt();
    		handler.terminate();
           }
        System.out.println("jobmanagement quit!");
    }

    /**
     * 取消作业的处理(默认为只有还未被应用主程序申请的job才会被取消)
     * @param jobId
     * @return
     */
    public boolean cancelJob(String jobId)
    {
        synchronized (jobList)
        {
            int n = jobList.size();
            for (int i = 0; i < n; i++)
            {
                Job tmpjob = (Job) jobList.elementAt(i);
                if (jobId.equals(tmpjob.getJobId()))
                {
                    jobList.removeElement(tmpjob);                    
                    return true;
                }
            }
        }
        return false;
    }

    public boolean jobReceiving(Job tmpJob)
    {
        //Job tmpJob = new Job(tmpMonitorJob,dispatcherId);
        synchronized (jobList)
        {
            jobList.addElement(tmpJob);
        }        
        return true;
    }
    
     
    /**
     *
     * <p>Title: </p>
     * <p>Description:向subjob分配worker </p>
     * <p>Copyright: Copyright (c) 2004</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    class DispatcherJobServerHandler
        extends Thread
    {
        private boolean djsIsActive = true;
        private boolean runflag=true;
        private static final int POLL_INTERVAL = 1000;
        public DispatcherJobServerHandler()
        {
        }
        
        public void run()
        {
            while (djsIsActive)
            {
            	this.workerDistribute();
            }
            System.out.println("jobserverhandler quit!");            
            return;
        } //run
        
        /**
         * 将空闲worker分配给一个job线程(预分配)
         */
        public synchronized void workerDistribute() {
        	int whichJobList=0;
			try {
				if(Parameter.jobDebugIsActive)
				{
					System.out.println("workerDistributing starting!");
					System.err.println("the jobApply size is: "+jobApply.size());
					System.err.println("the urgentjoblist size is: "+urgentJobList.size());
				}
				synchronized (dummy) {
					while (runflag && ((jobApply.size()) == 0)
							&& (urgentJobList.size() == 0)
							|| (idleWorkerNodeList.size()) == 0)
						dummy.wait();
				}
				if(Parameter.jobDebugIsActive)
					System.out.println("workerDistributing start!");
				if (jobApply.size() == 0&&urgentJobList.size() == 0)
					return;
				SubJob tmpjob=null;
				if(urgentJobList.size() != 0)
					tmpjob=(SubJob)urgentJobList.getFirst();
				else if(jobApply.size()!=0)
					{
					   whichJobList=1;
					   tmpjob = (SubJob) jobApply.getFirst();
					}
				else return;
				String tmpworkerid="";
				if(idleWorkerNodeList.size()!=0)
				    tmpworkerid = (String) idleWorkerNodeList.firstElement();
				else return;
				int i = 0;
				WorkerNode tmpwn=null;
				while(i<workerNodeList.size())
				{
					tmpwn=(WorkerNode) (workerNodeList.elementAt(i));
					if(tmpworkerid.compareTo(tmpwn.getWorkerId())==0)
						break;
					i++;
				}
				if (i >= workerNodeList.size()) {
					System.out
							.println("Haven't found this worker in idleWorkerNodeList!");
				} else {
					if(whichJobList==0)
						tmpwn.isReduncyWorker=true;
					else tmpwn.isReduncyWorker=false;
					tmpwn.setJob(tmpjob); //将这个subjob预分配给这个worker
					tmpwn.setJobApply(jobApply);
					tmpjob.setWorkerNode(tmpwn.getWorkerId());//同时将这个worker分配给subjob
					if(whichJobList==0)
						urgentJobList.removeFirst();
					else jobApply.removeFirst();
					idleWorkerNodeList.removeElementAt(0);
					System.out.println("idleWorkerNodeList.size():  "
							+ idleWorkerNodeList.size());
					nodeInfo.setIdleWorkerCount(idleWorkerNodeList.size());
					tmpjob.distributeTime = System.currentTimeMillis();//记录分配子任务的时间。
					runningJobList.add(tmpjob);
				}		

			} catch (Exception e) {
				//e.printStackTrace();
				djsIsActive = false;
				runflag = false;
				System.out.println(e.toString());
			}
		}       
       

        public void terminate()
        {
        	System.out.println("DispatcherJobServer handler quitting...");
            djsIsActive = false;
            runflag=false;
        }
    } //DispatcherJobServerHandler
    
    class TestDeadSubJob extends Thread
	{
    	
	}

        public void log(Exception e)
        {
        e.printStackTrace();
        }

       
    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -