⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datapollcommunication.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.worker;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.URL;
import java.net.URLClassLoader;

import cn.edu.hust.cgcl.biogrid.BioSDK.TaskRun;
import cn.edu.hust.cgcl.biogrid.dispatcher.SubJob;
import cn.edu.hust.cgcl.biogrid.service.MessageService;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DataPollCommunication extends Thread {
	SubJob sj;

	MessageService msgService;

	String workerId;

	String d_p_Ip;

	int port;

	WorkerInfo nodeInfo;

	private ObjectInputStream ois;

	private FileInputStream fis;

	public boolean regetDataFlag = false;//判断是否为再取数据。(容错处理)

	public DataPollCommunication(SubJob sj, String workerId, WorkerInfo nodeInfo) {
		this.sj = sj;
		this.workerId = workerId;
		this.d_p_Ip = sj.getDataServerIp();
		this.port = sj.getDataServerPort();
		this.nodeInfo = nodeInfo; //after having  finishied the subjob,set the finish state=true;
		//this.dataName=new String[1];
		//dataName="bgdata01";
		if(sj.getState()==sj.PRE_SUBTASK)
			this.regetDataFlag=true;
		System.out.println("fetch data from datapoll start:");
	}

	protected void getWorkerPath() {
		Parameter.path = System.getProperty("user.dir");
	}

	public void run() {
		int i = 0;
		if ((i = getProgram()) != 0) {

			System.err.println("Fail to get program from datapoll !error code:"
					+ i);
			return;
		}
		if (this.regetDataFlag) {
			if ((i = regetData()) != 0) {
				System.err
						.println("Fail to get data from datapoll !error code: "
								+ i);
				return;
			}
		} else if ((i = getData()) != 0) {
			System.err.println("Fail to get data from datapoll !error code: "
					+ i);
			return;
		}
		try {
			
			URL url = null;
			File program = new File(Parameter.programName);
			if (program.isDirectory()) {
				url = new java.io.File(Parameter.programName + "\\"
						+ "usersubtask.jar").toURL();
			} else if (program.isFile()) {
				url = new java.io.File(Parameter.programName).toURL();
			}

			URLClassLoader loader = new URLClassLoader(new URL[] { url });
			Class c = loader.loadClass("hpc.application.usersubtask");
			Object obj = c.newInstance();

			TaskRun tr = (TaskRun) obj;
			//System.out.println("dataName ,dataPath ,programName,resultPath: "+Parameter.dataName+" "+Parameter.dataPath+" "+Parameter.programName+""+Parameter.resultPath);
			tr.start(sj, Parameter.dataName, Parameter.dataPath, this.workerId,
					Parameter.programName, Parameter.resultPath);

			this.nodeInfo.setFinishFlag(true);
			TaskTransfer tt = new TaskTransfer(this.nodeInfo);
			tt.taskFinished(Parameter.programName, Parameter.dataName,
					Parameter.resultPath);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private boolean initConnect() {
		msgService = new MessageService();
		return true;
	}

	private int getProgram() {
		System.err.println("getProgram!");
		int i = 0;
		if (!initConnect())
			return -1;

		msgService.clearTryTimes();
		i = msgService.getProgramOnce(d_p_Ip, "worker", "worker",
				Parameter.programPath, Parameter.userID, sj.getJobId(), 
				sj.getSubJobId(), workerId);
		if (i != 0)
			return i;

		Parameter.programName = msgService.getReturn();

		if (Parameter.programName != null) {
			//disconnect();
			return 0;
		}		
		return -1;
	}

	private int getData() {
		System.err.println("getData!");
		int i = 0;
		if (!initConnect()) {
			return -1;
		}
		msgService.clearTryTimes();
		i = msgService.getDataOnce(d_p_Ip, "worker", "worker",
				Parameter.dataPath, Parameter.userID, sj.getJobId(), 
				sj.getSubJobId(), workerId);
		if (i != 0)
			return i;

		//System.out.println("dataPath,userID, sj.getJobId(),sj.getSubJobId(), workerId:  "+Parameter.dataPath+Parameter.userID+sj.getJobId()+sj.getSubJobId()+workerId);
		Parameter.dataName = msgService.getReturn();
		//System.out.println("dataName: "+Parameter.dataName);
		if (Parameter.dataName != null) {
			//disconnect();
			System.out.println("Fei: the program is: "+Parameter.dataName);
			return 0;
		}
		//disconnect();
		return -1;
	}

	private int regetData() {
		System.err.println("regetData!");
		int i = 0;
		if (!initConnect()) {
			return -1;
		}
		msgService.clearTryTimes();
		i = msgService.reGetDataOnce(d_p_Ip, "worker", "worker",
				Parameter.dataPath, Parameter.userID, sj.getJobId(), sj
						.getSubJobId(), workerId);
		if (i != 0)
			return i;

		//System.out.println("dataPath,userID, sj.getJobId(),sj.getSubJobId(), workerId:  "+Parameter.dataPath+Parameter.userID+sj.getJobId()+sj.getSubJobId()+workerId);
		Parameter.dataName = msgService.getReturn();
		//System.out.println("dataName: "+Parameter.dataName);
		if (Parameter.dataName != null) {
			//disconnect();
			return 0;
		}
		//disconnect();
		return -1;
	}
	
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -