⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeertransportprotocol.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      requests_discarded++;
      printRequestStats();
      piece.destroy();
  	if (Logger.isEnabled())
		Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR, "Protocol:In: "
				+ error_msg + "but piece block discarded as invalid."));
      return;
    }
    
    final PEPiece pePiece =manager.getPiece(pieceNumber);
    if (pePiece !=null)
        pePiece.setDownloaded(offset);
    
    DiskManagerReadRequest request = manager.createDiskManagerRequest( pieceNumber, offset, length );
    boolean piece_error = true;

    if( hasBeenRequested( request ) ) {  //from active request
      removeRequest( request );
      reSetRequestsTime();
        
      if( manager.isWritten( pieceNumber, offset ) ) {  //oops, looks like this block has already been written
        peer_stats.bytesDiscarded( length );
        manager.discarded( length );

        if( manager.isInEndGameMode() ) {  //we're probably in end-game mode then
        	if (Logger.isEnabled())
						Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
								"Protocol:In: " + error_msg + "but piece block ignored as "
										+ "already written in end-game mode."));      
          requests_discarded_endgame++;
        }
        else {
        	if (Logger.isEnabled())
						Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
								"Protocol:In: " + error_msg + "but piece block discarded as "
										+ "already written."));
          requests_discarded++;
        }
        
        printRequestStats();
      }
      else {  //successfully received block!
          manager.writeBlock( pieceNumber, offset, payload, this, false);
          final long now =SystemTime.getCurrentTime();
          if (last_good_data_time !=-1 &&now -last_good_data_time <60 *1000)
              setSnubbed(false);
        requests_completed++;
        piece_error = false;  //dont destroy message, as we've passed the payload on to the disk manager for writing
        last_good_data_time =now;
      }
    }
    else {  //initial request may have already expired, but check if we can use the data anyway
      if( !manager.isWritten( pieceNumber, offset ) ) {
        boolean ever_requested;
        
        try{  recent_outgoing_requests_mon.enter();
          ever_requested = recent_outgoing_requests.containsKey( request );
        }
        finally{  recent_outgoing_requests_mon.exit();  }
        
        if( ever_requested ) { //security-measure: we dont want to be accepting any ol' random block
            manager.writeBlock( pieceNumber, offset, payload, this, true);
            final long now =SystemTime.getCurrentTime();
            if (last_good_data_time !=-1 &&now -last_good_data_time <60 *1000)
                setSnubbed(false);
            reSetRequestsTime();
          requests_recovered++;
          printRequestStats();
          piece_error = false;  //dont destroy message, as we've passed the payload on to the disk manager for writing
          last_good_data_time =now;
      	if (Logger.isEnabled())
			Logger.log(new LogEvent(this, LogIDs.PIECES, "Protocol:In: " + error_msg
					+ "expired piece block data " + "recovered as useful."));
        }
        else {
          
          System.out.println( "[" +client+ "]" +error_msg + "but expired piece block discarded as never requested." );
          
          peer_stats.bytesDiscarded( length );
          manager.discarded( length );
          requests_discarded++;
          printRequestStats();
      	if (Logger.isEnabled())
			Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
					"Protocol:In: " + error_msg + "but expired piece block "
							+ "discarded as never requested."));
        }
      }
      else {
        peer_stats.bytesDiscarded( length );
        manager.discarded( length );
        requests_discarded++;
        printRequestStats();
      	if (Logger.isEnabled())
			Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
					"Protocol:In: " + error_msg
							+ "but expired piece block discarded "
							+ "as already written."));
      }
    }
    
    if( piece_error )
    {
        piece.destroy();
        if (!manager.isWritten(pieceNumber, offset) &&pePiece !=null)
            pePiece.clearDownloaded(offset);
    }
  }
  
  
  
  protected void decodeCancel( BTCancel cancel ) {
    int number = cancel.getPieceNumber();
    int offset = cancel.getPieceOffset();
    int length = cancel.getLength();
    cancel.destroy();
    
    outgoing_piece_message_handler.removePieceRequest( number, offset, length );
  }
  
  
  
  private void registerForMessageHandling() {
    
    //INCOMING MESSAGES
    connection.getIncomingMessageQueue().registerQueueListener( new IncomingMessageQueue.MessageQueueListener() {
      public boolean messageReceived( Message message ) {      
      	
      	if (Logger.isEnabled())
							Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
									"Received [" + message.getDescription() + "] message"));
        
        last_message_received_time = SystemTime.getCurrentTime();
        if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
          last_data_message_received_time = SystemTime.getCurrentTime();
        }
            
          if( message.getID().equals( BTMessage.ID_BT_PIECE ) ) {
              decodePiece( (BTPiece)message );
              return true;
            }
            
      	if( closing ) {
      		message.destroy();
      		return true;
      	}
      	
        if( message.getID().equals( BTMessage.ID_BT_KEEP_ALIVE ) ) {
          message.destroy();
          
          //make sure they're not spamming us
          if( !message_limiter.countIncomingMessage( message.getID(), 6, 60*1000 ) ) {  //allow max 6 keep-alives per 60sec
            System.out.println( "Incoming keep-alive message flood detected, dropping spamming peer connection." +PEPeerTransportProtocol.this );
            closeConnectionInternally( "Incoming keep-alive message flood detected, dropping spamming peer connection." );
          }

          return true;
        }

        
        if( message.getID().equals( BTMessage.ID_BT_HANDSHAKE ) ) {
          decodeBTHandshake( (BTHandshake)message );
          return true;
        }
        
        if( message.getID().equals( AZMessage.ID_AZ_HANDSHAKE ) ) {
          decodeAZHandshake( (AZHandshake)message );
          return true;
        }

        if( message.getID().equals( BTMessage.ID_BT_BITFIELD ) ) {
          decodeBitfield( (BTBitfield)message );
          return true;
        }
         
        if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) {
          decodeChoke( (BTChoke)message );
          if( choking_other_peer ) {
            connection.enableEnhancedMessageProcessing( false );  //downgrade back to normal handler
          }
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) {
          decodeUnchoke( (BTUnchoke)message );
          connection.enableEnhancedMessageProcessing( true );  //make sure we use a fast handler for the resulting download
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_INTERESTED ) ) {
          decodeInterested( (BTInterested)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_UNINTERESTED ) ) {
          decodeUninterested( (BTUninterested)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_HAVE ) ) {
          decodeHave( (BTHave)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_REQUEST ) ) {
          decodeRequest( (BTRequest)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_CANCEL ) ) {
          decodeCancel( (BTCancel)message );
          return true;
        }

        if( message.getID().equals( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
          decodeAZPeerExchange( (AZPeerExchange)message );
          return true;
        }
        
        return false;
      }
      
      public void protocolBytesReceived( int byte_count ) {
        //update stats
        peer_stats.protocolBytesReceived( byte_count );
        manager.protocolBytesReceived( byte_count );
      }
      
      public void dataBytesReceived( int byte_count ) {
        //update stats
        peer_stats.dataBytesReceived( byte_count );
        
        manager.dataBytesReceived( byte_count );
        
        	// pick up very slow peers as they may deliver an entire block so slowly that the
        	// request times out if we only use block reception to measure when data's received
        
        //last_good_data_time	= SystemTime.getCurrentTime();
        //last_good_data_time is expresely for data that's fully validated, wanted, and actually written to disk
        //last_data_message_received_time is set for all incoming data messages (even worthless data) 
      }
    });
    
    
    //OUTGOING MESSAGES
    connection.getOutgoingMessageQueue().registerQueueListener( new OutgoingMessageQueue.MessageQueueListener() {
      public boolean messageAdded( Message message ) {  return true;  }
      
      public void messageQueued( Message message ) { /* ignore */ }
      
      public void messageRemoved( Message message ) { /*ignore*/ }
        
      public void messageSent( Message message ) {
        //update keep-alive info
        last_message_sent_time = SystemTime.getCurrentTime();
        
        if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
          last_data_message_sent_time = SystemTime.getCurrentTime();
        }

        if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) { // is about to send piece data
          connection.enableEnhancedMessageProcessing( true );  //so make sure we use a fast handler
        }
        else if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) { // is done sending piece data
          if( choked_by_other_peer ) {
            connection.enableEnhancedMessageProcessing( false );  //so downgrade back to normal handler
          }
        }
        
        if (Logger.isEnabled())
							Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
									"Sent [" + message.getDescription() + "] message"));
      }
  
      public void protocolBytesSent( int byte_count ) {
        //update stats
        peer_stats.protocolBytesSent( byte_count );
        manager.protocolBytesSent( byte_count );
      }
        
      public void dataBytesSent( int byte_count ) {
        //update stats
        peer_stats.dataBytesSent( byte_count );
        manager.dataBytesSent( byte_count );
      }
    });

    
    //start message processing
    connection.startMessageProcessing( manager.getUploadLimitedRateGroup(), manager.getDownloadLimitedRateGroup() );
  }
  
  
  
  public Connection getConnection() {
    return plugin_connection;
  }
  
  
  public Message[] getSupportedMessages() {
    return supported_messages;
  }
  
  
  public boolean supportsMessaging() {
    return supported_messages != null;
  }
  
  public String
  getEncryption()
  {
	  return( connection.getTCPTransport().getEncryption());
  }
  
  
  public void 
  addListener( 
	 PEPeerListener listener ) 
  {
	  try{
		peer_listeners_mon.enter();
	  
	    if( peer_listeners_cow == null ){
	    	
	    	peer_listeners_cow = new ArrayList();
	    }
	    
	    List	new_listeners = new ArrayList( peer_listeners_cow );
	    
	    new_listeners.add( listener );
	    
	    peer_listeners_cow	= new_listeners;
	    
	  }finally{
		  
		  peer_listeners_mon.exit();
	  }
  }
  
  public void 
  removeListener( 
		  PEPeerListener listener ) 
  {
	  try{
		  peer_listeners_mon.enter();

		  if ( peer_listeners_cow != null ){
			  
			   List	new_listeners = new ArrayList( peer_listeners_cow );
			    
			   new_listeners.remove( listener );
			   
			   if ( new_listeners.isEmpty()){
				   
				   new_listeners	= null;
			   }
			   
			   peer_listeners_cow	= new_listeners;
		  }
	  }finally{
		  
		  peer_listeners_mon.exit();
	  }
  }
  
  
  private void changePeerState( int new_state ) {
    current_peer_state = new_state;
    
    if( current_peer_state == PEPeer.TRANSFERING ) {   //YUCK!
      doPostHandshakeProcessing();
    }

    List	peer_listeners_ref = peer_listeners_cow;
    
    if ( peer_listeners_ref != null ){
    	
      for( int i=0; i < peer_listeners_ref.size(); i++ ) {
    	  
        PEPeerListener l = (PEPeerListener)peer_listeners_ref.get( i );
      
        l.stateChanged(this, current_peer_state);
      }
    }
  }
  
  
  
  
  private void doPostHandshakeProcessing() {
    //peer exchange registration
    if( manager.isPeerExchangeEnabled()) {
      //try and register all connections for their peer exchange info
      peer_exchange_item = manager.createPeerExchangeConnection( this );
    
      if( peer_exchange_item != null ) {
        //check for peer exchange support
        if( peerSupportsMessageType( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
          peer_exchange_supported = true;
        }
        else {  //no need to maintain internal states as we wont be sending/receiving peer exchange messages
          peer_exchange_item.disableStateMaintenance();
        }
      }
    }
  }
  
  
  
  private boolean peerSupportsMessageType( String message_id ) {
    if( supported_messages != null ) {
      for( int i=0; i < supported_messages.length; 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -