⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectdisconnectmanager.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        try{  new_canceled_mon.enter();
          canceled = canceled_requests.contains( request.listener );
        }
        finally{ new_canceled_mon.exit(); }
        
        if( canceled ) {
        	closeConnection( request.channel );
        }
        else {
        	connect_selector.cancel( request.channel );
          request.listener.connectSuccess( request.channel );
        }
      }
      else { //should never happen
        Debug.out( "finishConnect() failed" );
        request.listener.connectFailure( new Throwable( "finishConnect() failed" ) );
        
        closeConnection( request.channel );
      }
    }
    catch( Throwable t ) {
          
      if( SHOW_CONNECT_STATS ) {
        long queue_wait_time = request.connect_start_time - request.request_start_time;
        long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
        int num_queued = new_requests.size();
        int num_connecting = pending_attempts.size();
        System.out.println("F: queue_wait_time="+queue_wait_time+
                            ", connect_time="+connect_time+
                            ", num_queued="+num_queued+
                            ", num_connecting="+num_connecting);
      }
          
      request.listener.connectFailure( t );
      
      closeConnection( request.channel );
    }
  }
  

  
  private void runSelect() {
    //do cancellations
    try{
      new_canceled_mon.enter();

      for (Iterator can_it =canceled_requests.iterator(); can_it.hasNext();) {
        ConnectListener key =(ConnectListener) can_it.next();

        ConnectionRequest to_remove =null;

        for (Iterator pen_it =pending_attempts.keySet().iterator(); pen_it.hasNext();) {
          ConnectionRequest request =(ConnectionRequest) pen_it.next();
          if (request.listener ==key) {
            connect_selector.cancel(request.channel);

            closeConnection(request.channel);

            to_remove =request;
            break;
          }
        }

        if( to_remove != null ) {
          pending_attempts.remove( to_remove );
        }
      }

      canceled_requests.clear();
    }
    finally{
      new_canceled_mon.exit();
    }

    //run select
    try{
      connect_selector.select(100);
    }
    catch( Throwable t ) {
      Debug.out("connnectSelectLoop() EXCEPTION: ", t);
    }

    //do connect attempt timeout checks
    int num_stalled_requests =0;
    final long now =SystemTime.getCurrentTime();
    for (Iterator i =pending_attempts.keySet().iterator(); i.hasNext();) {
      final ConnectionRequest request =(ConnectionRequest) i.next();
      final long waiting_time =now -request.connect_start_time;
      if( waiting_time > request.connect_timeout ) {
        i.remove();

        SocketChannel channel = request.channel;
        
        connect_selector.cancel( channel );

        closeConnection( channel );
              
        InetSocketAddress	sock_address = request.address;
        
       	InetAddress a = sock_address.getAddress();
        	
       	String	target;
       	
       	if ( a != null ){
        		
        	target = a.getHostAddress() + ":" + sock_address.getPort();
        		
        }else{
        		
        	target = sock_address.toString();
        }
               
        request.listener.connectFailure( new Throwable( "Connection attempt to " + target + " aborted: timed out after " + request.connect_timeout/1000+ "sec" ) );
      }
      else if( waiting_time >= CONNECT_ATTEMPT_STALL_TIME ) {
        num_stalled_requests++;
      }
      else if( waiting_time < 0 ) {  //time went backwards
        request.connect_start_time =now;
      }
    }

    //check if our connect queue is stalled, and expand if so
    if (num_stalled_requests ==pending_attempts.size() &&pending_attempts.size() <MAX_SIMULTANIOUS_CONNECT_ATTEMPTS) {
      ConnectionRequest cr =null;

      try{
        new_canceled_mon.enter();

        if( !new_requests.isEmpty() ) {
          cr = (ConnectionRequest)new_requests.removeFirst();
        }
      }
      finally{
        new_canceled_mon.exit();
      }

      if( cr != null ) {
        addNewRequest( cr );
      }
    }
  }
  
  
  private void doClosings() {
    try{
    	pending_closes_mon.enter();
    
    	long	now = SystemTime.getCurrentTime();
    	
    	if ( delayed_closes.size() > 0 ){
    		   		
    		Iterator	it = delayed_closes.entrySet().iterator();
    		
    		while( it.hasNext()){
    			
    			Map.Entry	entry = (Map.Entry)it.next();
    			
    			long	wait = ((Long)entry.getValue()).longValue() - now;
    			
    			if ( wait < 0 || wait > 60*1000 ){
    				
    				pending_closes.addLast( entry.getKey());
    				
    				it.remove();    				
    			}
    		}
    	}
    	
    	while( !pending_closes.isEmpty() ) {
    		
    		SocketChannel channel = (SocketChannel)pending_closes.removeFirst();
    		if( channel != null ) {
        	
    			connect_selector.cancel( channel );
        	
    			try{ 
    				channel.close();
    			}
    			catch( Throwable t ) {
    				/*Debug.printStackTrace(t);*/
    			}
    		}
    	}
    }finally{
    	
    	pending_closes_mon.exit();
    }
  }
  
  
  /**
   * Request that a new connection be made out to the given address.
   * @param address remote ip+port to connect to
   * @param listener to receive notification of connect attempt success/failure
   */
  public void requestNewConnection( InetSocketAddress address, ConnectListener listener ) {   
	  requestNewConnection( address, listener, CONNECT_ATTEMPT_TIMEOUT );
  }
  
  public void requestNewConnection( InetSocketAddress address, ConnectListener listener, long connect_timeout ) {    
	   ConnectionRequest cr = new ConnectionRequest( address, listener, connect_timeout );
	    try{
	      new_canceled_mon.enter();
	    
	      //insert at a random position because new connections are usually added in 50-peer
	      //chunks, i.e. from a tracker announce reply, and we want to evenly distribute the
	      //connect attempts if there are multiple torrents running
	      int insert_pos = random.nextInt( new_requests.size() + 1 );
	      new_requests.add( insert_pos, cr );
	    }finally{
	    	
	      new_canceled_mon.exit();
	    }
  }
  
  /**
   * Close the given connection.
   * @param channel to close
   */
  public void 
  closeConnection( 
	SocketChannel channel ) 
  {
	  closeConnection( channel, 0 );
  }

  public void closeConnection( SocketChannel channel, int delay ) {
    try{
    	pending_closes_mon.enter();
    
    	if ( delay == 0 ){
    		
    		if ( !delayed_closes.containsKey( channel )){
    		
	    		if ( !pending_closes.contains( channel )){
	    			
	    			pending_closes.addLast( channel );
	    		}
    		}
    	}else{
    		
    		delayed_closes.put( channel, new Long( SystemTime.getCurrentTime() + delay ));
    	}
    }finally{
    	
    	pending_closes_mon.exit();
    }
  }
  
  
  /**
   * Cancel a pending new connection request.
   * @param listener_key used in the initial connect request
   */
  public void cancelRequest( ConnectListener listener_key ) {
    try{
      new_canceled_mon.enter();
    
      //check if we can cancel it right away
      for( Iterator i = new_requests.iterator(); i.hasNext(); ) {
        ConnectionRequest request = (ConnectionRequest)i.next();
        if( request.listener == listener_key ) {
          i.remove();
          return;
        }
      }
      
      canceled_requests.add( listener_key ); //else add for later removal during select
    }
    finally{
      new_canceled_mon.exit();
    }
  }
  
  

  private static class ConnectionRequest {
    private final InetSocketAddress address;
    private final ConnectListener listener;
    private final long request_start_time;
    private long connect_start_time;
    private final long connect_timeout;
    private SocketChannel channel;
        
    private ConnectionRequest( InetSocketAddress _address, ConnectListener _listener, long _connect_timeout  ) {

      address = _address;
      listener = _listener;
      connect_timeout	= _connect_timeout;
      request_start_time = SystemTime.getCurrentTime();
    }
  }
  
  
///////////////////////////////////////////////////////////  
  
  /**
   * Listener for notification of connection establishment.
   */
  public interface ConnectListener {
     /**
      * The connection establishment process has started,
      * i.e. the connection is actively being attempted.
      */
     public void connectAttemptStarted();    
     
     /**
      * The connection attempt succeeded.
      * @param channel connected socket channel
      */
     public void connectSuccess( SocketChannel channel ) ;
     
    
    /**
     * The connection attempt failed.
     * @param failure_msg failure reason
     */
    public void connectFailure( Throwable failure_msg );
  }
   
/////////////////////////////////////////////////////////////
   
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -