⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchernodemanagement.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Vector;

/**
 * <p>Title: </p>
 * <p>Description:管理worekernode队列,worker的加入、退出处理,同时要完成对worker的状况的统计等工作 </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DispatcherNodeManagement {
	private int workerPort; //与worker通信的端口(worker加入的端口)

	private int listenPort; //侦听workerheartbeat的端口

	//private int workerHeartbeatPort; //定期接收worker信息的端口(这两个端口都应初始化)
	private Vector workerNodeList; //worker队列(活着的)

	private Vector idleWorkerNodeList; //空闲worker队列

	private Vector deadWorkerNodeList; //死亡的worker队列

	private Vector jobList;

	public Vector ftWorkerNodeList;//容错的worker队列

	private DispatcherInfo dispatcherInfo;

	private static int num = 0;

	private LinkedList deadFlag; //死亡的workerId列表(临时)(未入deadWorkerNodeList之前)

	NodeHandler handler;

	ListenningHandler lHandler;

	TestDeadWorker tHandler;

	private JobFaultTolerantHandler jfth = null;

	//    DispatcherJobManagement jobManager;
	private Integer dummy;

	private ServerSocket workerSocket;

	private ServerSocket listenSocket;

	private static final int TIME_OUT = 1000;

	public DispatcherNodeManagement(int workerPort, int listenPort,
			Vector workerNodeList, Vector idleWorkerNodeList,
			Vector deadWorkerNodeList, Vector jobList, DispatcherInfo dinfo,
			Integer djm) {
		this.workerPort = workerPort;
		this.listenPort = listenPort;
		this.workerNodeList = workerNodeList;
		this.idleWorkerNodeList = idleWorkerNodeList;
		this.deadWorkerNodeList = deadWorkerNodeList;
		this.jobList = jobList;
		this.dispatcherInfo = dinfo;
		this.ftWorkerNodeList = new Vector();
		//        this.jobManager=djm;
		this.dummy = djm;
		workerSocket = null;
		listenSocket = null;
		deadFlag = new LinkedList();
	}

	public boolean InitNodeServer() {
		try {
			workerSocket = new ServerSocket(workerPort);
			//workerSocket.setSoTimeout(TIME_OUT);
			listenSocket = new ServerSocket(listenPort);
			//listenSocket.setSoTimeout(TIME_OUT);
		} catch (IOException e) {
			log(e);
			System.out.println("Error binding to port");
			return false;
		} catch (SecurityException e) {
			log(e);
			System.out
					.println("The security manager refused permission to bind to port");
			return false;
		}
		return true;
	} //InitNodeServer;

	/**
	 * 生成线程NodeHandler进行处理
	 *
	 */
	public boolean StartNodeServer() {
		if (InitNodeServer()) {
			handler = new NodeHandler();
			handler.start();
			lHandler = new ListenningHandler();
			lHandler.start();
			tHandler = new TestDeadWorker();
			tHandler.start();
			if(Parameter.jobFaultTolerant)
			{
			Thread jfth = new JobFaultTolerantHandler(jobList);
			jfth.start();
			}
			return true;
		}
		return false;
	} // StartNodeServer

	public void quit() {
		System.out.println("DispatcherNodeManagement quitting...");
		if (jfth != null && jfth.isAlive()) {
			jfth.interrupt();
			jfth.quit();
		}
		if (handler != null && handler.isAlive())
			handler.terminate();
		if (lHandler != null && lHandler.isAlive())
			lHandler.terminate();
		if (tHandler != null && tHandler.isAlive()) {
			tHandler.interrupt();
			tHandler.terminate();
		}
		synchronized (workerNodeList) {
			for (int i = 0; i < workerNodeList.size(); i++) {
				WorkerNode wn = (WorkerNode) workerNodeList.elementAt(i);
				wn.quit();
			}
		}
		try {
			if (!this.workerSocket.isClosed())
				this.workerSocket.close();
			if (!this.listenSocket.isClosed())
				this.listenSocket.close();
		} catch (Exception e) {

		}
		System.out.println("DispatcherNodeManagement quit!");
	} // StopNodeServer

	public int getWorkerCount() {
		return workerNodeList.size();
	} //getWorkerCount

	public int getIdleWorkerCount() {
		return idleWorkerNodeList.size();
	}

	//内联类:生成serversocket侦听是否有worker要加入
	class NodeHandler extends Thread {
		private boolean isActive = true;

		public NodeHandler() {
		}

		public void run() {
			while (isActive) {
				Socket socket = null;
				try {
					socket = workerSocket.accept();
					WorkerJoin w = new WorkerJoin(socket); //连接后处理
					w.start();
				} catch (java.io.InterruptedIOException e) {
					//等待连接超时,返回循环开头;(无处理)
				} catch (IOException e) {
					log(e);
					isActive = false;
					//error("I/O Error", e);
				} catch (SecurityException e) {
					log(e);
					isActive = false;
					//error("An unauthorized client has attempted to connect", e);
				}
			}
			System.out.println("handler quit!");
		}

		/**
		 * dispatcher收到worker加入请求后,调用此方法将worker入队列
		 * @return
		 */
		public class WorkerJoin extends Thread {
			Socket s;

			private BufferedReader is;

			private PrintWriter os;

			public WorkerJoin(Socket sock) {
				this.s = sock;
			}

			public void run() {

				if (s == null) {
					return;
				}
				if (Parameter.dispatcherLogIsActive) {
					LogFile lf = new LogFile(Parameter.logFileName);
					try {
						lf.logWorker_login(s.getInetAddress().getHostAddress());
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
				try {
					is = new BufferedReader(new InputStreamReader(s
							.getInputStream()));
					os = new PrintWriter(s.getOutputStream());
					//协议传输数据
					String lineStr = is.readLine();

					if (lineStr.equals("<Worker join>")) {
						os.println("<can accept worker>");
						os.flush();
						String ipAddr = is.readLine();
						String pcWorkLoad = is.readLine();

						String workerId;
						String dispatcherid = dispatcherInfo.getDispatcherId();

						if (num == 0) {
							workerId = "W"
									+ dispatcherid.substring(1, dispatcherid
											.length()) + "00001";
							num++;
						} else {
							String tmp = Integer.toString(num + 1);
							int i = 5 - Integer.toString(num + 1).length();
							while (i-- != 0) {
								tmp = "0" + tmp;
							}
							workerId = "W"
									+ dispatcherid.substring(1, dispatcherid
											.length()) + tmp;
							num++;
						}
						WorkerNode w = new WorkerNode(workerId, ipAddr,
								pcWorkLoad, deadFlag);
						synchronized (workerNodeList) {
							workerNodeList.addElement(w);
						}
						synchronized (idleWorkerNodeList) {
							idleWorkerNodeList.addElement(w.getWorkerId()); //应该以workerid入队列
							dispatcherInfo
									.setWorkerCount(workerNodeList.size());
							dispatcherInfo
									.setIdleWorkerCount(idleWorkerNodeList
											.size());
						}
						synchronized (dummy) {
							try {
								dummy.notifyAll();
							} catch (IllegalMonitorStateException e) {
								//e.printStackTrace();
								System.out.println(e.toString());
							}
						}
						//将必要的dispatchernode信息回传给worker
						//os.println(dispatcherInfo.getDispatcherIp());
						os.println(workerId);
						os.println(dispatcherInfo.getDispatcherId());
						os.println(listenPort);
						os.flush();
						lineStr = is.readLine();
						if (lineStr.equals("<join finish>")) {
							w.start();
							//return true; //如果不等。。。??
							if (Parameter.dispatcherLogIsActive) {
								LogFile lf = new LogFile(Parameter.logFileName);
								try {
									lf.logWorker_login_Succ(s.getInetAddress()
											.getHostAddress(), workerId,
											workerNodeList.size());
								} catch (Exception e) {
									e.printStackTrace();
								}
							}
						}
					}
				} catch (IOException e) {
					log(e);
					//error("I/O error", e);
				} finally {
					try {
						is.close();
						s.close();
						os.close();
					} catch (IOException e) {
						log(e);
						//error("Error closing socket", e);
					}
				}
				return;
			}
			//.....
		}

		public void terminate() {
			System.out.println("handler quitting...");
			isActive = false;

		}

	}

	/**
	 *
	 * <p>Title: </p>
	 * <p>Description:接收各个worker的heartbeat </p>
	 * <p>Copyright: Copyright (c) 2004</p>
	 * <p>Company: </p>
	 * @author not attributable
	 * @version 1.0
	 */
	public class ListenningHandler extends Thread {
		private boolean isActive = true;

		private boolean subflag = true;

		public ListenningHandler() {

		}

		public void run() {
			while (isActive) {
				Socket socket = null;
				try {
					socket = listenSocket.accept();
					Process l = new Process(socket); //连接后处理
					l.start();
				} catch (java.io.InterruptedIOException e) {
					//等待连接超时,返回循环开头;(无处理)
				} catch (IOException e) {
					log(e);
					isActive = false;
					// error("I/O Error", e);
				} catch (SecurityException e) {
					log(e);
					isActive = false;
					//error("An unauthorized client has attempted to connect", e);
				}
			}//while
			System.out.println("listening handler quit!");
		}

		public class Process extends Thread {
			Socket s;

			private BufferedReader is;

			private PrintWriter os;

			private ObjectOutputStream oob;

			private boolean flag = true;

			public Process(Socket sock) {
				this.s = sock;
				try {
					is = new BufferedReader(new InputStreamReader(s
							.getInputStream()));
					os = new PrintWriter(s.getOutputStream());
					oob = new ObjectOutputStream(s.getOutputStream());
				} catch (Exception e1) {
					System.out.println(e1.toString());
				}
				System.out.println("Worker poll start: ");
			}

			public void terminate() {
				try {
					if (!this.s.isClosed())
						this.s.close();
					flag = false;
				} catch (Exception e) {

				}
			}

			public void run() {
				while (flag && subflag) {

					if (s == null) {
						return;
					}
					try {
						//协议传输数据
						String tmpWorkerId = is.readLine();
						if (Parameter.jobDebugIsActive) {
							System.out.println(tmpWorkerId);
							System.out.print("idle worker node:> ");
							for (int istart = 0; istart < idleWorkerNodeList
									.size(); istart++)
								System.out.print(idleWorkerNodeList
										.elementAt(istart)
										+ "  ");
							System.out.println();
						}

						String lineStr = is.readLine();

						int i = 0;
						WorkerNode wn = null;
						boolean ftFlag = false;
						synchronized (ftWorkerNodeList) {
							for (i = 0; i < ftWorkerNodeList.size(); i++) {
								if (tmpWorkerId
										.equals((String) (ftWorkerNodeList
												.elementAt(i)))) {
									ftFlag = true;
									break;
								}
							}
						}
						if (ftFlag) {
							if (lineStr.equals("<worker alive>")) {
								os.println("<dispatcher alive>");
								os.flush();
								//System.out.println("<dispatcher alive>");
							} else if (lineStr.equals("<subjob hava finished>")) {
								String ft_jobId = is.readLine();
								String ft_subJobId = is.readLine();
								int j = 0;
								Job tmpjob = null;
								synchronized (jobList) {
									for (j = 0; j < jobList.size(); j++) {
										tmpjob = (Job) (jobList.elementAt(j));
										//System.out.println(jobId+"  "+tmpjob.getJobId());
										if (ft_jobId.equals(tmpjob.getJobId()))
											break;
									}
									if (j >= jobList.size()) {
										System.out
												.println("Don't find this job in jobList!");
										return;
									}
									//else tmpjob=(Job) (jobList.elementAt(j));
								}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -