⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchernode.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Vector;

/**
 * <p>Title: </p>
 * <p>Description: 负责dispathcer所有的通信管理工作,包括与monitor、dispatcher和worker</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

//dispatcher节点的特点是既有被动连接,又有主动连接;即socket和serversocket;
//socket用于和dispatcher节点的通信 ;
//serversocket用于响应monitor的连接;
//暂时不考虑备份(包括monito和dispatcher)

public class DispatcherNode
    extends Thread
{
    private String ipAddr;
    private int dispatcherPort; //与dispatcher通信端口//hasn't be initialized.

    private DispatcherInfo dispatcherInfo;
    //private int status;

    private Vector jobList; //job队列
    private Vector workerNodeList; //worker队列
    private Vector idleWorkerNodeList; //空闲worker队列
    private Vector deadWorkerNodeList; //死亡的worker队列
    //注意:队列处理要互斥*/
    
    private DispatcherJobManagement jobManager;
    private DispatcherNodeManagement nodeManager;
    Receive_MPoll t1;
    TaskTransfer t2;
        
    private Integer dummy=new Integer(0);
    private CountWorker cw;
    public DispatcherNode()
    {
        try
        {
            this.ipAddr = InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e)
        {
            e.printStackTrace();
        }

        this.jobList = new Vector();
        this.workerNodeList = new Vector();
        this.idleWorkerNodeList = new Vector();
        this.deadWorkerNodeList = new Vector();
        //dispatcherInfo=null;//must notice this place....
        dispatcherInfo = new DispatcherInfo(); //dispatcherInfo的内容要初始化。
        jobManager = new DispatcherJobManagement(jobList,
                workerNodeList,
                idleWorkerNodeList,dispatcherInfo,dummy);
        //jobManager.startJobServer();
        nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
            workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
            dispatcherInfo,dummy);
        //nodeManager.StartNodeServer();
        t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
        t1.start();
        t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
        t2.start();
        //Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
       // t3.start();
        cw=new CountWorker(1,workerNodeList);
    } //DispatcherNode

    public DispatcherNode(DispatcherInfo di)
    {
    	inputPort();
        try
        {
            this.ipAddr = InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e)
        {
            e.printStackTrace();
        }
        this.dispatcherInfo = di;
        this.jobList = new Vector();
        this.workerNodeList = new Vector();
        this.idleWorkerNodeList = new Vector();
        this.deadWorkerNodeList = new Vector();
        jobManager = new DispatcherJobManagement(jobList,
                workerNodeList,
                idleWorkerNodeList,dispatcherInfo,dummy);
        //jobManager.startJobServer();
        nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
            workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
            dispatcherInfo,dummy);
        //nodeManager.StartNodeServer();
        t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
        t1.start();
        t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
        t2.start();
        //Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
        //t3.start();
        cw=new CountWorker(1,workerNodeList);
    }

    public DispatcherNode(String dispatcherid)
    {
    	inputPort();
        try
        {
            this.ipAddr = InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e)
        {
            e.printStackTrace();
        }

        this.jobList = new Vector();
        this.workerNodeList = new Vector();
        this.idleWorkerNodeList = new Vector();
        this.deadWorkerNodeList = new Vector();
        //dispatcherInfo=null;//must notice this place....
        dispatcherInfo = new DispatcherInfo(); //dispatcherInfo的内容要初始化。
        jobManager = new DispatcherJobManagement(jobList,
                workerNodeList,
                idleWorkerNodeList,dispatcherInfo,dummy);
        //jobManager.startJobServer();
        nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
            workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
            dispatcherInfo,dummy);
        //nodeManager.StartNodeServer();
        Thread t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
        t1.start();
        Thread t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
        t2.start();
        //Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
       // t3.start();
        cw=new CountWorker(1,workerNodeList);
    }
    
    protected void inputPort()
    {
    	System.out.println("please input the numbers of ports that used to create the server sockets...");
    	
    	try
		{
    		
        java.io.InputStreamReader inputstreamreader = new java.io.InputStreamReader(System.in);
        java.io.BufferedReader bufferedreader = new java.io.BufferedReader(inputstreamreader);
        System.out.print("please input the serverPort that used to communicate with monitor:(5002)>");
        String port = bufferedreader.readLine();  port=port.trim();
        if(port.equals(""))
        	Parameter.serverPort=5002;
        else
        	Parameter.serverPort=Integer.parseInt(port);
        System.out.print("please input the serverPort that used to communicate with worker:(5003)>");
        port=bufferedreader.readLine().trim();
        if(port.equals(""))
        	Parameter.workerPort=5003;
        else
        	Parameter.workerPort=Integer.parseInt(port);
        System.out.print("please input the serverPort that used to onpool worker:(5004)>");
        port=bufferedreader.readLine().trim();
        if(port.equals(""))
        	Parameter.listenPort=5004;
        else
        	Parameter.listenPort=Integer.parseInt(port);
        System.out.print("please input the task transfer port:(5007)>");
        port=bufferedreader.readLine().trim();
        if(port.equals(""))
        	Parameter.taskPort=5007;
        else
        	Parameter.taskPort=Integer.parseInt(port);
		}
    	catch(IOException e)
		{
    		e.printStackTrace();
		}
    }
    
    
    public DispatcherInfo getDispatcherInfo()
    {
        return dispatcherInfo;
    } // GetMonitorInfo

    public void init(String dispatcherId, String m_mIPAddr,String m_mId)
    {
    	this.dispatcherInfo.init(dispatcherId,m_mId,m_mIPAddr,"","");
    //this.dispatcherInfo=new DispatcherInfo(dispatcherId,m_mId,m_mIPAddr,"","");
    }

    public int getDispatcherPort()
    {
    return Parameter.serverPort;
    }
    
    public int getWorkerPort()
    {
    	return Parameter.workerPort;
    }
    
    public int getTaskPort()
    {
    	return Parameter.taskPort;
    }
    
    public int getListenPort()
    {
    	return Parameter.listenPort;
    }

    public void run()
    {
    	nodeManager.StartNodeServer();
    	jobManager.startJobServer();
    	cw.start();
    } // run

    public void quit()
    {
    	if(t1.isAlive()) t1.quit();
    	if(t2.isAlive()) t2.quit();
    	nodeManager.quit();
    	jobManager.quit();
    	cw.quit();
    }
    
    

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -