⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchercommunication.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.worker;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.PrintWriter;
import java.net.Socket;

import cn.edu.hust.cgcl.biogrid.dispatcher.SubJob;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DispatcherCommunication
    extends Thread
{
    private String dIpAddr;
    private String subdIpAddr;
    private String workerId;
    private String dispatcherId;
    private int workerHeartbeatPort;
    private int subWorkerHeartbeatPort;
    public SubJob sj;
    private WorkerInfo nodeInfo;
    private boolean isActive = true;
//  private boolean quitFlag=false;
    DataPollCommunication dpc;

    private static final int TIME_OUT = 20 * 1000;
    private static final int INTERVAL = 5 * 1000;

    private Socket nodeSocket = null;

    private BufferedReader is;
    private PrintWriter os;
    private ObjectInputStream iob;
   

    public DispatcherCommunication(WorkerInfo nodeInfo)
    {
        this.sj = null;
        this.nodeInfo = nodeInfo;
        this.workerId = nodeInfo.getWorkerId();
        this.dispatcherId = nodeInfo.get_f_d_Id();
        this.dIpAddr = nodeInfo.get_f_d_Ip();
        this.workerHeartbeatPort = nodeInfo.get_f_d_port();
        this.subdIpAddr=nodeInfo.get_s_d_Ip();
        this.subWorkerHeartbeatPort=nodeInfo.get_s_d_port();
    }

    public void run()
    {
        while (isActive)
        {
            //System.out.println(dIpAddr+workerHeartbeatPort);
            heartbeat();
            try
            {
            	if(!interrupted())
        		{
                Thread.sleep(INTERVAL);
        		}
            	else {
            				isActive=false;
            	}
            }
            catch (InterruptedException e)
            {
                isActive=false;//get back to work
            }
        }
//        System.out.println("DispatcherCommunication's quit!");
        try{
        if(!this.nodeSocket.isClosed())
        	this.nodeSocket.close();
        }catch(Exception e)
		{
        	
		}
    }
    
    public void quit()
    {
    	System.out.println("DispatcherCommunication handler quitting...");
        isActive = false;
        try{
            if(!this.nodeSocket.isClosed())
            	this.nodeSocket.close();
            }catch(Exception e)
    		{
            	
    		}
    }

    public boolean heartbeat()
    {
//        boolean flag = false;
        if (nodeSocket == null)
        {
            if (!initCommunication())
            {
                return false;
            }
        }
        {
            try
            {
                os.println(this.workerId);
//                System.out.print("poll start: ");
                os.println("<worker alive>");
                os.flush();
                String stmp = is.readLine();
                if (stmp.equals("<new job>"))
                {

                    String rtmp= is.readLine(); 
                    System.out.println("new job start!");
                    //........
                    os.println("<new job accept>");
                    os.flush();
                    //iob=new ObjectInputStream(nodeSocket.getInputStream());
                    sj = (SubJob) iob.readObject();
                    nodeInfo.sj=sj;
                    System.out.println("the new job id is: "+sj.getSubJobId());
                    //iob.close();
                    
                    //set the info of beckup Dispatcher. 
                    nodeInfo.secondDispatcherIPAddr=sj.subDispatcherIp;
                    nodeInfo.secondDispatcherPort=sj.subDispatcherPort;
                    this.subdIpAddr=sj.subDispatcherIp;
                    this.subWorkerHeartbeatPort=sj.subDispatcherPort;
                    
                    nodeInfo.setFinishFlag(false);
                    dpc = new DataPollCommunication(sj, workerId, nodeInfo);
                    if(rtmp.equalsIgnoreCase("1"))
                    	dpc.regetDataFlag=true;//判断是否是容错处理,需再去数据。
                    else dpc.regetDataFlag=false;
                    stmp = is.readLine();//"<dispatcher alive>"
                    dpc.start();
                }
            }
            catch (IOException e)
            {
                System.out.println(e.toString());
                //e.printStackTrace();
                try
                {
                	if(!nodeSocket.isClosed())
                       nodeSocket.close();
//                    is.close();
//                    os.close();
                    nodeSocket=null;
                    is=null;
                    os=null;

                 /*{//容错,需第二个dispatcher,现测试,删除
                    this.dIpAddr=this.subdIpAddr;
                    this.workerHeartbeatPort=this.subWorkerHeartbeatPort;
                    subdIpAddr=null;
                    subWorkerHeartbeatPort=0;
                    if (subdIpAddr==null)
                    {
                        return false;
                    } // if
                    if (!this.initCommunication())
                    {
                        return false;
                    } // if
                    
                    os.println(this.workerId);
//                    System.out.print("poll start: ");

                    this.os.println("worker fault tolerance");
                    os.println(sj.getJobId());
                    os.println(sj.getSubJobId());
                    os.println(sj.getState());
                    os.flush();
                    String msg=is.readLine();
                    if (!msg.equals("worker fault tolerance finish"))
                    {
                        return false;
                    } // if
                   }//容错*/
                    //continue;
//                    return true;
                } // try
                catch (Exception e1)
                {
                    e1.printStackTrace();
                    return false;
                } // catch
                return false;
            } // catch
            catch (ClassNotFoundException e)
            {
                System.out.println(e.toString());
                try
                {
                    nodeSocket.close();
                    is.close();
                    os.close();
                }
                catch (Exception e1)
                {
                    e1.printStackTrace();
                    return false;
                } // catch
                return false;
            } // catch
            return true;
        } // while

        //return flag;

    }

    private boolean initCommunication()
    {
        try
        {
            this.nodeSocket = new Socket(dIpAddr, workerHeartbeatPort);

            nodeSocket.setSoTimeout(TIME_OUT);
            is = new BufferedReader(new InputStreamReader(nodeSocket.
                getInputStream()));
            os = new PrintWriter(nodeSocket.getOutputStream());
            iob = new ObjectInputStream(nodeSocket.getInputStream());
        }
        catch (Exception e)
        {
            try
            {
                if (nodeSocket != null)
                    nodeSocket.close();
                if (os != null)
                    os.close();
                if (is != null)
                    is.close();
            }
            catch (Exception e1)
            {
                e1.printStackTrace();
            }
            //iob=null;
            return false;
        }
        return true;
    }


    

    private void log(Exception e)
    {
        e.printStackTrace();
    }

    // 调用函数之前需要判断是否是worker容错:"worker fault tolerance"
    static public boolean handleWorkerFaultTolerance(SubJob job,
        BufferedReader is,
        PrintWriter os)
        throws Exception
    {
        String jobid=is.readLine();
        String subjobid=is.readLine();
        int subjobstate=Integer.parseInt(is.readLine());
        os.println("worker fault tolerance finish");
        os.flush();

        return true;
    } // handleWorkerFaultTolerance
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -