📄 .#monitorcommunication.java.1.3
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;
/**
* <p>Title: </p>
* <p>Description: 用于处理与monitor的通信,
* 现此类不再使用,功能由Receive_MPool.java代替.</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MonitorCommunication
extends Thread
{
private static final int TIME_OUT =3*60*1000;
private boolean isActive = true; //用于控制accept线程的终止
private int serverPort; //与monitor通信的服务器端口
private DispatcherInfo dispatcherInfo;
private Vector jobList;
DispatcherJobManagement jobManager;
private MonitorJob tmpMonitorJob;//分配来的新任务
private int tmpJobCount=0;//任务数
private String tmpJobId;//要取消的job的ID
private boolean flag1 = false; //job取消工作完成标志
private Integer dummy;
private ServerSocket serverSocket; //与monitor通信的serversocket
public MonitorCommunication(int localPort, DispatcherInfo nodeInfo,
Vector jobList,
DispatcherJobManagement jobManager)
{
System.out.println("the monitorcommunication start");
this.serverPort = localPort;
this.dispatcherInfo = nodeInfo;
this.jobList = jobList;
this.jobManager = jobManager;
tmpMonitorJob = null;
serverSocket = null;
}
public void run()
{
try
{
serverSocket = new ServerSocket(this.serverPort);
while (isActive)
{
Socket socket = null;
try
{
socket = serverSocket.accept();
Process p = new Process(socket); //连接后处理
p.start();
}
catch (java.io.InterruptedIOException e)
{
//等待连接超时,返回循环开头;(无处理)
}
catch (IOException e)
{
System.out.println("I/O Error");
isActive=false;
}
catch (SecurityException e)
{
System.out.println("An unauthorized client has attempted to connect");
isActive=false;
}
catch (Throwable e)
{
System.out.println("Unexpected exception");
isActive=false;
}
} // while
try
{
serverSocket.close();
}
catch (IOException e)
{
log(e);
error("Error closing server socket", e);
}
} // try
/*catch(SocketTimeoutException e)
{
System.out.println("The monitor has downed now!");
}*/
catch (IOException e)
{
log(e);
error("Error binding to port", e);
}
catch (SecurityException e)
{
log(e);
error("The security manager refused permission to bind to port", e);
}
} // run
/**
*
* <p>Title: </p>
* <p>Description:连接处理线程:内联类 </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class Process
extends Thread
{
Socket socket;
private BufferedReader is;
private PrintWriter os;
private ObjectOutputStream oob;
private ObjectInputStream iob;
private boolean flag=true;
public Process(Socket sock)
{
this.socket = sock;
try{
is = new BufferedReader(new InputStreamReader(socket.
getInputStream()));
os = new PrintWriter(socket.getOutputStream());
iob = new ObjectInputStream(socket.getInputStream());
//oob = new ObjectOutputStream(socket.getOutputStream());
}
catch(Exception e)
{
e.printStackTrace();
}
System.out.println("Monitor onpoll start");
}
public void run()
{
//process the query from monitor
while(flag)
{
if (socket == null || socket.isClosed())
{
return;
}
try
{
//transmit data based the protocol
String lineStr = is.readLine();
/*******************************
* To communicate with monitor, and receive job and count
********************************/
if (lineStr.equals("<Dispatcher poll>"))
{
System.out.println("<Dispatcher poll>");
//dispatcherInfo.dispatcherUpdate();
//the update cost about one second
//System.out.println("idleworkercount: "+dispatcherInfo.getIdleWorkerCount());
//System.out.println("workercount: "+dispatcherInfo.getWorkerCount());
//oob.writeObject(dispatcherInfo);
//oob.flush();
os.println(dispatcherInfo.getDispatcherId());
//os.println(dispatcherInfo.getFirstMonitorId());
//os.println(dispatcherInfo.getFirstMonitorIp());
//os.println(dispatcherInfo.getSecondMonitorId());
//os.println(dispatcherInfo.getSecondMonitorIp());
os.println(dispatcherInfo.getJobCount());
os.println(dispatcherInfo.getWorkerCount());
os.println(dispatcherInfo.getIdleWorkerCount());
os.println(dispatcherInfo.getCanAcceptJob());
os.println(dispatcherInfo.getDispatcherLoad());
//os.println(dispatcherInfo.getDispacherJob());
os.flush();
//System.out.println(lineStr);
lineStr = is.readLine();
if (lineStr.equals("<poll finish>"))
{
//System.out.println(lineStr);
//return;
}
}
else if (lineStr.equals("<Dispatcher cancelJob>"))
{
tmpJobId = is.readLine(); //the jobid of the job that will be canceled
//*注意更新dispatchernode的信息(jobcount、)以及idleworker队列等
//要返回成功与否的标志flag1!
JobCanceling jc = new JobCanceling(tmpJobId);
jc.start();
os.println("<cancelJob finish>");
os.flush();
}
else if (lineStr.equals("<Job transmission>"))
{
System.out.println("Job transmission start:");
tmpJobCount=Integer.parseInt(is.readLine());
System.out.println(tmpJobCount);
os.println("<Send Object>");
os.flush();
tmpMonitorJob = (MonitorJob) iob.readObject(); //接收到monitorjob的信息
System.out.println("Read JobDesc Object OK"+tmpMonitorJob.getJobID());
//用线程处理
Job tmpJob = new Job(tmpMonitorJob,dispatcherInfo.getDispatcherId());
JobReceiving jr = new JobReceiving(tmpJob);
jr.start();
//os.println("<tansmission finish>");
//os.flush();
System.out.println("Job transmission finished");
}
/********************************
* communicate with user's main program(job) , and send the subtask's id that will be done under this dispatcher
*********************************/
else if (lineStr.equals("<new task>"))
{
System.out.println("<new task>");
String tmpJobId = is.readLine(); //申请执行的任务ID
int n = Integer.parseInt(is.readLine()); //申请执行子任务的个数
int j = 0;
Job tmpjob=null;
synchronized(jobList){
for (; j < jobList.size(); j++)
{
if (tmpJobId.equals( ( (Job) (jobList.
elementAt(
j))).getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
else tmpjob=(Job) (jobList.elementAt(j));
}
//String[] s=tmpjob.newSubJob(n,dummy);
for (int i = 0; i < n; i++)
{
// os.println(s[i]);
}
os.flush();
}
else if (lineStr.equals("<get subtask status>"))
{
String jobId=is.readLine();
String subtaskId = is.readLine();
int j = 0;
Job tmpjob=null;
synchronized(jobList){
for (; j < jobList.size(); j++)
{
if (tmpJobId.equals( ( (Job) (jobList.
elementAt(
j))).getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
else tmpjob=(Job) (jobList.elementAt(j));
}
Vector subjoblist=tmpjob.getSubJobList();
synchronized(subjoblist)
{
int i=subjoblist.size();
for(j=0;j<i;j++)
{
if (subtaskId.equals(subjoblist.elementAt(j)))
break;
}
if(j>=i)
{
System.out.println("Don't find this subTask in subTasklist!");
return;
}
os.println(((SubJob)(subjoblist.elementAt(j))).getState());
os.flush();
}
}
}
catch (IOException e)
{
flag=false;
e.printStackTrace();
//System.out.println(e.toString());
try
{
//oob.close();
iob.close();
os.close();
is.close();
socket.close();
//oob.close();
}
catch (IOException e1)
{
log(e1);
flag=false;
}
}
catch (ClassNotFoundException e)
{
flag=false;
try
{
//oob.close();
iob.close();
os.close();
is.close();
socket.close();
}
catch (IOException e1)
{
log(e);
flag=false;
}
e.printStackTrace();
}
}//while
} //process
}
public void terminate()
{
isActive = false; //终止serversocket服务器
}
public void log(Exception e)
{
e.printStackTrace();
}
public void error(String message, Throwable e)
{
//错误处理到日志
System.err.println(new Date() + ":dispatcherNode(" +
serverSocket.getLocalPort() +
"):error:" + message + ":" + e.getClass().getName() +
":" + e.getMessage());
} //error
//do with the job receving
class JobReceiving
extends Thread
{
private Job job;
public JobReceiving(Job tmpJob)
{
job=tmpJob;
}
public void run()
{
if (! (jobManager.jobReceiving(job)))
{
System.out.println("job transform error:");
return;
}
synchronized (dispatcherInfo)
{
System.out.println(job.getJobId());
dispatcherInfo.insertJob(job.getJobId()); //更新dispacherInfo的信息。
}
}
}
//do with the job canceling
class JobCanceling
extends Thread
{
String tjId;
public JobCanceling(String tmpJobId)
{
tjId=tmpJobId;
}
public void run()
{
if (! (jobManager.cancelJob(tmpJobId)))
{
flag1 = false;
return;
}
//synchronized (dispatcherInfo)
//{
dispatcherInfo.cancelJob(tmpJobId); //update dispatcherInfo
//}
flag1 = true;
return;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -