📄 monitorjobmanagement.java
字号:
package cn.edu.hust.cgcl.biogrid.monitor;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.ParseException;
import java.util.LinkedList;
import cn.edu.hust.cgcl.biogrid.user.desc.JobDesc;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MonitorJobManagement
{
final public static int SERVER_RUNNING=0;
final public static int SERVER_STOPPED=1;
final public static int SERVER_ERROR=2;
//public static int JOB_SERVER_PORT=4567;
private MonitorGroup monGroup;
private int jobPort;
private MonitorJobServerHandler jobServHandler;
private int jobServerState;
private LinkedList jobList;
MonitorInfo monitorInfo;
public MonitorJobManagement(MonitorGroup mongroup, MonitorInfo monitorinfo, int jobport)
{
monGroup = mongroup;
this.jobPort = jobport;
monitorInfo = monitorinfo;
jobServHandler=null;
jobServerState=SERVER_STOPPED;
jobList=new LinkedList();
} // MonitorJobManagement
public void StartJobServer()
{
if (jobServHandler==null)
{
jobServHandler = new MonitorJobServerHandler(this.monGroup,
monitorInfo,
jobPort,
jobList);
System.out.print("Begin jobServHandler\n");
jobServHandler.start();
} // if
else if (getJobServerState()!=MonitorJobManagement.SERVER_RUNNING)
{
jobServHandler.start();
} // else if
return;
} // StartJobServer
public void quit()
{
System.out.println("MonitorJobManagement start to quitting...");
jobServHandler.quit();
System.out.println("MonitorJobManagement's quitting finished!");
} // StopJobServer
public int getJobServerState()
{
if (jobServHandler==null)
{
return MonitorJobManagement.SERVER_STOPPED;
} // if
return this.jobServerState=jobServHandler.getJobServerState();
} // getJobServerState
public MonitorJob GetJob(String jobId)
{
synchronized(jobList)
{
if (jobList.size() <= 0)
{
return null;
}
MonitorJob tmpJob = null;
int i;
for (i = 0; i < this.jobList.size(); i++)
{
tmpJob = (MonitorJob)jobList.get(i);
if (jobId.equals(tmpJob.getJobID()))
{
break;
} // if
} // for
if (i >= jobList.size())
{
return null;
} // if
return tmpJob;
} // synchronized
} // GetJob
} // MonitorJobManagement
class MonitorJobServerHandler
extends Thread
{
MonitorGroup monitorGroup;
MonitorInfo monitorInfo;
private int serverPort;
private int jobServerState;
private LinkedList jobList;
private ServerSocket serverSock;
public MonitorJobServerHandler(MonitorGroup mongroup, MonitorInfo monitorinfo, int servport, LinkedList joblist)
{
this.monitorGroup=mongroup;
this.monitorInfo=monitorinfo;
serverPort=servport;
this.jobServerState=MonitorJobManagement.SERVER_STOPPED;
this.jobList=joblist;
} // NodeServerHandler
public void run()
{
Socket clientSock;
try
{
serverSock = new ServerSocket(this.serverPort);
System.out.println("Begin MonitorJobServerHandler listen");
while ( (clientSock = serverSock.accept()) != null)
{
JobServerHandler jobHandler = new JobServerHandler(
clientSock, this.monitorGroup, monitorInfo, this.jobList);
jobHandler.start();
this.jobServerState=MonitorJobManagement.SERVER_RUNNING;
} // while
} // try
catch (Exception e)
{
this.jobServerState=MonitorJobManagement.SERVER_ERROR;
} // catch
System.out.println("MonitorJobServerHandler's quit!");
return;
} // run
public void quit()
{
try{
if(!this.serverSock.isClosed())
this.serverSock.close();
}catch(Exception e)
{
}
}
private void JobReceived()
{
return;
} // DisPatcherJoin
private void ChooseDisatcherGroup()
{
return;
} // ChooseDispatcherGroup
public int getJobServerState()
{
return this.jobServerState;
} // if
} // NodeServerHandler
class JobServerHandler
extends Thread
{
Socket clientSock;
MonitorGroup monitorGroup;
MonitorInfo monitorInfo;
private LinkedList jobList;
//String exceptId;
public JobServerHandler(Socket sock, MonitorGroup mongroup, MonitorInfo monitorinfo, LinkedList joblist)
{
this.jobList=joblist;
clientSock = sock;
monitorGroup = mongroup;
this.monitorInfo=monitorinfo;
//log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
lf.logJob_Submit(clientSock.getInetAddress().getHostAddress());
}//log
//exceptId=exceptid;
} // JobRecvHandler
public void run()
{
BufferedReader is;
PrintWriter os;
if (clientSock == null)
{
return;
} // if
try
{
System.out.println("Begin JobServerHandler");
is = new BufferedReader(new InputStreamReader(clientSock.
getInputStream()));
os = new PrintWriter(clientSock.getOutputStream());
String lineStr = is.readLine();
// 是否是提交新job
if (lineStr.equals("new task"))
{
/*
os.println("<new task next>");
os.flush();
*/
os.println("Begin transferring");
os.flush();
ObjectInputStream ois = new ObjectInputStream(clientSock.
getInputStream());
JobDesc job = (JobDesc) ois.readObject();
ois.close();
MonitorJob mj=new MonitorJob(job);
this.jobList.add(mj);
job=null;
// log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
lf.logJob_Submit_Succ(clientSock.getInetAddress().getHostAddress(),mj.JobID,this.jobList.size());
}//log
mj=null;
/*
try
{
MonitorNode monnode = monitorGroup.getLowestMonitorNode(null);
Socket sock = new Socket(monnode.getMonConf().getMonitorIp(),
monnode.getMonConf().getJobPort());
BufferedReader monis = new BufferedReader(new
InputStreamReader(sock.getInputStream()));
PrintWriter monos = new PrintWriter(sock.getOutputStream());
monos.println("can accept task");
monos.flush();
lineStr = monis.readLine();
Vector exceptIdVed = new Vector();
while (!lineStr.equals("monitor node can"))
{
exceptIdVed.add(monnode.getMonConf().getMonitorId());
monnode = null;
sock = null;
monis = null;
monos = null;
monnode = monitorGroup.getLowestMonitorNode(exceptIdVed);
sock = new Socket(monnode.getMonConf().
getMonitorIp(),
monnode.getMonConf().
getJobPort());
monis = new BufferedReader(new
InputStreamReader(sock.getInputStream()));
monos = new PrintWriter(sock.
getOutputStream());
monos.println("can accept task");
monos.flush();
lineStr = monis.readLine();
// 好象没做完,要完成与任务提交部分的通信接口
} // if
os.println(monnode.getMonConf().getMonitorId());
os.println(monnode.getMonConf().getMonitorIp());
os.flush();
monis.close();
monos.close();
sock.close();
sock=null;
monis=null;
monos=null;
} // try
catch (Exception e)
{
os.println("new task error!");
os.flush();
} // catch
*/
} // if
// 接受查询是否可以接受job
else if (lineStr.equals("can accept task"))
{
if (this.monitorInfo.canAcceptJob)
{
os.println("monitor node can");
} // if
else
{
os.println("monitor node cannot");
} // else
os.flush();
} // else if
// 接受提交的job
else if (lineStr.equals("submit task"))
{
} // else if
else
{
throw new ParseException("Protocol parse error!", 0);
} // else
clientSock.close();
is.close();
os.close();
clientSock=null;
is=null;
os=null;
} // try
catch (Exception e)
{try{
clientSock.close();
clientSock=null;
is=null;
os=null;
}catch(Exception e1)
{
}
System.out.println(e.toString());
} // catch
} // run
} // JobRecvHandler
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -