📄 napletsocket.java
字号:
package naplet.nsocket;
import java.io.*;
import java.net.*;
import java.math.BigInteger;
import java.util.Random;
import java.security.*;
import java.util.StringTokenizer;
import naplet.NapletID;
/**
* NapletSocket is a wrapper of Java Socket.It provides standard socket APIs
* and some additional functions for connection migration.
*
*/
public class NapletSocket
implements java.io.Serializable
{
/**
* Real socket under this NapletSocket
*/
private transient Socket dataSocket;
/**
* need to suspend and resume I/O
*/
private transient NSocketInputStream dataReader;
private transient NSocketOutputStream dataWriter;
/**
* Socket ID to identify this socket
*/
private String localSocketID;
/**
* use to tell which remote socket to resume
*/
private String remoteSocketID;
/**
* current state of NapletSocket
*/
private int iSocketState = Global.CLOSED;
/**
* Connection type of this socket, whether persistent or transient
*/
private boolean bPersistentFlag;
/**
* record when suspend, use to send resume request when resume
*/
private InetAddress remoteAddress;
/** use for diffie-hellman key exchange */
private BigInteger publicKey;
/** remote naplet*/
private NapletID destnid;
private NapletID srcnid;
/**
* Priority for this side of connection. Used when two peers decide to migrate
* at the same time. It is decided by a hash code of naplet id. Two peers of
* a connection should have different priority which is decide in connection
* setup.
*/
private int priority;
/**
* Whether concurrent migration happens for this connection
*/
private boolean dualMigration;
/**
* Whether receving a suspend request just after acknowledging one
*/
private boolean suspendAfterACK;
/**
* Whether just sending a suspend ack message
*/
private boolean suspendACKSent;
/**
* Buffer for this connection to hold in-flight data during agent
* migration
*/
private ByteBuffer buffer;
/**
* construct a NapletSocket using a naplet ID
* @param nid
*/
public NapletSocket( NapletID src, NapletID dest )
throws IOException, NoSocketControllerException
{
this( src, dest, true );
}
/**
* constructor for Napletsocket
*
* @param src Who creates this napletsocket
* @param dest To which naplet
* @param persistent Whether persistent or not
* @throws IOException
*/
public NapletSocket( NapletID src, NapletID dest, boolean persistent )
throws IOException, NoSocketControllerException
{
srcnid = src;
destnid = dest;
int srcCode = src.toString().hashCode();
log( "***dest:" + dest );
//nid.setVersion(".1"); //dest
int destCode = destnid.toString().hashCode();
log( "dest:" + destCode + ",src:" + srcCode );
// !! zhongxl assign priority according to hashcode of naplet ID toString
if ( destCode < srcCode )
{
priority = Global.HIGH_PRIORITY;
}
bPersistentFlag = persistent;
localSocketID = Global.generateID();
dataSocket
= SocketController.registerSocket( this, localSocketID, destnid );
BufferedWriter bufferedwriter
= ( new BufferedWriter
( new OutputStreamWriter( dataSocket.getOutputStream() ) ) );
BufferedReader bufferedreader
= ( new BufferedReader
( new InputStreamReader( dataSocket.getInputStream() ) ) );
String connectmsg = "ConnectMsg:" + destnid + ":" + priority + ":" +
persistent;
BigInteger x = null;
if ( SocketController.GENKEY )
{
x = new BigInteger( Global.KEYSIZE, new Random() );
BigInteger tosend = Global.G.modPow( x, Global.N );
connectmsg += ":" + tosend;
}
bufferedwriter.write( connectmsg );
bufferedwriter.newLine();
bufferedwriter.flush();
// always important to read a ACK
String res = bufferedreader.readLine();
if ( SocketController.GENKEY )
{
StringTokenizer stringtokenizer = new StringTokenizer( res, ":" );
String s1 = stringtokenizer.nextToken(); //ack
String s2 = stringtokenizer.nextToken(); //seed to send
BigInteger seed = new BigInteger( s2 );
publicKey = seed.modPow( x, Global.N );
}
initSocket( dataSocket.getInetAddress(), SocketController.ControlPort );
}
/**
* Now no use in current release, keep just for reference.
* Remove for security concern.
* @param host
* @param port
* @throws IOException
*/
private NapletSocket( String host, int port )
throws IOException
{
if ( host == null )
{
throw new NullPointerException( "remoteHostName" );
}
localSocketID = Global.generateID();
// socket for data, connect to remote server socket
dataSocket = new Socket( host, port );
}
/**
* Called from NapletServerSocket.accept()
* @param socket
* @throws IOException
*/
protected NapletSocket( Socket socket )
throws IOException, NoSocketControllerException
{
this( socket, false );
}
protected NapletSocket( Socket socket, boolean flag )
throws IOException, NoSocketControllerException
{
log( "call in naplet(socket)" );
if ( socket == null )
{
throw new NullPointerException( "socket" );
}
dataSocket = socket;
initSocketServer();
bPersistentFlag = flag;
}
ByteBuffer getBuffer()
{
return buffer;
}
void setBuffer( ByteBuffer buf )
{
buffer = buf;
}
int getPriority()
{
return priority;
}
void setPriority( int pri )
{
priority = pri;
}
/**
* whether concurrent migration happens for this connection.
* @return
*/
public boolean isDualMigration()
{
return dualMigration;
}
/**
* Two peers try to migrate at the same time for this connection.
* @param dual
*/
void setDualMigration( boolean dual )
{
dualMigration = dual;
}
/**
* A suspend request is received before an acknowledgement has been issued.
* @return
*/
boolean isSuspendAfterACK()
{
return suspendAfterACK;
}
void setSuspendAfterACK( boolean sus )
{
suspendAfterACK = sus;
}
/**
* Whether a suspend ack message has been sent.
* @return
*/
boolean isSuspendACKSent()
{
return suspendACKSent;
}
void setSuspendACKSent( boolean sus )
{
suspendACKSent = sus;
}
/**
* initialize some conditions
*/
void initCondition()
{
suspendACKSent = false;
dualMigration = false;
suspendAfterACK = false;
//srcnid=null;
//destnid=null;
}
/**
* Get key for Diffe-Hellman
* @return
*/
public BigInteger getPublicKey()
{
return publicKey;
}
public void setPublicKey( BigInteger key )
{
publicKey = key;
}
/**
* Address of the other peer of this connection.
* @param remote
*/
void setRemoteAddress( InetAddress remote )
{
remoteAddress = remote;
}
/**
* Set destination napletid of this connection
* @param id
*/
void setDestNapletID( NapletID id )
{
destnid = id;
}
public String getDestNapletID()
{
if ( destnid == null )
{
return null;
}
return destnid.toString();
}
/**
* Source napletid of this connection
* @param id
*/
void setSrcNapletID( NapletID id )
{
srcnid = id;
}
public String getSrcNapletID()
{
if ( srcnid == null )
{
return null;
}
return srcnid.toString();
}
/**
* Whether this connection is persistent or not.
* @return
*/
public boolean isPersistent()
{
return bPersistentFlag;
}
/**
* Gracefully close the connection
* @throws IOException
*/
public synchronized void close()
throws IOException
{
while ( iSocketState == Global.SUSPENDED_LOCAL ||
iSocketState == Global.SUSPENDED_REMOTE )
{
try
{
this.wait();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
}
SocketController.removeSocket( localSocketID );
try
{
String msg = "CloseMsg:" + remoteSocketID
+ ":" + dataSocket.getPort();
sendControlMessage( msg );
}
catch ( Exception exception )
{
exception.printStackTrace();
}
dataSocket.close();
iSocketState = Global.CLOSED;
}
/**
* When get a close request, call this to close remote half part
* gracefully. Or else, this side of nsocket wont know the connection
* is being closed by remote part.
* @throws IOException
*/
protected synchronized void closeRemotely()
throws IOException
{
while ( iSocketState == Global.SUSPENDED_LOCAL ||
iSocketState == Global.SUSPENDED_REMOTE )
{
try
{
this.wait();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
}
SocketController.removeSocket( localSocketID );
dataReader.close();
dataReader = null;
dataWriter.close();
dataWriter = null;
dataSocket.close();
iSocketState = Global.CLOSED;
log( "socket closed reomotely" );
}
public InetAddress getInetAddress()
{
if ( dataSocket == null )
{
return null;
}
else
{
return dataSocket.getInetAddress();
}
}
/**
* return an outputstream for this NapletSocket
* @return
* @throws IOException
*/
public synchronized NSocketOutputStream getOutputStream()
throws IOException
{
while ( iSocketState != Global.RESUMED )
{
try
{
wait();
}
catch ( InterruptedException ex )
{
ex.printStackTrace();
}
}
dataWriter
= new NSocketOutputStream( dataSocket.getOutputStream() );
return dataWriter;
}
/**
* Return an inputstream for this napletsocket.
* @return
* @throws IOException
*/
public synchronized NSocketInputStream getInputStream()
throws IOException
{
while ( iSocketState != Global.RESUMED )
{
try
{
wait();
}
catch ( InterruptedException ex )
{
ex.printStackTrace();
}
}
dataReader
= new NSocketInputStream( dataSocket.getInputStream(),
dataSocket.getReceiveBufferSize(), this );
return dataReader;
}
/**
* Gets the local address to which the socket is bound.
* @return
*/
public InetAddress getLocalAddress()
{
return dataSocket.getLocalAddress();
}
/**
* Returns the local port to which this socket is bound.
* @return
*/
public int getLocalPort()
{
return dataSocket.getLocalPort();
}
/**
* Returns the remote port to which this socket is connected.
* @return
*/
public int getPort()
{
return dataSocket.getPort();
}
public int getReceiveBufferSize()
throws SocketException
{
return dataSocket.getReceiveBufferSize();
}
/**
* return the remote socket id of this connection
* @return
*/
protected String getRemoteSocketID()
{
return remoteSocketID;
}
/**
* Returns the remote port to which this socket is connected.
* @return
*/
protected InetAddress getResumeAddress()
{
return remoteAddress;
}
public int getSendBufferSize()
throws SocketException
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -