📄 dispatchernodemanagement.java
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Vector;
/**
* <p>Title: </p>
* <p>Description:管理worekernode队列,worker的加入、退出处理,同时要完成对worker的状况的统计等工作 </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherNodeManagement {
private int workerPort; //与worker通信的端口(worker加入的端口)
private int listenPort; //侦听workerheartbeat的端口
//private int workerHeartbeatPort; //定期接收worker信息的端口(这两个端口都应初始化)
private Vector workerNodeList; //worker队列(活着的)
private Vector idleWorkerNodeList; //空闲worker队列
private Vector deadWorkerNodeList; //死亡的worker队列
private Vector jobList;
public Vector ftWorkerNodeList;//容错的worker队列
private DispatcherInfo dispatcherInfo;
private static int num = 0;
private LinkedList deadFlag; //死亡的workerId列表(临时)(未入deadWorkerNodeList之前)
NodeHandler handler;
ListenningHandler lHandler;
TestDeadWorker tHandler;
private JobFaultTolerantHandler jfth = null;
// DispatcherJobManagement jobManager;
private Integer dummy;
private ServerSocket workerSocket;
private ServerSocket listenSocket;
private static final int TIME_OUT = 1000;
public DispatcherNodeManagement(int workerPort, int listenPort,
Vector workerNodeList, Vector idleWorkerNodeList,
Vector deadWorkerNodeList, Vector jobList, DispatcherInfo dinfo,
Integer djm) {
this.workerPort = workerPort;
this.listenPort = listenPort;
this.workerNodeList = workerNodeList;
this.idleWorkerNodeList = idleWorkerNodeList;
this.deadWorkerNodeList = deadWorkerNodeList;
this.jobList = jobList;
this.dispatcherInfo = dinfo;
this.ftWorkerNodeList = new Vector();
// this.jobManager=djm;
this.dummy = djm;
workerSocket = null;
listenSocket = null;
deadFlag = new LinkedList();
}
public boolean InitNodeServer() {
try {
workerSocket = new ServerSocket(workerPort);
//workerSocket.setSoTimeout(TIME_OUT);
listenSocket = new ServerSocket(listenPort);
//listenSocket.setSoTimeout(TIME_OUT);
} catch (IOException e) {
log(e);
System.out.println("Error binding to port");
return false;
} catch (SecurityException e) {
log(e);
System.out
.println("The security manager refused permission to bind to port");
return false;
}
return true;
} //InitNodeServer;
/**
* 生成线程NodeHandler进行处理
*
*/
public boolean StartNodeServer() {
if (InitNodeServer()) {
handler = new NodeHandler();
handler.start();
lHandler = new ListenningHandler();
lHandler.start();
tHandler = new TestDeadWorker();
tHandler.start();
if(Parameter.jobFaultTolerant)
{
Thread jfth = new JobFaultTolerantHandler(jobList);
jfth.start();
}
return true;
}
return false;
} // StartNodeServer
public void quit() {
System.out.println("DispatcherNodeManagement quitting...");
if (jfth != null && jfth.isAlive()) {
jfth.interrupt();
jfth.quit();
}
if (handler != null && handler.isAlive())
handler.terminate();
if (lHandler != null && lHandler.isAlive())
lHandler.terminate();
if (tHandler != null && tHandler.isAlive()) {
tHandler.interrupt();
tHandler.terminate();
}
synchronized (workerNodeList) {
for (int i = 0; i < workerNodeList.size(); i++) {
WorkerNode wn = (WorkerNode) workerNodeList.elementAt(i);
wn.quit();
}
}
try {
if (!this.workerSocket.isClosed())
this.workerSocket.close();
if (!this.listenSocket.isClosed())
this.listenSocket.close();
} catch (Exception e) {
}
System.out.println("DispatcherNodeManagement quit!");
} // StopNodeServer
public int getWorkerCount() {
return workerNodeList.size();
} //getWorkerCount
public int getIdleWorkerCount() {
return idleWorkerNodeList.size();
}
//内联类:生成serversocket侦听是否有worker要加入
class NodeHandler extends Thread {
private boolean isActive = true;
public NodeHandler() {
}
public void run() {
while (isActive) {
Socket socket = null;
try {
socket = workerSocket.accept();
WorkerJoin w = new WorkerJoin(socket); //连接后处理
w.start();
} catch (java.io.InterruptedIOException e) {
//等待连接超时,返回循环开头;(无处理)
} catch (IOException e) {
log(e);
isActive = false;
//error("I/O Error", e);
} catch (SecurityException e) {
log(e);
isActive = false;
//error("An unauthorized client has attempted to connect", e);
}
}
System.out.println("handler quit!");
}
/**
* dispatcher收到worker加入请求后,调用此方法将worker入队列
* @return
*/
public class WorkerJoin extends Thread {
Socket s;
private BufferedReader is;
private PrintWriter os;
public WorkerJoin(Socket sock) {
this.s = sock;
}
public void run() {
if (s == null) {
return;
}
if (Parameter.dispatcherLogIsActive) {
LogFile lf = new LogFile(Parameter.logFileName);
try {
lf.logWorker_login(s.getInetAddress().getHostAddress());
} catch (Exception e) {
e.printStackTrace();
}
}
try {
is = new BufferedReader(new InputStreamReader(s
.getInputStream()));
os = new PrintWriter(s.getOutputStream());
//协议传输数据
String lineStr = is.readLine();
if (lineStr.equals("<Worker join>")) {
os.println("<can accept worker>");
os.flush();
String ipAddr = is.readLine();
String pcWorkLoad = is.readLine();
String workerId;
String dispatcherid = dispatcherInfo.getDispatcherId();
if (num == 0) {
workerId = "W"
+ dispatcherid.substring(1, dispatcherid
.length()) + "00001";
num++;
} else {
String tmp = Integer.toString(num + 1);
int i = 5 - Integer.toString(num + 1).length();
while (i-- != 0) {
tmp = "0" + tmp;
}
workerId = "W"
+ dispatcherid.substring(1, dispatcherid
.length()) + tmp;
num++;
}
WorkerNode w = new WorkerNode(workerId, ipAddr,
pcWorkLoad, deadFlag);
synchronized (workerNodeList) {
workerNodeList.addElement(w);
}
synchronized (idleWorkerNodeList) {
idleWorkerNodeList.addElement(w.getWorkerId()); //应该以workerid入队列
dispatcherInfo
.setWorkerCount(workerNodeList.size());
dispatcherInfo
.setIdleWorkerCount(idleWorkerNodeList
.size());
}
synchronized (dummy) {
try {
dummy.notifyAll();
} catch (IllegalMonitorStateException e) {
//e.printStackTrace();
System.out.println(e.toString());
}
}
//将必要的dispatchernode信息回传给worker
//os.println(dispatcherInfo.getDispatcherIp());
os.println(workerId);
os.println(dispatcherInfo.getDispatcherId());
os.println(listenPort);
os.flush();
lineStr = is.readLine();
if (lineStr.equals("<join finish>")) {
w.start();
//return true; //如果不等。。。??
if (Parameter.dispatcherLogIsActive) {
LogFile lf = new LogFile(Parameter.logFileName);
try {
lf.logWorker_login_Succ(s.getInetAddress()
.getHostAddress(), workerId,
workerNodeList.size());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
} catch (IOException e) {
log(e);
//error("I/O error", e);
} finally {
try {
is.close();
s.close();
os.close();
} catch (IOException e) {
log(e);
//error("Error closing socket", e);
}
}
return;
}
//.....
}
public void terminate() {
System.out.println("handler quitting...");
isActive = false;
}
}
/**
*
* <p>Title: </p>
* <p>Description:接收各个worker的heartbeat </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class ListenningHandler extends Thread {
private boolean isActive = true;
private boolean subflag = true;
public ListenningHandler() {
}
public void run() {
while (isActive) {
Socket socket = null;
try {
socket = listenSocket.accept();
Process l = new Process(socket); //连接后处理
l.start();
} catch (java.io.InterruptedIOException e) {
//等待连接超时,返回循环开头;(无处理)
} catch (IOException e) {
log(e);
isActive = false;
// error("I/O Error", e);
} catch (SecurityException e) {
log(e);
isActive = false;
//error("An unauthorized client has attempted to connect", e);
}
}//while
System.out.println("listening handler quit!");
}
public class Process extends Thread {
Socket s;
private BufferedReader is;
private PrintWriter os;
private ObjectOutputStream oob;
private boolean flag = true;
public Process(Socket sock) {
this.s = sock;
try {
is = new BufferedReader(new InputStreamReader(s
.getInputStream()));
os = new PrintWriter(s.getOutputStream());
oob = new ObjectOutputStream(s.getOutputStream());
} catch (Exception e1) {
System.out.println(e1.toString());
}
System.out.println("Worker poll start: ");
}
public void terminate() {
try {
if (!this.s.isClosed())
this.s.close();
flag = false;
} catch (Exception e) {
}
}
public void run() {
while (flag && subflag) {
if (s == null) {
return;
}
try {
//协议传输数据
String tmpWorkerId = is.readLine();
if (Parameter.jobDebugIsActive) {
System.out.println(tmpWorkerId);
System.out.print("idle worker node:> ");
for (int istart = 0; istart < idleWorkerNodeList
.size(); istart++)
System.out.print(idleWorkerNodeList
.elementAt(istart)
+ " ");
System.out.println();
}
String lineStr = is.readLine();
int i = 0;
WorkerNode wn = null;
boolean ftFlag = false;
synchronized (ftWorkerNodeList) {
for (i = 0; i < ftWorkerNodeList.size(); i++) {
if (tmpWorkerId
.equals((String) (ftWorkerNodeList
.elementAt(i)))) {
ftFlag = true;
break;
}
}
}
if (ftFlag) {
if (lineStr.equals("<worker alive>")) {
os.println("<dispatcher alive>");
os.flush();
//System.out.println("<dispatcher alive>");
} else if (lineStr.equals("<subjob hava finished>")) {
String ft_jobId = is.readLine();
String ft_subJobId = is.readLine();
int j = 0;
Job tmpjob = null;
synchronized (jobList) {
for (j = 0; j < jobList.size(); j++) {
tmpjob = (Job) (jobList.elementAt(j));
//System.out.println(jobId+" "+tmpjob.getJobId());
if (ft_jobId.equals(tmpjob.getJobId()))
break;
}
if (j >= jobList.size()) {
System.out
.println("Don't find this job in jobList!");
return;
}
//else tmpjob=(Job) (jobList.elementAt(j));
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -