⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeertransportprotocol.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
	  }
	  
	  field.get(DirectByteBuffer.SS_PEER, dataf);
	  
	  try{
		  closing_mon.enter();
		  if (closing)
			  bitfield.destroy();
		  else
		  {
			  final BitFlags tempHavePieces;
			  if (peerHavePieces ==null)
			  {
				  tempHavePieces =new BitFlags(nbPieces);
			  } else
			  {
				  tempHavePieces =peerHavePieces;
				  removeAvailability();
			  }
			  for (int i =0; i <nbPieces; i++)
			  {
				  final int index =i /8;
				  final int bit =7 -(i %8);
				  final byte bData =dataf[index];
				  final byte b =(byte) (bData >>bit);
				  if ((b &0x01) ==1)
				  {
					  tempHavePieces.set(i);
					  manager.updateSuperSeedPiece(this, i);
				  }
			  }
			  bitfield.destroy();
			  
			  peerHavePieces =tempHavePieces;
			  addAvailability();

			  checkSeed();
			  checkInterested();
		  }
	  }
	  finally{
		  closing_mon.exit();
	  }
  }
  
  
  
  protected void decodeChoke( BTChoke choke ) {    
	    choke.destroy();
	    if (!choked_by_other_peer)
	    {
	    	choked_by_other_peer =true;
	    	cancelRequests();
	    	final long unchoked =SystemTime.getCurrentTime() -unchokedTime;
	    	if (unchoked >0 &&!isSnubbed())
	    		unchokedTimeTotal +=unchoked;
	    }
  }
  
  
  protected void decodeUnchoke( BTUnchoke unchoke ) {
	    unchoke.destroy();
	    if (choked_by_other_peer)
	    {
	    	choked_by_other_peer = false;
	    	if (!isSnubbed())
	    		unchokedTime =SystemTime.getCurrentTime();
	    }
  }
  
  
  protected void decodeInterested( BTInterested interested ) {
      interested.destroy();
      // Don't allow known seeds to be interested in us
      other_peer_interested_in_me =!isSeed();
  }
  
  
  protected void decodeUninterested( BTUninterested uninterested ) {
	    uninterested.destroy();
    other_peer_interested_in_me = false;

    //force send any pending haves in case one of them would make the other peer interested again
    if( outgoing_have_message_aggregator != null ) {
      outgoing_have_message_aggregator.forceSendOfPending();
    }

  }
  
  
  
  
    protected void decodeHave(BTHave have) {
        final int pieceNumber =have.getPieceNumber();
        have.destroy();

        if ((pieceNumber >=nbPieces) ||(pieceNumber <0)) {
            closeConnectionInternally("invalid pieceNumber: " +pieceNumber);
            return;
        }

        if (closing)
            return;

        if (peerHavePieces ==null)
            peerHavePieces =new BitFlags(nbPieces);

        if (!peerHavePieces.flags[pieceNumber])
        {
            if (!interested_in_other_peer &&diskManager.isInteresting(pieceNumber))
            {
                connection.getOutgoingMessageQueue().addMessage(new BTInterested(), false);
                interested_in_other_peer =true;
            }
            peerHavePieces.set(pieceNumber);

            final int pieceLength =manager.getPieceLength(pieceNumber);
            manager.havePiece(pieceNumber, pieceLength, this);

            checkSeed(); // maybe a seed using lazy bitfield, or suddenly became a seed;
            other_peer_interested_in_me &=!isSeed();	// never consider seeds interested

            peer_stats.hasNewPiece(pieceLength);
        }
    }
  
  
  protected void decodeRequest( BTRequest request ) {
    final int number = request.getPieceNumber();
    final int offset = request.getPieceOffset();
    final int length = request.getLength();
    request.destroy();  
    
    if( !manager.validateReadRequest( number, offset, length ) ) {
      closeConnectionInternally( "request for piece #" + number + ":" + offset + "->" + (offset + length -1) + " is an invalid request" );
      return;
    }
      
    if( !choking_other_peer ) {
      outgoing_piece_message_handler.addPieceRequest( number, offset, length );
    }
    else {
    	if (Logger.isEnabled())
				Logger.log(new LogEvent(this, LOGID, "decodeRequest(): peer request for piece #"
						+ number + ":" + offset + "->" + (offset + length -1)
						+ " ignored as peer is currently choked."));
    }

  }
  
  
  
  protected void decodePiece( BTPiece piece ) {
	  final int pieceNumber = piece.getPieceNumber();
	  final int offset = piece.getPieceOffset();
	  final DirectByteBuffer payload = piece.getPieceData();
	  final int length = payload.remaining( DirectByteBuffer.SS_PEER );

	  /*
    if ( AEDiagnostics.CHECK_DUMMY_FILE_DATA ){
      int pos = payload.position( DirectByteBuffer.SS_PEER );
      long  off = ((long)number) * getControl().getPieceLength(0) + offset;
      for (int i=0;i<length;i++){
        byte  v = payload.get( DirectByteBuffer.SS_PEER );
        if ((byte)off != v ){      
          System.out.println( "piece: read is bad at " + off + ": expected = " + (byte)off + ", actual = " + v );
          break;
        }
        off++;           
      }
      payload.position( DirectByteBuffer.SS_PEER, pos );
    }
	   */

	  final Object error_msg = 
		  new Object()
	  {
		  public final String
		  toString()
		  {
			  return( "decodePiece(): Peer has sent piece #" + pieceNumber + ":" + offset + "->"	+ (offset + length -1) + ", " );
		  }
	  };

	  if( !manager.validatePieceReply( pieceNumber, offset, payload ) ) {
		  peer_stats.bytesDiscarded( length );
		  manager.discarded( this, length );
		  requests_discarded++;
		  printRequestStats();
		  piece.destroy();
		  if (Logger.isEnabled())
			  Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR,
					  error_msg
					  +"but piece block discarded as invalid."));
		  return;
	  }

	  final DiskManagerReadRequest request = manager.createDiskManagerRequest( pieceNumber, offset, length );
	  boolean piece_error = true;

	  if( hasBeenRequested( request ) ) {  //from active request
		  removeRequest( request );
		  final long now =SystemTime.getCurrentTime();
		  reSetRequestsTime(now);

		  if( manager.isWritten( pieceNumber, offset ) ) {  //oops, looks like this block has already been written
			  peer_stats.bytesDiscarded( length );
			  manager.discarded( this, length );

			  if( manager.isInEndGameMode() ) {  //we're probably in end-game mode then
				  if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
					  setSnubbed(false);
				  last_good_data_time =now;
				  requests_discarded_endgame++;
				  if (Logger.isEnabled())
					  Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
							  error_msg
							  +"but piece block ignored as already written in end-game mode."));      
			  }
			  else {
				  // if they're not snubbed, then most likely this peer got a re-request after some other peer
				  // snubbed themselves, and the slow peer finially finished the piece, but before this peer did
				  // so give credit to this peer anyway for having delivered a block at this time
				  if (!isSnubbed())
					  last_good_data_time =now;
				  if (Logger.isEnabled())
					  Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
							  error_msg
							  +"but piece block discarded as already written."));
				  requests_discarded++;
			  }

			  printRequestStats();
		  }
		  else {  //successfully received block!
			  manager.writeBlock( pieceNumber, offset, payload, this, false);
			  if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
				  setSnubbed(false);
			  last_good_data_time =now;
			  requests_completed++;
			  piece_error = false;  //dont destroy message, as we've passed the payload on to the disk manager for writing
		  }
	  }
	  else {  //initial request may have already expired, but check if we can use the data anyway
		  if( !manager.isWritten( pieceNumber, offset ) ) {
			  final boolean ever_requested;

			  try{  recent_outgoing_requests_mon.enter();
			  ever_requested = recent_outgoing_requests.containsKey( request );
			  }
			  finally{  recent_outgoing_requests_mon.exit();  }

			  if( ever_requested ) { //security-measure: we dont want to be accepting any ol' random block
				  manager.writeBlock( pieceNumber, offset, payload, this, true);
				  final long now =SystemTime.getCurrentTime();
				  if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
					  setSnubbed(false);
				  reSetRequestsTime(now);
				  last_good_data_time =now;
				  requests_recovered++;
				  printRequestStats();
				  piece_error = false;  //dont destroy message, as we've passed the payload on to the disk manager for writing
				  if (Logger.isEnabled())
					  Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
							  error_msg
							  +"expired piece block data recovered as useful."));
			  }
			  else {

				  System.out.println( "[" +client+ "]" +error_msg + "but expired piece block discarded as never requested." );

				  peer_stats.bytesDiscarded( length );
				  manager.discarded( this, length );
				  requests_discarded++;
				  printRequestStats();
				  if (Logger.isEnabled())
					  Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
							  error_msg
							  +"but expired piece block discarded as never requested."));
			  }
		  }
		  else {
			  peer_stats.bytesDiscarded( length );
			  manager.discarded( this, length );
			  requests_discarded++;
			  printRequestStats();
			  if (Logger.isEnabled())
				  Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
						  error_msg
						  +"but expired piece block discarded as already written."));
		  }
	  }

	  if( piece_error )
	  {
		  piece.destroy();
	  }
  }
  
  
  
  protected void decodeCancel( BTCancel cancel ) {
    int number = cancel.getPieceNumber();
    int offset = cancel.getPieceOffset();
    int length = cancel.getLength();
    cancel.destroy();
    
    outgoing_piece_message_handler.removePieceRequest( number, offset, length );
  }
  
  
  
  private void registerForMessageHandling() {
    
    //INCOMING MESSAGES
    connection.getIncomingMessageQueue().registerQueueListener( new IncomingMessageQueue.MessageQueueListener() {
      public final boolean messageReceived( Message message ) {      
      	
      	if (Logger.isEnabled())
							Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
									"Received [" + message.getDescription() + "] message"));
        final long now =SystemTime.getCurrentTime();
        last_message_received_time =now;
        if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
          last_data_message_received_time =now;
        }
            
          if( message.getID().equals( BTMessage.ID_BT_PIECE ) ) {
              decodePiece( (BTPiece)message );
              return true;
            }
            
      	if( closing ) {
      		message.destroy();
      		return true;
      	}
      	
        if( message.getID().equals( BTMessage.ID_BT_KEEP_ALIVE ) ) {
          message.destroy();
          
          //make sure they're not spamming us
          if( !message_limiter.countIncomingMessage( message.getID(), 6, 60*1000 ) ) {  //allow max 6 keep-alives per 60sec
            System.out.println( "Incoming keep-alive message flood detected, dropping spamming peer connection." +PEPeerTransportProtocol.this );
            closeConnectionInternally( "Incoming keep-alive message flood detected, dropping spamming peer connection." );
          }

          return true;
        }

        
        if( message.getID().equals( BTMessage.ID_BT_HANDSHAKE ) ) {
          decodeBTHandshake( (BTHandshake)message );
          return true;
        }
        
        if( message.getID().equals( AZMessage.ID_AZ_HANDSHAKE ) ) {
          decodeAZHandshake( (AZHandshake)message );
          return true;
        }

        if( message.getID().equals( BTMessage.ID_BT_BITFIELD ) ) {
          decodeBitfield( (BTBitfield)message );
          return true;
        }
         
        if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) {
          decodeChoke( (BTChoke)message );
          if( choking_other_peer ) {
            connection.enableEnhancedMessageProcessing( false );  //downgrade back to normal handler
          }
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) {
          decodeUnchoke( (BTUnchoke)message );
          connection.enableEnhancedMessageProcessing( true );  //make sure we use a fast handler for the resulting download
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_INTERESTED ) ) {
          decodeInterested( (BTInterested)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_UNINTERESTED ) ) {
          decodeUninterested( (BTUninterested)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_HAVE ) ) {
          decodeHave( (BTHave)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_REQUEST ) ) {
          decodeRequest( (BTRequest)message );
          return true;
        }
        
        if( message.getID().equals( BTMessage.ID_BT_CANCEL ) ) {
          decodeCancel( (BTCancel)message );
          return true;
        }

        if( message.getID().equals( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
          de

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -