📄 napletsocket.java
字号:
{
return dataSocket.getSendBufferSize();
}
public int getSoLinger()
throws SocketException
{
return dataSocket.getSoLinger();
}
public int getSoTimeout()
throws SocketException
{
return dataSocket.getSoTimeout();
}
protected String getSocketID()
{
return localSocketID;
}
public boolean getTcpNoDelay()
throws SocketException
{
return dataSocket.getTcpNoDelay();
}
/**
* The two params can be used to query socket type of the remote.
* Called when clients exchange some info with server.
* @param inetaddress
* @param i
* @throws IOException
*/
private void initSocket( InetAddress inetaddress, int port )
throws IOException
{
BufferedReader bufferedreader
= new BufferedReader( new InputStreamReader( dataSocket
.getInputStream() ) );
BufferedWriter bufferedwriter
= ( new BufferedWriter
( new OutputStreamWriter( dataSocket.getOutputStream() ) ) );
// send local socket ID
bufferedwriter.write( localSocketID );
bufferedwriter.newLine();
bufferedwriter.flush();
// receive remote ID
remoteSocketID = bufferedreader.readLine();
// setup I/O stream
dataReader = new NSocketInputStream( dataSocket.getInputStream(),
dataSocket.getReceiveBufferSize(),
this );
dataWriter = new NSocketOutputStream( dataSocket.getOutputStream() );
iSocketState = Global.RESUMED;
}
/**
* Initial socket of ServerSocket. Called when constructing a nsocket at
* server side using napletsocket(socket).
* @throws IOException
*/
private void initSocketServer()
throws IOException, NoSocketControllerException
{
localSocketID = Global.generateID();
// add naplet socket to local table
SocketController.registerSocket( this, localSocketID );
BufferedReader bufferedreader
= new BufferedReader( new InputStreamReader( dataSocket
.getInputStream() ) );
BufferedWriter bufferedwriter
= ( new BufferedWriter
( new OutputStreamWriter( dataSocket.getOutputStream() ) ) );
// read remote socket ID
remoteSocketID = bufferedreader.readLine();
// send local one
bufferedwriter.write( localSocketID );
bufferedwriter.newLine();
bufferedwriter.flush();
// setup I/O streams
log( "begin setting up i/o" );
dataReader = new NSocketInputStream( dataSocket.getInputStream(),
dataSocket.getReceiveBufferSize(),
this );
dataWriter = new NSocketOutputStream( dataSocket.getOutputStream() );
iSocketState = Global.RESUMED;
}
public synchronized boolean isSuspended()
{
if ( iSocketState == Global.RESUMED )
{
return false;
}
return true;
}
/**
* resume a connection.
* @return
* @throws IOException
*/
public synchronized boolean resume()
throws IOException
{
if ( iSocketState == Global.RESUMED )
{
return false;
}
try
{
log( "call resume here to:" + remoteAddress );
dataSocket
= SocketController.createSocket( remoteAddress,
SocketController.ControlPort );
dataReader = new NSocketInputStream( dataSocket.getInputStream(),
dataSocket.getReceiveBufferSize(),
this );
dataWriter = new NSocketOutputStream( dataSocket.getOutputStream() );
String resumemsg = "ResumeMsg:" + remoteSocketID;
if ( SocketController.GENKEY )
{
resumemsg += ":" + publicKey;
}
BufferedWriter controlWriter
= ( new BufferedWriter( new OutputStreamWriter(
dataSocket.getOutputStream() ) ) );
BufferedReader controlReader
= ( new BufferedReader( new InputStreamReader(
dataSocket.getInputStream() ) ) );
controlWriter.write( resumemsg );
controlWriter.newLine();
controlWriter.flush();
String res = controlReader.readLine().trim();
if ( res.equalsIgnoreCase( Global.ACK_STOP_RESUME ) )
{
iSocketState = Global.SUSPENDED;
// set to null so that they can be reestablished later
dataReader = null;
dataWriter = null;
while ( iSocketState != Global.RESUMED )
{
try
{
log( "stop resume by a ACK_STOP_RESUME " );
wait();
}
catch ( InterruptedException ex )
{
ex.printStackTrace();
}
}
// stop resuming and return
return false;
}
iSocketState = Global.RESUMED;
this.notifyAll();
resumeIStream();
resumeOStream();
// resume input stream
SocketController.registerSocket( this, localSocketID );
initCondition();
return true;
}
catch ( Exception exception )
{
exception.printStackTrace();
iSocketState = Global.SUSPENDED_LOCAL;
return false;
}
}
/**
* Used for concurrent migration.
* When either side of a connection has just acknowledged a suspend request,
* it is possible for it receive a suspend request locally for that
* onnection. In this case the suspend request shouldn't be approved
* until the previously acknowledged request finishes. An ack-wait message
* is used to keep it wait and a half resume message is used to wake it up.
*
* Notice that the peer that is waiting will get approved for its suspend
* suspend request and go on suspending the connection. So instead of
* sending
* a resume message and update underlying socket, a half resume message is
* sent and underlying datasocket won't be updated.
* @throws IOException
*/
public synchronized boolean halfResume()
throws IOException
{
if ( iSocketState == Global.RESUMED )
{
return false;
}
try
{
log( "call half resume here" );
String resumemsg = "HalfResumeMsg:" + remoteSocketID;
sendControlMessage( resumemsg );
return true;
}
catch ( Exception exception )
{
exception.printStackTrace();
iSocketState = Global.SUSPENDED_LOCAL;
return false;
}
}
/**
* Resume a connection. This happens when an agent lands on a new host.
* This function is used for resuming socket passively. It is used by
* the one who receives a resume request, not the one who issues the
* request.
* @param socket
* @return
* @throws IOException
*/
synchronized boolean resume( Socket socket )
throws IOException, NoSocketControllerException
{
log( "called in resume(socket) with state:" + iSocketState +
",suspendAfterACK:" + suspendAfterACK );
if ( iSocketState == Global.RESUMED )
{
return false;
}
dataSocket = socket;
remoteAddress = socket.getInetAddress();
// dont update data socket if there is a suspend operation pending.
if ( suspendAfterACK )
{
iSocketState = Global.RESUMED;
notifyAll();
return false;
}
SocketController.registerSocket( this, localSocketID );
// concurrent migration
if ( dataReader == null )
{
dataReader = new NSocketInputStream( dataSocket.getInputStream(),
dataSocket.getReceiveBufferSize(), this );
}
if ( dataWriter == null )
{
dataWriter = new NSocketOutputStream( dataSocket.getOutputStream() );
// important when remote one is fixed
}
resumeIStream();
resumeOStream();
log( "called in resume(socket), set state to resumed" );
iSocketState = Global.RESUMED;
this.notifyAll();
// init because in case it comes back for the secnd time
initCondition();
return true;
}
/**
* update input stream.
* @throws IOException
*/
private synchronized void resumeIStream()
throws IOException
{
dataReader.resume( dataSocket.getInputStream() );
this.notifyAll();
}
/**
* update output stream
* @throws IOException
*/
protected synchronized void resumeOStream()
throws IOException
{
log( "called in naplet resume ostream" + iSocketState + bPersistentFlag );
if ( ( dataWriter == null ) || ( dataSocket == null ) )
{
return;
}
dataWriter.resume( dataSocket.getOutputStream() );
}
public void setReceiveBufferSize( int i )
throws SocketException
{
dataSocket.setReceiveBufferSize( i );
}
public void setSendBufferSize( int i )
throws SocketException
{
dataSocket.setSendBufferSize( i );
}
public void setSoLinger( boolean bool, int i )
throws SocketException
{
dataSocket.setSoLinger( bool, i );
}
public void setSoTimeout( int i )
throws SocketException
{
dataSocket.setSoTimeout( i );
}
public void setTcpNoDelay( boolean bool )
throws SocketException
{
dataSocket.setTcpNoDelay( bool );
}
/**
* call from socket controller. set state after ack
* @param state
*/
void setState( int state )
{
iSocketState = state;
}
int getState()
{
return iSocketState;
}
/**
* Use UDP to exchange message. A simple retransimission is used here to
* gurantee message delivery.
* @param msg
* @return
* @throws IOException
*/
private String sendControlMessage( String msg )
throws IOException
{
int retry = Global.UDPRETRY;
String res = null;
if ( remoteAddress == null )
{
remoteAddress = dataSocket.getInetAddress();
}
SocketController.channel.sendMessage( msg, remoteAddress,
SocketController.UDPPort );
// retry until either get a message or reach maxium times of retry
while ( ( res = SocketController.channel.receiveMessage() ) == null )
{
if ( retry-- == 0 )
{
return null;
}
SocketController.channel.sendMessage( msg, remoteAddress,
SocketController.UDPPort );
}
return res;
}
/**
* Receive a message from channel.
* @return
* @throws IOException
*/
private String receiveControlMessage()
throws IOException
{
String res = SocketController.channel.receiveMessage();
return res;
}
/**
* Suspend current connection incuding socket and i/o. It happens before
* agent migration.
* @throws IOException
*/
public synchronized void suspend()
throws IOException
{
log( "get a suspend req locally with sock state:" + iSocketState );
if ( suspendACKSent )
{
// has acknowledge a request. can't suspend if the previous one
//hasn't finished.
while ( iSocketState != Global.RESUMED )
{
try
{
log( "begin waiting!!!!concurrent migration" );
suspendAfterACK = true;
wait();
}
catch ( Exception e )
{
e.printStackTrace();
}
log( "get notified from concurrent migration" );
// should be suspended
iSocketState = Global.SUSPENDED;
return;
}
}
if ( iSocketState == Global.RESUMED )
{
iSocketState = Global.SUSPEND_SENT;
log( "***set state to:Global.SUSPEND_SENT,susACKSent:" + suspendACKSent );
remoteAddress = dataSocket.getInetAddress();
try
{
String msg = "SuspendMsg:" + remoteSocketID;
if ( SocketController.GENKEY )
{
msg += ":" + publicKey;
}
log( "send msg to :" + remoteAddress );
// a response for this control message is returned as res
String res = sendControlMessage( msg );
if ( res.trim().equalsIgnoreCase( Global.ACK_WAIT_MSG ) )
{
while ( iSocketState != Global.HALF_RESUMED )
{
log( "begin waiting!!! by a ACK_WAIT_MSG " );
wait();
}
iSocketState = Global.SUSPENDED;
return;
}
iSocketState = Global.SUSPEND_ACK_RCD;
log( "set state to:SUSPEND_ACK_RCD" );
}
catch ( Exception exception )
{
exception.printStackTrace();
}
dataReader.suspend();
dataWriter.suspend();
try
{
dataSocket.close();
}
catch ( IOException ioexception )
{
ioexception.printStackTrace();
}
dataSocket = null;
iSocketState = Global.SUSPENDED_LOCAL;
}
}
/**
* Same as resume(socket). This function is used by the one who suspends by
* a request from the peer.
* @throws IOException
*/
protected synchronized void suspendRemotly()
throws IOException
{
if ( iSocketState == Global.SUSPENDED_REMOTE )
{
return;
}
log( "socket suspend remotely:" + dataSocket );
// possible i/o not setup when suspend caused
if ( dataWriter != null )
{
dataWriter.suspend();
}
if ( dataReader != null )
{
dataReader.suspend();
}
try
{
dataSocket.close();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
dataSocket = null;
iSocketState = Global.SUSPENDED_REMOTE;
}
/**
* For debug use
* @param info
*/
private static void log( String info )
{
if ( SocketController.debug )
{
System.out.println( "NapletSocket:" + info );
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -