⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mainsdk.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.BioSDK;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.Vector;

import cn.edu.hust.cgcl.biogrid.service.MessageService;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MainSDK
{
    final static int TIME_OUT = 3 * 1000;

    public MainSDK()
    {} // MainSDK

    static public void newSubTask(int n, Job job, LinkedList taskList)
        throws Exception
    {
        if (taskList == null)
        {
            throw new ParseException("method parameter error!", 0);
        } // if
        //System.out.println("monitorIp + monitorPort: "+job.monitorIp+job.monitorPort);
        Socket monSock = new Socket(job.monitorIp, job.monitorPort);
        //monSock.setSoTimeout(TIME_OUT);
        BufferedReader monis = new BufferedReader(new InputStreamReader(monSock.
            getInputStream()));
        PrintWriter monos = new PrintWriter(monSock.getOutputStream());
        monos.println("<new subtask>");
        monos.flush();
        String tmpline = monis.readLine();
        if (!tmpline.equals("<new subtask next>"))
        {
            throw new ParseException("protocol parse error!", 0);
        } // if
        monos.println(job.jobId);
        monos.flush();
        monos.println(n);
        monos.flush();
        Vector vec = new Vector();
        tmpline = monis.readLine();
        while (tmpline != null && !tmpline.equals("<dispatcher finish>"))
        {
            vec.add(tmpline);
            tmpline = monis.readLine();
        } // while
        monSock.close();

        for (int i = 0; i < vec.size(); i++)
        {
            // format is : taskCount#priip#priport#backip#backport
            tmpline = (String) vec.get(i);
            int index1 = tmpline.indexOf('#', 0);
            int taskCount = Integer.parseInt(tmpline.substring(0, index1));
            //int index2 = tmpline.lastIndexOf('#');
            int index2 = tmpline.indexOf('#', index1+1);
            String tmpmainip = tmpline.substring(index1 + 1, index2);
//            int tmpport = Integer.parseInt(tmpline.substring(index2 + 1,
//                tmpline.length()));
            index1=index2+1;
            index2 = tmpline.indexOf('#', index1);
            int tmpmainport = Integer.parseInt(tmpline.substring(index1,index2));

            index1 = index2 + 1;
            index2 = tmpline.indexOf('#', index1);
            String tmpsubip = tmpline.substring(index1, index2);

            index1=index2+1;
            //index2 = tmpline.indexOf('#', index1);
            //int tmpsubport = Integer.parseInt(tmpline.substring(index1,index2));
            int tmpsubport = Integer.parseInt(tmpline.substring(index1,
                tmpline.length()));

            // 向dispatcher申请子任务
            //System.out.println("dispatcherip + port: "+tmpmainip+tmpmainport);
            Socket dispSock = new Socket(tmpmainip, tmpmainport);
            BufferedReader dispis = new BufferedReader(new InputStreamReader(
                dispSock.getInputStream()));
            PrintWriter dispos = new PrintWriter(dispSock.getOutputStream());
            ObjectOutputStream oos = new ObjectOutputStream(dispSock.
                getOutputStream());
            dispos.println("<new task>");
            dispos.println(job.jobId);
            dispos.println(Integer.toString(taskCount));
            // sub-dispatcher information
            dispos.println(tmpsubip);
            dispos.println(tmpsubport);
            dispos.flush();

            for (int j = 0; j < taskCount; j++)
            {
                String subid = dispis.readLine();
                taskList.add(new SubTask
                             (subid, tmpmainip, tmpmainport, tmpsubip,
                              tmpsubport));
            } // for
            dispSock.close();
            dispSock = null;
        } // for
    } // newTask

//    static public int getResult(SubTask task, Job job, String filepath)
//    {
//        int state;
//        try
//        {
//            state = getTaskStatus(task,job.jobId);
//        } //try
//        catch (Exception e)
//        {
//            return -1;
//        } // catch
//        switch (state)
//        {
//            case SubTask.SUBTASK_STOP:
//                break;
//            case SubTask.SUBTASK_RUNNING:
//                break;
//            case SubTask.SUBTASK_FINISH:
//                MessageService msgService = new MessageService();
//                int result = msgService.conncetRequest(job.dataServerIp);
//                if (result < 0)
//                {
//                    return -1;
//                } // if
//                if ( (result = msgService.registerServ("worker", "worker",
//                    "admin")) < 0)
//                {
//                    msgService.disConnect();
//                    return -1;
//                } // if
//                if ( result=( msgService.getSubTaskResult(filepath,
//                    "worker",
//                    job.jobId,
//                    task.subTaskId)) < 0)
//                {
//                    msgService.disConnect();
//                    return -1;
//                } // if
//                msgService.disConnect();
//                break;
//              case SubTask.SUBTASK_FAULT:
//              	break;
//            default:
//                break;
//        }
//        return state;
//    } // getResult

    static public String getResult(Object o_task, Job job, String filepath)
    throws Exception
    {
        int state;
        SubTask task=(SubTask)o_task;
        state = getTaskStatus(task, job.jobId);
        String result=null;

        switch (state)
        {
            case SubTask.SUBTASK_STOP:
                break;
            case SubTask.SUBTASK_RUNNING:
                break;
            case SubTask.SUBTASK_FINISH:
                MessageService msgService = new MessageService();
                /*if (msgService.conncetRequest(job.dataServerIp) < 0)
                {
                    throw new ParseException("Data Server Error!", 0);
                } // if
                if ( ( msgService.registerServ(job.userId, job.userPwd,
                    "worker")) < 0)
                {
                    msgService.disConnect();
                    throw new ParseException("Data Server Error!", 0);
                } */// if
                /*if ( ( result=msgService.getSubTaskResult(filepath,
                    job.userId,
                    job.jobId,
                    task.subTaskId)) == null)*/
                if(msgService.getResultOnce(job.dataServerIp, job.userId, job.userPwd,
                			filepath, job.userId, job.jobId, task.subTaskId)!=0)
                {
                    //msgService.disConnect();
                    throw new ParseException("Data Server Error!", 0);
                } // if
                result=msgService.getReturn();
                //msgService.disConnect();
                return result;

              case SubTask.SUBTASK_FAULT:
              	break;
            default:
                break;
        }
        return null;
    } // getResult

    static private int getInnTaskStatus(SubTask task,String jobid, Socket dispSock, BufferedReader dispis, PrintWriter dispos)
    throws Exception
    {
        int state;
        dispos.println("<get subtask status>");
        dispos.println(jobid);
        dispos.println(task.subTaskId);
        dispos.flush();
        String tmpline = dispis.readLine();
        state = Integer.parseInt(tmpline);
        return state;
    }

    static public int getTaskStatus(Object o_task,String jobid)
    throws Exception
    {
        SubTask task=(SubTask)o_task;
    	Socket dispSock = null;
        BufferedReader dispis = null;
        PrintWriter dispos = null;
        int state=-1;
        try
        {
            dispSock = new Socket(task.mainDispIp, task.mainDispPort);
            //dispSock.setSoTimeout(TIME_OUT);
            dispis = new BufferedReader(new InputStreamReader(dispSock.getInputStream()));
            dispos = new PrintWriter(dispSock.getOutputStream());
            ObjectOutputStream oos = new ObjectOutputStream(dispSock.
                    getOutputStream());
            state=getInnTaskStatus(task, jobid, dispSock, dispis, dispos);
            dispis.close();
            dispos.close();
            dispSock.close();
            return state;
        } // try
        catch (IOException e)
        {
            e.printStackTrace();
            dispis.close();
            dispos.close();
            dispSock.close();
            dispis=null;
            dispos=null;
            dispSock=null;

            if (task.mainDispIp==null)
            {
                return state;
            } // if
            task.mainDispIp=task.subDispIp;
            task.mainDispPort=task.subDispPort;
            task.subDispIp=null;
            task.subDispPort=0;
            dispSock = new Socket(task.mainDispIp, task.mainDispPort);
            //dispSock.setSoTimeout(TIME_OUT);
            dispis = new BufferedReader(new InputStreamReader(dispSock.getInputStream()));
            dispos = new PrintWriter(dispSock.getOutputStream());
            ObjectOutputStream oos = new ObjectOutputStream(dispSock.
                    getOutputStream());
            // 这里容错协议,只是增加了一行:"fault tolerance"
            dispos.println("fault tolerance");

            state=getInnTaskStatus(task, jobid, dispSock, dispis, dispos);
            dispis.close();
            dispos.close();
            dispSock.close();
            dispis=null;
            dispos=null;
            dispSock=null;

            return state;
        } // catch
    } // getTaskStatus

    public static int updateTaskData(String path, Job job, int nums)
    {
        MessageService msgService = new MessageService();
        int result = -1;
        /*int result = msgService.conncetRequest(job.dataServerIp);
        if (result < 0)
        {
            return -1;
        } // if
        /*if ( (result = msgService.registerServ(job.userId, job.userPwd,
                                               "worker")) < 0)
        {
            msgService.disConnect();
            return -1;
        }*/ // if
        if (path!=null)
        {
            if ( (result = msgService.updateDataOnce(job.dataServerIp,
            		job.userId, job.userPwd, path, nums,
                job.userId, job.jobId)) < 0)
            {
                //msgService.disConnect();
                return -1;
            } // if
        }
        else
        {
            if ( (result = msgService.updateDataOnce(job.dataServerIp,
            		job.userId, job.userPwd, nums,
	                job.userId, job.jobId)) < 0)
            {
                //msgService.disConnect();
                return -1;
            } // if
        }
        //msgService.disConnect();
        return 1;
    }

    public static int updateTaskData(Job job, int nums)
    {
        MessageService msgService = new MessageService();
        int result=-1; 
		/*= msgService.conncetRequest(job.dataServerIp);
        if (result < 0)
        {
            return -1;
        } // if
        if ( (result = msgService.registerServ(job.userId, job.userPwd,
                                               "worker")) < 0)
        {
            msgService.disConnect();
            return -1;
        } // if

        if ( (result = msgService.updateTaskData( nums,
                                                 job.userId, job.jobId)) < 0)
        {
            msgService.disConnect();
            return -1;
        } // if

        msgService.disConnect();*/
        if ( (result = msgService.updateDataOnce(job.dataServerIp,
        		job.userId, job.userPwd, nums,
                job.userId, job.jobId)) < 0)
        {
            //msgService.disConnect();
            return -1;
        }
        return 1;
    }

} // MainSDK

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -