📄 dispatchercommunication.java
字号:
package cn.edu.hust.cgcl.biogrid.worker;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.PrintWriter;
import java.net.Socket;
import cn.edu.hust.cgcl.biogrid.dispatcher.SubJob;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherCommunication
extends Thread
{
private String dIpAddr;
private String subdIpAddr;
private String workerId;
private String dispatcherId;
private int workerHeartbeatPort;
private int subWorkerHeartbeatPort;
public SubJob sj;
private WorkerInfo nodeInfo;
private boolean isActive = true;
// private boolean quitFlag=false;
DataPollCommunication dpc;
private static final int TIME_OUT = 20 * 1000;
private static final int INTERVAL = 5 * 1000;
private Socket nodeSocket = null;
private BufferedReader is;
private PrintWriter os;
private ObjectInputStream iob;
public DispatcherCommunication(WorkerInfo nodeInfo)
{
this.sj = null;
this.nodeInfo = nodeInfo;
this.workerId = nodeInfo.getWorkerId();
this.dispatcherId = nodeInfo.get_f_d_Id();
this.dIpAddr = nodeInfo.get_f_d_Ip();
this.workerHeartbeatPort = nodeInfo.get_f_d_port();
this.subdIpAddr=nodeInfo.get_s_d_Ip();
this.subWorkerHeartbeatPort=nodeInfo.get_s_d_port();
}
public void run()
{
while (isActive)
{
//System.out.println(dIpAddr+workerHeartbeatPort);
heartbeat();
try
{
if(!interrupted())
{
Thread.sleep(INTERVAL);
}
else {
isActive=false;
}
}
catch (InterruptedException e)
{
isActive=false;//get back to work
}
}
// System.out.println("DispatcherCommunication's quit!");
try{
if(!this.nodeSocket.isClosed())
this.nodeSocket.close();
}catch(Exception e)
{
}
}
public void quit()
{
System.out.println("DispatcherCommunication handler quitting...");
isActive = false;
try{
if(!this.nodeSocket.isClosed())
this.nodeSocket.close();
}catch(Exception e)
{
}
}
public boolean heartbeat()
{
// boolean flag = false;
if (nodeSocket == null)
{
if (!initCommunication())
{
return false;
}
}
{
try
{
os.println(this.workerId);
// System.out.print("poll start: ");
os.println("<worker alive>");
os.flush();
String stmp = is.readLine();
if (stmp.equals("<new job>"))
{
String rtmp= is.readLine();
System.out.println("new job start!");
//........
os.println("<new job accept>");
os.flush();
//iob=new ObjectInputStream(nodeSocket.getInputStream());
sj = (SubJob) iob.readObject();
nodeInfo.sj=sj;
System.out.println("the new job id is: "+sj.getSubJobId());
//iob.close();
//set the info of beckup Dispatcher.
nodeInfo.secondDispatcherIPAddr=sj.subDispatcherIp;
nodeInfo.secondDispatcherPort=sj.subDispatcherPort;
this.subdIpAddr=sj.subDispatcherIp;
this.subWorkerHeartbeatPort=sj.subDispatcherPort;
nodeInfo.setFinishFlag(false);
dpc = new DataPollCommunication(sj, workerId, nodeInfo);
if(rtmp.equalsIgnoreCase("1"))
dpc.regetDataFlag=true;//判断是否是容错处理,需再去数据。
else dpc.regetDataFlag=false;
stmp = is.readLine();//"<dispatcher alive>"
dpc.start();
}
}
catch (IOException e)
{
System.out.println(e.toString());
//e.printStackTrace();
try
{
if(!nodeSocket.isClosed())
nodeSocket.close();
// is.close();
// os.close();
nodeSocket=null;
is=null;
os=null;
/*{//容错,需第二个dispatcher,现测试,删除
this.dIpAddr=this.subdIpAddr;
this.workerHeartbeatPort=this.subWorkerHeartbeatPort;
subdIpAddr=null;
subWorkerHeartbeatPort=0;
if (subdIpAddr==null)
{
return false;
} // if
if (!this.initCommunication())
{
return false;
} // if
os.println(this.workerId);
// System.out.print("poll start: ");
this.os.println("worker fault tolerance");
os.println(sj.getJobId());
os.println(sj.getSubJobId());
os.println(sj.getState());
os.flush();
String msg=is.readLine();
if (!msg.equals("worker fault tolerance finish"))
{
return false;
} // if
}//容错*/
//continue;
// return true;
} // try
catch (Exception e1)
{
e1.printStackTrace();
return false;
} // catch
return false;
} // catch
catch (ClassNotFoundException e)
{
System.out.println(e.toString());
try
{
nodeSocket.close();
is.close();
os.close();
}
catch (Exception e1)
{
e1.printStackTrace();
return false;
} // catch
return false;
} // catch
return true;
} // while
//return flag;
}
private boolean initCommunication()
{
try
{
this.nodeSocket = new Socket(dIpAddr, workerHeartbeatPort);
nodeSocket.setSoTimeout(TIME_OUT);
is = new BufferedReader(new InputStreamReader(nodeSocket.
getInputStream()));
os = new PrintWriter(nodeSocket.getOutputStream());
iob = new ObjectInputStream(nodeSocket.getInputStream());
}
catch (Exception e)
{
try
{
if (nodeSocket != null)
nodeSocket.close();
if (os != null)
os.close();
if (is != null)
is.close();
}
catch (Exception e1)
{
e1.printStackTrace();
}
//iob=null;
return false;
}
return true;
}
private void log(Exception e)
{
e.printStackTrace();
}
// 调用函数之前需要判断是否是worker容错:"worker fault tolerance"
static public boolean handleWorkerFaultTolerance(SubJob job,
BufferedReader is,
PrintWriter os)
throws Exception
{
String jobid=is.readLine();
String subjobid=is.readLine();
int subjobstate=Integer.parseInt(is.readLine());
os.println("worker fault tolerance finish");
os.flush();
return true;
} // handleWorkerFaultTolerance
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -