📄 incomingsocketchannelmanager.java
字号:
return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_AUTO_FALLBACK );
}else{
return( TransportCryptoManager.HandshakeListener.MATCH_CRYPTO_NO_AUTO_FALLBACK );
}
}
}
});
}
});
server_selector.start();
}
}finally{
this_mon.exit();
}
}
protected void process( TCPTransportHelperFilter filter ) {
//do timeout check if necessary
long now = SystemTime.getCurrentTime();
if( now < last_timeout_check_time || now - last_timeout_check_time > 5*1000 ) {
doTimeoutChecks();
last_timeout_check_time = now;
}
if( match_buffers_cow.isEmpty() ) { //no match registrations, just close
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Incoming TCP connection from ["
+ filter.getSocketChannel().socket().getInetAddress().getHostAddress() + ":"
+ filter.getSocketChannel().socket().getPort()+ "] dropped because zero routing handlers registered"));
NetworkManager.getSingleton().closeSocketChannel( filter.getSocketChannel() );
return;
}
//set advanced socket options
try {
int so_sndbuf_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
if( so_sndbuf_size > 0 ) filter.getSocketChannel().socket().setSendBufferSize( so_sndbuf_size );
String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
if( ip_tos.length() > 0 ) filter.getSocketChannel().socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
}
catch( Throwable t ) {
t.printStackTrace();
}
// note that the filter may have some data internally queued in it after the crypto handshake decode
// (in particular the BT header). However, there should be some data right behind it that will trigger
// a read-select below, thus giving prompt access to the queued data
final IncomingConnection ic = new IncomingConnection( filter, max_match_buffer_size );
VirtualChannelSelector selector = NetworkManager.getSingleton().getReadSelector();
try{ connections_mon.enter();
connections.add( ic );
selector.register( ic.filter.getSocketChannel(), this, ic );
} finally { connections_mon.exit(); }
// might be stuff queued up in the filter - force one process cycle (NAT check in particular )
selectSuccess( selector, ic.filter.getSocketChannel(), ic );
}
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
IncomingConnection ic = (IncomingConnection)attachment;
try {
long bytes_read = ic.filter.read( new ByteBuffer[]{ ic.buffer }, 0, 1 );
if( bytes_read < 0 ) {
throw new IOException( "end of stream on socket read" );
}
if( bytes_read == 0 ) {
return false;
}
ic.last_read_time = SystemTime.getCurrentTime();
MatchListener listener = checkForMatch( ic.buffer, false );
if( listener == null ) { //no match found
if( ic.buffer.position() >= max_match_buffer_size ) { //we've already read in enough bytes to have compared against all potential match buffers
ic.buffer.flip();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
LogEvent.LT_WARNING,
"Incoming TCP stream from ["
+ sc.socket().getInetAddress()
.getHostAddress()
+ ":"
+ sc.socket().getPort()
+ "] does not match "
+ "any known byte pattern: "
+ ByteFormatter.nicePrint(ic.buffer.array())));
removeConnection( ic, true );
}
}
else { //match found!
ic.buffer.flip();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"Incoming TCP stream from ["
+ sc.socket().getInetAddress()
.getHostAddress()
+ ":"
+ sc.socket().getPort()
+ "] recognized as "
+ "known byte pattern: "
+ ByteFormatter.nicePrint(ic.buffer.array())));
removeConnection( ic, false );
listener.connectionMatched( ic.filter, ic.buffer );
}
}
catch( Throwable t ) {
try {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
LogEvent.LT_ERROR,
"Incoming TCP connection ["
+ sc.socket().getInetAddress()
.getHostAddress() + ":"
+ sc.socket().getPort()
+ "] socket read exception: "
+ t.getMessage()));
}
catch( Throwable x ) {
Debug.out( "Caught exception on incoming exception log:" );
x.printStackTrace();
System.out.println( "CAUSED BY:" );
t.printStackTrace();
}
removeConnection( ic, true );
}
return true;
}
//FAILURE
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc, Object attachment, Throwable msg ) {
IncomingConnection ic = (IncomingConnection)attachment;
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
"Incoming TCP connection [" + sc
+ "] socket select op failure: "
+ msg.getMessage()));
removeConnection( ic, true );
}
private void restart() {
try{
this_mon.enter();
if( server_selector != null ) {
server_selector.stop();
server_selector = null;
}
}finally{
this_mon.exit();
}
try{ Thread.sleep( 1000 ); }catch( Throwable t ) { t.printStackTrace(); }
start();
}
protected void removeConnection( IncomingConnection connection, boolean close_as_well ) {
try{ connections_mon.enter();
NetworkManager.getSingleton().getReadSelector().cancel( connection.filter.getSocketChannel() ); //cancel read op
connections.remove( connection ); //remove from connection list
} finally { connections_mon.exit(); }
if( close_as_well ) {
NetworkManager.getSingleton().closeSocketChannel( connection.filter.getSocketChannel() ); //async close it
}
}
protected MatchListener
checkForMatch(
ByteBuffer to_check, boolean min_match )
{
//remember original values for later restore
int orig_position = to_check.position();
int orig_limit = to_check.limit();
//rewind
to_check.position( 0 );
MatchListener listener = null;
for( Iterator i = match_buffers_cow.entrySet().iterator(); i.hasNext(); ) {
Map.Entry entry = (Map.Entry)i.next();
NetworkManager.ByteMatcher bm = (NetworkManager.ByteMatcher)entry.getKey();
if ( min_match ){
if( orig_position < bm.minSize() ) { //not enough bytes yet to compare
continue;
}
if( bm.minMatches( to_check ) ) { //match found!
listener = (MatchListener)entry.getValue();
break;
}
}else{
if( orig_position < bm.size() ) { //not enough bytes yet to compare
continue;
}
if( bm.matches( to_check ) ) { //match found!
listener = (MatchListener)entry.getValue();
break;
}
}
}
//restore original values in case the checks changed them
to_check.position( orig_position );
to_check.limit( orig_limit );
return listener;
}
protected void doTimeoutChecks() {
try{ connections_mon.enter();
ArrayList to_close = null;
long now = SystemTime.getCurrentTime();
for( int i=0; i < connections.size(); i++ ) {
IncomingConnection ic = (IncomingConnection)connections.get( i );
if( ic.last_read_time > 0 ) { //at least one read op has occured
if( now < ic.last_read_time ) { //time went backwards!
ic.last_read_time = now;
}
else if( now - ic.last_read_time > READ_TIMEOUT ) { //10s read timeout
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Incoming TCP connection ["
+ ic.filter.getSocketChannel().socket().getInetAddress().getHostAddress() + ":"
+ ic.filter.getSocketChannel().socket().getPort()
+ "] forcibly timed out due to socket read inactivity ["
+ ic.buffer.position() + " bytes read: "
+ new String(ic.buffer.array()) + "]"));
if( to_close == null ) to_close = new ArrayList();
to_close.add( ic );
}
}
else { //no bytes have been read yet
if( now < ic.initial_connect_time ) { //time went backwards!
ic.initial_connect_time = now;
}
else if( now - ic.initial_connect_time > CONNECT_TIMEOUT ) { //60s connect timeout
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Incoming TCP connection ["
+ ic.filter.getSocketChannel() + "] forcibly timed out after "
+ "60sec due to socket inactivity"));
if( to_close == null ) to_close = new ArrayList();
to_close.add( ic );
}
}
}
if( to_close != null ) {
for( int i=0; i < to_close.size(); i++ ) {
IncomingConnection ic = (IncomingConnection)to_close.get( i );
removeConnection( ic, true );
}
}
} finally { connections_mon.exit(); }
}
protected static class IncomingConnection {
protected final TCPTransportHelperFilter filter;
protected final ByteBuffer buffer;
protected long initial_connect_time;
protected long last_read_time = -1;
protected IncomingConnection( TCPTransportHelperFilter filter, int buff_size ) {
this.filter = filter;
this.buffer = ByteBuffer.allocate( buff_size );
this.initial_connect_time = SystemTime.getCurrentTime();
}
}
/**
* Listener for byte matches.
*/
public interface MatchListener {
/**
* Currently if message crypto is on and default fallback for incoming not
* enabled then we would bounce incoming messages from non-crypto transports
* For example, NAT check
* This method allows auto-fallback for such transports
* @return
*/
public boolean
autoCryptoFallback();
/**
* The given socket has been accepted as matching the byte filter.
* @param channel matching accepted connection
* @param read_so_far bytes already read
*/
public void connectionMatched( TCPTransportHelperFilter filter, ByteBuffer read_so_far );
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -