⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tasktransfer.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
/*
 * Created on 2004-12-4
 *
 * TODO To change the template for this generated file go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Vector;

import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;


/**
 * @author Administrator
 *
 * TODO To change the template for this generated type comment go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
public class TaskTransfer extends Thread{
    private DispatcherInfo dispatcherInfo;
    private Vector jobList;
    DispatcherJobManagement jobManager;
	
	private int taskPort;
	private ServerSocket serverSocket;
	
	private boolean isActive=true;
	
	private Integer dummy;
	
	public TaskTransfer(int port,DispatcherInfo info,Vector joblist,
            DispatcherJobManagement jobmanager,Integer dum)
	{
		this.taskPort=port;
		this.dispatcherInfo=info;
		this.jobList=joblist;
		this.jobManager=jobmanager;
		this.dummy=dum;
	}
	
	public void run()
	{
		try
        {
            serverSocket = new ServerSocket(this.taskPort);
            while (isActive)
            {try{
                Socket socket = null;
                socket = serverSocket.accept();
                this.process(socket); //连接后处理
            }catch(Exception e1)
			{
            	System.out.println(e1);
			}
            }//while
        }catch(Exception e)
		{
        	e.printStackTrace();
		}
        System.out.println("TaskTransfer handler quit!");
	}//run
	
	public void process(Socket sock)
	{  BufferedReader is=null;
	   PrintWriter os=null;
	   ObjectInputStream iob=null;

       try
       {
        is = new BufferedReader(new InputStreamReader(sock.
            getInputStream()));
        os = new PrintWriter(sock.getOutputStream());
        iob = new ObjectInputStream(sock.getInputStream());
        
        
//        System.out.println("Task transferance start");
        
            //transmit data based the protocol
            String lineStr = is.readLine(); 
//            System.out.println(lineStr);
            /*******************************
             * To communicate with monitor, and receive job and count
             ********************************/
            if (lineStr.equals("<Dispatcher cancelJob>"))
            {
                String tmpJobId = is.readLine(); //the jobid of the job that will be canceled
                //*注意更新dispatchernode的信息(jobcount、)以及idleworker队列等
                 //要返回成功与否的标志flag1!
                JobCanceling jc = new JobCanceling(tmpJobId);
                jc.start();

                os.println("<cancelJob finish>");
                os.flush();
            }
            else if (lineStr.equals("<Job transmission>"))
            {
            	System.out.println("Job transmission start:");
                int tmpJobCount=Integer.parseInt(is.readLine());
                if(Parameter.jobDebugIsActive)
                  System.out.println(tmpJobCount);
                os.println("<Send Object>");
                os.flush();
                MonitorJob tmpMonitorJob = (MonitorJob) iob.readObject(); //接收到monitorjob的信息
                if(Parameter.jobDebugIsActive)
                   System.out.println("Read JobDesc Object OK "+tmpMonitorJob.getJobID());
                //用线程处理
                Job tmpJob = new Job(tmpMonitorJob,dispatcherInfo.getDispatcherId());
                JobReceiving jr = new JobReceiving(tmpJob);
//                jr.start();      
                //os.println("<tansmission finish>");
                //os.flush();
                System.out.println("Job transmission finished");
            }  
            
            
            /********************************
             * communicate with user's main program(job) , and send the subtask's id that will be done under this dispatcher
             *********************************/
            else if (lineStr.equals("<new task>"))
            {
            	if(Parameter.jobDebugIsActive)
            	   System.out.println("<new task>");
                String tmpJobId = is.readLine(); //申请执行的任务ID
                int n = Integer.parseInt(is.readLine()); //申请执行子任务的个数
                if(Parameter.jobDebugIsActive)
                   System.out.println("task number: "+n);
                String tmpBackupIp=is.readLine();
                int tmpBackupPort=Integer.parseInt(is.readLine());
                int j = 0;
                Job tmpjob=null;
                synchronized(jobList){
                    for (; j < jobList.size(); j++)
                    {
                        if (tmpJobId.equals( ( (Job) (jobList.
                            elementAt(
                            j))).getJobId()))
                            break;
                    }
                    if(j>=jobList.size())
                    {
                    System.out.println("Don't find this job in jobList!");
                    return;
                    }
                   else {tmpjob=(Job) (jobList.elementAt(j));
                   }
                }
                String[] s=tmpjob.newSubJob(n,dummy,tmpBackupIp,tmpBackupPort);
                
                for (int i = 0; i < n; i++)
                {
                    os.println(s[i]);
                }
                os.flush();
            }
            else if (lineStr.equals("<get subtask status>"))
            {
//            	if(Parameter.jobDebugIsActive)
//            	   System.out.println("<get subtask status>");
            	String jobId=is.readLine();
                String subtaskId = is.readLine();
                int j = 0;
                Job tmpjob=null;
//                if(Parameter.jobDebugIsActive)
//                   System.out.println("size of jobList:"+jobList.size());
                synchronized(jobList){
                    for (; j < jobList.size(); j++)
                    {
                    	tmpjob=(Job) (jobList.elementAt(j));
                    	//System.out.println(jobId+"  "+tmpjob.getJobId());
                        if (jobId.equals( tmpjob.getJobId()))
                            {
//                        	if(Parameter.jobDebugIsActive)
//                        		System.out.println(jobId+"  "+tmpjob.getJobId());
                        	break;
                            }
                    }
                    if(j>=jobList.size())
                    {
                    System.out.println("Don't find this job in jobList!");
                    return;
                    }
                    //else tmpjob=(Job) (jobList.elementAt(j));
                }
                Vector subjoblist=tmpjob.getSubJobList();
                synchronized(subjoblist)
				{
                	int i=subjoblist.size();
                for(j=0;j<i;j++)
                {
                	
                	if (subtaskId.equals(((SubJob)subjoblist.elementAt(j)).getSubJobId()))
                		{
//                		  if(Parameter.jobDebugIsActive)
//                		  	System.out.println(subtaskId+"   equal  "+((SubJob)subjoblist.elementAt(j)).getSubJobId());
                		  break;
                		}
                }
                if(j>=i)
                {
                	System.out.println("Don't find this subTask in subTasklist!");
                	return;
                }
                os.println(((SubJob)(subjoblist.elementAt(j))).getState());
                os.flush();
				}
            }
            //fault tolerant .......
            else if (lineStr.equals("fault tolerance"))
            {
            	is.readLine();//"<get subtask status>" is disposed.
            	String jobId=is.readLine();
                String subtaskId = is.readLine();
                int j = 0;
                Job tmpjob=null;
                System.out.println("size of jobList:"+jobList.size());
                synchronized(jobList){
                    for (; j < jobList.size(); j++)
                    {
                    	tmpjob=(Job) (jobList.elementAt(j));
                    	System.out.println(jobId+"  "+tmpjob.getJobId());
                        if (jobId.equals( tmpjob.getJobId()))
                            break;
                    }
                    if(j>=jobList.size())
                    {
                    System.out.println("Don't find this job in jobList!");
                    return;
                    }
                    //else tmpjob=(Job) (jobList.elementAt(j));
                }
                tmpjob.newSubJob(subtaskId,dummy);//fault tolerant, apply an new subjob
                tmpjob.state=4;//fault tolerant
                os.println(tmpjob.state);
                os.flush();
				}
	   }catch(Exception e)
	   {
	   	e.printStackTrace();
	   	//System.out.println(e.toString());
	   }
	   finally
	   {
	   	try{
	   	sock.close();
	   	}catch(Exception e)
		{
	   		e.printStackTrace();
		}
	   }
	}//process
	
	  //do with the job receving
    class JobReceiving
        extends Thread
    {
    	private Job job;
        public JobReceiving(Job tmpJob)
        {
        	job=tmpJob;
        	
//        }
//
//        public void run()
//        {
            if (! (jobManager.jobReceiving(job)))
            {
                System.out.println("job receiviving error!");
                return;
            }
            synchronized (dispatcherInfo)
            {
//            	System.out.println(job.getJobId());
                dispatcherInfo.insertJob(job.getJobId()); //更新dispacherInfo的信息。
            }
        }
    }

    //do with the job canceling
    class JobCanceling
        extends Thread
    {
        String tjId;
        public JobCanceling(String tmpJobId)
        {
        tjId=tmpJobId;
        }

        public void run()
        {
            if (! (jobManager.cancelJob(tjId)))
            {
            	System.out.println("Fail to cancel job from jobManager ");
                return;
            }
            //synchronized (dispatcherInfo)
            //{
                dispatcherInfo.cancelJob(tjId); //update dispatcherInfo
            //}
            return;
        }
    }
    
    public void quit()
    {
    	System.out.println("Task transfer handler quitting...");
    	isActive=false;
    	if(!this.serverSocket.isClosed())
    	{
    	try{
    	     this.serverSocket.close();
    	}catch(Exception e)
		{
    		e.printStackTrace();
		}
    	}
    }

}//TaskTransfer

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -