📄 dispatchernodemanagement.java
字号:
Vector tmpSjList = tmpjob.subJobList;
SubJob tmpsubjob = null;
synchronized (tmpSjList) {
for (j = 0; j < tmpSjList.size(); j++) {
tmpsubjob = (SubJob) (jobList
.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_subJobId.equals(tmpsubjob
.getSubJobId())) {
tmpsubjob.setWorkerState(2);//finished
break;
}
}
if (j >= tmpSjList.size()) {
System.out
.println("Don't find this subjob in subjobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
synchronized (dummy) {
try {
dummy.notifyAll();
} catch (IllegalMonitorStateException e) {
}
}
os.println("<dispatcher hava known>");
os.flush();
tmpsubjob.finishedTime = System
.currentTimeMillis();
tmpsubjob.computingTime = tmpsubjob.finishedTime
- tmpsubjob.distributeTime;
} else if (lineStr.equals("<worker quit>")) {
//交给workernode的一个函数处理
synchronized (ftWorkerNodeList) {
for (i = 0; i < ftWorkerNodeList.size(); i++) {
if (tmpWorkerId
.equals((String) (ftWorkerNodeList
.elementAt(i)))) {
ftWorkerNodeList.remove(i);
break;
}
}
}
os.println("<worker quit agree>");
os.flush();
}
this.s.close();
return;
} else {
synchronized (workerNodeList) {
for (i = 0; i < workerNodeList.size(); i++) {
if (tmpWorkerId
.equals(((WorkerNode) (workerNodeList
.elementAt(i)))
.getWorkerId()))
break;
}
if (i >= workerNodeList.size()) {
//查看是否为已死worker,若找到,复活。
int j = 0;
synchronized (deadWorkerNodeList) {
for (j = 0; j < deadWorkerNodeList
.size(); j++) {
if (tmpWorkerId
.equals(((WorkerNode) (deadWorkerNodeList
.elementAt(j)))
.getWorkerId()))
break;
}
if (j >= deadWorkerNodeList.size()) {//if fault tolerant ,then deal with...
if (lineStr
.equals("worker fault tolerance")) {
String ft_jobId = is.readLine();
String ft_subJobId = is
.readLine();
int ft_sjState = Integer
.parseInt(is.readLine());
os
.println("worker fault tolerance finish");
os.flush();
ftWorkerNodeList
.add(tmpWorkerId);
Job tmpjob = null;
synchronized (jobList) {
for (j = 0; j < jobList
.size(); j++) {
tmpjob = (Job) (jobList
.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_jobId
.equals(tmpjob
.getJobId()))
break;
}
if (j >= jobList.size()) {
System.out
.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
Vector tmpSjList = tmpjob.subJobList;
SubJob tmpsubjob = null;
synchronized (tmpSjList) {
for (j = 0; j < tmpSjList
.size(); j++) {
tmpsubjob = (SubJob) (jobList
.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_subJobId
.equals(tmpsubjob
.getSubJobId())) {
tmpsubjob
.setWorkerState(ft_sjState);
break;
}
}
if (j >= tmpSjList.size()) {
System.out
.println("Don't find this subjob in subjobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
} else {
System.out
.println("Don't find this worker in workerNodeList and deadWorkerNodeList!");
return;
}
} else {
wn = (WorkerNode) (deadWorkerNodeList
.elementAt(j));
workerNodeList.addElement(wn);
synchronized (idleWorkerNodeList) {
idleWorkerNodeList
.addElement(wn
.getWorkerId()); //应该以workerid入队列
}
deadWorkerNodeList
.removeElementAt(j);
dispatcherInfo
.setWorkerCount(workerNodeList
.size());
dispatcherInfo
.setIdleWorkerCount(idleWorkerNodeList
.size());
wn.revive();
}
}
synchronized (dummy) {
try {
dummy.notifyAll();
} catch (IllegalMonitorStateException e) {
}
}
} else
wn = (WorkerNode) (workerNodeList
.elementAt(i));
}
//找到这个worker,接着:
if (lineStr.equals("<worker alive>")) {
SubJob sj = wn.alive(); //寿命恢复到3
if (sj != null && sj.getState() != 2) {
os.println("<new job>");
if(wn.isReduncyWorker)
os.println("1");
else os.println("0");
os.flush();
if (is.readLine()
.equals("<new job accept>")) {
if (Parameter.jobDebugIsActive)
System.out
.println("<new job accept>");
//oob = new ObjectOutputStream(s.getOutputStream());
oob.writeObject(sj); //传送datapoll信息给worker
//oob.close();
oob.flush();
wn.setWorkerState(1); //将worker状态置为busy
sj.setWorkerState(1);
//这里有一个延时问题。
dispatcherInfo
.setIdleWorkerCount(idleWorkerNodeList
.size());
}
}
os.println("<dispatcher alive>");
os.flush();
//System.out.println("<dispatcher alive>");
} else {
if (lineStr.equals("<subjob hava finished>")) {
is.readLine();//jobid, unuseless
is.readLine();//subjobid, unuseless
SubJob sj = wn.alive(); //寿命恢复到3
sj=wn.getSubJob();//在busy状态下,wn.alive()不返回subjob,必须主动取。
if (sj != null) {
if(sj.getState()!=2&&Parameter.jobFaultTolerant)
{//子任务以前没有worker完成
sj.finishedTime = System
.currentTimeMillis();
sj.computingTime = sj.finishedTime
- sj.distributeTime;
calAveTime(sj, sj.computingTime);
}//if
// if (sj.redun_num != 0) {//有冗余计算,从urgentJobList中删除
// LinkedList tmplist = DispatcherJobManagement.urgentJobList;
// synchronized (tmplist) {
// for (int j = 0; i < tmplist
// .size(); i++) {
// SubJob tmpsj = (SubJob) tmplist
// .get(j);
// if (sj
// .getSubJobId()
// .equals(
// tmpsj
// .getSubJobId())) {
// tmplist.remove(j);
// break;
// }
// }
// }
// }
//从runningJobList中删除
LinkedList tmplist = DispatcherJobManagement.runningJobList;
synchronized (tmplist) {
for (int j = 0; i < tmplist.size(); i++) {
SubJob tmpsj = (SubJob) tmplist
.get(j);
if (sj.getSubJobId().equals(
tmpsj.getSubJobId())) {
tmplist.remove(j);
break;
}
}
}
}//if
wn.workFinish();
idleWorkerNodeList.addElement(wn
.getWorkerId());
dispatcherInfo
.setIdleWorkerCount(idleWorkerNodeList
.size());
synchronized (dummy) {
try {
dummy.notifyAll();
} catch (IllegalMonitorStateException e) {
}
}
os.println("<dispatcher hava known>");
os.flush();
} else if (lineStr.equals("<worker quit>")) {
//交给workernode的一个函数处理
wn.workerQuit();
os.println("<worker quit agree>");
os.flush();
if (Parameter.dispatcherLogIsActive) {
LogFile lf = new LogFile(Parameter.logFileName);
try {
lf.logWorker_quit(wn.getWorkerIp(), wn.getWorkerId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
this.s.close();
return;
}//else
}//else
}//try
catch (IOException e) {
try {
is.close();
s.close();
os.close();
return;
} catch (IOException e1) {
log(e);
System.out.println(e.toString());
flag = false;
//error("Error closing socket", e);
}
//log(e);
System.out.println(e.toString());
//e.printStackTrace();
flag = false;
return;
//error("I/O error", e);
}
}//while
try {
if (!this.s.isClosed())
this.s.close();
flag = false;
} catch (Exception e) {
}
return;
}//run
//.....
}
public void terminate() {
System.out.println("listening handler quitting...");
subflag = false;
isActive = false;
}
public void calAveTime(SubJob sj, long t) {
String jobId = sj.getJobId();
Job tmpjob = null;
synchronized (jobList) {
int j = 0;
for (j = 0; j < jobList.size(); j++) {
tmpjob = (Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (jobId.equals(tmpjob.getJobId()))
break;
}
if (j >= jobList.size()) {
System.out.println("Don't find this job in jobList!");
return;
}
tmpjob.calAveTime(t);
//else tmpjob=(Job) (jobList.elementAt(j));
}
}
}
/**
*
* <p>Title: </p>
* <p>Description: 定期检查有没有死亡的worker,将这些workernode从workernodelist队列移入deadworkerlist队列</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class TestDeadWorker extends Thread {
private boolean flag = true;
private boolean subflag = true;
private static final int POLL_INTERVAL = 10 * 1000;
public void run() {
while (flag) {
this.poll();
try {
if (!interrupted())
Thread.sleep(POLL_INTERVAL);
else
flag = false;
} catch (InterruptedException e) {
flag = false;
//get back to work
}
}
System.out.println("Testdeadworker handler quit!");
}
public void poll() {
while (subflag && deadFlag.size() != 0) {
System.out.println("deadwork: " + deadFlag.size());
String s = null;
WorkerNode tmpwn = null;
synchronized (deadFlag) {
s = (String) deadFlag.removeFirst();
}
synchronized (workerNodeList) {
int i = 0;
boolean b = true;
for (; i < workerNodeList.size(); i++) {
tmpwn = (WorkerNode) (workerNodeList.elementAt(i));
if (s.equals(tmpwn.getWorkerId())) {
String deadW_ip = tmpwn.getWorkerIp();
workerNodeList.removeElementAt(i);
dispatcherInfo
.setWorkerCount(workerNodeList.size());
b = false;
if (Parameter.dispatcherLogIsActive) {
LogFile lf = new LogFile(Parameter.logFileName);
try {
lf.logWorker_quitFromQueue(deadW_ip, s,
dispatcherInfo.getWorkerCount());
} catch (Exception e) {
e.printStackTrace();
}
}
break;
}
}
if (b) {
System.out
.println("Don't find this dead workernode in workerNodeList!");
continue;
}
}
synchronized (deadWorkerNodeList) {
deadWorkerNodeList.addElement(tmpwn);
}
if (tmpwn.atIdleListFlag)
synchronized (idleWorkerNodeList) {
Iterator it = idleWorkerNodeList.iterator();
String tmps = "";
boolean b = true;
while (it.hasNext()) {
tmps = (String) it.next();
if (s.equals(tmps)) {
idleWorkerNodeList.remove(tmps);
dispatcherInfo
.setIdleWorkerCount(idleWorkerNodeList
.size());
b = false;
break;
}
}
if (b)
System.out
.println("Don't find this worker at idleWorkerNodelist(but the falg 'atIdleListFlag' is true)!");
}
}//while
} //pool
public void terminate() {
System.out.println("Testdeadworker handler quitting...");
subflag = false;
flag = false;
}
} //TestDead
public void log(Exception e) {
//e.printStackTrace();
System.out.println(e.toString());
}
} //DispatcherNodeManagement
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -