📄 monitornodemanagement.java
字号:
package cn.edu.hust.cgcl.biogrid.monitor;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MonitorNodeManagement extends Thread
{
DispatcherGroup dispGroup;
MonitorGroup monGroup;
MonitorInfo monInfo;
LocalMonitorNode localNode;
int nodePort;
ServerSocket serverSock;
NodeHandler jobHandler;
boolean flag=true;
public MonitorNodeManagement(DispatcherGroup dispgroup, MonitorGroup mongroup, MonitorInfo moninfo,int nodeport, LocalMonitorNode localnode)
{
this.localNode=localnode;
this.dispGroup=dispgroup;
this.monGroup=mongroup;
this.monInfo=moninfo;
this.nodePort=nodeport;
} // NodeServerHandler
public void run()
{
Socket clientSock;
try
{
serverSock = new ServerSocket(this.nodePort);
System.out.print("Begin Monitor Node Management Accept\n");
while ( flag&&(clientSock = serverSock.accept()) != null)
{
/*
System.out.print("Node Management Accept:");
System.out.println(clientSock.getInetAddress().getHostAddress());
*/
jobHandler = new NodeHandler(
this.dispGroup, this.monGroup, this.monInfo, clientSock, nodePort, this.localNode);
jobHandler.start();
//this.jobServerState = MonitorJobManagement.SERVER_RUNNING;
} // while
} // try
catch (Exception e)
{
//this.jobServerState=MonitorJobManagement.SERVER_ERROR;
} // catch
System.out.println("MonitorNodeManagement's quit!");
return;
} // run
public void quit()
{
System.out.println("MonitorNodeManagement start to quitting...");
try{
flag=false;
if(!serverSock.isClosed())
this.serverSock.close();
if(jobHandler.isAlive())
this.jobHandler.quit();
this.dispGroup.quit();
}
catch(Exception e)
{
}
System.out.println("MonitorNodeManagement's quitting finished!");
}
} // NodeServerHandler
class NodeHandler
extends Thread
{
private Socket clientSock;
private DispatcherGroup dispGroup;
private MonitorGroup monitorGroup;
private MonitorInfo monitorInfo;
private int nodeServerPort;
private int taskPort=5007;//the communication port ,
private LocalMonitorNode localNode;
public NodeHandler(DispatcherGroup dispgroup, MonitorGroup mongroup,
MonitorInfo moninfo, Socket s, int nodeport, LocalMonitorNode localnode)
{
this.localNode=localnode;
clientSock = s;
this.monitorGroup = mongroup;
this.monitorInfo = moninfo;
this.nodeServerPort = nodeport;
dispGroup=dispgroup;
} // NodeServerHandler
public void run()
{
BufferedReader is;
PrintWriter os;
if (clientSock == null)
{
return;
} // if
try
{
System.out.print("Begin Monitor Node Handler Accept");
String dispip=clientSock.getInetAddress().getHostAddress();
System.out.println(clientSock.getInetAddress().getHostAddress());
is = new BufferedReader(new InputStreamReader(clientSock.
getInputStream()));
os = new PrintWriter(clientSock.getOutputStream());
String lineStr = is.readLine();
if (lineStr.equals("monitor join"))
{
// log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
try{
lf.logMonitor_login(dispip);
}catch(Exception e)
{
e.printStackTrace();
}
}//log
monitorJoin(is, os);
} // if
else if (lineStr.equals("dispatcher join"))
{
if(is.readLine().equals(dispip))
{
disPatcherJoin(is, os);
}
else {os.println("LAN");
os.flush();
System.out.println("this dispatcher(ip is "+dispip+")is locate in LAN, the joinning is rejected!" );
}
} // else if
else if (lineStr.equals("dispatcher join in"))
{
// log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
try{
lf.logDispatcher_login(dispip);
}catch(Exception e)
{
e.printStackTrace();
}
}
this.dispatcherJoinIn(is, os);
} // else if
else if (lineStr.equals("worker join"))
{
workerJoin(is, os);
} // else if
// 还需要处理worker加入
else if (lineStr.equals("worker join in"))
{
workerJoinIn(is, os);
} // else if
is.close();
os.close();
this.clientSock.close();
this.clientSock = null;
is = null;
os = null;
} // try
catch (Exception e)
{
System.out.println(e.toString());
is = null;
os = null;
clientSock=null;
} // catch
return;
} // run
private void disPatcherJoin(BufferedReader is, PrintWriter os)
throws Exception
{
/*
MonitorNode monitor = this.monitorGroup.getHeaviestDispMonitor();
os.println(monitor.getMonConf().getMonitorIp());
os.println(monitor.getMonConf().getJoinPort());
*/
os.println(this.localNode.monConfig.getMonitorIp());
os.println(localNode.monConfig.getJoinPort());
os.println("dispatcher join finish");
os.flush();
return;
} // DisPatcherJoin
private void workerJoin(BufferedReader is, PrintWriter os)
throws Exception
{
/*
MonitorNode monitor = this.monitorGroup.getHeaviestWorkerMonitor();
os.println(monitor.getMonConf().getMonitorIp());
os.println(monitor.getMonConf().getJoinPort());
*/
os.println(this.localNode.monConfig.getMonitorIp());
os.println(localNode.monConfig.getJoinPort());
os.println("worker join finish");
os.flush();
return;
} // DisPatcherJoin
private void monitorJoin(BufferedReader is, PrintWriter os)
throws Exception
{
int count=monitorGroup.getMonitorCount();
os.println(count);
for (int i=0; i<count; i++)
{
// 需要修改!!!
MonitorNode monitor = this.monitorGroup.getMonitor(i);
os.println(monitor.getMonConf().getMonitorIp());
os.println(monitor.getMonConf().getJoinPort());
} // for
os.println("monitor join finish");
os.flush();
} // monitorJoin
private void dispatcherJoinIn(BufferedReader is, PrintWriter os)
throws Exception
{
System.out.println("Dispater Join In");
String dispIp=is.readLine();
StringBuffer dispId=new StringBuffer(dispGroup.getId());
dispId.setCharAt(0,'D');
int tmp=dispGroup.getNodeCount();
if (tmp<100)
{
dispId.append('0');
} // if
dispId.append(tmp);
String tmpstr=is.readLine();
int dispPort=Integer.parseInt(tmpstr);
tmpstr=is.readLine();
int workerPort=Integer.parseInt(tmpstr);
tmpstr=is.readLine();
int taskPort=Integer.parseInt(tmpstr);
//System.out.println(workerPort);
os.println(dispId.toString());
os.println(this.monitorInfo.getMonitorId());
os.println("dispatcher join in finish");
os.flush();
DispatcherNode tmpNode=new DispatcherNode(dispId.toString(),
dispIp, dispPort,
workerPort,taskPort,
this.dispGroup);
this.dispGroup.addNode(tmpNode);
tmpNode.start();
// log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
try{
lf.logDispatcher_login_Succ(tmpNode.GetIp(),tmpNode.getNodeInfo().getDispatcherId(),this.dispGroup.getNodeCount());
}catch(Exception e)
{
e.printStackTrace();
}
}
} // dispatcherJoinIn
public void quit()
{
try{
if(!this.clientSock.isClosed())
this.clientSock.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
private void workerJoinIn(BufferedReader is, PrintWriter os)
throws Exception
{
System.out.println("Worker Join In");
this.dispGroup.workerJoin(is, os);
os.println("worker join in finish");
os.flush();
System.out.println("worker join in finish");
} // workerJoinIn
private void monitorJoinIn(BufferedReader is, PrintWriter os)
{
} // monitorJoinIn
} // NodeServerHandler
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -