⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 receive_mpoll.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
/*
 * Created on 2004-12-4
 *
 * TODO To change the template for this generated file go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
package cn.edu.hust.cgcl.biogrid.dispatcher;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
//import java.util.Vector;



/**
 * @author Administrator
 *
 * TODO To change the template for this generated type comment go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
public class Receive_MPoll extends Thread{
	private int serverPort;
	private ServerSocket serverSocket;

    private boolean subflag=true;
	
	DispatcherInfo dispatcherInfo;
	
	private boolean isActive=true;
	public Receive_MPoll(int port,DispatcherInfo info)
	{
		this.serverPort=port;
		this.dispatcherInfo=info;
	}
	
	public void run()
    {
        try
        {
            serverSocket = new ServerSocket(this.serverPort);
            while (isActive)
            {
                Socket socket = null;
                try
                {
                    socket = serverSocket.accept();
                    Process p = new Process(socket); //连接后处理
                    p.start();
                }
                catch (Exception e)
				{
                	System.out.println(e.toString());
                	//e.printStackTrace();
				}
            } // while
        } // try
        /*catch(SocketTimeoutException e)
		{
        	System.out.println("The monitor has downed now!");
		}*/
        catch (Exception e)
        {
        	e.printStackTrace();
        }
        System.out.println("Receive pool handler quit!");
    } // run
	
	
	/**
    *
    * <p>Title: </p>
    * <p>Description:连接处理线程:内联类 </p>
    * <p>Copyright: Copyright (c) 2004</p>
    * <p>Company: </p>
    * @author not attributable
    * @version 1.0
    */
   public class Process
       extends Thread
   {
       Socket socket;
       private BufferedReader is;
       private PrintWriter os;
       //private ObjectOutputStream oob;
       //private ObjectInputStream iob;
       private boolean flag=true;
       public Process(Socket sock)
       {
           this.socket = sock;
           try{
           is = new BufferedReader(new InputStreamReader(socket.
               getInputStream()));
           os = new PrintWriter(socket.getOutputStream());
           //iob = new ObjectInputStream(socket.getInputStream());
           //oob = new ObjectOutputStream(socket.getOutputStream());
           }
           catch(Exception e)
			{
           	e.printStackTrace();
			}
           System.out.println("Monitor onpoll start");
       }
       

       public void run()
       {
           //process the query from monitor
       	while(subflag&&flag)
       	{
           if (socket == null || socket.isClosed())
           {
               return;
           }
           try
           {
               //transmit data based the protocol
               String lineStr = is.readLine();           
               /*******************************
                * To communicate with monitor, and receive job and count
                ********************************/
               System.out.println(lineStr);
               if (lineStr.equals("<Dispatcher poll>"))
               {
               	System.out.println("<Dispatcher poll>");
                   //dispatcherInfo.dispatcherUpdate();
                   //the update cost about one second 
                   //System.out.println("idleworkercount: "+dispatcherInfo.getIdleWorkerCount());
                   //System.out.println("workercount: "+dispatcherInfo.getWorkerCount());
                   //oob.writeObject(dispatcherInfo); 
                   //oob.flush();
                   os.println(dispatcherInfo.getDispatcherId());
                   //os.println(dispatcherInfo.getFirstMonitorId());
                   //os.println(dispatcherInfo.getFirstMonitorIp());
                   //os.println(dispatcherInfo.getSecondMonitorId());
                   //os.println(dispatcherInfo.getSecondMonitorIp());
                   os.println(dispatcherInfo.getJobCount());
                   os.println(dispatcherInfo.getWorkerCount());
                   os.println(dispatcherInfo.getIdleWorkerCount());
                   os.println(dispatcherInfo.getCanAcceptJob());
                   os.println(dispatcherInfo.getDispatcherLoad());
                   //os.println(dispatcherInfo.getDispacherJob());
                   os.flush();
                   //System.out.println(lineStr);
                   lineStr = is.readLine();
                   if (lineStr.equals("<poll finish>"))
                   {
                       //System.out.println(lineStr);
                   //return; 
                   }
               }
               else{
                System.out.println("what: "+lineStr);
               	System.out.println("commnuication error!");
               }
           }
           catch (Exception e)
           {
            flag=false;
            System.out.println(e.toString());
               try
               {
                socket.close();
               }
               catch (IOException e1)
               {
                   e1.printStackTrace();
                   flag=false;
               }
           }
       	}//while
       	try{
       	if(!socket.isClosed())
       		socket.close();
       	}catch(Exception e)
		{
       		
		}
       	return;
       }//run
     } //process
   
   public void quit()
   {
   	System.out.println("Receive pool handler quitting...");
   	subflag=false;
   	isActive=false;
   	
   	if(!this.serverSocket.isClosed())
   	{
   	try{
   	     this.serverSocket.close();
   	}catch(Exception e)
		{
   		e.printStackTrace();
		}
   	}
   }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -