📄 nsocketinputstream.java
字号:
package naplet.nsocket;
import java.io.*;
import java.net.*;
/**
* NSocketInputStream is an extension of Java InputStream Socket.It provides
* standard java inputstream APIs and some additional functions for
* connection migration.
*/
public class NSocketInputStream
extends InputStream
{
/**
* Inputstream under this napletinstream
*/
private InputStream iStream = null;
/**
* Buffer to save all the data from remote output stream during suspending
*/
private ByteBuffer buffer = null;
/**
* total number of users currently that are using this outstream
*/
private int iStreamUserCount = 0;
/**
* current state of this stream
*/
private int iStreamState = Global.RESUMED;
/**use control socket in naplet socket to send/receive msg, should change*/
private NapletSocket nsocket = null;
protected NSocketInputStream
( InputStream inputstream, int i, NapletSocket nsock )
throws IOException
{
if ( inputstream == null )
{
throw new NullPointerException( "iStream" );
}
if ( i < 1 )
{
throw new IllegalArgumentException
( "iByteBufferSize must be greater than 0" );
}
if ( nsock == null )
{
throw new NullPointerException( "nsocket" );
}
iStream = inputstream;
// can assume setup a pool of byte array and assign one
// here. can save some overhead here;
buffer = new ByteBuffer( Global.BUFFER_SIZE );
nsocket = nsock;
}
public synchronized void close()
throws IOException
{
while ( iStreamState == Global.SUSPENDED )
{
try
{
this.wait();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
}
iStream.close();
// set to null, so that when it close, read will throws null exception.
// Use it as a way to indicate nsocket disconnection. see read func below
iStream = null;
iStreamState = Global.CLOSED;
}
private static void log( String info )
{
if ( SocketController.debug )
{
System.out.println( "NapletSocketInStream:" + info );
}
}
/**
* never used this function
* @return
* @throws IOException
*/
public synchronized int read()
throws IOException
{
// won't use this one when the read(byte[],off,len) exists
//log("in read()");
testState();
int i;
// ! buffer is null
if ( nsocket.getBuffer().isEmpty() )
{
i = iStream.read();
}
else
{
i = nsocket.getBuffer().read();
}
iStreamUserCount--;
this.notifyAll();
return i;
}
public synchronized int read( byte[] is )
throws IOException
{
return read( is, 0, is.length );
}
/**
* Reads up to len bytes of data from the input stream into an array of bytes.
* An attempt is made to read as many as len bytes, but a smaller number may
* be read, possibly zero. The number of bytes actually read is returned as an
* integer.
*
* Read from local buffer is not empty. Else read from socket
* @param is
* @param off
* @param len
* @return
* @throws IOException
*/
public synchronized int read( byte[] is, int off, int len )
throws IOException
{
testState();
int fromBuffer = 0;
//buffer is null
if ( ( nsocket.getBuffer() != null )
&& ( !nsocket.getBuffer().isEmpty() ) )
{
fromBuffer = nsocket.getBuffer().read( is, off, len );
if ( fromBuffer < 600 )
{
log( "!!!read some thing from buffer:" + new String( is ).trim() +
",amoun:" + fromBuffer + ",and buf empty:" +
nsocket.getBuffer().isEmpty() + "!!!" );
}
else
{
log( "!!!read some thing from buffer with amount:" + fromBuffer +
",and buf empty:" + nsocket.getBuffer().isEmpty() );
}
iStreamUserCount--;
this.notifyAll();
return fromBuffer;
}
int fromStream = 0;
try
{
fromStream = iStream.read( is, off, len );
}
catch ( NullPointerException ne )
{
iStreamUserCount--;
this.notifyAll();
throw new SocketException();
}
catch ( SocketException se )
{
// import must decrease user counter
iStreamUserCount--;
// zhongxl:if not set, this exe appear more in sun
iStreamState = Global.SUSPENDED;
this.notifyAll();
return 0;
}
catch ( IOException ioexception )
{
log( "get an io exc in input" );
//ioexception.printStackTrace();
iStreamUserCount--;
this.notifyAll();
return fromStream;
}
iStreamUserCount--;
this.notifyAll();
return fromStream;
}
/**
* Updates input stream
* @param inputstream
*/
protected synchronized void resume( InputStream inputstream )
{
if ( iStreamState != Global.RESUMED
&& iStreamState != Global.CLOSED )
{
iStreamState = Global.RESUMED;
log( "called resume in stream, set state to resumed" );
iStream = inputstream;
this.notifyAll();
}
}
/**
* Suspend input stream
*/
protected synchronized void suspend()
{
if ( iStreamState != Global.SUSPENDED
&& iStreamState != Global.CLOSED )
{
iStreamState = Global.SUSPENDED;
log( "called in stream, set state to suspend" );
while ( iStreamUserCount > 0 )
{
try
{
this.wait();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
}
// buffer=null
if ( ( buffer != null ) && ( !buffer.isEmpty() ) )
{
try
{
iStream.close();
}
catch ( IOException ioexception )
{
/* empty */
}
iStream = null;
}
try
{
//should throw an exception
byte[] temp = new byte[buffer.size()];
int fromStream;
// keep reading until the peer dsn't send data
while ( ( fromStream = iStream.read( temp ) ) != -1 )
{
int wrt = buffer.write( temp, 0, fromStream );
if ( wrt < 70 )
{
log( "!!!!!sth into app buffer:" +
new String( temp ).toString().trim() + "!!!" );
}
else
{
log( "!!!!!sth into app buffer with amount:" + wrt );
}
if ( nsocket.getBuffer() == null )
{
nsocket.setBuffer( buffer );
}
else
{
nsocket.getBuffer().resize( nsocket.getBuffer().size()
+ nsocket.getReceiveBufferSize() );
nsocket.getBuffer().write( temp, 0, fromStream );
}
}
iStream.close();
iStream = null;
}
catch ( Exception ex )
{
}
} // end if
} // end of suspend
/**
* Be sure no one else is using this input stream.
* Wait if someone is using it.
*/
private synchronized void testState()
{
while ( iStreamState == Global.SUSPENDED )
{
try
{
this.wait();
}
catch ( Exception exception )
{
exception.printStackTrace();
}
}
iStreamUserCount++;
}
} // end of class inputstream
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -