📄 dispatchernode.java
字号:
package cn.edu.hust.cgcl.biogrid.monitor;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.LinkedList;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.dispatcher.DispatcherInfo;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherNode
extends Thread
{
private DispatcherInfo nodeInfo;
private String dispatcherId;
private String ipAddr;
private int dispatcherPort;
private int workerPort;
private int taskPort;
// private boolean newJobFlag = false;
// private boolean cancelJobFlag = false;
// private MonitorJob tmpNewJob = null;
// private int newJobCount = 0;
// private String cancelJobId;
public boolean isActive = true;
private DispatcherGroup disGroup = null;
private int failureCount = 3;
private Socket nodeSocket;
private BufferedReader is;
private PrintWriter os;
//private ObjectOutputStream oob;
private static final int POLL_INTERVAL = 10 * 1000;
private static final int TIME_OUT = 1000;
LinkedList tmpString = new LinkedList();
public DispatcherNode()
{
DispatcherInfo nodeInfo = null;
} //DispatcherNode
public DispatcherNode(String dispatcherId, String ipAddr,
int dispatcherPort, int workerport, int taskport,
DispatcherGroup dgroup)
{
this.dispatcherId = dispatcherId;
this.ipAddr = ipAddr;
this.dispatcherPort = dispatcherPort;
this.workerPort = workerport;
this.taskPort = taskport;
nodeInfo = new DispatcherInfo(dispatcherId, ipAddr);
this.disGroup = dgroup;
failureCount = 3;
//nodeInfo = null;
} // DispatcherNode
public String GetIp()
{
return this.ipAddr;
}
public int GetPort()
{
return this.dispatcherPort;
}
public int getTaskPort()
{
return this.taskPort;
}
public int getWorkerPort()
{
return this.workerPort;
}
// 获取节点信息
public DispatcherInfo getNodeInfo()
{
return this.nodeInfo;
} // getDispatcherInfo
// 获取作任务信息
public Vector getJobInfo()
{
return nodeInfo.getDispacherJob();
} // getJobInfo
// 是否可以再接受任务
public boolean canAcceptJob()
{
return nodeInfo.getCanAcceptJob();
} // canAcceptJob
// 接收新的计算任务
public boolean dispatchJob(MonitorJob job, int jobcount)
{
newJobHandle(job, jobcount);
return true;
// if (newJobFlag)
// {
// return false;
// }
// else
// {
// tmpNewJob = job;
// newJobCount = jobcount;
// newJobFlag = true;
// return true;
// }
//若成功返回true,否则false
} // dispatcherJob
// 结束计算项目任务
public boolean cancelJob(String jobId)
{
cancelJobHandle(jobId);
return true;
// if (cancelJobFlag)
// {
// return false;
// }
// else
// {
// cancelJobFlag = true;
// cancelJobId = jobId;
// return true;
// }
} // cancelProject
public void run()
{
while (isActive)
{
// if (newJobFlag)
// {
// newJobHandle();
// }
// if (cancelJobFlag)
// {
// cancelJobHandle();
// }
if (!this.onPoll())
{
isActive = false;
failureCount--;
if (failureCount <= 0)
{
// quit thread
this.disGroup.removeNode(this);
System.out.print("Dispatcher Node Failure Exit: ");
System.out.println(this.ipAddr);
//log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
try{
lf.logDispatcher_quit(this.ipAddr,this.dispatcherId,this.disGroup.getNodeCount());
}catch(Exception e)
{
e.printStackTrace();
}
}//log
return;
} // if
}
failureCount = 3;
// System.out.println(
// "the dispatcher information: load workercount idleworkercount");
// System.out.println(nodeInfo.getWorkerCount() + " " +
// nodeInfo.getIdleWorkerCount());
try
{
if(!interrupted())
{
Thread.sleep(POLL_INTERVAL);
}
else {
isActive=false;
}
}
catch (InterruptedException e)
{
isActive=false;//get back to work
}
} //while
System.out.println("dispatchernode's quit!");
this.disGroup.removeNode(this);
// log
if(MonitorConfiuration.MonLogIsActive)
{
LogFile lf=new LogFile(MonitorConfiuration.logFileName);
try{
lf.logDispatcher_quit(this.ipAddr,this.dispatcherId,this.disGroup.getNodeCount());
}catch(Exception e)
{
e.printStackTrace();
}
}//log
try{
if(!this.nodeSocket.isClosed())
this.nodeSocket.close();
}catch(Exception e)
{
}
return;
} //run
public void quit()
{
this.disGroup.removeNode(this);
isActive=false;
try{
if(!this.nodeSocket.isClosed())
this.nodeSocket.close();
}catch(Exception e)
{
}
}
private boolean init()
{
try
{
this.nodeSocket = new Socket(ipAddr, dispatcherPort);
//System.out.println(dispatcherPort);
//nodeSocket.setSoTimeout(TIME_OUT);
is = new BufferedReader(new InputStreamReader(nodeSocket.
getInputStream()));
os = new PrintWriter(nodeSocket.getOutputStream());
}
catch (Exception e)
{
System.out.println(e.toString());
try
{
if (is != null)
is.close();
if (os != null)
os.close();
if (nodeSocket != null)
nodeSocket.close();
}
catch (Exception e1)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -