⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchernodemanagement.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
								Vector tmpSjList = tmpjob.subJobList;
								SubJob tmpsubjob = null;
								synchronized (tmpSjList) {
									for (j = 0; j < tmpSjList.size(); j++) {
										tmpsubjob = (SubJob) (jobList
												.elementAt(j));
										//System.out.println(jobId+"  "+tmpjob.getJobId());
										if (ft_subJobId.equals(tmpsubjob
												.getSubJobId())) {
											tmpsubjob.setWorkerState(2);//finished
											break;
										}
									}
									if (j >= tmpSjList.size()) {
										System.out
												.println("Don't find this subjob in subjobList!");
										return;
									}
									//else tmpjob=(Job) (jobList.elementAt(j));
								}

								synchronized (dummy) {
									try {
										dummy.notifyAll();
									} catch (IllegalMonitorStateException e) {

									}
								}
								os.println("<dispatcher hava known>");
								os.flush();
								tmpsubjob.finishedTime = System
										.currentTimeMillis();
								tmpsubjob.computingTime = tmpsubjob.finishedTime
										- tmpsubjob.distributeTime;
							} else if (lineStr.equals("<worker quit>")) {
								//交给workernode的一个函数处理
								synchronized (ftWorkerNodeList) {
									for (i = 0; i < ftWorkerNodeList.size(); i++) {
										if (tmpWorkerId
												.equals((String) (ftWorkerNodeList
														.elementAt(i)))) {
											ftWorkerNodeList.remove(i);
											break;
										}
									}
								}
								os.println("<worker quit agree>");
								os.flush();
							}

							this.s.close();
							return;
						} else {
							synchronized (workerNodeList) {
								for (i = 0; i < workerNodeList.size(); i++) {
									if (tmpWorkerId
											.equals(((WorkerNode) (workerNodeList
													.elementAt(i)))
													.getWorkerId()))
										break;
								}
								if (i >= workerNodeList.size()) {
									//查看是否为已死worker,若找到,复活。
									int j = 0;
									synchronized (deadWorkerNodeList) {
										for (j = 0; j < deadWorkerNodeList
												.size(); j++) {
											if (tmpWorkerId
													.equals(((WorkerNode) (deadWorkerNodeList
															.elementAt(j)))
															.getWorkerId()))
												break;
										}
										if (j >= deadWorkerNodeList.size()) {//if fault tolerant ,then deal with...

											if (lineStr
													.equals("worker fault tolerance")) {

												String ft_jobId = is.readLine();
												String ft_subJobId = is
														.readLine();
												int ft_sjState = Integer
														.parseInt(is.readLine());

												os
														.println("worker fault tolerance finish");
												os.flush();
												ftWorkerNodeList
														.add(tmpWorkerId);
												Job tmpjob = null;
												synchronized (jobList) {
													for (j = 0; j < jobList
															.size(); j++) {
														tmpjob = (Job) (jobList
																.elementAt(j));
														//System.out.println(jobId+"  "+tmpjob.getJobId());
														if (ft_jobId
																.equals(tmpjob
																		.getJobId()))
															break;
													}
													if (j >= jobList.size()) {
														System.out
																.println("Don't find this job in jobList!");
														return;
													}
													//else tmpjob=(Job) (jobList.elementAt(j));
												}
												Vector tmpSjList = tmpjob.subJobList;
												SubJob tmpsubjob = null;
												synchronized (tmpSjList) {
													for (j = 0; j < tmpSjList
															.size(); j++) {
														tmpsubjob = (SubJob) (jobList
																.elementAt(j));
														//System.out.println(jobId+"  "+tmpjob.getJobId());
														if (ft_subJobId
																.equals(tmpsubjob
																		.getSubJobId())) {
															tmpsubjob
																	.setWorkerState(ft_sjState);
															break;
														}
													}
													if (j >= tmpSjList.size()) {
														System.out
																.println("Don't find this subjob in subjobList!");
														return;
													}
													//else tmpjob=(Job) (jobList.elementAt(j));
												}

											} else {
												System.out
														.println("Don't find this worker in workerNodeList and deadWorkerNodeList!");
												return;
											}
										} else {
											wn = (WorkerNode) (deadWorkerNodeList
													.elementAt(j));
											workerNodeList.addElement(wn);
											synchronized (idleWorkerNodeList) {
												idleWorkerNodeList
														.addElement(wn
																.getWorkerId()); //应该以workerid入队列
											}
											deadWorkerNodeList
													.removeElementAt(j);
											dispatcherInfo
													.setWorkerCount(workerNodeList
															.size());
											dispatcherInfo
													.setIdleWorkerCount(idleWorkerNodeList
															.size());
											wn.revive();
										}
									}
									synchronized (dummy) {
										try {
											dummy.notifyAll();
										} catch (IllegalMonitorStateException e) {

										}
									}
								} else
									wn = (WorkerNode) (workerNodeList
											.elementAt(i));
							}
							//找到这个worker,接着:
							if (lineStr.equals("<worker alive>")) {
								SubJob sj = wn.alive(); //寿命恢复到3
								if (sj != null && sj.getState() != 2) {
									os.println("<new job>");
									if(wn.isReduncyWorker)
									  os.println("1");
									else os.println("0");
									os.flush();
									if (is.readLine()
											.equals("<new job accept>")) {
										if (Parameter.jobDebugIsActive)
											System.out
													.println("<new job accept>");
										//oob = new ObjectOutputStream(s.getOutputStream());
										oob.writeObject(sj); //传送datapoll信息给worker
										//oob.close();
										oob.flush();
										wn.setWorkerState(1); //将worker状态置为busy
										sj.setWorkerState(1);
										//这里有一个延时问题。
										dispatcherInfo
												.setIdleWorkerCount(idleWorkerNodeList
														.size());
									}
								}
								os.println("<dispatcher alive>");
								os.flush();
								//System.out.println("<dispatcher alive>");
							} else {
								if (lineStr.equals("<subjob hava finished>")) {
									is.readLine();//jobid, unuseless
									is.readLine();//subjobid, unuseless
									SubJob sj = wn.alive(); //寿命恢复到3
									sj=wn.getSubJob();//在busy状态下,wn.alive()不返回subjob,必须主动取。
									if (sj != null) {
										if(sj.getState()!=2&&Parameter.jobFaultTolerant)
										{//子任务以前没有worker完成
										sj.finishedTime = System
												.currentTimeMillis();
										sj.computingTime = sj.finishedTime
												- sj.distributeTime;
										calAveTime(sj, sj.computingTime);
										}//if
//										if (sj.redun_num != 0) {//有冗余计算,从urgentJobList中删除
//											LinkedList tmplist = DispatcherJobManagement.urgentJobList;
//											synchronized (tmplist) {
//												for (int j = 0; i < tmplist
//														.size(); i++) {
//													SubJob tmpsj = (SubJob) tmplist
//															.get(j);
//													if (sj
//															.getSubJobId()
//															.equals(
//																	tmpsj
//																			.getSubJobId())) {
//														tmplist.remove(j);
//														break;
//													}
//												}
//											}
//										}
										//从runningJobList中删除
										LinkedList tmplist = DispatcherJobManagement.runningJobList;
										synchronized (tmplist) {
											for (int j = 0; i < tmplist.size(); i++) {
												SubJob tmpsj = (SubJob) tmplist
														.get(j);
												if (sj.getSubJobId().equals(
														tmpsj.getSubJobId())) {
													tmplist.remove(j);
													break;
												}
											}
										}
									}//if
									wn.workFinish();
									idleWorkerNodeList.addElement(wn
											.getWorkerId());
									dispatcherInfo
											.setIdleWorkerCount(idleWorkerNodeList
													.size());
									synchronized (dummy) {
										try {
											dummy.notifyAll();
										} catch (IllegalMonitorStateException e) {

										}
									}
									os.println("<dispatcher hava known>");
									os.flush();

								} else if (lineStr.equals("<worker quit>")) {
									//交给workernode的一个函数处理
									wn.workerQuit();
									os.println("<worker quit agree>");
									os.flush();
									if (Parameter.dispatcherLogIsActive) {
										LogFile lf = new LogFile(Parameter.logFileName);
										try {
											lf.logWorker_quit(wn.getWorkerIp(), wn.getWorkerId());
										} catch (Exception e) {
											e.printStackTrace();
										}
									}
								}

								this.s.close();
								return;
							}//else
						}//else
					}//try
					catch (IOException e) {
						try {
							is.close();
							s.close();
							os.close();
							return;
						} catch (IOException e1) {
							log(e);
							System.out.println(e.toString());
							flag = false;
							//error("Error closing socket", e);
						}
						//log(e);
						System.out.println(e.toString());
						//e.printStackTrace();
						flag = false;
						return;
						//error("I/O error", e);
					}
				}//while
				try {
					if (!this.s.isClosed())
						this.s.close();
					flag = false;
				} catch (Exception e) {

				}
				return;
			}//run
			//.....
		}

		public void terminate() {
			System.out.println("listening handler quitting...");
			subflag = false;
			isActive = false;

		}

		public void calAveTime(SubJob sj, long t) {
			String jobId = sj.getJobId();
			Job tmpjob = null;
			synchronized (jobList) {
				int j = 0;
				for (j = 0; j < jobList.size(); j++) {
					tmpjob = (Job) (jobList.elementAt(j));
					//System.out.println(jobId+"  "+tmpjob.getJobId());
					if (jobId.equals(tmpjob.getJobId()))
						break;
				}
				if (j >= jobList.size()) {
					System.out.println("Don't find this job in jobList!");
					return;
				}
				tmpjob.calAveTime(t);
				//else tmpjob=(Job) (jobList.elementAt(j));
			}
		}

	}

	/**
	 *
	 * <p>Title: </p>
	 * <p>Description: 定期检查有没有死亡的worker,将这些workernode从workernodelist队列移入deadworkerlist队列</p>
	 * <p>Copyright: Copyright (c) 2004</p>
	 * <p>Company: </p>
	 * @author not attributable
	 * @version 1.0
	 */
	public class TestDeadWorker extends Thread {
		private boolean flag = true;

		private boolean subflag = true;

		private static final int POLL_INTERVAL = 10 * 1000;

		public void run() {
			while (flag) {
				this.poll();
				try {
					if (!interrupted())
						Thread.sleep(POLL_INTERVAL);
					else
						flag = false;
				} catch (InterruptedException e) {
					flag = false;
					//get back to work
				}

			}
			System.out.println("Testdeadworker handler quit!");
		}

		public void poll() {
			while (subflag && deadFlag.size() != 0) {
				System.out.println("deadwork: " + deadFlag.size());
				String s = null;
				WorkerNode tmpwn = null;
				synchronized (deadFlag) {
					s = (String) deadFlag.removeFirst();
				}
				synchronized (workerNodeList) {
					int i = 0;
					boolean b = true;
					for (; i < workerNodeList.size(); i++) {
						tmpwn = (WorkerNode) (workerNodeList.elementAt(i));
						if (s.equals(tmpwn.getWorkerId())) {
							String deadW_ip = tmpwn.getWorkerIp();
							workerNodeList.removeElementAt(i);
							dispatcherInfo
									.setWorkerCount(workerNodeList.size());
							b = false;
							if (Parameter.dispatcherLogIsActive) {
								LogFile lf = new LogFile(Parameter.logFileName);
								try {
									lf.logWorker_quitFromQueue(deadW_ip, s,
											dispatcherInfo.getWorkerCount());
								} catch (Exception e) {
									e.printStackTrace();
								}
							}
							break;
						}
					}
					if (b) {
						System.out
								.println("Don't find this dead workernode in workerNodeList!");
						continue;
					}
				}
				synchronized (deadWorkerNodeList) {
					deadWorkerNodeList.addElement(tmpwn);
				}
				if (tmpwn.atIdleListFlag)
					synchronized (idleWorkerNodeList) {
						Iterator it = idleWorkerNodeList.iterator();
						String tmps = "";
						boolean b = true;
						while (it.hasNext()) {
							tmps = (String) it.next();
							if (s.equals(tmps)) {
								idleWorkerNodeList.remove(tmps);
								dispatcherInfo
										.setIdleWorkerCount(idleWorkerNodeList
												.size());
								b = false;
								break;
							}
						}
						if (b)
							System.out
									.println("Don't find this worker at idleWorkerNodelist(but the falg 'atIdleListFlag' is true)!");
					}
			}//while
		} //pool

		public void terminate() {
			System.out.println("Testdeadworker handler quitting...");
			subflag = false;
			flag = false;
		}
	} //TestDead

	public void log(Exception e) {
		//e.printStackTrace();
		System.out.println(e.toString());
	}

} //DispatcherNodeManagement

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -