⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jpeerserverthreads.java

📁 JAMPACK Grid programming
💻 JAVA
字号:
/**
 *
 * Program         : JPeerServerThreads.java
 *
 * Author          : Vijayakrishnan Menon
 *
 * Date            : 26th Dec 2005 Updated on 25th Feb 2006
 *
 * Organization    : Centre for Excellence in 
 *						Computational Engineering and Networking (CEN),
 *                   		Amrita Viswa Vidyapeetham
 *
 **/

package JAMPack;

/** The PeerServer is the agent that facilitates point-to-point interprocess 
  * communication with in the Grid Environment. The Environment boots a daemon 
  * thread of this class. This thread listens on a specific port and buffers all
  * sends to this local machine. The receiver is expected to call the receiveCall 
  * method. It waits until a valid, tagged mesage is recieved. by the peerserver.
  * If the tag matches then the PeerAgent comm handel is returned. Otherwise it 
  * again continues to wait until it gets a matching call. One should remember 
  * not to use unpaired receives , it will 'deadlock' the task.  
  **/
  
public class JPeerServerThreads {
	protected JPeerServer _peerDataServerThread;
	protected JPeerServer _peerObjectServerThread;
	
	public JPeerServerThreads(JPeerServer dataServer,JPeerServer objectServer) {
		this._peerDataServerThread = dataServer;
		this._peerObjectServerThread = objectServer;
	}
	
	public void start() {
		this._peerDataServerThread.start();
		this._peerObjectServerThread.start();
	}
	
	public void suspend() {
		this._peerDataServerThread.suspend();
		this._peerObjectServerThread.suspend();		
	}
	
	public void resume() {
		this._peerDataServerThread.resume();
		this._peerObjectServerThread.resume();
	}
} 

/** The actual static server class, that runs two peer IPC servers in 2 threads,
  * one for Data messages and one other for Object transmissions, over the wire.
  * It has a getInstance method that returns not the PeerServers but a Proxy 
  * container for both service threads, the class above this comment 
  **/
 
class JPeerServer extends Thread {
	protected static java.net.ServerSocket _peerDataServer;
	protected static java.net.ServerSocket _peerObjectServer;
	protected static JPeerAgent _agent = new JPeerAgent();
	protected static Object _receiveBlock = new Object();
	protected static java.util.Vector<JPeerAgent> _peerQueue = new java.util.Vector<JPeerAgent>();
	private static JPeerServer _peerDataThread;
	private static JPeerServer _peerObjectThread;
	private static boolean _singleton = false;

	/** The Peer server constructor called only by the environment */
	public static JPeerServerThreads getPeerServerInstance(int dataPort, int objectPort) {
		if(!JPeerServer._singleton){
			try { 
				JPeerServer._peerDataServer = new java.net.ServerSocket(dataPort);
				JPeerServer._peerObjectServer = new java.net.ServerSocket(objectPort);
			}
			catch (java.io.IOException e) { e.printStackTrace() ; }
			
			/*******************************************************************
			 *				Data IPC Server
			 *******************************************************************/
			JPeerServer._peerDataThread = new JPeerServer() {
				
				/** The listening thread which accepts peer socket connections 
				  * from senders.The listener will accept incommeng messages and 
				  * append it into a message pool accessible by receivers. It 
				  * also notifies all the blocked listeners and awakens their 
				  * threads to read the message. The sends are non blocking 
				  * where as the receives cannot proceed to the next task without
				  * the message. Hence they block the current thread and wait 
				  * until the matching send happens. The dialog can happen
				  * in any asynchronous pattern deducible, that might occur in 
				  * realtime. 
				  **/
				public void run() {
					System.out.println("Starting Data IPC Server......on port "+JEnvironment.PEER_DATA_PORT );
					while (true) {
						try { JPeerServer._agent =  new JPeerDataAgent(JPeerServer._peerDataServer.accept()) ;}
						catch(java.io.IOException e) { e.printStackTrace(); }
						synchronized(this._receiveBlock) {
							JPeerServer._peerQueue.add(JPeerServer._agent);
							JPeerServer._receiveBlock.notifyAll();
						}
					}
				}
			};
			
			/********************************************************************
			 *				Object IPC Server
			 ********************************************************************/
			JPeerServer._peerObjectThread = new JPeerServer() {
				
				/** The listening thread which accepts peer socket connections 
				  * from senders.The listener will accept incommeng messages and 
				  * append it into a message pool accessible by receivers. It 
				  * also notifies all the blocked listeners and awakens their 
				  * threads to read the message. The sends are non blocking 
				  * where as the receives cannot proceed to the next task without
				  * the message. Hence they block the current thread and wait 
				  * until the matching send happens. The dialog can happen
				  * in any asynchronous pattern deducible, that might occur in 
				  * realtime. 
				  **/
				public void run() {
					System.out.println("Starting Object IPC Server......on port "+JEnvironment.PEER_OBJECT_PORT);
					while (true) {
						try { JPeerServer._agent =  new JPeerObjectAgent(JPeerServer._peerObjectServer.accept()) ;}
						catch(java.io.IOException e) { e.printStackTrace(); }
						synchronized(this._receiveBlock) {
							JPeerServer._peerQueue.add(JPeerServer._agent);
							JPeerServer._receiveBlock.notifyAll();
						}
					}
				}
			};
						
			JPeerServer._singleton = true;
			return new JPeerServerThreads(JPeerServer._peerDataThread,JPeerServer._peerObjectThread);
		}
		else return null;
	}
	

	/** The block and wait receive method commonly used by any receive in the user's
	  * code. There can be many receives that can happen in seperate threads in a
	  * single local node. So this method is expected to be run by many concurrent 
	  * threads. any thread calling this method will be blocked, util a matching 
	  * message comes in. The method first scans the message queue for the matching 
	  * message. if it finds it takes it and removes it from the queue. or it will 
	  * block and wait until the message arrives and is notified by the server. */
	  
	public static JPeerAgent receiveCall(int tag, int source)  {
		boolean found = false;
		JPeerAgent comm = null;
		int size = JPeerServer._peerQueue.size();
		if( !(JPeerServer._peerQueue.isEmpty()) )
			for(int i=0;i<size;i++) {
				comm = (JPeerAgent)JPeerServer._peerQueue.get(i);
				if(comm.getSource() == source && comm.getTag() == tag){
					JPeerServer._peerQueue.remove(i);
					return comm;
				}
			}
		while (true) {
			try {
				synchronized(JPeerServer._receiveBlock) {
					System.out.println("Blocking...");
					JPeerServer._receiveBlock.wait();
					System.out.println("Checking...");
					if(source == _agent.getSource() && tag == _agent.getTag()) 
						break;					
				}
			}catch(java.lang.InterruptedException e) { e.printStackTrace(); }
		}
		JPeerAgent temp = JPeerServer._agent;
		JPeerServer._peerQueue.remove(temp);
		return temp;
	}
}

/***************************************************************************************

  * The Comm handle, an agent that helps to receive data from a sending peer. This is
  * a base abstraction. There are specialised comm haddles for primitive data and for 
  * higher abstactions (objects).
  *
  * NOTE: Initially I had only the base PeerAgent class, I thought i could handdle both 
  *       object I/O and Data I/O through the same agent , but soo found out that Object 
  *       and data streams dont mix well. So I made it into spesific abstractions like
  *		  bellow.	  
  
****************************************************************************************/

class JPeerAgent {
	protected java.net.Socket _peer;
	protected java.io.BufferedReader _inData;
	protected java.io.ObjectInputStream _inObject;
	protected int _tag;
	protected int _source;

	public JPeerAgent(){}

	public JPeerAgent(java.net.Socket s) {
		this._peer = s;		
	}

	public int getTag(){
		return this._tag;
	}

	public int getSource() {
		return this._source;
	}
	
	public String getData() throws java.io.IOException {
		return this._inData.readLine();
	} 
	
	public Object getObject() throws java.io.IOException,ClassNotFoundException {
		return this._inObject.readObject();
	}

	public void close() { }	
}


/** This one is for data messages and primitive types, arrays of that etc. */

class JPeerDataAgent extends JPeerAgent{
	
	public JPeerDataAgent(java.net.Socket s) {
		super(s);
		try {
			this._inData = new java.io.BufferedReader( new java.io.InputStreamReader(s.getInputStream()) );
			this._tag = Integer.parseInt(_inData.readLine());
			this._source = Integer.parseInt(_inData.readLine());
		}catch(java.io.IOException e) { e.printStackTrace();}
	}
	
	public void close() {
		try {
			this._inData.close();
			this._peer.close();
		}catch(java.io.IOException e) { e.printStackTrace(); }
	}
}

/** This one is for all types of objects, popular ones though. I meant the runtime type known
  * to both sender and receiver. Otherwise a nasty ClassUnknowException will be thrown by the 
  * receiver.  */

class JPeerObjectAgent extends JPeerAgent{
	
	public JPeerObjectAgent(java.net.Socket s) {
		super(s);
		try {
			this._inObject = new java.io.ObjectInputStream( new java.io.BufferedInputStream(s.getInputStream()) );
			this._tag = this._inObject.readInt();
			this._source = this._inObject.readInt();
		}catch(java.io.IOException e) { e.printStackTrace();}
	}
	
	public void close() {
		try {
			this._inObject.close();
			this._peer.close();
		}catch(java.io.IOException e) { e.printStackTrace(); }
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -