⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeertransportprotocol.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
  
  
  protected void decodeBTHandshake( BTHandshake handshake ) {
    PeerIdentityDataID  my_peer_data_id = manager.getPeerIdentityDataID();
      
    if( !Arrays.equals( manager.getHash(), handshake.getDataHash() ) ) {
      closeConnectionInternally( "handshake has wrong infohash" );
      handshake.destroy();
      return;
    }
    
    peer_id = handshake.getPeerId();

    //decode a client identification string from the given peerID
    client = PeerClassifier.getClientDescription( peer_id );

    //make sure the client type is not banned
    if( !PeerClassifier.isClientTypeAllowed( client ) ) {
      closeConnectionInternally( client+ " client type not allowed to connect, banned" );
      handshake.destroy();
      return;
    }

    //make sure we are not connected to ourselves
    if( Arrays.equals( manager.getPeerId(), peer_id ) ) {
      manager.peerVerifiedAsSelf( this );  //make sure we dont do it again
      closeConnectionInternally( "given peer id matches myself" );
      handshake.destroy();
      return;
    }

    //make sure we are not already connected to this peer
    boolean sameIdentity = PeerIdentityManager.containsIdentity( my_peer_data_id, peer_id );
    boolean sameIP = false;
      
      
    //allow loopback connects for co-located proxy-based connections and testing
    boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || ip.equals( "127.0.0.1" );
    if( !same_allowed ){  
      if( PeerIdentityManager.containsIPAddress( my_peer_data_id, ip )) {
        sameIP = true;
      }
    }
      
    if( sameIdentity ) {
    	boolean close = true;
    	
    	if( connection.isLANLocal() ) {   //this new connection is lan-local    		
    		PEPeerTransport existing = manager.getTransportFromIdentity( peer_id );
    		if( existing != null && !existing.isLANLocal() ) {  //so drop the existing connection if it is an external (non lan-local) one
    			Debug.out( "dropping existing non-lanlocal peer connection [" +existing+ "]" );
    			manager.removePeer( existing );
    			close = false;    			
    		}
    	}
    	
      if( close ) {
      closeConnectionInternally( "peer matches already-connected peer id" );
      handshake.destroy();
      return;
    }
    }
    
    if( sameIP ) {
      closeConnectionInternally( "peer matches already-connected IP address, duplicate connections not allowed" );
      handshake.destroy();
      return;
    }

    //make sure we haven't reached our connection limit
    int maxAllowed = manager.getMaxNewConnectionsAllowed();
    if( maxAllowed == 0 ) {
    	String msg = "too many existing peer connections [p" +
    								PeerIdentityManager.getIdentityCount( my_peer_data_id )+
    								"/g" +PeerIdentityManager.getTotalIdentityCount()+
    								", pmx" +PeerUtils.MAX_CONNECTIONS_PER_TORRENT+ "/gmx" +
    								PeerUtils.MAX_CONNECTIONS_TOTAL+"/dmx" + manager.getMaxConnections()+ "]";
    	//System.out.println( msg );
      closeConnectionInternally( msg );
      handshake.destroy();
      return;
    }

    try{
        closing_mon.enter();
      
        if( closing ){
        	
        	String msg = "connection already closing";
        	
        	closeConnectionInternally( msg );
        	
        	handshake.destroy();
        	
        	return;
        }
        
        if ( !PeerIdentityManager.addIdentity( my_peer_data_id, peer_id, ip )){
        	
            closeConnectionInternally( "peer matches already-connected peer id" );
            
            handshake.destroy();
            
            return;
        }
        
        identityAdded = true;
        
    }finally{
    	
    	closing_mon.exit();
    }
 
    if (Logger.isEnabled())
			Logger.log(new LogEvent(this, LOGID, "In: has sent their handshake"));

    /*
     * Waiting until we've received the initiating-end's full handshake, before sending back our own,
     * really should be the "proper" behavior.  However, classic BT trackers running NAT checking will
     * only send the first 48 bytes (up to infohash) of the peer handshake, skipping peerid, which means
     * we'll never get their complete handshake, and thus never reply, which causes the NAT check to fail.
     * So, we need to send our handshake earlier, after we've verified the infohash.
     * 
      if( incoming ) {  //wait until we've received their handshake before sending ours
        sendBTHandshake();
      }
    */
    
    
    //extended protocol processing
    if( (handshake.getReserved()[0] & 128) == 128 ) {  //if first (high) bit is set
      if( !manager.isAZMessagingEnabled() ) {
      	if (Logger.isEnabled())
					Logger.log(new LogEvent(this, LOGID,
							"Ignoring peer's extended AZ messaging support,"
									+ " as disabled for this download."));
      }
      else if( client.indexOf( "Plus!" ) != -1 ) {
      	if (Logger.isEnabled())
					Logger.log(new LogEvent(this, LOGID, "Handshake mistakingly indicates"
							+ " extended AZ messaging support...ignoring."));
      }
      else {
      	if (Logger.isEnabled() && client.indexOf("Azureus") == -1) {
					Logger.log(new LogEvent(this, LOGID, "Handshake claims extended AZ "
							+ "messaging support....enabling AZ mode."));
				}
        
        az_messaging_mode = true;
        connection.getIncomingMessageQueue().setDecoder( new AZMessageDecoder() );
        connection.getOutgoingMessageQueue().setEncoder( new AZMessageEncoder() );
      
        sendAZHandshake();
      }
    }

    handshake.destroy();
    
    
    /*
    for( int i=0; i < reserved.length; i++ ) {
      int val = reserved[i] & 0xFF;
      if( val != 0 ) {
        System.out.println( "Peer "+ip+" ["+client+"] sent reserved byte #"+i+" to " +val);
      }
    }
    */

    if( !az_messaging_mode ) {  //otherwise we'll do this after receiving az handshake
     
      connection.getIncomingMessageQueue().resumeQueueProcessing();  //HACK: because BT decoder is auto-paused after initial handshake, so it doesn't accidentally decode the next AZ message
             
      changePeerState( PEPeer.TRANSFERING );
      
      connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;
      
      sendBitField(); 
	  addAvailability();
    }
    
  }
  
  
  
  protected void decodeAZHandshake( AZHandshake handshake ) {
    client = handshake.getClient()+ " " +handshake.getClientVersion();

    if( handshake.getTCPListenPort() > 0 ) {  //use the ports given in handshake
      tcp_listen_port = handshake.getTCPListenPort();
      udp_listen_port = handshake.getUDPListenPort();
      int type = handshake.getHandshakeType() == AZHandshake.HANDSHAKE_TYPE_CRYPTO ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
      
      //remake the id using the peer's remote listen port instead of their random local port
      peer_item_identity = PeerItemFactory.createPeerItem( ip, tcp_listen_port, PeerItem.convertSourceID( peer_source ), type );
    }

    //find mutually available message types
    ArrayList messages = new ArrayList();

    for( int i=0; i < handshake.getMessageIDs().length; i++ ) {
      Message msg = MessageManager.getSingleton().lookupMessage( handshake.getMessageIDs()[i] );
      
      if( msg != null ) {  //mutual support!
        messages.add( msg );
      }
    }
    
    supported_messages = (Message[])messages.toArray( new Message[messages.size()] );
     
    changePeerState( PEPeer.TRANSFERING );
    
    connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;

    sendBitField();
    
    handshake.destroy();

    addAvailability();
  }
  
  
  

  
  
  
  protected void decodeBitfield( BTBitfield bitfield )
  {
	  DirectByteBuffer field =bitfield.getBitfield();
	  
	  byte[] dataf =new byte[(nbPieces +7) /8];
	  
	  if( field.remaining( DirectByteBuffer.SS_PEER ) < dataf.length ) {
		  String error = toString() + " has sent invalid Bitfield: too short [" +field.remaining( DirectByteBuffer.SS_PEER )+ "<" +dataf.length+ "]";
		  Debug.out( error );
		  if (Logger.isEnabled())
			  Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR, error ));
		  bitfield.destroy();
		  return;
	  }
	  
	  field.get(DirectByteBuffer.SS_PEER, dataf);
	  
	  try{
		  closing_mon.enter();
		  if (closing)
			  bitfield.destroy();
		  else
		  {
			  final BitFlags tempHavePieces;
			  if (peerHavePieces ==null)
			  {
				  tempHavePieces =new BitFlags(nbPieces);
			  } else
			  {
				  tempHavePieces =peerHavePieces;
				  removeAvailability();
			  }
			  for (int i =0; i <nbPieces; i++)
			  {
				  final int index =i /8;
				  final int bit =7 -(i %8);
				  final byte bData =dataf[index];
				  final byte b =(byte) (bData >>bit);
				  if ((b &0x01) ==1)
				  {
					  tempHavePieces.set(i);
					  manager.updateSuperSeedPiece(this, i);
				  }
			  }
			  bitfield.destroy();
			  
			  peerHavePieces =tempHavePieces;
			  addAvailability();

			  checkSeed();
			  checkInterested();
		  }
	  }
	  finally{
		  closing_mon.exit();
	  }
  }
  
  
  
  protected void decodeChoke( BTChoke choke ) {    
	    choke.destroy();
    choked_by_other_peer = true;
    cancelRequests();
  }
  
  
  protected void decodeUnchoke( BTUnchoke unchoke ) {
	    unchoke.destroy();
    choked_by_other_peer = false;
  }
  
  
  protected void decodeInterested( BTInterested interested ) {
	    interested.destroy();                                                   
	  // Don't allow known seeds to be interested in us
	  if (!seed)
		  other_peer_interested_in_me =true;
  }
  
  
  protected void decodeUninterested( BTUninterested uninterested ) {
	    uninterested.destroy();
    other_peer_interested_in_me = false;

    //force send any pending haves in case one of them would make the other peer interested again
    if( outgoing_have_message_aggregator != null ) {
      outgoing_have_message_aggregator.forceSendOfPending();
    }

  }
  
  
  
  
  protected void decodeHave( BTHave have ) {
    int piece_number = have.getPieceNumber();
	have.destroy();
	
    if ((piece_number >=nbPieces) || (piece_number < 0)) {
      closeConnectionInternally( "invalid piece_number: " + piece_number );
      return;
    }

    try{  closing_mon.enter();
    
    	if( closing )  return;
    
		if (peerHavePieces ==null)
			peerHavePieces =new BitFlags(nbPieces);
    
		if ( peerHavePieces.flags[piece_number]){
			
				// BitComet 0.6 (for example) sometimes sends haves for bits already marked.
			
			// Debug.out( "Received have but bit already set: " + this );
			
		}else{
	    	peerHavePieces.set(piece_number);
	    	int pieceLength = manager.getPieceLength(piece_number);
	    	peer_stats.hasNewPiece(pieceLength);
	    	manager.havePiece(piece_number, pieceLength, this);
		}
		
    	checkSeed();

    	if (!interested_in_other_peer) {
    		checkInterested(piece_number);
    	}
    }
    finally{ closing_mon.exit();  }    
  }
  
  
  
  protected void decodeRequest( BTRequest request ) {
    int number = request.getPieceNumber();
    int offset = request.getPieceOffset();
    int length = request.getLength();
    request.destroy();  
    
    if( !manager.checkBlock( number, offset, length ) ) {
      closeConnectionInternally( "request for piece #" + number + ":" + offset + "->" + (offset + length -1) + " is an invalid request" );
      return;
    }
      
    if( !choking_other_peer ) {
      outgoing_piece_message_handler.addPieceRequest( number, offset, length );
    }
    else {
    	if (Logger.isEnabled())
				Logger.log(new LogEvent(this, LOGID, "Protocol:In: peer request for piece #"
						+ number + ":" + offset + "->" + (offset + length -1)
						+ " ignored as peer is currently choked."));
    }

  }
  
  
  
  protected void decodePiece( BTPiece piece ) {
	  final int pieceNumber = piece.getPieceNumber();
	  final int offset = piece.getPieceOffset();
	  final DirectByteBuffer payload = piece.getPieceData();
	  final int length = payload.remaining( DirectByteBuffer.SS_PEER );
    
    /*
    if ( AEDiagnostics.CHECK_DUMMY_FILE_DATA ){
      int pos = payload.position( DirectByteBuffer.SS_PEER );
      long  off = ((long)number) * getControl().getPieceLength(0) + offset;
      for (int i=0;i<length;i++){
        byte  v = payload.get( DirectByteBuffer.SS_PEER );
        if ((byte)off != v ){      
          System.out.println( "piece: read is bad at " + off + ": expected = " + (byte)off + ", actual = " + v );
          break;
        }
        off++;           
      }
      payload.position( DirectByteBuffer.SS_PEER, pos );
    }
    */
    
	  final String error_msg = "Peer has sent piece #" + pieceNumber + ":" + offset + "->"	+ (offset + length -1) + ", ";
    
    if( !manager.checkBlock( pieceNumber, offset, payload ) ) {
      peer_stats.bytesDiscarded( length );
      manager.discarded( length );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -