📄 tasktransfer.java
字号:
/*
* Created on 2004-12-4
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.monitor.MonitorJob;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to
* Window - Preferences - Java - Code Style - Code Templates
*/
public class TaskTransfer extends Thread{
private DispatcherInfo dispatcherInfo;
private Vector jobList;
DispatcherJobManagement jobManager;
private int taskPort;
private ServerSocket serverSocket;
private boolean isActive=true;
private Integer dummy;
public TaskTransfer(int port,DispatcherInfo info,Vector joblist,
DispatcherJobManagement jobmanager,Integer dum)
{
this.taskPort=port;
this.dispatcherInfo=info;
this.jobList=joblist;
this.jobManager=jobmanager;
this.dummy=dum;
}
public void run()
{
try
{
serverSocket = new ServerSocket(this.taskPort);
while (isActive)
{try{
Socket socket = null;
socket = serverSocket.accept();
this.process(socket); //连接后处理
}catch(Exception e1)
{
System.out.println(e1);
}
}//while
}catch(Exception e)
{
e.printStackTrace();
}
System.out.println("TaskTransfer handler quit!");
}//run
public void process(Socket sock)
{ BufferedReader is=null;
PrintWriter os=null;
ObjectInputStream iob=null;
try
{
is = new BufferedReader(new InputStreamReader(sock.
getInputStream()));
os = new PrintWriter(sock.getOutputStream());
iob = new ObjectInputStream(sock.getInputStream());
// System.out.println("Task transferance start");
//transmit data based the protocol
String lineStr = is.readLine();
// System.out.println(lineStr);
/*******************************
* To communicate with monitor, and receive job and count
********************************/
if (lineStr.equals("<Dispatcher cancelJob>"))
{
String tmpJobId = is.readLine(); //the jobid of the job that will be canceled
//*注意更新dispatchernode的信息(jobcount、)以及idleworker队列等
//要返回成功与否的标志flag1!
JobCanceling jc = new JobCanceling(tmpJobId);
jc.start();
os.println("<cancelJob finish>");
os.flush();
}
else if (lineStr.equals("<Job transmission>"))
{
System.out.println("Job transmission start:");
int tmpJobCount=Integer.parseInt(is.readLine());
if(Parameter.jobDebugIsActive)
System.out.println(tmpJobCount);
os.println("<Send Object>");
os.flush();
MonitorJob tmpMonitorJob = (MonitorJob) iob.readObject(); //接收到monitorjob的信息
if(Parameter.jobDebugIsActive)
System.out.println("Read JobDesc Object OK "+tmpMonitorJob.getJobID());
//用线程处理
Job tmpJob = new Job(tmpMonitorJob,dispatcherInfo.getDispatcherId());
JobReceiving jr = new JobReceiving(tmpJob);
// jr.start();
//os.println("<tansmission finish>");
//os.flush();
System.out.println("Job transmission finished");
}
/********************************
* communicate with user's main program(job) , and send the subtask's id that will be done under this dispatcher
*********************************/
else if (lineStr.equals("<new task>"))
{
if(Parameter.jobDebugIsActive)
System.out.println("<new task>");
String tmpJobId = is.readLine(); //申请执行的任务ID
int n = Integer.parseInt(is.readLine()); //申请执行子任务的个数
if(Parameter.jobDebugIsActive)
System.out.println("task number: "+n);
String tmpBackupIp=is.readLine();
int tmpBackupPort=Integer.parseInt(is.readLine());
int j = 0;
Job tmpjob=null;
synchronized(jobList){
for (; j < jobList.size(); j++)
{
if (tmpJobId.equals( ( (Job) (jobList.
elementAt(
j))).getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
else {tmpjob=(Job) (jobList.elementAt(j));
}
}
String[] s=tmpjob.newSubJob(n,dummy,tmpBackupIp,tmpBackupPort);
for (int i = 0; i < n; i++)
{
os.println(s[i]);
}
os.flush();
}
else if (lineStr.equals("<get subtask status>"))
{
// if(Parameter.jobDebugIsActive)
// System.out.println("<get subtask status>");
String jobId=is.readLine();
String subtaskId = is.readLine();
int j = 0;
Job tmpjob=null;
// if(Parameter.jobDebugIsActive)
// System.out.println("size of jobList:"+jobList.size());
synchronized(jobList){
for (; j < jobList.size(); j++)
{
tmpjob=(Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (jobId.equals( tmpjob.getJobId()))
{
// if(Parameter.jobDebugIsActive)
// System.out.println(jobId+" "+tmpjob.getJobId());
break;
}
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
Vector subjoblist=tmpjob.getSubJobList();
synchronized(subjoblist)
{
int i=subjoblist.size();
for(j=0;j<i;j++)
{
if (subtaskId.equals(((SubJob)subjoblist.elementAt(j)).getSubJobId()))
{
// if(Parameter.jobDebugIsActive)
// System.out.println(subtaskId+" equal "+((SubJob)subjoblist.elementAt(j)).getSubJobId());
break;
}
}
if(j>=i)
{
System.out.println("Don't find this subTask in subTasklist!");
return;
}
os.println(((SubJob)(subjoblist.elementAt(j))).getState());
os.flush();
}
}
//fault tolerant .......
else if (lineStr.equals("fault tolerance"))
{
is.readLine();//"<get subtask status>" is disposed.
String jobId=is.readLine();
String subtaskId = is.readLine();
int j = 0;
Job tmpjob=null;
System.out.println("size of jobList:"+jobList.size());
synchronized(jobList){
for (; j < jobList.size(); j++)
{
tmpjob=(Job) (jobList.elementAt(j));
System.out.println(jobId+" "+tmpjob.getJobId());
if (jobId.equals( tmpjob.getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
tmpjob.newSubJob(subtaskId,dummy);//fault tolerant, apply an new subjob
tmpjob.state=4;//fault tolerant
os.println(tmpjob.state);
os.flush();
}
}catch(Exception e)
{
e.printStackTrace();
//System.out.println(e.toString());
}
finally
{
try{
sock.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}//process
//do with the job receving
class JobReceiving
extends Thread
{
private Job job;
public JobReceiving(Job tmpJob)
{
job=tmpJob;
// }
//
// public void run()
// {
if (! (jobManager.jobReceiving(job)))
{
System.out.println("job receiviving error!");
return;
}
synchronized (dispatcherInfo)
{
// System.out.println(job.getJobId());
dispatcherInfo.insertJob(job.getJobId()); //更新dispacherInfo的信息。
}
}
}
//do with the job canceling
class JobCanceling
extends Thread
{
String tjId;
public JobCanceling(String tmpJobId)
{
tjId=tmpJobId;
}
public void run()
{
if (! (jobManager.cancelJob(tjId)))
{
System.out.println("Fail to cancel job from jobManager ");
return;
}
//synchronized (dispatcherInfo)
//{
dispatcherInfo.cancelJob(tjId); //update dispatcherInfo
//}
return;
}
}
public void quit()
{
System.out.println("Task transfer handler quitting...");
isActive=false;
if(!this.serverSocket.isClosed())
{
try{
this.serverSocket.close();
}catch(Exception e)
{
e.printStackTrace();
}
}
}
}//TaskTransfer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -