⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectdisconnectmanager.java

📁 Azureus is a powerful, full-featured, cross-platform java BitTorrent client
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on Sep 13, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SARL au capital de 30,000 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.networkmanager;

import java.net.*;
import java.nio.channels.*;
import java.util.*;

import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.logging.LGLogger;
import org.gudy.azureus2.core3.util.*;



/**
 * Manages new connection establishment and ended connection termination.
 */
public class ConnectDisconnectManager {
  private static int MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = 3;  
  public static int MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = 5;  //NOTE: WinXP SP2 limits to 10 max at any given time
  static {
    MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = COConfigurationManager.getIntParameter( "network.max.simultaneous.connect.attempts" );
    MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS - 2;
    if( MIN_SIMULTANIOUS_CONNECT_ATTEMPTS < 1 ) {
      MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS == 0 ? 0 : 1;  //max 0 = outbound disabled
    }
    COConfigurationManager.addParameterListener( "network.max.simultaneous.connect.attempts", new ParameterListener() {
      public void parameterChanged( String parameterName ) {
        MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = COConfigurationManager.getIntParameter( "network.max.simultaneous.connect.attempts" );
        MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS - 2;
        if( MIN_SIMULTANIOUS_CONNECT_ATTEMPTS < 1 ) {
          MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS == 0 ? 0 : 1;  //max 0 = outbound disabled
        }
      }
    });
  }
  
  private static final int CONNECT_ATTEMPT_TIMEOUT = 30*1000;  //30sec
  private static final int CONNECT_ATTEMPT_STALL_TIME = 3*1000;  //3sec
  private static final boolean SHOW_CONNECT_STATS = false;
  
  private final VirtualChannelSelector connect_selector = new VirtualChannelSelector( VirtualChannelSelector.OP_CONNECT );
  
  private final LinkedList new_requests = new LinkedList();
  private final ArrayList canceled_requests = new ArrayList();
  private final AEMonitor	new_canceled_mon= new AEMonitor( "ConnectDisconnectManager:NCM");
  
  private final HashMap pending_attempts = new HashMap();
  
  private final LinkedList pending_closes = new LinkedList();
  private final AEMonitor	pending_closes_mon = new AEMonitor( "ConnectDisconnectManager:PC");
     
  private final Random random = new Random();
  
  
  
  protected ConnectDisconnectManager() {
    Thread loop = new AEThread( "ConnectDisconnectManager" ) {
      public void runSupport() {
        mainLoop();
      }
    };
    loop.setDaemon( true );
    loop.start();    
  }
  

  private void mainLoop() {      
    while( true ) {
      addNewOutboundRequests();
      runSelect();
      doClosings();
    }
  }
  
  
  private void addNewOutboundRequests() {    
    while( pending_attempts.size() < MIN_SIMULTANIOUS_CONNECT_ATTEMPTS ) {
      try{
        new_canceled_mon.enter();
      
        if( new_requests.isEmpty() )  break;
        ConnectionRequest cr = (ConnectionRequest)new_requests.removeFirst();
        addNewRequest( cr ); 
      }
      finally{
        new_canceled_mon.exit();
      }
    }
  }
  
  

  private void addNewRequest( final ConnectionRequest request ) {
    request.listener.connectAttemptStarted();
    
    try {
      request.channel = SocketChannel.open();
      
      try {  //advanced socket options
        int rcv_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
        if( rcv_size > 0 ) {
          LGLogger.log( "Setting socket receive buffer size for outgoing connection [" +request.address+ "] to: " + rcv_size );
          request.channel.socket().setReceiveBufferSize( rcv_size );
        }
      
        int snd_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
        if( snd_size > 0 ) {
          LGLogger.log( "Setting socket send buffer size for outgoing connection [" +request.address+ "] to: " + snd_size );
          request.channel.socket().setSendBufferSize( snd_size );
        }

        String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
        if( ip_tos.length() > 0 ) {
          LGLogger.log( "Setting socket TOS field for outgoing connection [" +request.address+ "] to: " + ip_tos );
          request.channel.socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
        }

        String bindIP = COConfigurationManager.getStringParameter("Bind IP", "");
        if ( bindIP.length() > 6 ) {
          LGLogger.log( "Binding outgoing connection [" +request.address+ "] to local IP address: " + bindIP );
          request.channel.socket().bind( new InetSocketAddress( InetAddress.getByName( bindIP ), 0 ) );
        }
      }
      catch( Throwable t ) {
        String msg = "Error while processing advanced socket options.";
        Debug.out( msg, t );
        LGLogger.logUnrepeatableAlert( msg, t );
        //dont pass the exception outwards, so we will continue processing connection without advanced options set
      }
      
      request.channel.configureBlocking( false );
      request.channel.connect( request.address );
      
      connect_selector.register( request.channel, new VirtualChannelSelector.VirtualSelectorListener() {
        public void selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {         
          try {
            if( request.channel.finishConnect() ) {
                  
              if( SHOW_CONNECT_STATS ) {
                long queue_wait_time = request.connect_start_time - request.request_start_time;
                long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
                int num_queued = new_requests.size();
                int num_connecting = pending_attempts.size();
                System.out.println("S: queue_wait_time="+queue_wait_time+
                                    ", connect_time="+connect_time+
                                    ", num_queued="+num_queued+
                                    ", num_connecting="+num_connecting);
              }
                  
              request.listener.connectSuccess( request.channel );
            }
            else { //should never happen
              Debug.out( "finishConnect() failed" );
              request.listener.connectFailure( new Throwable( "finishConnect() failed" ) );
              try{
                pending_closes_mon.enter();
                  
                pending_closes.addLast( request.channel );
              }finally{
                pending_closes_mon.exit();
              }
            }
          }
          catch( Throwable t ) {
                
            if( SHOW_CONNECT_STATS ) {
              long queue_wait_time = request.connect_start_time - request.request_start_time;
              long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
              int num_queued = new_requests.size();
              int num_connecting = pending_attempts.size();
              System.out.println("F: queue_wait_time="+queue_wait_time+
                                  ", connect_time="+connect_time+
                                  ", num_queued="+num_queued+
                                  ", num_connecting="+num_connecting);
            }
                
            request.listener.connectFailure( t );
            try{
              pending_closes_mon.enter();
                
              pending_closes.addLast( request.channel );
            }finally{
                	
              pending_closes_mon.exit();
            }
          }
            
          pending_attempts.remove( request );
        }
        
        public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
          Debug.out( "selectFailure" );
          
          try{
          	pending_closes_mon.enter();
          
            pending_closes.addLast( request.channel );
          }finally{
          	
          	pending_closes_mon.exit();
          }

          request.listener.connectFailure( msg );

          pending_attempts.remove( request );
          
        }
      }, null );

      request.connect_start_time = SystemTime.getCurrentTime();
      pending_attempts.put( request, null );
    }
    catch( Throwable t ) {
      
      String full = request.address.toString();
      String hostname = request.address.getHostName();
      int port = request.address.getPort();
      boolean unresolved = request.address.isUnresolved();
      InetAddress	inet_address = request.address.getAddress();
      String full_sub = inet_address==null?request.address.toString():inet_address.toString();
      String host_address = inet_address==null?request.address.toString():inet_address.getHostAddress();
      
      String msg = "ConnectDisconnectManager::address exception: full="+full+ ", hostname="+hostname+ ", port="+port+ ", unresolved="+unresolved+ ", full_sub="+full_sub+ ", host_address="+host_address;
      if( request.channel != null ) {
        String channel = request.channel.toString();
        String socket = request.channel.socket().toString();
        String local_address = request.channel.socket().getLocalAddress().toString();
        int local_port = request.channel.socket().getLocalPort();
           SocketAddress ra = request.channel.socket().getRemoteSocketAddress();
        String remote_address;
           if( ra != null )  remote_address = ra.toString();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -