⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 .#monitorcommunication.java.1.3

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 3
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
import java.util.Vector;

import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;

/**
 * <p>Title: </p>
 * <p>Description: 用于处理与monitor的通信,
 * 现此类不再使用,功能由Receive_MPool.java代替.</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MonitorCommunication
    extends Thread
{
    private static final int TIME_OUT =3*60*1000;
    private boolean isActive = true; //用于控制accept线程的终止

    private int serverPort; //与monitor通信的服务器端口
    private DispatcherInfo dispatcherInfo;
    private Vector jobList;
    DispatcherJobManagement jobManager;
    private MonitorJob tmpMonitorJob;//分配来的新任务
    private int tmpJobCount=0;//任务数
    private String tmpJobId;//要取消的job的ID
    private boolean flag1 = false; //job取消工作完成标志
    private Integer dummy;

    private ServerSocket serverSocket; //与monitor通信的serversocket

    public MonitorCommunication(int localPort, DispatcherInfo nodeInfo,
                                Vector jobList,
                                DispatcherJobManagement jobManager)
    {
    	System.out.println("the monitorcommunication start");
        this.serverPort = localPort;
        this.dispatcherInfo = nodeInfo;
        this.jobList = jobList;
        this.jobManager = jobManager;
        tmpMonitorJob = null;
        serverSocket = null;
    }

    public void run()
    {
        try
        {
            serverSocket = new ServerSocket(this.serverPort);
            while (isActive)
            {
                Socket socket = null;
                try
                {
                    socket = serverSocket.accept();
                    Process p = new Process(socket); //连接后处理
                    p.start();
                }
                catch (java.io.InterruptedIOException e)
                {
                //等待连接超时,返回循环开头;(无处理)
                }
                catch (IOException e)
                {
                	System.out.println("I/O Error");
                	isActive=false;
                }
                catch (SecurityException e)
                {
                	System.out.println("An unauthorized client has attempted to connect");
                	isActive=false;
                }
                catch (Throwable e)
                {
                	System.out.println("Unexpected exception");
                	isActive=false;
                }
            } // while
            try
            {
                serverSocket.close();
            }
            catch (IOException e)
            {
                log(e);
                error("Error closing server socket", e);
            }
        } // try
        /*catch(SocketTimeoutException e)
		{
        	System.out.println("The monitor has downed now!");
		}*/
        catch (IOException e)
        {
            log(e);
            error("Error binding to port", e);
        }
        catch (SecurityException e)
        {
            log(e);
            error("The security manager refused permission to bind to port", e);
        }
    } // run

    /**
     *
     * <p>Title: </p>
     * <p>Description:连接处理线程:内联类 </p>
     * <p>Copyright: Copyright (c) 2004</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    public class Process
        extends Thread
    {
        Socket socket;
        private BufferedReader is;
        private PrintWriter os;
        private ObjectOutputStream oob;
        private ObjectInputStream iob;
        private boolean flag=true;

        public Process(Socket sock)
        {
            this.socket = sock;
            try{
            is = new BufferedReader(new InputStreamReader(socket.
                getInputStream()));
            os = new PrintWriter(socket.getOutputStream());
            iob = new ObjectInputStream(socket.getInputStream());
            //oob = new ObjectOutputStream(socket.getOutputStream());
            }
            catch(Exception e)
			{
            	e.printStackTrace();
			}
            System.out.println("Monitor onpoll start");
        }
        

        public void run()
        {
            //process the query from monitor
        	while(flag)
        	{
            if (socket == null || socket.isClosed())
            {
                return;
            }
            try
            {
                //transmit data based the protocol
                String lineStr = is.readLine();           
                /*******************************
                 * To communicate with monitor, and receive job and count
                 ********************************/
                if (lineStr.equals("<Dispatcher poll>"))
                {
                	System.out.println("<Dispatcher poll>");
                    //dispatcherInfo.dispatcherUpdate();
                    //the update cost about one second 
                    //System.out.println("idleworkercount: "+dispatcherInfo.getIdleWorkerCount());
                    //System.out.println("workercount: "+dispatcherInfo.getWorkerCount());
                    //oob.writeObject(dispatcherInfo); 
                    //oob.flush();
                    os.println(dispatcherInfo.getDispatcherId());
                    //os.println(dispatcherInfo.getFirstMonitorId());
                    //os.println(dispatcherInfo.getFirstMonitorIp());
                    //os.println(dispatcherInfo.getSecondMonitorId());
                    //os.println(dispatcherInfo.getSecondMonitorIp());
                    os.println(dispatcherInfo.getJobCount());
                    os.println(dispatcherInfo.getWorkerCount());
                    os.println(dispatcherInfo.getIdleWorkerCount());
                    os.println(dispatcherInfo.getCanAcceptJob());
                    os.println(dispatcherInfo.getDispatcherLoad());
                    //os.println(dispatcherInfo.getDispacherJob());
                    os.flush();
                    //System.out.println(lineStr);
                    lineStr = is.readLine();
                    if (lineStr.equals("<poll finish>"))
                    {
                        //System.out.println(lineStr);
                    //return; 
                    }
                }
                else if (lineStr.equals("<Dispatcher cancelJob>"))
                {
                    tmpJobId = is.readLine(); //the jobid of the job that will be canceled
                    //*注意更新dispatchernode的信息(jobcount、)以及idleworker队列等
                     //要返回成功与否的标志flag1!
                    JobCanceling jc = new JobCanceling(tmpJobId);
                    jc.start();

                    os.println("<cancelJob finish>");
                    os.flush();
                }
                else if (lineStr.equals("<Job transmission>"))
                {
                	System.out.println("Job transmission start:");
                    tmpJobCount=Integer.parseInt(is.readLine());
                    System.out.println(tmpJobCount);
                    os.println("<Send Object>");
                    os.flush();
                    tmpMonitorJob = (MonitorJob) iob.readObject(); //接收到monitorjob的信息
                    System.out.println("Read JobDesc Object OK"+tmpMonitorJob.getJobID());
                    //用线程处理
                    Job tmpJob = new Job(tmpMonitorJob,dispatcherInfo.getDispatcherId());
                    JobReceiving jr = new JobReceiving(tmpJob);
                    jr.start();
                    
                   
                    //os.println("<tansmission finish>");
                    //os.flush();
                    System.out.println("Job transmission finished");
                }
                
                
                
                
                /********************************
                 * communicate with user's main program(job) , and send the subtask's id that will be done under this dispatcher
                 *********************************/
                else if (lineStr.equals("<new task>"))
                {
                	System.out.println("<new task>");
                    String tmpJobId = is.readLine(); //申请执行的任务ID
                    int n = Integer.parseInt(is.readLine()); //申请执行子任务的个数
                    int j = 0;
                    Job tmpjob=null;
                    synchronized(jobList){
                        for (; j < jobList.size(); j++)
                        {
                            if (tmpJobId.equals( ( (Job) (jobList.
                                elementAt(
                                j))).getJobId()))
                                break;
                        }
                        if(j>=jobList.size())
                        {
                        System.out.println("Don't find this job in jobList!");
                        return;
                        }
                       else tmpjob=(Job) (jobList.elementAt(j));
                    }
                    //String[] s=tmpjob.newSubJob(n,dummy);
                    
                    for (int i = 0; i < n; i++)
                    {
                      //  os.println(s[i]);
                    }
                    os.flush();
                }
                else if (lineStr.equals("<get subtask status>"))
                {
                	String jobId=is.readLine();
                    String subtaskId = is.readLine();
                    int j = 0;
                    Job tmpjob=null;
                    synchronized(jobList){
                        for (; j < jobList.size(); j++)
                        {
                            if (tmpJobId.equals( ( (Job) (jobList.
                                elementAt(
                                j))).getJobId()))
                                break;
                        }
                        if(j>=jobList.size())
                        {
                        System.out.println("Don't find this job in jobList!");
                        return;
                        }
                        else tmpjob=(Job) (jobList.elementAt(j));
                    }
                    Vector subjoblist=tmpjob.getSubJobList();
                    synchronized(subjoblist)
					{
                    	int i=subjoblist.size();
                    for(j=0;j<i;j++)
                    {
                    	if (subtaskId.equals(subjoblist.elementAt(j)))
                    		break;
                    }
                    if(j>=i)
                    {
                    	System.out.println("Don't find this subTask in subTasklist!");
                    	return;
                    }
                    os.println(((SubJob)(subjoblist.elementAt(j))).getState());
                    os.flush();
					}
                }
            }
            catch (IOException e)
            {

                flag=false;
            	e.printStackTrace();
                //System.out.println(e.toString());
                try
                {
                	//oob.close();
                	iob.close();
                	os.close();
                    is.close();
                    socket.close();
                    //oob.close();
                }
                catch (IOException e1)
                {
                    log(e1);
                    flag=false;
                }
            }
            catch (ClassNotFoundException e)
            {

                flag=false;
                try
                {
                	//oob.close();
                	iob.close();
                	os.close();
                    is.close();
                    socket.close();
                }
                catch (IOException e1)
                {
                    log(e);
                    flag=false;
                }
                e.printStackTrace();
            }
        	}//while
        } //process
    }

    public void terminate()
    {
        isActive = false; //终止serversocket服务器
    }

    public void log(Exception e)
    {
    e.printStackTrace();
    }

    public void error(String message, Throwable e)
    {
        //错误处理到日志
        System.err.println(new Date() + ":dispatcherNode(" +
                           serverSocket.getLocalPort() +
                           "):error:" + message + ":" + e.getClass().getName() +
                           ":" + e.getMessage());
    } //error

    //do with the job receving
    class JobReceiving
        extends Thread
    {
    	private Job job;
        public JobReceiving(Job tmpJob)
        {
        	job=tmpJob;
        	
        }

        public void run()
        {
            if (! (jobManager.jobReceiving(job)))
            {
                System.out.println("job transform error:");
                return;
            }
            synchronized (dispatcherInfo)
            {
            	System.out.println(job.getJobId());
                dispatcherInfo.insertJob(job.getJobId()); //更新dispacherInfo的信息。
            }
        }
    }

    //do with the job canceling
    class JobCanceling
        extends Thread
    {
        String tjId;
        public JobCanceling(String tmpJobId)
        {
        tjId=tmpJobId;
        }

        public void run()
        {
            if (! (jobManager.cancelJob(tmpJobId)))
            {
                flag1 = false;
                return;
            }
            //synchronized (dispatcherInfo)
            //{
                dispatcherInfo.cancelJob(tmpJobId); //update dispatcherInfo
            //}
            flag1 = true;
            return;
        }
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -