📄 jpeerserverthreads.java
字号:
/**
*
* Program : JPeerServerThreads.java
*
* Author : Vijayakrishnan Menon
*
* Date : 26th Dec 2005 Updated on 25th Feb 2006
*
* Organization : Centre for Excellence in
* Computational Engineering and Networking (CEN),
* Amrita Viswa Vidyapeetham
*
**/
package JAMPack;
/** The PeerServer is the agent that facilitates point-to-point interprocess
* communication with in the Grid Environment. The Environment boots a daemon
* thread of this class. This thread listens on a specific port and buffers all
* sends to this local machine. The receiver is expected to call the receiveCall
* method. It waits until a valid, tagged mesage is recieved. by the peerserver.
* If the tag matches then the PeerAgent comm handel is returned. Otherwise it
* again continues to wait until it gets a matching call. One should remember
* not to use unpaired receives , it will 'deadlock' the task.
**/
public class JPeerServerThreads {
protected JPeerServer _peerDataServerThread;
protected JPeerServer _peerObjectServerThread;
public JPeerServerThreads(JPeerServer dataServer,JPeerServer objectServer) {
this._peerDataServerThread = dataServer;
this._peerObjectServerThread = objectServer;
}
public void start() {
this._peerDataServerThread.start();
this._peerObjectServerThread.start();
}
public void suspend() {
this._peerDataServerThread.suspend();
this._peerObjectServerThread.suspend();
}
public void resume() {
this._peerDataServerThread.resume();
this._peerObjectServerThread.resume();
}
}
/** The actual static server class, that runs two peer IPC servers in 2 threads,
* one for Data messages and one other for Object transmissions, over the wire.
* It has a getInstance method that returns not the PeerServers but a Proxy
* container for both service threads, the class above this comment
**/
class JPeerServer extends Thread {
protected static java.net.ServerSocket _peerDataServer;
protected static java.net.ServerSocket _peerObjectServer;
protected static JPeerAgent _agent = new JPeerAgent();
protected static Object _receiveBlock = new Object();
protected static java.util.Vector<JPeerAgent> _peerQueue = new java.util.Vector<JPeerAgent>();
private static JPeerServer _peerDataThread;
private static JPeerServer _peerObjectThread;
private static boolean _singleton = false;
/** The Peer server constructor called only by the environment */
public static JPeerServerThreads getPeerServerInstance(int dataPort, int objectPort) {
if(!JPeerServer._singleton){
try {
JPeerServer._peerDataServer = new java.net.ServerSocket(dataPort);
JPeerServer._peerObjectServer = new java.net.ServerSocket(objectPort);
}
catch (java.io.IOException e) { e.printStackTrace() ; }
/*******************************************************************
* Data IPC Server
*******************************************************************/
JPeerServer._peerDataThread = new JPeerServer() {
/** The listening thread which accepts peer socket connections
* from senders.The listener will accept incommeng messages and
* append it into a message pool accessible by receivers. It
* also notifies all the blocked listeners and awakens their
* threads to read the message. The sends are non blocking
* where as the receives cannot proceed to the next task without
* the message. Hence they block the current thread and wait
* until the matching send happens. The dialog can happen
* in any asynchronous pattern deducible, that might occur in
* realtime.
**/
public void run() {
System.out.println("Starting Data IPC Server......on port "+JEnvironment.PEER_DATA_PORT );
while (true) {
try { JPeerServer._agent = new JPeerDataAgent(JPeerServer._peerDataServer.accept()) ;}
catch(java.io.IOException e) { e.printStackTrace(); }
synchronized(this._receiveBlock) {
JPeerServer._peerQueue.add(JPeerServer._agent);
JPeerServer._receiveBlock.notifyAll();
}
}
}
};
/********************************************************************
* Object IPC Server
********************************************************************/
JPeerServer._peerObjectThread = new JPeerServer() {
/** The listening thread which accepts peer socket connections
* from senders.The listener will accept incommeng messages and
* append it into a message pool accessible by receivers. It
* also notifies all the blocked listeners and awakens their
* threads to read the message. The sends are non blocking
* where as the receives cannot proceed to the next task without
* the message. Hence they block the current thread and wait
* until the matching send happens. The dialog can happen
* in any asynchronous pattern deducible, that might occur in
* realtime.
**/
public void run() {
System.out.println("Starting Object IPC Server......on port "+JEnvironment.PEER_OBJECT_PORT);
while (true) {
try { JPeerServer._agent = new JPeerObjectAgent(JPeerServer._peerObjectServer.accept()) ;}
catch(java.io.IOException e) { e.printStackTrace(); }
synchronized(this._receiveBlock) {
JPeerServer._peerQueue.add(JPeerServer._agent);
JPeerServer._receiveBlock.notifyAll();
}
}
}
};
JPeerServer._singleton = true;
return new JPeerServerThreads(JPeerServer._peerDataThread,JPeerServer._peerObjectThread);
}
else return null;
}
/** The block and wait receive method commonly used by any receive in the user's
* code. There can be many receives that can happen in seperate threads in a
* single local node. So this method is expected to be run by many concurrent
* threads. any thread calling this method will be blocked, util a matching
* message comes in. The method first scans the message queue for the matching
* message. if it finds it takes it and removes it from the queue. or it will
* block and wait until the message arrives and is notified by the server. */
public static JPeerAgent receiveCall(int tag, int source) {
boolean found = false;
JPeerAgent comm = null;
int size = JPeerServer._peerQueue.size();
if( !(JPeerServer._peerQueue.isEmpty()) )
for(int i=0;i<size;i++) {
comm = (JPeerAgent)JPeerServer._peerQueue.get(i);
if(comm.getSource() == source && comm.getTag() == tag){
JPeerServer._peerQueue.remove(i);
return comm;
}
}
while (true) {
try {
synchronized(JPeerServer._receiveBlock) {
System.out.println("Blocking...");
JPeerServer._receiveBlock.wait();
System.out.println("Checking...");
if(source == _agent.getSource() && tag == _agent.getTag())
break;
}
}catch(java.lang.InterruptedException e) { e.printStackTrace(); }
}
JPeerAgent temp = JPeerServer._agent;
JPeerServer._peerQueue.remove(temp);
return temp;
}
}
/***************************************************************************************
* The Comm handle, an agent that helps to receive data from a sending peer. This is
* a base abstraction. There are specialised comm haddles for primitive data and for
* higher abstactions (objects).
*
* NOTE: Initially I had only the base PeerAgent class, I thought i could handdle both
* object I/O and Data I/O through the same agent , but soo found out that Object
* and data streams dont mix well. So I made it into spesific abstractions like
* bellow.
****************************************************************************************/
class JPeerAgent {
protected java.net.Socket _peer;
protected java.io.BufferedReader _inData;
protected java.io.ObjectInputStream _inObject;
protected int _tag;
protected int _source;
public JPeerAgent(){}
public JPeerAgent(java.net.Socket s) {
this._peer = s;
}
public int getTag(){
return this._tag;
}
public int getSource() {
return this._source;
}
public String getData() throws java.io.IOException {
return this._inData.readLine();
}
public Object getObject() throws java.io.IOException,ClassNotFoundException {
return this._inObject.readObject();
}
public void close() { }
}
/** This one is for data messages and primitive types, arrays of that etc. */
class JPeerDataAgent extends JPeerAgent{
public JPeerDataAgent(java.net.Socket s) {
super(s);
try {
this._inData = new java.io.BufferedReader( new java.io.InputStreamReader(s.getInputStream()) );
this._tag = Integer.parseInt(_inData.readLine());
this._source = Integer.parseInt(_inData.readLine());
}catch(java.io.IOException e) { e.printStackTrace();}
}
public void close() {
try {
this._inData.close();
this._peer.close();
}catch(java.io.IOException e) { e.printStackTrace(); }
}
}
/** This one is for all types of objects, popular ones though. I meant the runtime type known
* to both sender and receiver. Otherwise a nasty ClassUnknowException will be thrown by the
* receiver. */
class JPeerObjectAgent extends JPeerAgent{
public JPeerObjectAgent(java.net.Socket s) {
super(s);
try {
this._inObject = new java.io.ObjectInputStream( new java.io.BufferedInputStream(s.getInputStream()) );
this._tag = this._inObject.readInt();
this._source = this._inObject.readInt();
}catch(java.io.IOException e) { e.printStackTrace();}
}
public void close() {
try {
this._inObject.close();
this._peer.close();
}catch(java.io.IOException e) { e.printStackTrace(); }
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -