📄 connectdisconnectmanager.java
字号:
else remote_address = "<null>";
int remote_port = request.channel.socket().getPort();
msg += "\n channel="+channel+ ", socket="+socket+ ", local_address="+local_address+ ", local_port="+local_port+ ", remote_address="+remote_address+ ", remote_port="+remote_port;
}
LGLogger.log( msg, t );
if( request.channel != null ) {
try{
pending_closes_mon.enter();
pending_closes.addLast( request.channel );
}finally{
pending_closes_mon.exit();
}
}
request.listener.connectFailure( t );
}
}
private void runSelect() {
//do cancellations
try{
new_canceled_mon.enter();
for( Iterator can_it = canceled_requests.iterator(); can_it.hasNext(); ) {
ConnectListener key = (ConnectListener)can_it.next();
//boolean found = false;
for( Iterator pen_it = pending_attempts.keySet().iterator(); pen_it.hasNext(); ) {
ConnectionRequest request = (ConnectionRequest)pen_it.next();
if( request.listener == key ) {
connect_selector.cancel( request.channel );
try{
pending_closes_mon.enter();
pending_closes.addLast( request.channel );
}
finally{
pending_closes_mon.exit();
}
//found = true;
pen_it.remove();
break;
}
}
//if( !found ) Debug.out( "~~~ canceled request not found ~~~" );
}
canceled_requests.clear();
}
finally{
new_canceled_mon.exit();
}
//run select
connect_selector.select( 100 );
//do connect attempt timeout checks
int num_stalled_requests = 0;
for( Iterator i = pending_attempts.keySet().iterator(); i.hasNext(); ) {
ConnectionRequest request = (ConnectionRequest)i.next();
long waiting_time = SystemTime.getCurrentTime() - request.connect_start_time;
if( waiting_time > CONNECT_ATTEMPT_TIMEOUT ) {
i.remove();
connect_selector.cancel( request.channel );
try{
pending_closes_mon.enter();
pending_closes.addLast( request.channel );
}finally{
pending_closes_mon.exit();
}
request.listener.connectFailure( new Throwable( "Connection attempt aborted: timed out after " +CONNECT_ATTEMPT_TIMEOUT/1000+ "sec" ) );
}
else if( waiting_time >= CONNECT_ATTEMPT_STALL_TIME ) {
num_stalled_requests++;
}
else if( waiting_time < 0 ) { //time went backwards
request.connect_start_time = SystemTime.getCurrentTime();
}
}
//check if our connect queue is stalled, and expand if so
if( num_stalled_requests == pending_attempts.size() && pending_attempts.size() < MAX_SIMULTANIOUS_CONNECT_ATTEMPTS ) {
try{
new_canceled_mon.enter();
if( !new_requests.isEmpty() ) {
ConnectionRequest cr = (ConnectionRequest)new_requests.removeFirst();
addNewRequest( cr );
}
}
finally{
new_canceled_mon.exit();
}
}
}
private void doClosings() {
try{
pending_closes_mon.enter();
while( !pending_closes.isEmpty() ) {
SocketChannel channel = (SocketChannel)pending_closes.removeFirst();
if( channel != null ) {
try{
channel.close();
}
catch( Throwable t ) { Debug.printStackTrace(t); }
}
}
}finally{
pending_closes_mon.exit();
}
}
/**
* Request that a new connection be made out to the given address.
* @param address remote ip+port to connect to
* @param listener to receive notification of connect attempt success/failure
*/
protected void requestNewConnection( InetSocketAddress address, ConnectListener listener ) {
if( MAX_SIMULTANIOUS_CONNECT_ATTEMPTS == 0 ) { //outbound connects are disabled, so fail immediately
LGLogger.log( "Aborting connect attempt to [" +address+ "]: Outbound connects disabled in config." );
listener.connectFailure( new Throwable( "Outbound connects disabled in config: MAX_SIMULTANIOUS_CONNECT_ATTEMPTS == 0" ) );
return;
}
ConnectionRequest cr = new ConnectionRequest( address, listener );
try{
new_canceled_mon.enter();
//insert at a random position because new connections are usually added in 50-peer
//chunks, i.e. from a tracker announce reply, and we want to evenly distribute the
//connect attempts if there are multiple torrents running
int insert_pos = 0;
if( new_requests.size() > 0 ) {
insert_pos = random.nextInt( new_requests.size() );
}
new_requests.add( insert_pos, cr );
}finally{
new_canceled_mon.exit();
}
}
/**
* Close the given connection.
* @param channel to close
*/
protected void closeConnection( SocketChannel channel ) {
try{
pending_closes_mon.enter();
pending_closes.addLast( channel );
}finally{
pending_closes_mon.exit();
}
}
/**
* Cancel a pending new connection request.
* @param listener_key used in the initial connect request
*/
protected void cancelRequest( ConnectListener listener_key ) {
try{
new_canceled_mon.enter();
//check if we can cancel it right away
for( Iterator i = new_requests.iterator(); i.hasNext(); ) {
ConnectionRequest request = (ConnectionRequest)i.next();
if( request.listener == listener_key ) {
i.remove();
return;
}
}
canceled_requests.add( listener_key ); //else add for later removal during select
}
finally{
new_canceled_mon.exit();
}
}
private static class ConnectionRequest {
private final InetSocketAddress address;
private final ConnectListener listener;
private final long request_start_time;
private long connect_start_time;
private SocketChannel channel;
private ConnectionRequest( InetSocketAddress _address, ConnectListener _listener ) {
address = _address;
listener = _listener;
request_start_time = SystemTime.getCurrentTime();
}
}
///////////////////////////////////////////////////////////
/**
* Listener for notification of connection establishment.
*/
protected interface ConnectListener {
/**
* The connection establishment process has started,
* i.e. the connection is actively being attempted.
*/
public void connectAttemptStarted();
/**
* The connection attempt succeeded.
* @param channel connected socket channel
*/
public void connectSuccess( SocketChannel channel ) ;
/**
* The connection attempt failed.
* @param failure_msg failure reason
*/
public void connectFailure( Throwable failure_msg );
}
/////////////////////////////////////////////////////////////
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -