⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeertransportprotocol.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
  
  
  
  public void doKeepAliveCheck() {
      final long now =SystemTime.getCurrentTime();
      final long wait_time =now -last_message_sent_time;
    
    if( last_message_sent_time == 0 || wait_time < 0 ) {
      last_message_sent_time =now; //don't send if brand new connection
      return;
    }
    
    if( wait_time > 2*60*1000 ) {  //2min keep-alive timer
      sendKeepAlive();
      last_message_sent_time =now;  //not quite true, but we don't want to queue multiple keep-alives before the first is actually sent
    }
  }

  
  public boolean doTimeoutChecks() {
      //Timeouts for states PEPeerTransport.CONNECTION_PENDING and
      //PEPeerTransport.CONNECTION_CONNECTING are handled by the ConnectDisconnectManager
      //so we don't need to deal with them here.
      
      final long now =SystemTime.getCurrentTime();
      //make sure we time out stalled connections
      if( connection_state == PEPeerTransport.CONNECTION_FULLY_ESTABLISHED ) {
          if (last_message_received_time >now)
              last_message_received_time =now;
          if (last_data_message_received_time >now)
              last_data_message_received_time =now;
          if (now -last_message_received_time >5*60*1000
              &&now -last_data_message_received_time >5*60*1000) { //5min timeout
              closeConnectionInternally( "timed out while waiting for messages" );
              return true;
          }
      }
      //ensure we dont get stuck in the handshaking phases
      else if( connection_state == PEPeerTransport.CONNECTION_WAITING_FOR_HANDSHAKE ) {
          if (connection_established_time >now)
              connection_established_time =now;
          else if (now - connection_established_time > 3*60*1000 ) { //3min timeout
              closeConnectionInternally( "timed out while waiting for handshake" );
              return true;
          }
      }
      
      return false;
  }
  
  
  
  public void doPerformanceTuningCheck() {
	Transport	transport = connection.getTransport();
	
    if( transport != null && peer_stats != null && outgoing_piece_message_handler != null ) {

      //send speed -based tuning
    	final long send_rate = peer_stats.getDataSendRate() + peer_stats.getProtocolSendRate();
      
      if( send_rate >= 3125000 ) {  // 25 Mbit/s
    	  transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
        outgoing_piece_message_handler.setRequestReadAhead( 256 );
      }
      else if( send_rate >= 1250000 ) {  // 10 Mbit/s
    	  transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
        outgoing_piece_message_handler.setRequestReadAhead( 128 );
      }
      else if( send_rate >= 125000 ) {  // 1 Mbit/s
        if( transport.getTransportMode() < Transport.TRANSPORT_MODE_FAST ) {
        	transport.setTransportMode( Transport.TRANSPORT_MODE_FAST );
        }
        outgoing_piece_message_handler.setRequestReadAhead( 32 );
      }
      else if( send_rate >= 62500 ) {  // 500 Kbit/s
        outgoing_piece_message_handler.setRequestReadAhead( 16 );
      }
      else if( send_rate >= 31250 ) {  // 250 Kbit/s
        outgoing_piece_message_handler.setRequestReadAhead( 8 );
      }
      else if( send_rate >= 12500 ) {  // 100 Kbit/s
        outgoing_piece_message_handler.setRequestReadAhead( 4 );
      }
      else {
        outgoing_piece_message_handler.setRequestReadAhead( 2 );
      }
      
      
      //receive speed -based tuning
      final long receive_rate = peer_stats.getDataReceiveRate() + peer_stats.getProtocolReceiveRate();
      
      if( receive_rate >= 1250000 ) {  // 10 Mbit/s
    	  transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
      }
      else if( receive_rate >= 125000 ) {  // 1 Mbit/s
        if( transport.getTransportMode() < Transport.TRANSPORT_MODE_FAST ) {
        	transport.setTransportMode( Transport.TRANSPORT_MODE_FAST );
        }
      }
      
    }
  }
  
  
  
  
  public int getConnectionState() {  return connection_state;  }
  
  
  
  
  public long getTimeSinceLastDataMessageReceived() {
    if( last_data_message_received_time == -1 ) {  //never received
      return -1;
    }

    final long now =SystemTime.getCurrentTime();

    if (last_data_message_received_time >now)
        last_data_message_received_time =now;   //time went backwards
    return now -last_data_message_received_time;
  }
  
	public long getTimeSinceGoodDataReceived()
	{
		if (last_good_data_time ==-1)
			return -1;	// never received
		final long now =SystemTime.getCurrentTime();
        if (last_good_data_time >now)
            last_good_data_time =now;   //time went backwards
        return now -last_good_data_time;
	}
  
  
  public long getTimeSinceLastDataMessageSent() {
    if( last_data_message_sent_time == -1 ) {  //never sent
      return -1;
    }
    final long now =SystemTime.getCurrentTime();
    if (last_data_message_sent_time >now)
        last_data_message_sent_time =now;   //time went backwards
    return now -last_data_message_sent_time;
  }
  
  
  
  
  public long getTimeSinceConnectionEstablished() {
    if( connection_established_time == 0 ) {  //fudge it while the transport is being connected
      return 0;
    }
    final long now =SystemTime.getCurrentTime();
    if (connection_established_time >now)
        connection_established_time =now;
    return now -connection_established_time;
  }
  
  public int 
  getConsecutiveNoRequestCount()
  {
	 return( consecutive_no_request_count );
  }
  
  public void 
  setConsecutiveNoRequestCount( 
	int num )
  {
	  consecutive_no_request_count	= num;
  }

  
  protected void decodeBTHandshake( BTHandshake handshake ) {
    PeerIdentityDataID  my_peer_data_id = manager.getPeerIdentityDataID();
      
    if( !Arrays.equals( manager.getHash(), handshake.getDataHash() ) ) {
      closeConnectionInternally( "handshake has wrong infohash" );
      handshake.destroy();
      return;
    }
    
    peer_id = handshake.getPeerId();

    //decode a client identification string from the given peerID
    client = PeerClassifier.getClientDescription( peer_id );

    //make sure the client type is not banned
    if( !PeerClassifier.isClientTypeAllowed( client ) ) {
      closeConnectionInternally( client+ " client type not allowed to connect, banned" );
      handshake.destroy();
      return;
    }

    //make sure we are not connected to ourselves
    if( Arrays.equals( manager.getPeerId(), peer_id ) ) {
      manager.peerVerifiedAsSelf( this );  //make sure we dont do it again
      closeConnectionInternally( "given peer id matches myself" );
      handshake.destroy();
      return;
    }

    //make sure we are not already connected to this peer
    boolean sameIdentity = PeerIdentityManager.containsIdentity( my_peer_data_id, peer_id, getPort());
    boolean sameIP = false;
      
      
    //allow loopback connects for co-located proxy-based connections and testing
    boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || ip.equals( "127.0.0.1" );
    if( !same_allowed ){  
      if( PeerIdentityManager.containsIPAddress( my_peer_data_id, ip )) {
        sameIP = true;
      }
    }
      
    if( sameIdentity ) {
    	boolean close = true;
    	
    	if( connection.isLANLocal() ) {   //this new connection is lan-local    		
    		final PEPeerTransport existing = manager.getTransportFromIdentity( peer_id );
    		if( existing != null && !existing.isLANLocal() ) {  //so drop the existing connection if it is an external (non lan-local) one
    			Debug.out( "dropping existing non-lanlocal peer connection [" +existing+ "]" );
    			manager.removePeer( existing );
    			close = false;    			
    		}
    	}
    	
      if( close ) {
      closeConnectionInternally( "peer matches already-connected peer id" );
      handshake.destroy();
      return;
    }
    }
    
    if( sameIP ) {
      closeConnectionInternally( "peer matches already-connected IP address, duplicate connections not allowed" );
      handshake.destroy();
      return;
    }

    //make sure we haven't reached our connection limit
    final int maxAllowed = manager.getMaxNewConnectionsAllowed();
    if (maxAllowed ==0 &&!manager.doOptimisticDisconnect( isLANLocal()))
    {
        final String msg = "too many existing peer connections [p" +
            PeerIdentityManager.getIdentityCount( my_peer_data_id )
            +"/g" +PeerIdentityManager.getTotalIdentityCount()
            +", pmx" +PeerUtils.MAX_CONNECTIONS_PER_TORRENT+ "/gmx"
            +PeerUtils.MAX_CONNECTIONS_TOTAL+"/dmx" + manager.getMaxConnections()+ "]";
        //System.out.println( msg );
        closeConnectionInternally( msg );
        handshake.destroy();
        return;
    }

    try{
        closing_mon.enter();
      
        if( closing ){
        	
        	final String msg = "connection already closing";
        	
        	closeConnectionInternally( msg );
        	
        	handshake.destroy();
        	
        	return;
        }
        
        if ( !PeerIdentityManager.addIdentity( my_peer_data_id, peer_id, getPort(), ip )){
        	
            closeConnectionInternally( "peer matches already-connected peer id" );
            
            handshake.destroy();
            
            return;
        }
        
        identityAdded = true;
        
    }finally{
    	
    	closing_mon.exit();
    }
 
    if (Logger.isEnabled())
			Logger.log(new LogEvent(this, LOGID, "In: has sent their handshake"));

    /*
     * Waiting until we've received the initiating-end's full handshake, before sending back our own,
     * really should be the "proper" behavior.  However, classic BT trackers running NAT checking will
     * only send the first 48 bytes (up to infohash) of the peer handshake, skipping peerid, which means
     * we'll never get their complete handshake, and thus never reply, which causes the NAT check to fail.
     * So, we need to send our handshake earlier, after we've verified the infohash.
     * 
      if( incoming ) {  //wait until we've received their handshake before sending ours
        sendBTHandshake();
      }
    */
    
    
    //extended protocol processing
    if( (handshake.getReserved()[0] & 128) == 128 ) {  //if first (high) bit is set
      if( !manager.isAZMessagingEnabled() ) {
      	if (Logger.isEnabled())
					Logger.log(new LogEvent(this, LOGID,
							"Ignoring peer's extended AZ messaging support,"
									+ " as disabled for this download."));
      }
      else if( client.indexOf( "Plus!" ) != -1 ) {
      	if (Logger.isEnabled())
					Logger.log(new LogEvent(this, LOGID, "Handshake mistakingly indicates"
							+ " extended AZ messaging support...ignoring."));
      }
      else {
      	if (Logger.isEnabled() && client.indexOf("Azureus") == -1) {
					Logger.log(new LogEvent(this, LOGID, "Handshake claims extended AZ "
							+ "messaging support....enabling AZ mode."));
				}
        
        az_messaging_mode = true;
        connection.getIncomingMessageQueue().setDecoder( new AZMessageDecoder() );
        connection.getOutgoingMessageQueue().setEncoder( new AZMessageEncoder() );
      
        sendAZHandshake();
      }
    }

    handshake.destroy();
    
    
    /*
    for( int i=0; i < reserved.length; i++ ) {
      int val = reserved[i] & 0xFF;
      if( val != 0 ) {
        System.out.println( "Peer "+ip+" ["+client+"] sent reserved byte #"+i+" to " +val);
      }
    }
    */

    if( !az_messaging_mode ) {  //otherwise we'll do this after receiving az handshake
     
      connection.getIncomingMessageQueue().resumeQueueProcessing();  //HACK: because BT decoder is auto-paused after initial handshake, so it doesn't accidentally decode the next AZ message
             
      changePeerState( PEPeer.TRANSFERING );
      
      connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;
      
      sendBitField(); 
	  addAvailability();
    }
    
  }
  
  
  
  protected void decodeAZHandshake( AZHandshake handshake ) {
    client = handshake.getClient()+ " " +handshake.getClientVersion();

    if( handshake.getTCPListenPort() > 0 ) {  //use the ports given in handshake
      tcp_listen_port = handshake.getTCPListenPort();
      udp_listen_port = handshake.getUDPListenPort();
      udp_non_data_port = handshake.getUDPNonDataListenPort();
      final byte type = handshake.getHandshakeType() == AZHandshake.HANDSHAKE_TYPE_CRYPTO ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
      
      //remake the id using the peer's remote listen port instead of their random local port
      peer_item_identity = PeerItemFactory.createPeerItem( ip, tcp_listen_port, PeerItem.convertSourceID( peer_source ), type, udp_listen_port, crypto_level, 0 );
    }

    //find mutually available message types
    final ArrayList messages = new ArrayList();

    for( int i=0; i < handshake.getMessageIDs().length; i++ ) {
      Message msg = MessageManager.getSingleton().lookupMessage( handshake.getMessageIDs()[i] );
      
      if( msg != null ) {  //mutual support!
        messages.add( msg );
      }
    }
    
    supported_messages = (Message[])messages.toArray( new Message[messages.size()] );
     
    changePeerState( PEPeer.TRANSFERING );
    
    connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;

    sendBitField();
    
    handshake.destroy();

    addAvailability();
  }
  
  
  

  
  
  
  protected void decodeBitfield( BTBitfield bitfield )
  {
	  final DirectByteBuffer field =bitfield.getBitfield();
	  
	  final byte[] dataf =new byte[(nbPieces +7) /8];
	  
	  if( field.remaining( DirectByteBuffer.SS_PEER ) < dataf.length ) {
		  final String error = toString() + " has sent invalid Bitfield: too short [" +field.remaining( DirectByteBuffer.SS_PEER )+ "<" +dataf.length+ "]";
		  Debug.out( error );
		  if (Logger.isEnabled())
			  Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR, error ));
		  bitfield.destroy();
		  return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -