⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 .#dispatchernodemanagement.java.1.9

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 9
📖 第 1 页 / 共 3 页
字号:
                {
                    log(e);
                    //error("I/O error", e);
                }
                finally
                {
                    try
                    {
                        is.close();
                        s.close();
                        os.close();
                    }
                    catch (IOException e)
                    {
                        log(e);
                        //error("Error closing socket", e);
                    }
                }
                return;
            }
//.....
        }

        public void terminate()
        {
        	System.out.println("handler quitting...");
            isActive = false;
           
        }

    }

    /**
     *
     * <p>Title: </p>
     * <p>Description:接收各个worker的heartbeat </p>
     * <p>Copyright: Copyright (c) 2004</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    public class ListenningHandler
        extends Thread
    {
        private boolean isActive = true;
        private boolean subflag=true;
        public ListenningHandler()
        {

        }

        public void run()
        {
            while (isActive)
            {
                Socket socket = null;
                try
                {
                    socket = listenSocket.accept();
                    Process l = new Process(socket); //连接后处理
                    l.start();
                }
                catch (java.io.InterruptedIOException e)
                {
                //等待连接超时,返回循环开头;(无处理)
                }
                catch (IOException e)
                {
                    log(e);
                    isActive=false;
                    // error("I/O Error", e);
                }
                catch (SecurityException e)
                {
                    log(e);
                    isActive=false;
                    //error("An unauthorized client has attempted to connect", e);
                }
                /*catch (Throwable e)
                                 {
                    error("Unexpected exception", e);
                                 }*/
            }//while
            System.out.println("listening handler quit!");
        }

        public class Process
            extends Thread
        {
            Socket s;
            private BufferedReader is;
            private PrintWriter os;
            private ObjectOutputStream oob;
            private boolean flag=true;

            public Process(Socket sock)
            {
                this.s = sock;
                try{
                is = new BufferedReader(new InputStreamReader(s.
                    getInputStream()));
                //iob = new ObjectInputStream(s.getInputStream());
                os = new PrintWriter(s.getOutputStream());
                oob = new ObjectOutputStream(s.getOutputStream());
                }catch(Exception e1)
				{
                	//e1.printStackTrace();
                	System.out.println(e1.toString());
				}
                System.out.println("Worker poll start: ");
            }
            
            public void terminate()
            {
            	try
				{
            	if(!this.s.isClosed())
            		this.s.close();
            	flag=false;
				}catch(Exception e)
				{
					
				}
            }

            public void run()
            {
            	while(flag&&subflag)
            	{

                if (s == null)
                {
                    return;
                }
                try
                {
                    //协议传输数据
                    String tmpWorkerId = is.readLine();
                    System.out.println(tmpWorkerId);
                    System.out.print("idle worker node:> ");
                    for(int istart=0;istart<idleWorkerNodeList.size();istart++)
                    	System.out.print(idleWorkerNodeList.elementAt(istart)+"  ");
                    System.out.println();

                    String lineStr = is.readLine();
                    //System.out.println(lineStr);
                    
                    int i = 0;
                    WorkerNode wn = null;
                    boolean ftFlag=false;
                    synchronized(ftWorkerNodeList)
					{
                    	for (i=0; i < ftWorkerNodeList.size(); i++)
                        {
                            if (tmpWorkerId.equals((String)(ftWorkerNodeList.elementAt(i))))
                                {
                            	ftFlag=true;
                            	break;
                                }
                        }
					}
                    if(ftFlag)
                    {
                    	if (lineStr.equals("<worker alive>"))
                        {
                            os.println("<dispatcher alive>");
                            os.flush();
                            //System.out.println("<dispatcher alive>");
                        }
                        else if (lineStr.equals("<subjob hava finished>"))
                        {
                        	String ft_jobId=is.readLine();
                        	String ft_subJobId=is.readLine();
                        	int j=0;
                        	Job tmpjob=null;
                        	synchronized(jobList){
                                for (j=0; j < jobList.size(); j++)
                                {
                                	tmpjob=(Job) (jobList.elementAt(j));
                                	//System.out.println(jobId+"  "+tmpjob.getJobId());
                                    if (ft_jobId.equals( tmpjob.getJobId()))
                                        break;
                                }
                                if(j>=jobList.size())
                                {
                                System.out.println("Don't find this job in jobList!");
                                return;
                                }
                                //else tmpjob=(Job) (jobList.elementAt(j));
                            }
                        	Vector tmpSjList=tmpjob.subJobList;
                        	SubJob tmpsubjob=null;
                        	synchronized(tmpSjList){
                                for (j=0; j < tmpSjList.size(); j++)
                                {
                                	tmpsubjob=(SubJob) (jobList.elementAt(j));
                                	//System.out.println(jobId+"  "+tmpjob.getJobId());
                                    if (ft_subJobId.equals( tmpsubjob.getSubJobId()))
                                        {
                                    	tmpsubjob.setWorkerState(2);//finished
                                    	break;
                                        }
                                }
                                if(j>=tmpSjList.size())
                                {
                                System.out.println("Don't find this subjob in subjobList!");
                                return;
                                }
                                //else tmpjob=(Job) (jobList.elementAt(j));
                            }
                        	
                            synchronized(dummy)
    						{
                            try{
                            dummy.notifyAll();
                            }
                            catch(IllegalMonitorStateException e)
    						{
                            	
    						}
    						}
                            os.println("<dispatcher hava known>");
                            os.flush();
                            tmpsubjob.finishedTime=System.currentTimeMillis();
                            tmpsubjob.computingTime=tmpsubjob.finishedTime-tmpsubjob.distributeTime;
                        }
                        else if (lineStr.equals("<worker quit>"))
                        {
                            //交给workernode的一个函数处理
                        	synchronized(ftWorkerNodeList)
							{
		                    	for (i=0; i < ftWorkerNodeList.size(); i++)
		                        {
		                            if (tmpWorkerId.equals((String)(ftWorkerNodeList.elementAt(i))))
		                                {
		                            	ftWorkerNodeList.remove(i);
		                            	break;
		                                }
		                        }
							}
                            os.println("<worker quit agree>");
                            os.flush();
                        }
                        
                        this.s.close();
                        return;
                    }
                    else{
                    synchronized (workerNodeList)
                    {                    
                        for (i=0; i < workerNodeList.size(); i++)
                        {
                            if (tmpWorkerId.equals( ( (WorkerNode) (
                                workerNodeList.elementAt(i))).getWorkerId()))
                                break;
                        }
                        if (i >= workerNodeList.size())
                        {
                            //查看是否为已死worker,若找到,复活。
                            int j = 0;
                            synchronized (deadWorkerNodeList)
                            {
                                for (j=0; j < deadWorkerNodeList.size(); j++)
                                {
                                    if (tmpWorkerId.equals( ( (WorkerNode) (
                                        deadWorkerNodeList.elementAt(j))).
                                        getWorkerId()))
                                        break;
                                }
                                if (j >= deadWorkerNodeList.size())
                                {//if fault tolerant ,then deal with...
                                	
                                	if(lineStr.equals("worker fault tolerance"))
                                    {
                                		
                                    	String ft_jobId=is.readLine();
                                    	String ft_subJobId=is.readLine();
                                    	int ft_sjState=Integer.parseInt(is.readLine());
                                    	
                                    	os.println("worker fault tolerance finish");
                                    	os.flush();
                                    	ftWorkerNodeList.add(tmpWorkerId);
                                    	Job tmpjob=null;
                                    	synchronized(jobList){
                                            for (j=0; j < jobList.size(); j++)
                                            {
                                            	tmpjob=(Job) (jobList.elementAt(j));
                                            	//System.out.println(jobId+"  "+tmpjob.getJobId());
                                                if (ft_jobId.equals( tmpjob.getJobId()))
                                                    break;
                                            }
                                            if(j>=jobList.size())
                                            {
                                            System.out.println("Don't find this job in jobList!");
                                            return;
                                            }
                                            //else tmpjob=(Job) (jobList.elementAt(j));
                                        }
                                    	Vector tmpSjList=tmpjob.subJobList;
                                    	SubJob tmpsubjob=null;
                                    	synchronized(tmpSjList){
                                            for (j=0; j < tmpSjList.size(); j++)
                                            {
                                            	tmpsubjob=(SubJob) (jobList.elementAt(j));
                                            	//System.out.println(jobId+"  "+tmpjob.getJobId());
                                                if (ft_subJobId.equals( tmpsubjob.getSubJobId()))
                                                    {
                                                	tmpsubjob.setWorkerState(ft_sjState);
                                                	break;
                                                    }
                                            }
                                            if(j>=tmpSjList.size())
                                            {
                                            System.out.println("Don't find this subjob in subjobList!");
                                            return;
                                            }
                                            //else tmpjob=(Job) (jobList.elementAt(j));
                                        }
                                    	
                                    }
                                	else
                                    {System.out.println(
                                        "Don't find this worker in workerNodeList and deadWorkerNodeList!");
                                    return;
                                    }
                                }
                                else

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -