📄 .#dispatchernodemanagement.java.1.9
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Vector;
/**
* <p>Title: </p>
* <p>Description:管理worekernode队列,worker的加入、退出处理,同时要完成对worker的状况的统计等工作 </p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherNodeManagement
{
private int workerPort; //与worker通信的端口(worker加入的端口)
private int listenPort; //侦听workerheartbeat的端口
//private int workerHeartbeatPort; //定期接收worker信息的端口(这两个端口都应初始化)
private Vector workerNodeList; //worker队列(活着的)
private Vector idleWorkerNodeList; //空闲worker队列
private Vector deadWorkerNodeList; //死亡的worker队列
private Vector jobList;
public Vector ftWorkerNodeList;//容错的worker队列
private DispatcherInfo dispatcherInfo;
private static int num = 0;
private LinkedList deadFlag; //死亡的workerId列表(临时)(未入deadWorkerNodeList之前)
NodeHandler handler;
ListenningHandler lHandler;
TestDeadWorker tHandler;
private JobFaultTolerantHandler jfth=null;
// DispatcherJobManagement jobManager;
private Integer dummy;
private ServerSocket workerSocket;
private ServerSocket listenSocket;
private static final int TIME_OUT = 1000;
public DispatcherNodeManagement(int workerPort, int listenPort,
Vector workerNodeList,
Vector idleWorkerNodeList,
Vector deadWorkerNodeList,Vector jobList,
DispatcherInfo dinfo,Integer djm)
{
this.workerPort = workerPort;
this.listenPort = listenPort;
this.workerNodeList = workerNodeList;
this.idleWorkerNodeList = idleWorkerNodeList;
this.deadWorkerNodeList = deadWorkerNodeList;
this.jobList=jobList;
this.dispatcherInfo = dinfo;
this.ftWorkerNodeList=new Vector();
// this.jobManager=djm;
this.dummy=djm;
workerSocket = null;
listenSocket = null;
deadFlag = new LinkedList();
jfth=new JobFaultTolerantHandler(jobList);
}
public boolean InitNodeServer()
{
try
{
workerSocket = new ServerSocket(workerPort);
//workerSocket.setSoTimeout(TIME_OUT);
listenSocket = new ServerSocket(listenPort);
//listenSocket.setSoTimeout(TIME_OUT);
}
catch (IOException e)
{
log(e);
System.out.println("Error binding to port");
return false;
}
catch (SecurityException e)
{
log(e);
System.out.println("The security manager refused permission to bind to port");
return false;
}
return true;
} //InitNodeServer;
/**
* 生成线程NodeHandler进行处理
*
*/
public boolean StartNodeServer()
{
if(InitNodeServer())
{
handler = new NodeHandler();
handler.start();
lHandler = new ListenningHandler();
lHandler.start();
tHandler = new TestDeadWorker();
tHandler.start();
return true;
}
return false;
} // StartNodeServer
public void quit()
{
System.out.println("DispatcherNodeManagement quitting...");
if(handler!=null&&handler.isActive)
handler.terminate();
if(lHandler!=null&&lHandler.isActive)
lHandler.terminate();
if(tHandler!=null&&tHandler.isAlive())
{
tHandler.interrupt();
tHandler.terminate();
}
synchronized(workerNodeList)
{
for(int i=0;i<workerNodeList.size();i++)
{
WorkerNode wn=(WorkerNode)workerNodeList.elementAt(i);
wn.quit();
}
}
try
{
if(!this.workerSocket.isClosed())
this.workerSocket.close();
if(!this.listenSocket.isClosed())
this.listenSocket.close();
}
catch(Exception e)
{
}
System.out.println("DispatcherNodeManagement quit!");
} // StopNodeServer
public int getWorkerCount()
{
return workerNodeList.size();
} //getWorkerCount
public int getIdleWorkerCount()
{
return idleWorkerNodeList.size();
}
//内联类:生成serversocket侦听是否有worker要加入
class NodeHandler
extends Thread
{
private boolean isActive = true;
public NodeHandler()
{
}
public void run()
{
while (isActive)
{
Socket socket = null;
try
{
socket = workerSocket.accept();
WorkerJoin w = new WorkerJoin(socket); //连接后处理
w.start();
}
catch (java.io.InterruptedIOException e)
{
//等待连接超时,返回循环开头;(无处理)
}
catch (IOException e)
{
log(e);
isActive=false;
//error("I/O Error", e);
}
catch (SecurityException e)
{
log(e);
isActive=false;
//error("An unauthorized client has attempted to connect", e);
}
/*catch (Throwable e)
{
error("Unexpected exception", e);
}*/
}
System.out.println("handler quit!");
}
/**
* dispatcher收到worker加入请求后,调用此方法将worker入队列
* @return
*/
public class WorkerJoin
extends Thread
{
Socket s;
private BufferedReader is;
private PrintWriter os;
public WorkerJoin(Socket sock)
{
this.s = sock;
}
public void run()
{
if (s == null)
{
return;
}
if(Parameter.dispatcherLogIsActive)
{
LogFile lf=new LogFile(Parameter.logFileName);
try{
lf.logWorker_login(s.getInetAddress().getHostAddress());
}
catch(Exception e)
{
e.printStackTrace();
}
}
try
{
is = new BufferedReader(new InputStreamReader(s.
getInputStream()));
os = new PrintWriter(s.getOutputStream());
//协议传输数据
String lineStr = is.readLine();
if (lineStr.equals("<Worker join>"))
{
os.println("<can accept worker>");
os.flush();
String ipAddr = is.readLine();
String pcWorkLoad = is.readLine();
String workerId;
String dispatcherid = dispatcherInfo.
getDispatcherId();
if (num == 0)
{
workerId = "W" +
dispatcherid.substring(1, dispatcherid.length()) +
"00001";
num++;
}
else
{
String tmp = Integer.toString(num + 1);
int i = 5 - Integer.toString(num + 1).length();
while (i-- != 0)
{
tmp = "0" + tmp;
}
workerId = "W" +
dispatcherid.substring(1, dispatcherid.length()) +
tmp;
num++;
}
WorkerNode w = new WorkerNode(workerId, ipAddr,
pcWorkLoad, deadFlag);
synchronized (workerNodeList)
{
workerNodeList.addElement(w);
}
synchronized (idleWorkerNodeList)
{
idleWorkerNodeList.addElement(w.getWorkerId()); //应该以workerid入队列
dispatcherInfo.setWorkerCount(workerNodeList.size());
dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.
size());
}
synchronized(dummy)
{
try{
dummy.notifyAll();
}catch(IllegalMonitorStateException e)
{
//e.printStackTrace();
System.out.println(e.toString());
}
}
//将必要的dispatchernode信息回传给worker
//os.println(dispatcherInfo.getDispatcherIp());
os.println(workerId);
os.println(dispatcherInfo.getDispatcherId());
os.println(listenPort);
os.flush();
lineStr = is.readLine();
if (lineStr.equals("<join finish>"))
{
w.start();
//return true; //如果不等。。。??
if(Parameter.dispatcherLogIsActive)
{
LogFile lf=new LogFile("displog.log");
try{
lf.logWorker_login_Succ(s.getInetAddress().getHostAddress(),workerId,workerNodeList.size());
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
}
}
catch (IOException e)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -