📄 .#dispatchernodemanagement.java.1.9
字号:
{
wn = (WorkerNode) (deadWorkerNodeList.
elementAt(j));
workerNodeList.addElement(wn);
synchronized (idleWorkerNodeList)
{
idleWorkerNodeList.addElement(wn.
getWorkerId()); //应该以workerid入队列
}
deadWorkerNodeList.removeElementAt(j);
dispatcherInfo.setWorkerCount(
workerNodeList.size());
dispatcherInfo.setIdleWorkerCount(
idleWorkerNodeList.size());
wn.revive();
}
}
synchronized(dummy)
{
try{
dummy.notifyAll();
}catch(IllegalMonitorStateException e)
{
}
}
}
else
wn = (WorkerNode) (workerNodeList.
elementAt(i));
}
//找到这个worker,接着:
if (lineStr.equals("<worker alive>"))
{
SubJob sj = wn.
alive(); //寿命恢复到3
if (sj != null)
{
os.println("<new job>");
os.flush();
if (is.readLine().equals("<new job accept>"))
{
System.out.println("<new job accept>");
//oob = new ObjectOutputStream(s.getOutputStream());
oob.writeObject(sj); //传送datapoll信息给worker
//oob.close();
oob.flush();
wn.
setWorkerState(1); //将worker状态置为busy
sj.setWorkerState(1);
//这里有一个延时问题。
dispatcherInfo.setIdleWorkerCount(
idleWorkerNodeList.size());
}
}
os.println("<dispatcher alive>");
os.flush();
//System.out.println("<dispatcher alive>");
}
else {
if (lineStr.equals("<subjob hava finished>"))
{
is.readLine();//jobid, unuseless
is.readLine();//subjobid, unuseless
SubJob sj = wn.
alive(); //寿命恢复到3
wn.
workFinish();
idleWorkerNodeList.addElement(wn.getWorkerId());
dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.
size());
synchronized(dummy)
{
try{
dummy.notifyAll();
}
catch(IllegalMonitorStateException e)
{
}
}
os.println("<dispatcher hava known>");
os.flush();
sj.finishedTime=System.currentTimeMillis();
sj.computingTime=sj.finishedTime-sj.distributeTime;
calAveTime(sj,sj.computingTime);
if(sj.redun_num!=0)
{//有冗余计算,从urgentJobList中删除
LinkedList tmplist=DispatcherJobManagement.urgentJobList;
synchronized(tmplist)
{
for(int j=0;i<tmplist.size();i++)
{
SubJob tmpsj=(SubJob)tmplist.get(j);
if(sj.getSubJobId().equals(tmpsj.getSubJobId()))
{
tmplist.remove(j);
break;
}
}
}
}
//从runningJobList中删除
LinkedList tmplist=DispatcherJobManagement.runningJobList;
synchronized(tmplist)
{
for(int j=0;i<tmplist.size();i++)
{
SubJob tmpsj=(SubJob)tmplist.get(j);
if(sj.getSubJobId().equals(tmpsj.getSubJobId()))
{
tmplist.remove(j);
break;
}
}
}
}
else if (lineStr.equals("<worker quit>"))
{
//交给workernode的一个函数处理
wn.workerQuit();
os.println("<worker quit agree>");
os.flush();
}
this.s.close();
return;
}//else
}//else
}//try
catch (IOException e)
{
try
{
is.close();
s.close();
os.close();
return;
}
catch (IOException e1)
{
log(e);
System.out.println(e.toString());
flag=false;
//error("Error closing socket", e);
}
//log(e);
System.out.println(e.toString());
//e.printStackTrace();
flag=false;
return;
//error("I/O error", e);
}
}//while
try
{
if(!this.s.isClosed())
this.s.close();
flag=false;
}catch(Exception e)
{
}
return;
}//run
//.....
}
public void terminate()
{
System.out.println("listening handler quitting...");
subflag=false;
isActive = false;
}
public void calAveTime(SubJob sj,long t)
{
String jobId=sj.getJobId();
Job tmpjob=null;
synchronized(jobList){
int j=0;
for (j=0; j < jobList.size(); j++)
{
tmpjob=(Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (jobId.equals( tmpjob.getJobId()))
break;
}
if(j>=jobList.size())
{
System.out.println("Don't find this job in jobList!");
return;
}
tmpjob.calAveTime(t);
//else tmpjob=(Job) (jobList.elementAt(j));
}
jfth.start();
}
}
/**
*
* <p>Title: </p>
* <p>Description: 定期检查有没有死亡的worker,将这些workernode从workernodelist队列移入deadworkerlist队列</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class TestDeadWorker
extends Thread
{
private boolean flag = true;
private boolean subflag=true;
private static final int POLL_INTERVAL = 10 * 1000;
public void run()
{
while (flag)
{
this.poll();
try
{
if(!interrupted())
Thread.sleep(POLL_INTERVAL);
else flag=false;
}
catch (InterruptedException e)
{
flag=false;
//get back to work
}
}
System.out.println("Testdeadworker handler quit!");
}
public void poll()
{
while (subflag&&deadFlag.size() != 0)
{
System.out.println("deadwork: "+deadFlag.size());
String s = null;
WorkerNode tmpwn=null;
synchronized (deadFlag)
{
s=(String)deadFlag.removeFirst();
}
synchronized (workerNodeList)
{
int i = 0;
boolean b=true;
for (; i < workerNodeList.size(); i++)
{
tmpwn=(WorkerNode) (workerNodeList.elementAt(i));
if (s.equals( tmpwn.getWorkerId()))
{
String deadW_ip=tmpwn.getWorkerIp();
workerNodeList.removeElementAt(i);
dispatcherInfo.setWorkerCount(workerNodeList.size());
b=false;
if(Parameter.dispatcherLogIsActive)
{
LogFile lf=new LogFile(Parameter.logFileName);
try{
lf.logWorker_quit(deadW_ip,s,dispatcherInfo.getWorkerCount());
}
catch(Exception e)
{
e.printStackTrace();
}
}
break;
}
}
if (b)
{
System.out.println(
"Don't find this dead workernode in workerNodeList!");
continue;
}
}
synchronized (deadWorkerNodeList)
{
deadWorkerNodeList.addElement(tmpwn);
}
if(tmpwn.atIdleListFlag)
synchronized(idleWorkerNodeList)
{
Iterator it=idleWorkerNodeList.iterator();
String tmps="";
boolean b=true;
while(it.hasNext())
{
tmps=(String)it.next();
if(s.equals(tmps))
{
idleWorkerNodeList.remove(tmps);
dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.size());
b=false;
break;
}
}
if (b) System.out.println("Don't find this worker at idleWorkerNodelist(but the falg 'atIdleListFlag' is true)!");
}
}//while
} //pool
public void terminate()
{
System.out.println("Testdeadworker handler quitting...");
subflag=false;
flag = false;
}
} //TestDead
public void log(Exception e)
{
//e.printStackTrace();
System.out.println(e.toString());
}
} //DispatcherNodeManagement
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -