📄 dispatchernode.java
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Vector;
/**
* <p>Title: </p>
* <p>Description: 负责dispathcer所有的通信管理工作,包括与monitor、dispatcher和worker</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
//dispatcher节点的特点是既有被动连接,又有主动连接;即socket和serversocket;
//socket用于和dispatcher节点的通信 ;
//serversocket用于响应monitor的连接;
//暂时不考虑备份(包括monito和dispatcher)
public class DispatcherNode
extends Thread
{
private String ipAddr;
private int dispatcherPort; //与dispatcher通信端口//hasn't be initialized.
private DispatcherInfo dispatcherInfo;
//private int status;
private Vector jobList; //job队列
private Vector workerNodeList; //worker队列
private Vector idleWorkerNodeList; //空闲worker队列
private Vector deadWorkerNodeList; //死亡的worker队列
//注意:队列处理要互斥*/
private DispatcherJobManagement jobManager;
private DispatcherNodeManagement nodeManager;
Receive_MPoll t1;
TaskTransfer t2;
private Integer dummy=new Integer(0);
private CountWorker cw;
public DispatcherNode()
{
try
{
this.ipAddr = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e)
{
e.printStackTrace();
}
this.jobList = new Vector();
this.workerNodeList = new Vector();
this.idleWorkerNodeList = new Vector();
this.deadWorkerNodeList = new Vector();
//dispatcherInfo=null;//must notice this place....
dispatcherInfo = new DispatcherInfo(); //dispatcherInfo的内容要初始化。
jobManager = new DispatcherJobManagement(jobList,
workerNodeList,
idleWorkerNodeList,dispatcherInfo,dummy);
//jobManager.startJobServer();
nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
dispatcherInfo,dummy);
//nodeManager.StartNodeServer();
t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
t1.start();
t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
t2.start();
//Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
// t3.start();
cw=new CountWorker(1,workerNodeList);
} //DispatcherNode
public DispatcherNode(DispatcherInfo di)
{
inputPort();
try
{
this.ipAddr = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e)
{
e.printStackTrace();
}
this.dispatcherInfo = di;
this.jobList = new Vector();
this.workerNodeList = new Vector();
this.idleWorkerNodeList = new Vector();
this.deadWorkerNodeList = new Vector();
jobManager = new DispatcherJobManagement(jobList,
workerNodeList,
idleWorkerNodeList,dispatcherInfo,dummy);
//jobManager.startJobServer();
nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
dispatcherInfo,dummy);
//nodeManager.StartNodeServer();
t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
t1.start();
t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
t2.start();
//Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
//t3.start();
cw=new CountWorker(1,workerNodeList);
}
public DispatcherNode(String dispatcherid)
{
inputPort();
try
{
this.ipAddr = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException e)
{
e.printStackTrace();
}
this.jobList = new Vector();
this.workerNodeList = new Vector();
this.idleWorkerNodeList = new Vector();
this.deadWorkerNodeList = new Vector();
//dispatcherInfo=null;//must notice this place....
dispatcherInfo = new DispatcherInfo(); //dispatcherInfo的内容要初始化。
jobManager = new DispatcherJobManagement(jobList,
workerNodeList,
idleWorkerNodeList,dispatcherInfo,dummy);
//jobManager.startJobServer();
nodeManager = new DispatcherNodeManagement(Parameter.workerPort, Parameter.listenPort,
workerNodeList, idleWorkerNodeList, deadWorkerNodeList,jobList,
dispatcherInfo,dummy);
//nodeManager.StartNodeServer();
Thread t1 = new Receive_MPoll(Parameter.serverPort,dispatcherInfo); //启动一个服务器线程,用于响应monitor的轮训
t1.start();
Thread t2=new TaskTransfer(Parameter.taskPort,dispatcherInfo,jobList,jobManager,dummy);
t2.start();
//Thread t3 = new DispatcherCommunication(ipAddr, dispatcherPort,backupServerPort,dispatcherInfo,jobList,workerNodeList,idleWorkerNodeList,deadWorkerNodeList); //启动一个线程,用于和dispatcher通信
// t3.start();
cw=new CountWorker(1,workerNodeList);
}
protected void inputPort()
{
System.out.println("please input the numbers of ports that used to create the server sockets...");
try
{
java.io.InputStreamReader inputstreamreader = new java.io.InputStreamReader(System.in);
java.io.BufferedReader bufferedreader = new java.io.BufferedReader(inputstreamreader);
System.out.print("please input the serverPort that used to communicate with monitor:(5002)>");
String port = bufferedreader.readLine(); port=port.trim();
if(port.equals(""))
Parameter.serverPort=5002;
else
Parameter.serverPort=Integer.parseInt(port);
System.out.print("please input the serverPort that used to communicate with worker:(5003)>");
port=bufferedreader.readLine().trim();
if(port.equals(""))
Parameter.workerPort=5003;
else
Parameter.workerPort=Integer.parseInt(port);
System.out.print("please input the serverPort that used to onpool worker:(5004)>");
port=bufferedreader.readLine().trim();
if(port.equals(""))
Parameter.listenPort=5004;
else
Parameter.listenPort=Integer.parseInt(port);
System.out.print("please input the task transfer port:(5007)>");
port=bufferedreader.readLine().trim();
if(port.equals(""))
Parameter.taskPort=5007;
else
Parameter.taskPort=Integer.parseInt(port);
}
catch(IOException e)
{
e.printStackTrace();
}
}
public DispatcherInfo getDispatcherInfo()
{
return dispatcherInfo;
} // GetMonitorInfo
public void init(String dispatcherId, String m_mIPAddr,String m_mId)
{
this.dispatcherInfo.init(dispatcherId,m_mId,m_mIPAddr,"","");
//this.dispatcherInfo=new DispatcherInfo(dispatcherId,m_mId,m_mIPAddr,"","");
}
public int getDispatcherPort()
{
return Parameter.serverPort;
}
public int getWorkerPort()
{
return Parameter.workerPort;
}
public int getTaskPort()
{
return Parameter.taskPort;
}
public int getListenPort()
{
return Parameter.listenPort;
}
public void run()
{
nodeManager.StartNodeServer();
jobManager.startJobServer();
cw.start();
} // run
public void quit()
{
if(t1.isAlive()) t1.quit();
if(t2.isAlive()) t2.quit();
nodeManager.quit();
jobManager.quit();
cw.quit();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -