📄 tcptransportimpl.java
字号:
if( !data_already_read.hasRemaining() ) {
data_already_read = null;
break;
}
}
if( !buffers[ array_offset + length - 1 ].hasRemaining() ) { //the last buffer has nothing left to read into normally
return inserted; //so return right away, skipping socket read
}
}
long bytes_read = filter.read( buffers, array_offset, length );
if( stats != null ) stats.bytesRead( (int)bytes_read ); //TODO
if( bytes_read == 0 ) {
requestReadSelect();
}
return bytes_read;
}
/**
* Request the transport connection be established.
* NOTE: Will automatically connect via configured proxy if necessary.
* @param address remote peer address to connect to
* @param listener establishment failure/success listener
*/
public void establishOutboundConnection( final InetSocketAddress address, final ConnectListener listener ) {
if( has_been_closed ) return;
if( filter != null ) { //already connected
Debug.out( "socket_channel != null" );
listener.connectSuccess();
return;
}
final boolean use_proxy = COConfigurationManager.getBooleanParameter( "Proxy.Data.Enable" );
final TCPTransport transport_instance = this;
ConnectDisconnectManager.ConnectListener connect_listener = new ConnectDisconnectManager.ConnectListener() {
public void connectAttemptStarted() {
listener.connectAttemptStarted();
}
public void connectSuccess( final SocketChannel channel ) {
if( channel == null ) {
String msg = "connectSuccess:: given channel == null";
Debug.out( msg );
listener.connectFailure( new Exception( msg ) );
return;
}
if( has_been_closed ) { //closed between select ops
NetworkManager.getSingleton().getConnectDisconnectManager().closeConnection( channel ); //just close it
return;
}
connect_request_key = null;
description = ( is_inbound_connection ? "R" : "L" ) + ": " + channel.socket().getInetAddress().getHostAddress() + ": " + channel.socket().getPort();
if( use_proxy ) { //proxy server connection established, login
Logger.log(new LogEvent(LOGID,"Socket connection established to proxy server [" +description+ "], login initiated..."));
// set up a transparent filter for socks negotiation
filter = TCPTransportHelperFilterFactory.createTransparentFilter( channel );
new ProxyLoginHandler( transport_instance, address, new ProxyLoginHandler.ProxyListener() {
public void connectSuccess() {
Logger.log(new LogEvent(LOGID, "Proxy [" +description+ "] login successful." ));
handleCrypto( address, channel, listener );
}
public void connectFailure( Throwable failure_msg ) {
NetworkManager.getSingleton().getConnectDisconnectManager().closeConnection( channel );
listener.connectFailure( failure_msg );
}
});
}
else { //direct connection established, notify
handleCrypto( address, channel, listener );
}
}
public void connectFailure( Throwable failure_msg ) {
connect_request_key = null;
listener.connectFailure( failure_msg );
}
};
connect_request_key = connect_listener;
InetSocketAddress to_connect = use_proxy ? ProxyLoginHandler.SOCKS_SERVER_ADDRESS : address;
NetworkManager.getSingleton().getConnectDisconnectManager().requestNewConnection( to_connect, connect_listener );
}
protected void handleCrypto( final InetSocketAddress address, final SocketChannel channel, final ConnectListener listener ) {
if( connect_with_crypto ) {
//attempt encrypted transport
TransportCryptoManager.getSingleton().manageCrypto( channel, shared_secret, false, new TransportCryptoManager.HandshakeListener() {
public void handshakeSuccess( TCPTransportHelperFilter _filter ) {
//System.out.println( description+ " | crypto handshake success [" +_filter.getName()+ "]" );
filter = _filter;
if ( Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Outgoing TCP stream to " + channel.socket().getRemoteSocketAddress() + " established, type = " + filter.getName()));
}
registerSelectHandling();
listener.connectSuccess();
}
public void handshakeFailure( Throwable failure_msg ) {
if( fallback_allowed && NetworkManager.OUTGOING_HANDSHAKE_FALLBACK_ALLOWED ) {
if( Logger.isEnabled() ) Logger.log(new LogEvent(LOGID, description+ " | crypto handshake failure [" +failure_msg.getMessage()+ "], attempting non-crypto fallback." ));
connect_with_crypto = false;
fallback_count++;
NetworkManager.getSingleton().getConnectDisconnectManager().closeConnection( channel ); //just close it
close();
has_been_closed = false;
establishOutboundConnection( address, listener );
}
else {
NetworkManager.getSingleton().getConnectDisconnectManager().closeConnection( channel );
listener.connectFailure( failure_msg );
}
}
public int
getMaximumPlainHeaderLength()
{
throw( new RuntimeException()); // this is outgoing
}
public int
matchPlainHeader(
ByteBuffer buffer )
{
throw( new RuntimeException()); // this is outgoing
}
});
}
else { //no crypto
//if( fallback_count > 0 ) {
// System.out.println( channel.socket()+ " | non-crypto fallback successful!" );
//}
filter = TCPTransportHelperFilterFactory.createTransparentFilter( channel );
if ( Logger.isEnabled()){
Logger.log(new LogEvent(LOGID, "Outgoing TCP stream to " + channel.socket().getRemoteSocketAddress() + " established, type = " + filter.getName() + ", fallback = " + (fallback_count==0?"no":"yes" )));
}
registerSelectHandling();
listener.connectSuccess();
}
}
private void setTransportBuffersSize( int size_in_bytes ) {
if( filter == null ) {
Debug.out( "socket_channel == null" );
return;
}
try{
filter.getSocketChannel().socket().setSendBufferSize( size_in_bytes );
filter.getSocketChannel().socket().setReceiveBufferSize( size_in_bytes );
int snd_real = filter.getSocketChannel().socket().getSendBufferSize();
int rcv_real = filter.getSocketChannel().socket().getReceiveBufferSize();
Logger.log(new LogEvent(LOGID, "Setting new transport [" + description
+ "] buffer sizes: SND=" + size_in_bytes + " [" + snd_real
+ "] , RCV=" + size_in_bytes + " [" + rcv_real + "]"));
}
catch( Throwable t ) {
Debug.out( t );
}
}
/**
* Set the transport to the given speed mode.
* @param mode to change to
*/
public void setTransportMode( int mode ) {
if( mode == transport_mode ) return; //already in mode
switch( mode ) {
case TRANSPORT_MODE_NORMAL:
setTransportBuffersSize( 8 * 1024 );
break;
case TRANSPORT_MODE_FAST:
setTransportBuffersSize( 64 * 1024 );
break;
case TRANSPORT_MODE_TURBO:
setTransportBuffersSize( 512 * 1024 );
break;
default:
Debug.out( "invalid transport mode given: " +mode );
}
transport_mode = mode;
}
/**
* Get the transport's speed mode.
* @return current mode
*/
public int getTransportMode() { return transport_mode; }
public String getEncryption(){ return( filter==null?"":filter.getName()); }
/**
* Close the transport connection.
*/
public void close() {
has_been_closed = true;
if( connect_request_key != null ) {
NetworkManager.getSingleton().getConnectDisconnectManager().cancelRequest( connect_request_key );
}
is_ready_for_read = false;
is_ready_for_write = false;
if( filter != null ){
NetworkManager.getSingleton().getReadSelector().cancel( filter.getSocketChannel() );
NetworkManager.getSingleton().getWriteSelector().cancel( filter.getSocketChannel() );
NetworkManager.getSingleton().getConnectDisconnectManager().closeConnection( filter.getSocketChannel() );
filter = null;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -