workernode.java

来自「分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台」· Java 代码 · 共 240 行

JAVA
240
字号
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.util.LinkedList;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class WorkerNode
    extends Thread
{
    private WorkerInfo nodeInfo;
    private String workerId;
    private String ipAddr;
    private int workerPort;
    private int workerState;
    private int life;
    private SubJob subJob;
    private LinkedList jobApply = null;
    private LinkedList deadFlag;
    private Heartbeat h;

    private static final int POLL_INTERVAL = 20 * 1000;
    private static final int WORKER_IDLE = 0;
    private static final int WORKER_BUSY = 1;
    private static final int WORKER_DEAD = 2;
    
    public boolean atIdleListFlag=true;
    public boolean isReduncyWorker=false;//本worker是处于冗余计算状态。

    public WorkerNode(String workerId, String ipAddr, String pcWorkLoad,LinkedList deadFlag)
    {
        this.workerId = workerId;
        this.ipAddr = ipAddr;
        this.deadFlag=deadFlag;
        life = 3;
        workerState = WORKER_IDLE;
        subJob = null;
    } //WorkerNode

    public void run()
    {
        Heartbeat h = new Heartbeat();
        h.start();
        return;
    }
    
    public void quit()
    {
    	if(h!=null)
    	{
    	h.interrupt();
    	h.quit();
    	}
    }

    public String getWorkerId()
    {
        return workerId;
    }
        
    public WorkerInfo getWrokerInfo()
    {
        return nodeInfo;
    } //getWrokerInfo
    
    
    public String getWorkerIp()
    {
    	return this.ipAddr;
    }

    public int getWorkerState()
    {
        return this.workerState;
    }

    public SubJob getSubJob()
    {
        return this.subJob;
    }

    /**
     * 侦听到worker有联系就调用这个方法,将这个worker的寿命恢复到3。
     * 并检测有没有任务分配到自己。有就返回这这任务的id,否则返回null
     */
    public SubJob alive()
    {
        life = 3;
        if (subJob != null && workerState == 0)
            return subJob;
        else
            return null;
    }


    public void setJob(SubJob subJob)
    {
        this.subJob = subJob;
        this.atIdleListFlag=false;
    }

    public void setWorkerState(int state)
    {
        this.workerState = state;
    }

    public void setJobApply(LinkedList jobApply)
    {
        if (this.jobApply == null)
            this.jobApply = jobApply;
    }

    public void workerQuit()
    {
        if (workerState == WORKER_IDLE)
        {
            if (subJob != null)
            { //有预分配的subjob,但未执行
                subJob.workerQuit();
                subJob.setWorkerState(subJob.PRE_SUBTASK);
                synchronized (jobApply)
                {
                	jobApply.addFirst(subJob);
                	try
					{
                		notifyAll();
					}
                	catch(IllegalMonitorStateException e)
					{
                		
					}
                }
            }
            //没有分配subjob
        }
        else if (workerState == WORKER_BUSY)
        {
            //分配有subjob,没做完
            subJob.workerQuit();
            subJob.setWorkerState(3); //将subjob的状态置为未做完中断
            synchronized (jobApply)
            {
            	jobApply.addFirst(subJob);//插入队首
            	try
				{
            		notifyAll();
				}
            	catch(IllegalMonitorStateException e)
				{
            		
				}
            }
        }
        workerState = WORKER_DEAD;
        this.subJob = null;
        synchronized(deadFlag)
        {
        	deadFlag.addLast(workerId);
        }
    }

    public void revive()
    {
    workerState=WORKER_IDLE;
    this.subJob=null;
    this.atIdleListFlag=true;
    life=3;
    this.start();
    }

    public void workFinish()
    {
        this.subJob.jobfinish();
        this.subJob = null;
        this.setWorkerState(0);
        this.atIdleListFlag=true;
    }

    /**
     * dispatcher节点定时侦听worker节点的信息,若连续三个周期worker节点没有向主dispatcher节点发送信息,则主dispatcher节点视worker已经down掉,主dispatcher向从dispatcher发消息通知worker已经down掉。
     * 修改workerstate参数
     *
     */
    public class Heartbeat
        extends Thread
    {
        private boolean flag = true;
        public void run()
        {
            while (flag)
            {
                this.poll();
                try
                {
                	if(!interrupted())
                      Thread.sleep(POLL_INTERVAL);
                	else flag=false;
                }
                catch (InterruptedException e)
                {
                	flag=false;
                //get back to work
                }

            }
            return;
        }
        
        public void quit()
        {
        	flag=false;
        }

        public void poll()
        {
            if (life > 0)
                life--;
            else
            {
                flag = false;                
                workerQuit();
                if (Parameter.dispatcherLogIsActive) {
					LogFile lf = new LogFile(Parameter.logFileName);
					try {
						lf.logWorker_overTime( ipAddr,workerId);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
            }
        }
    } //heartbeat

}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?