workernode.java
来自「分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台」· Java 代码 · 共 240 行
JAVA
240 行
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.util.LinkedList;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class WorkerNode
extends Thread
{
private WorkerInfo nodeInfo;
private String workerId;
private String ipAddr;
private int workerPort;
private int workerState;
private int life;
private SubJob subJob;
private LinkedList jobApply = null;
private LinkedList deadFlag;
private Heartbeat h;
private static final int POLL_INTERVAL = 20 * 1000;
private static final int WORKER_IDLE = 0;
private static final int WORKER_BUSY = 1;
private static final int WORKER_DEAD = 2;
public boolean atIdleListFlag=true;
public boolean isReduncyWorker=false;//本worker是处于冗余计算状态。
public WorkerNode(String workerId, String ipAddr, String pcWorkLoad,LinkedList deadFlag)
{
this.workerId = workerId;
this.ipAddr = ipAddr;
this.deadFlag=deadFlag;
life = 3;
workerState = WORKER_IDLE;
subJob = null;
} //WorkerNode
public void run()
{
Heartbeat h = new Heartbeat();
h.start();
return;
}
public void quit()
{
if(h!=null)
{
h.interrupt();
h.quit();
}
}
public String getWorkerId()
{
return workerId;
}
public WorkerInfo getWrokerInfo()
{
return nodeInfo;
} //getWrokerInfo
public String getWorkerIp()
{
return this.ipAddr;
}
public int getWorkerState()
{
return this.workerState;
}
public SubJob getSubJob()
{
return this.subJob;
}
/**
* 侦听到worker有联系就调用这个方法,将这个worker的寿命恢复到3。
* 并检测有没有任务分配到自己。有就返回这这任务的id,否则返回null
*/
public SubJob alive()
{
life = 3;
if (subJob != null && workerState == 0)
return subJob;
else
return null;
}
public void setJob(SubJob subJob)
{
this.subJob = subJob;
this.atIdleListFlag=false;
}
public void setWorkerState(int state)
{
this.workerState = state;
}
public void setJobApply(LinkedList jobApply)
{
if (this.jobApply == null)
this.jobApply = jobApply;
}
public void workerQuit()
{
if (workerState == WORKER_IDLE)
{
if (subJob != null)
{ //有预分配的subjob,但未执行
subJob.workerQuit();
subJob.setWorkerState(subJob.PRE_SUBTASK);
synchronized (jobApply)
{
jobApply.addFirst(subJob);
try
{
notifyAll();
}
catch(IllegalMonitorStateException e)
{
}
}
}
//没有分配subjob
}
else if (workerState == WORKER_BUSY)
{
//分配有subjob,没做完
subJob.workerQuit();
subJob.setWorkerState(3); //将subjob的状态置为未做完中断
synchronized (jobApply)
{
jobApply.addFirst(subJob);//插入队首
try
{
notifyAll();
}
catch(IllegalMonitorStateException e)
{
}
}
}
workerState = WORKER_DEAD;
this.subJob = null;
synchronized(deadFlag)
{
deadFlag.addLast(workerId);
}
}
public void revive()
{
workerState=WORKER_IDLE;
this.subJob=null;
this.atIdleListFlag=true;
life=3;
this.start();
}
public void workFinish()
{
this.subJob.jobfinish();
this.subJob = null;
this.setWorkerState(0);
this.atIdleListFlag=true;
}
/**
* dispatcher节点定时侦听worker节点的信息,若连续三个周期worker节点没有向主dispatcher节点发送信息,则主dispatcher节点视worker已经down掉,主dispatcher向从dispatcher发消息通知worker已经down掉。
* 修改workerstate参数
*
*/
public class Heartbeat
extends Thread
{
private boolean flag = true;
public void run()
{
while (flag)
{
this.poll();
try
{
if(!interrupted())
Thread.sleep(POLL_INTERVAL);
else flag=false;
}
catch (InterruptedException e)
{
flag=false;
//get back to work
}
}
return;
}
public void quit()
{
flag=false;
}
public void poll()
{
if (life > 0)
life--;
else
{
flag = false;
workerQuit();
if (Parameter.dispatcherLogIsActive) {
LogFile lf = new LogFile(Parameter.logFileName);
try {
lf.logWorker_overTime( ipAddr,workerId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
} //heartbeat
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?