📄 mainsdk.java
字号:
package cn.edu.hust.cgcl.biogrid.BioSDK;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.service.MessageService;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MainSDK
{
final static int TIME_OUT = 3 * 1000;
public MainSDK()
{} // MainSDK
static public void newSubTask(int n, Job job, LinkedList taskList)
throws Exception
{
if (taskList == null)
{
throw new ParseException("method parameter error!", 0);
} // if
//System.out.println("monitorIp + monitorPort: "+job.monitorIp+job.monitorPort);
Socket monSock = new Socket(job.monitorIp, job.monitorPort);
//monSock.setSoTimeout(TIME_OUT);
BufferedReader monis = new BufferedReader(new InputStreamReader(monSock.
getInputStream()));
PrintWriter monos = new PrintWriter(monSock.getOutputStream());
monos.println("<new subtask>");
monos.flush();
String tmpline = monis.readLine();
if (!tmpline.equals("<new subtask next>"))
{
throw new ParseException("protocol parse error!", 0);
} // if
monos.println(job.jobId);
monos.flush();
monos.println(n);
monos.flush();
Vector vec = new Vector();
tmpline = monis.readLine();
while (tmpline != null && !tmpline.equals("<dispatcher finish>"))
{
vec.add(tmpline);
tmpline = monis.readLine();
} // while
monSock.close();
for (int i = 0; i < vec.size(); i++)
{
// format is : taskCount#priip#priport#backip#backport
tmpline = (String) vec.get(i);
int index1 = tmpline.indexOf('#', 0);
int taskCount = Integer.parseInt(tmpline.substring(0, index1));
//int index2 = tmpline.lastIndexOf('#');
int index2 = tmpline.indexOf('#', index1+1);
String tmpmainip = tmpline.substring(index1 + 1, index2);
// int tmpport = Integer.parseInt(tmpline.substring(index2 + 1,
// tmpline.length()));
index1=index2+1;
index2 = tmpline.indexOf('#', index1);
int tmpmainport = Integer.parseInt(tmpline.substring(index1,index2));
index1 = index2 + 1;
index2 = tmpline.indexOf('#', index1);
String tmpsubip = tmpline.substring(index1, index2);
index1=index2+1;
//index2 = tmpline.indexOf('#', index1);
//int tmpsubport = Integer.parseInt(tmpline.substring(index1,index2));
int tmpsubport = Integer.parseInt(tmpline.substring(index1,
tmpline.length()));
// 向dispatcher申请子任务
//System.out.println("dispatcherip + port: "+tmpmainip+tmpmainport);
Socket dispSock = new Socket(tmpmainip, tmpmainport);
BufferedReader dispis = new BufferedReader(new InputStreamReader(
dispSock.getInputStream()));
PrintWriter dispos = new PrintWriter(dispSock.getOutputStream());
ObjectOutputStream oos = new ObjectOutputStream(dispSock.
getOutputStream());
dispos.println("<new task>");
dispos.println(job.jobId);
dispos.println(Integer.toString(taskCount));
// sub-dispatcher information
dispos.println(tmpsubip);
dispos.println(tmpsubport);
dispos.flush();
for (int j = 0; j < taskCount; j++)
{
String subid = dispis.readLine();
taskList.add(new SubTask
(subid, tmpmainip, tmpmainport, tmpsubip,
tmpsubport));
} // for
dispSock.close();
dispSock = null;
} // for
} // newTask
// static public int getResult(SubTask task, Job job, String filepath)
// {
// int state;
// try
// {
// state = getTaskStatus(task,job.jobId);
// } //try
// catch (Exception e)
// {
// return -1;
// } // catch
// switch (state)
// {
// case SubTask.SUBTASK_STOP:
// break;
// case SubTask.SUBTASK_RUNNING:
// break;
// case SubTask.SUBTASK_FINISH:
// MessageService msgService = new MessageService();
// int result = msgService.conncetRequest(job.dataServerIp);
// if (result < 0)
// {
// return -1;
// } // if
// if ( (result = msgService.registerServ("worker", "worker",
// "admin")) < 0)
// {
// msgService.disConnect();
// return -1;
// } // if
// if ( result=( msgService.getSubTaskResult(filepath,
// "worker",
// job.jobId,
// task.subTaskId)) < 0)
// {
// msgService.disConnect();
// return -1;
// } // if
// msgService.disConnect();
// break;
// case SubTask.SUBTASK_FAULT:
// break;
// default:
// break;
// }
// return state;
// } // getResult
static public String getResult(Object o_task, Job job, String filepath)
throws Exception
{
int state;
SubTask task=(SubTask)o_task;
state = getTaskStatus(task, job.jobId);
String result=null;
switch (state)
{
case SubTask.SUBTASK_STOP:
break;
case SubTask.SUBTASK_RUNNING:
break;
case SubTask.SUBTASK_FINISH:
MessageService msgService = new MessageService();
/*if (msgService.conncetRequest(job.dataServerIp) < 0)
{
throw new ParseException("Data Server Error!", 0);
} // if
if ( ( msgService.registerServ(job.userId, job.userPwd,
"worker")) < 0)
{
msgService.disConnect();
throw new ParseException("Data Server Error!", 0);
} */// if
/*if ( ( result=msgService.getSubTaskResult(filepath,
job.userId,
job.jobId,
task.subTaskId)) == null)*/
if(msgService.getResultOnce(job.dataServerIp, job.userId, job.userPwd,
filepath, job.userId, job.jobId, task.subTaskId)!=0)
{
//msgService.disConnect();
throw new ParseException("Data Server Error!", 0);
} // if
result=msgService.getReturn();
//msgService.disConnect();
return result;
case SubTask.SUBTASK_FAULT:
break;
default:
break;
}
return null;
} // getResult
static private int getInnTaskStatus(SubTask task,String jobid, Socket dispSock, BufferedReader dispis, PrintWriter dispos)
throws Exception
{
int state;
dispos.println("<get subtask status>");
dispos.println(jobid);
dispos.println(task.subTaskId);
dispos.flush();
String tmpline = dispis.readLine();
state = Integer.parseInt(tmpline);
return state;
}
static public int getTaskStatus(Object o_task,String jobid)
throws Exception
{
SubTask task=(SubTask)o_task;
Socket dispSock = null;
BufferedReader dispis = null;
PrintWriter dispos = null;
int state=-1;
try
{
dispSock = new Socket(task.mainDispIp, task.mainDispPort);
//dispSock.setSoTimeout(TIME_OUT);
dispis = new BufferedReader(new InputStreamReader(dispSock.getInputStream()));
dispos = new PrintWriter(dispSock.getOutputStream());
ObjectOutputStream oos = new ObjectOutputStream(dispSock.
getOutputStream());
state=getInnTaskStatus(task, jobid, dispSock, dispis, dispos);
dispis.close();
dispos.close();
dispSock.close();
return state;
} // try
catch (IOException e)
{
e.printStackTrace();
dispis.close();
dispos.close();
dispSock.close();
dispis=null;
dispos=null;
dispSock=null;
if (task.mainDispIp==null)
{
return state;
} // if
task.mainDispIp=task.subDispIp;
task.mainDispPort=task.subDispPort;
task.subDispIp=null;
task.subDispPort=0;
dispSock = new Socket(task.mainDispIp, task.mainDispPort);
//dispSock.setSoTimeout(TIME_OUT);
dispis = new BufferedReader(new InputStreamReader(dispSock.getInputStream()));
dispos = new PrintWriter(dispSock.getOutputStream());
ObjectOutputStream oos = new ObjectOutputStream(dispSock.
getOutputStream());
// 这里容错协议,只是增加了一行:"fault tolerance"
dispos.println("fault tolerance");
state=getInnTaskStatus(task, jobid, dispSock, dispis, dispos);
dispis.close();
dispos.close();
dispSock.close();
dispis=null;
dispos=null;
dispSock=null;
return state;
} // catch
} // getTaskStatus
public static int updateTaskData(String path, Job job, int nums)
{
MessageService msgService = new MessageService();
int result = -1;
/*int result = msgService.conncetRequest(job.dataServerIp);
if (result < 0)
{
return -1;
} // if
/*if ( (result = msgService.registerServ(job.userId, job.userPwd,
"worker")) < 0)
{
msgService.disConnect();
return -1;
}*/ // if
if (path!=null)
{
if ( (result = msgService.updateDataOnce(job.dataServerIp,
job.userId, job.userPwd, path, nums,
job.userId, job.jobId)) < 0)
{
//msgService.disConnect();
return -1;
} // if
}
else
{
if ( (result = msgService.updateDataOnce(job.dataServerIp,
job.userId, job.userPwd, nums,
job.userId, job.jobId)) < 0)
{
//msgService.disConnect();
return -1;
} // if
}
//msgService.disConnect();
return 1;
}
public static int updateTaskData(Job job, int nums)
{
MessageService msgService = new MessageService();
int result=-1;
/*= msgService.conncetRequest(job.dataServerIp);
if (result < 0)
{
return -1;
} // if
if ( (result = msgService.registerServ(job.userId, job.userPwd,
"worker")) < 0)
{
msgService.disConnect();
return -1;
} // if
if ( (result = msgService.updateTaskData( nums,
job.userId, job.jobId)) < 0)
{
msgService.disConnect();
return -1;
} // if
msgService.disConnect();*/
if ( (result = msgService.updateDataOnce(job.dataServerIp,
job.userId, job.userPwd, nums,
job.userId, job.jobId)) < 0)
{
//msgService.disConnect();
return -1;
}
return 1;
}
} // MainSDK
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -