📄 .#dispatchernodemanagement.java.1.9
字号:
{
log(e);
//error("I/O error", e);
}
finally
{
try
{
is.close();
s.close();
os.close();
}
catch (IOException e)
{
log(e);
//error("Error closing socket", e);
}
}
return;
}
//.....
}
public void terminate()
{
System.out.println("handler quitting...");
isActive = false;
}
}
/**
*
* <p>Title: </p>
* <p>Description:接收各个worker的heartbeat </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class ListenningHandler
extends Thread
{
private boolean isActive = true;
private boolean subflag=true;
public ListenningHandler()
{
}
public void run()
{
while (isActive)
{
Socket socket = null;
try
{
socket = listenSocket.accept();
Process l = new Process(socket); //连接后处理
l.start();
}
catch (java.io.InterruptedIOException e)
{
//等待连接超时,返回循环开头;(无处理)
}
catch (IOException e)
{
log(e);
isActive=false;
// error("I/O Error", e);
}
catch (SecurityException e)
{
log(e);
isActive=false;
//error("An unauthorized client has attempted to connect", e);
}
/*catch (Throwable e)
{
error("Unexpected exception", e);
}*/
}//while
System.out.println("listening handler quit!");
}
public class Process
extends Thread
{
Socket s;
private BufferedReader is;
private PrintWriter os;
private ObjectOutputStream oob;
private boolean flag=true;
public Process(Socket sock)
{
this.s = sock;
try{
is = new BufferedReader(new InputStreamReader(s.
getInputStream()));
//iob = new ObjectInputStream(s.getInputStream());
os = new PrintWriter(s.getOutputStream());
oob = new ObjectOutputStream(s.getOutputStream());
}catch(Exception e1)
{
//e1.printStackTrace();
System.out.println(e1.toString());
}
System.out.println("Worker poll start: ");
}
public void terminate()
{
try
{
if(!this.s.isClosed())
this.s.close();
flag=false;
}catch(Exception e)
{
}
}
public void run()
{
while(flag&&subflag)
{
if (s == null)
{
return;
}
try
{
//协议传输数据
String tmpWorkerId = is.readLine();
System.out.println(tmpWorkerId);
System.out.print("idle worker node:> ");
for(int istart=0;istart<idleWorkerNodeList.size();istart++)
System.out.print(idleWorkerNodeList.elementAt(istart)+" ");
System.out.println();
String lineStr = is.readLine();
//System.out.println(lineStr);
int i = 0;
WorkerNode wn = null;
boolean ftFlag=false;
synchronized(ftWorkerNodeList)
{
for (i=0; i < ftWorkerNodeList.size(); i++)
{
if (tmpWorkerId.equals((String)(ftWorkerNodeList.elementAt(i))))
{
ftFlag=true;
break;
}
}
}
if(ftFlag)
{
if (lineStr.equals("<worker alive>"))
{
os.println("<dispatcher alive>");
os.flush();
//System.out.println("<dispatcher alive>");
}
else if (lineStr.equals("<subjob hava finished>"))
{
String ft_jobId=is.readLine();
String ft_subJobId=is.readLine();
int j=0;
Job tmpjob=null;
synchronized(jobList){
for (j=0; j < jobList.size(); j++)
{
tmpjob=(Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_jobId.equals( tmpjob.getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
Vector tmpSjList=tmpjob.subJobList;
SubJob tmpsubjob=null;
synchronized(tmpSjList){
for (j=0; j < tmpSjList.size(); j++)
{
tmpsubjob=(SubJob) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_subJobId.equals( tmpsubjob.getSubJobId()))
{
tmpsubjob.setWorkerState(2);//finished
break;
}
}
if(j>=tmpSjList.size())
{
System.out.println("Don't find this subjob in subjobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
synchronized(dummy)
{
try{
dummy.notifyAll();
}
catch(IllegalMonitorStateException e)
{
}
}
os.println("<dispatcher hava known>");
os.flush();
tmpsubjob.finishedTime=System.currentTimeMillis();
tmpsubjob.computingTime=tmpsubjob.finishedTime-tmpsubjob.distributeTime;
}
else if (lineStr.equals("<worker quit>"))
{
//交给workernode的一个函数处理
synchronized(ftWorkerNodeList)
{
for (i=0; i < ftWorkerNodeList.size(); i++)
{
if (tmpWorkerId.equals((String)(ftWorkerNodeList.elementAt(i))))
{
ftWorkerNodeList.remove(i);
break;
}
}
}
os.println("<worker quit agree>");
os.flush();
}
this.s.close();
return;
}
else{
synchronized (workerNodeList)
{
for (i=0; i < workerNodeList.size(); i++)
{
if (tmpWorkerId.equals( ( (WorkerNode) (
workerNodeList.elementAt(i))).getWorkerId()))
break;
}
if (i >= workerNodeList.size())
{
//查看是否为已死worker,若找到,复活。
int j = 0;
synchronized (deadWorkerNodeList)
{
for (j=0; j < deadWorkerNodeList.size(); j++)
{
if (tmpWorkerId.equals( ( (WorkerNode) (
deadWorkerNodeList.elementAt(j))).
getWorkerId()))
break;
}
if (j >= deadWorkerNodeList.size())
{//if fault tolerant ,then deal with...
if(lineStr.equals("worker fault tolerance"))
{
String ft_jobId=is.readLine();
String ft_subJobId=is.readLine();
int ft_sjState=Integer.parseInt(is.readLine());
os.println("worker fault tolerance finish");
os.flush();
ftWorkerNodeList.add(tmpWorkerId);
Job tmpjob=null;
synchronized(jobList){
for (j=0; j < jobList.size(); j++)
{
tmpjob=(Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_jobId.equals( tmpjob.getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
Vector tmpSjList=tmpjob.subJobList;
SubJob tmpsubjob=null;
synchronized(tmpSjList){
for (j=0; j < tmpSjList.size(); j++)
{
tmpsubjob=(SubJob) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_subJobId.equals( tmpsubjob.getSubJobId()))
{
tmpsubjob.setWorkerState(ft_sjState);
break;
}
}
if(j>=tmpSjList.size())
{
System.out.println("Don't find this subjob in subjobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
}
else
{System.out.println(
"Don't find this worker in workerNodeList and deadWorkerNodeList!");
return;
}
}
else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -