⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 .#dispatchernodemanagement.java.1.9

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 9
📖 第 1 页 / 共 3 页
字号:
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectOutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Vector;

/**
 * <p>Title: </p>
 * <p>Description:管理worekernode队列,worker的加入、退出处理,同时要完成对worker的状况的统计等工作 </p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DispatcherNodeManagement
{
    private int workerPort; //与worker通信的端口(worker加入的端口)
    private int listenPort; //侦听workerheartbeat的端口
    //private int workerHeartbeatPort; //定期接收worker信息的端口(这两个端口都应初始化)
    private Vector workerNodeList; //worker队列(活着的)
    private Vector idleWorkerNodeList; //空闲worker队列
    private Vector deadWorkerNodeList; //死亡的worker队列
    private Vector jobList;
    
    public Vector ftWorkerNodeList;//容错的worker队列
    private DispatcherInfo dispatcherInfo;
    private static int num = 0;

    private LinkedList deadFlag; //死亡的workerId列表(临时)(未入deadWorkerNodeList之前)

    NodeHandler handler;
    ListenningHandler lHandler;
    TestDeadWorker tHandler;
    
    private JobFaultTolerantHandler jfth=null;
    
//    DispatcherJobManagement jobManager;
    private Integer dummy;

    private ServerSocket workerSocket;
    private ServerSocket listenSocket;

    private static final int TIME_OUT = 1000;

    public DispatcherNodeManagement(int workerPort, int listenPort,
                                    Vector workerNodeList,
                                    Vector idleWorkerNodeList,
                                    Vector deadWorkerNodeList,Vector jobList,
                                    DispatcherInfo dinfo,Integer djm)
    {
        this.workerPort = workerPort;
        this.listenPort = listenPort;
        this.workerNodeList = workerNodeList;
        this.idleWorkerNodeList = idleWorkerNodeList;
        this.deadWorkerNodeList = deadWorkerNodeList;
        this.jobList=jobList;
        this.dispatcherInfo = dinfo;
        this.ftWorkerNodeList=new Vector();
//        this.jobManager=djm;
        this.dummy=djm;
        workerSocket = null;
        listenSocket = null;
        deadFlag = new LinkedList();
        jfth=new JobFaultTolerantHandler(jobList);
    }

    public boolean InitNodeServer()
    {
        try
        {
            workerSocket = new ServerSocket(workerPort);
            //workerSocket.setSoTimeout(TIME_OUT);
            listenSocket = new ServerSocket(listenPort);
            //listenSocket.setSoTimeout(TIME_OUT);
        }
        catch (IOException e)
        {
            log(e);
            System.out.println("Error binding to port");
            return false;
        }
        catch (SecurityException e)
        {
            log(e);
            System.out.println("The security manager refused permission to bind to port");
            return false;
        }
        return true;
    } //InitNodeServer;

    /**
     * 生成线程NodeHandler进行处理
     *
     */
    public boolean StartNodeServer()
    {
        if(InitNodeServer())
        {
        handler = new NodeHandler();
        handler.start();
        lHandler = new ListenningHandler();
        lHandler.start();
        tHandler = new TestDeadWorker();
        tHandler.start();
        return true;
        }
        return false;
    } // StartNodeServer

    public void quit()
    {
    	System.out.println("DispatcherNodeManagement quitting...");
    	if(handler!=null&&handler.isActive)
          handler.terminate();
        if(lHandler!=null&&lHandler.isActive)
    	lHandler.terminate();
        if(tHandler!=null&&tHandler.isAlive())
        {
         tHandler.interrupt();
         tHandler.terminate();
        }
        synchronized(workerNodeList)
		{
        	for(int i=0;i<workerNodeList.size();i++)
        	{
        		WorkerNode wn=(WorkerNode)workerNodeList.elementAt(i);
        		wn.quit();
        	}
		}
        try
		{
        if(!this.workerSocket.isClosed())
        	this.workerSocket.close();
        if(!this.listenSocket.isClosed())
        	this.listenSocket.close();
		}
        catch(Exception e)
		{
        	
		}
        System.out.println("DispatcherNodeManagement quit!");
    } // StopNodeServer

    public int getWorkerCount()
    {
        return workerNodeList.size();
    } //getWorkerCount

    public int getIdleWorkerCount()
    {
        return idleWorkerNodeList.size();
    }

    //内联类:生成serversocket侦听是否有worker要加入
    class NodeHandler
        extends Thread
    {
        private boolean isActive = true;
        public NodeHandler()
        {
        }

        public void run()
        {
            while (isActive)
            {
                Socket socket = null;
                try
                {
                    socket = workerSocket.accept();
                    WorkerJoin w = new WorkerJoin(socket); //连接后处理
                    w.start();
                }
                catch (java.io.InterruptedIOException e)
                {
                //等待连接超时,返回循环开头;(无处理)
                }
                catch (IOException e)
                {
                    log(e);
                    isActive=false;
                    //error("I/O Error", e);
                }
                catch (SecurityException e)
                {
                    log(e);
                    isActive=false;
                    //error("An unauthorized client has attempted to connect", e);
                }
                /*catch (Throwable e)
                                 {
                    error("Unexpected exception", e);
                                 }*/
            }
            System.out.println("handler quit!");
        }

        /**
         * dispatcher收到worker加入请求后,调用此方法将worker入队列
         * @return
         */
        public class WorkerJoin
            extends Thread
        {
            Socket s;
            private BufferedReader is;
            private PrintWriter os;

            public WorkerJoin(Socket sock)
            {
                this.s = sock;
            }

            public void run()
            {

                if (s == null)
                {
                    return;
                }
                if(Parameter.dispatcherLogIsActive)
     	       {
     	       	LogFile lf=new LogFile(Parameter.logFileName);
     	       	try{
     	       		lf.logWorker_login(s.getInetAddress().getHostAddress());
     	       	}
     	       	catch(Exception e)
     			{
     	       		e.printStackTrace();
     			}
     	       }
                try
                {
                    is = new BufferedReader(new InputStreamReader(s.
                        getInputStream()));
                    os = new PrintWriter(s.getOutputStream());
                    //协议传输数据
                    String lineStr = is.readLine();

                    if (lineStr.equals("<Worker join>"))
                    {
                        os.println("<can accept worker>");
                        os.flush();
                        String ipAddr = is.readLine();
                        String pcWorkLoad = is.readLine();

                        String workerId;
                        String dispatcherid = dispatcherInfo.
                            getDispatcherId();

                        if (num == 0)
                        {
                            workerId = "W" +
                                dispatcherid.substring(1, dispatcherid.length()) +
                                "00001";
                            num++;
                        }
                        else
                        {
                            String tmp = Integer.toString(num + 1);
                            int i = 5 - Integer.toString(num + 1).length();
                            while (i-- != 0)
                            {
                                tmp = "0" + tmp;
                            }
                            workerId = "W" +
                                dispatcherid.substring(1, dispatcherid.length()) +
                                tmp;
                            num++;
                        }
                        WorkerNode w = new WorkerNode(workerId, ipAddr,
                            pcWorkLoad, deadFlag);
                        synchronized (workerNodeList)
                        {
                            workerNodeList.addElement(w);
                        }
                        synchronized (idleWorkerNodeList)
                        {
                            idleWorkerNodeList.addElement(w.getWorkerId()); //应该以workerid入队列
                        dispatcherInfo.setWorkerCount(workerNodeList.size());
                        dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.
                            size());
                        }
                        synchronized(dummy)
						{
                        	try{
                        		dummy.notifyAll();
                                }catch(IllegalMonitorStateException e)
        						{
                                	//e.printStackTrace();
                                	System.out.println(e.toString());
        						}
						}
                        //将必要的dispatchernode信息回传给worker
                        //os.println(dispatcherInfo.getDispatcherIp());
                        os.println(workerId);
                        os.println(dispatcherInfo.getDispatcherId());
                        os.println(listenPort);
                        os.flush();
                        lineStr = is.readLine();
                        if (lineStr.equals("<join finish>"))
                        {
                            w.start();
                            //return true; //如果不等。。。??
                            if(Parameter.dispatcherLogIsActive)
                 	       {
                 	       	LogFile lf=new LogFile("displog.log");
                 	       	try{
                 	       		lf.logWorker_login_Succ(s.getInetAddress().getHostAddress(),workerId,workerNodeList.size());
                 	       	}
                 	       	catch(Exception e)
                 			{
                 	       		e.printStackTrace();
                 			}
                 	       }
                        }
                    }
                }
                catch (IOException e)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -