⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 monitornodemanagement.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.monitor;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * <p>Title: </p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class MonitorNodeManagement extends Thread
{
    DispatcherGroup dispGroup;
    MonitorGroup monGroup;
    MonitorInfo monInfo;
    LocalMonitorNode localNode;
    int nodePort;

    ServerSocket serverSock;
    NodeHandler jobHandler;
    boolean flag=true;
    
    public MonitorNodeManagement(DispatcherGroup dispgroup, MonitorGroup mongroup, MonitorInfo moninfo,int nodeport, LocalMonitorNode localnode)
    {
        this.localNode=localnode;
        this.dispGroup=dispgroup;
        this.monGroup=mongroup;
        this.monInfo=moninfo;
        this.nodePort=nodeport;
    } // NodeServerHandler

    public void run()
    {
        Socket clientSock;
        try
        {
            serverSock = new ServerSocket(this.nodePort);
            System.out.print("Begin Monitor Node Management Accept\n");
            while ( flag&&(clientSock = serverSock.accept()) != null)
            {
                /*
                System.out.print("Node Management Accept:");
                System.out.println(clientSock.getInetAddress().getHostAddress());
                */
                jobHandler = new NodeHandler(
                    this.dispGroup, this.monGroup, this.monInfo, clientSock, nodePort, this.localNode);
                jobHandler.start();
                //this.jobServerState = MonitorJobManagement.SERVER_RUNNING;
            } // while
        } // try
        catch (Exception e)
        {
            //this.jobServerState=MonitorJobManagement.SERVER_ERROR;
        } // catch
        System.out.println("MonitorNodeManagement's quit!");
        return;
    } // run
    
    public void quit()
    {
    	System.out.println("MonitorNodeManagement start to quitting...");
    	try{
    		flag=false;
    		if(!serverSock.isClosed())
    		  this.serverSock.close();
    		if(jobHandler.isAlive())
    		   this.jobHandler.quit();
    		this.dispGroup.quit();
    	}
    	catch(Exception e)
		{
    		
		}
    	System.out.println("MonitorNodeManagement's quitting finished!");
    }
} // NodeServerHandler

class NodeHandler
    extends Thread
{
    private Socket clientSock;
    private DispatcherGroup dispGroup;
    private MonitorGroup monitorGroup;
    private MonitorInfo monitorInfo;
    private int nodeServerPort;
    private int taskPort=5007;//the communication port ,
    private LocalMonitorNode localNode;

    public NodeHandler(DispatcherGroup dispgroup, MonitorGroup mongroup,
                       MonitorInfo moninfo, Socket s, int nodeport, LocalMonitorNode localnode)
    {
        this.localNode=localnode;
        clientSock = s;
        this.monitorGroup = mongroup;
        this.monitorInfo = moninfo;
        this.nodeServerPort = nodeport;
        dispGroup=dispgroup;
    } // NodeServerHandler

    public void run()
    {
        BufferedReader is;
        PrintWriter os;
        if (clientSock == null)
        {
            return;
        } // if
        try
        {
            System.out.print("Begin Monitor Node Handler Accept");
            String dispip=clientSock.getInetAddress().getHostAddress();
            System.out.println(clientSock.getInetAddress().getHostAddress());
            is = new BufferedReader(new InputStreamReader(clientSock.
                getInputStream()));
            os = new PrintWriter(clientSock.getOutputStream());
            String lineStr = is.readLine();
            if (lineStr.equals("monitor join"))
            {
//            	log
                if(MonitorConfiuration.MonLogIsActive)
                {
                LogFile lf=new LogFile(MonitorConfiuration.logFileName);
                try{
                lf.logMonitor_login(dispip);
                }catch(Exception e)
        		{
                	e.printStackTrace();
        		}
                }//log
                monitorJoin(is, os);
            } // if
            else if (lineStr.equals("dispatcher join"))
            {
            	if(is.readLine().equals(dispip))
            	{

                   disPatcherJoin(is, os);
            	}
            	else {os.println("LAN");
            	os.flush();
            	System.out.println("this dispatcher(ip is "+dispip+")is locate in LAN, the joinning is rejected!" );
            	}
            } // else if
            else if (lineStr.equals("dispatcher join in"))
            {
//            	log
                if(MonitorConfiuration.MonLogIsActive)
                {
                LogFile lf=new LogFile(MonitorConfiuration.logFileName);
                try{
                lf.logDispatcher_login(dispip);
                }catch(Exception e)
        		{
                	e.printStackTrace();
        		}
                }
                this.dispatcherJoinIn(is, os);
            } // else if
            else if (lineStr.equals("worker join"))
            {
                workerJoin(is, os);
            } // else if
            // 还需要处理worker加入
            else if (lineStr.equals("worker join in"))
            {
                workerJoinIn(is, os);
            } // else if

            is.close();
            os.close();
            this.clientSock.close();
            this.clientSock = null;
            is = null;
            os = null;
        } // try
        catch (Exception e)
        {
            System.out.println(e.toString());
            is = null;
            os = null;
            clientSock=null;
        } // catch

        return;
    } // run

    private void disPatcherJoin(BufferedReader is, PrintWriter os)
        throws Exception
    {
        /*
        MonitorNode monitor = this.monitorGroup.getHeaviestDispMonitor();
        os.println(monitor.getMonConf().getMonitorIp());
        os.println(monitor.getMonConf().getJoinPort());
        */
        os.println(this.localNode.monConfig.getMonitorIp());
        os.println(localNode.monConfig.getJoinPort());
        os.println("dispatcher join finish");
        os.flush();
        return;
    } // DisPatcherJoin

    private void workerJoin(BufferedReader is, PrintWriter os)
        throws Exception
    {
        /*
        MonitorNode monitor = this.monitorGroup.getHeaviestWorkerMonitor();
        os.println(monitor.getMonConf().getMonitorIp());
        os.println(monitor.getMonConf().getJoinPort());
        */
        os.println(this.localNode.monConfig.getMonitorIp());
        os.println(localNode.monConfig.getJoinPort());
        os.println("worker join finish");
        os.flush();
        return;
    } // DisPatcherJoin

    private void monitorJoin(BufferedReader is, PrintWriter os)
    throws Exception
    {
        int count=monitorGroup.getMonitorCount();
        os.println(count);
        for (int i=0; i<count; i++)
        {
            // 需要修改!!!
            MonitorNode monitor = this.monitorGroup.getMonitor(i);
            os.println(monitor.getMonConf().getMonitorIp());
            os.println(monitor.getMonConf().getJoinPort());
        } // for
        os.println("monitor join finish");
        os.flush();
    } // monitorJoin

    private void dispatcherJoinIn(BufferedReader is, PrintWriter os)
    throws Exception
    {
        System.out.println("Dispater Join In");
        String dispIp=is.readLine();
        StringBuffer dispId=new StringBuffer(dispGroup.getId());
        dispId.setCharAt(0,'D');
        int tmp=dispGroup.getNodeCount();
        if (tmp<100)
        {
            dispId.append('0');
        } // if
        dispId.append(tmp);
        String tmpstr=is.readLine();
        int dispPort=Integer.parseInt(tmpstr);
        tmpstr=is.readLine();
        int workerPort=Integer.parseInt(tmpstr);
        tmpstr=is.readLine();
        int taskPort=Integer.parseInt(tmpstr);
        //System.out.println(workerPort);
        os.println(dispId.toString());
        os.println(this.monitorInfo.getMonitorId());
        os.println("dispatcher join in finish");
        os.flush();

        DispatcherNode tmpNode=new DispatcherNode(dispId.toString(),
                                                  dispIp, dispPort,
                                                  workerPort,taskPort,
                                                  this.dispGroup);
        this.dispGroup.addNode(tmpNode);
        tmpNode.start();
//    	log
        if(MonitorConfiuration.MonLogIsActive)
        {
        LogFile lf=new LogFile(MonitorConfiuration.logFileName);
        try{
        lf.logDispatcher_login_Succ(tmpNode.GetIp(),tmpNode.getNodeInfo().getDispatcherId(),this.dispGroup.getNodeCount());
        }catch(Exception e)
		{
        	e.printStackTrace();
		}
        }
    } // dispatcherJoinIn
    
    public void quit()
    {
    	try{
    		if(!this.clientSock.isClosed())
    	        this.clientSock.close();
    	}catch(Exception e)
		{
    		e.printStackTrace();
		}
    }

    private void workerJoinIn(BufferedReader is, PrintWriter os)
    throws Exception
    {
        System.out.println("Worker Join In");
         this.dispGroup.workerJoin(is, os);
        os.println("worker join in finish");
        os.flush();
        System.out.println("worker join in finish");
    } // workerJoinIn

    private void monitorJoinIn(BufferedReader is, PrintWriter os)
    {

    } // monitorJoinIn

} // NodeServerHandler

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -