⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeercontrolimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
	}
	
	public void
	processTrackerResponse(
		TRTrackerAnnouncerResponse	response )
	{
		// only process new peers if we're still running
		if ( is_running ){
			analyseTrackerResponse( response );
		}
	}
	
	private void
	addExtendedPeersFromTracker(
		Map		extensions )
	{
		final Map	protocols = (Map)extensions.get("protocols");
		
		if ( protocols != null ){
			
			System.out.println( "PEPeerControl: tracker response contained protocol extensions");
			
			final Iterator protocol_it = protocols.keySet().iterator();
			
			while( protocol_it.hasNext()){
				
				final String	protocol_name = (String)protocol_it.next();
				
				final Map	protocol = (Map)protocols.get(protocol_name);
				
				final List	transports = PEPeerTransportFactory.createExtendedTransports( this, protocol_name, protocol );
				
				for (int i=0;i<transports.size();i++){
					
					final PEPeer	transport = (PEPeer)transports.get(i);
					
					addPeer( transport );
				}
			}
		}
	}
	
	public List
	getPeers()
	{
		return( peer_transports_cow );
	}
	
	public List
	getPeers(
		String	address )
	{		
		List	result = new ArrayList();
	
		Iterator	it = peer_transports_cow.iterator();
			
		while( it.hasNext()){
			
			PEPeerTransport	peer = (PEPeerTransport)it.next();
			
			if ( peer.getIp().equals( address )){
				
				result.add( peer );
			}
		}
		
		return( result );
	}
	
	public PeerDescriptor[]
	getPendingPeers(
		String	address )
	{
		return((PeerDescriptor[])peer_database.getDiscoveredPeers());
	}
	
	public void
	addPeer(
		PEPeer		_transport )
	{
		if ( !( _transport instanceof PEPeerTransport )){
			
			throw( new RuntimeException("invalid class"));
		}
		
		final PEPeerTransport	transport = (PEPeerTransport)_transport;
		
	    if (!ip_filter.isInRange(transport.getIp(), adapter.getDisplayName())) {

			final ArrayList peer_transports = peer_transports_cow;
		
	    	if ( !peer_transports.contains(transport)){
			
	    		addToPeerTransports( transport );
			
	    	}else{
	    		Debug.out( "addPeer():: peer_transports.contains(transport): SHOULD NEVER HAPPEN !" );
	    		transport.closeConnection( "already connected" );
		}
	    }else{
	    	
	        transport.closeConnection( "IP address blocked by filters" );
	}
	}
	
	
	public void
	removePeer(
		PEPeer	_transport )
	{
		removePeer( _transport, "remove peer" );
	}
	
	public void
	removePeer(
		PEPeer	_transport,
		String	reason )
	{
		if ( !( _transport instanceof PEPeerTransport )){
			
			throw( new RuntimeException("invalid class"));
		}
		
		PEPeerTransport	transport = (PEPeerTransport)_transport;
		
		closeAndRemovePeer( transport, reason, true );
	}

  private void closeAndRemovePeer( PEPeerTransport peer, String reason, boolean log_if_not_found ) {
    boolean removed = false;
    
	// copy-on-write semantics
    try{
      peer_transports_mon.enter();
          
        if ( peer_transports_cow.contains( peer )){

          final ArrayList new_peer_transports = new ArrayList( peer_transports_cow );
          
          new_peer_transports.remove( peer );
           
          peer_transports_cow = new_peer_transports;
          
          removed = true;
        }
    }
    finally{ 
      peer_transports_mon.exit();
    }
    
    if( removed ) {
    	peer.closeConnection( reason );
      peerRemoved( peer );  //notify listeners      
    }
    else {
    	if ( log_if_not_found ){
    		// we know this happens due to timing issues... Debug.out( "closeAndRemovePeer(): peer not removed" );
    	}
    }
  }
  
  
  
	private void closeAndRemoveAllPeers( String reason, boolean reconnect ) {
		ArrayList peer_transports;
		
		try{
			peer_transports_mon.enter();
			
			peer_transports = peer_transports_cow;
			
			peer_transports_cow = new ArrayList( 0 );  
		}
		finally{
			peer_transports_mon.exit();
		}
		
    for( int i=0; i < peer_transports.size(); i++ ) {
	    final PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
	      
	    try{
	    	peer.closeConnection( reason );
	     
	    }catch( Throwable e ){
	    		
    		// if something goes wrong with the close process (there's a bug in there somewhere whereby
    		// we occasionally get NPEs then we want to make sure we carry on and close the rest
    		
	    	Debug.printStackTrace(e);
	    }
	    
	    try{
	    	peerRemoved( peer );  //notify listeners
	    	
	    }catch( Throwable e ){
    		   		
	    	Debug.printStackTrace(e);
	    }
    }
    
    if( reconnect ) {
      for( int i=0; i < peer_transports.size(); i++ ) {
    	  
        final PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
        
        PEPeerTransport	reconnected_peer = peer.reconnect();
 
        if ( reconnected_peer != null ){
        	
			addToPeerTransports( reconnected_peer );
        }
      }
    }
  }
	
	
	
	
	public void 
	addPeer( 
		String 	ip_address, 
		int		tcp_port, 
		int		udp_port,
		boolean use_crypto ) 
	{
		final byte type = use_crypto ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
		final PeerItem peer_item = PeerItemFactory.createPeerItem( ip_address, tcp_port, PeerItem.convertSourceID( PEPeerSource.PS_PLUGIN ), type, udp_port, PeerItemFactory.CRYPTO_LEVEL_1, 0 );
		
		byte	crypto_level = PeerItemFactory.CRYPTO_LEVEL_1;
		
		if( !isAlreadyConnected( peer_item ) ) {
			
			String fail_reason;
			
			if ( TCPNetworkManager.TCP_OUTGOING_ENABLED && tcp_port > 0){

				fail_reason = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, tcp_port, udp_port, true, use_crypto, crypto_level );  //directly inject the the imported peer
				
			}else if ( UDPNetworkManager.UDP_OUTGOING_ENABLED && udp_port > 0 ){
				
				fail_reason = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, tcp_port, udp_port, false, use_crypto, crypto_level );  //directly inject the the imported peer

			}else{
			
				fail_reason = "No usable protocol";
			}
		
			if( fail_reason != null )  Debug.out( "injected peer was not added - " + fail_reason );
		}
	}
	
	
	
	private void 
	addPeersFromTracker(
		TRTrackerAnnouncerResponsePeer[]		peers )
	{
		
		for (int i = 0; i < peers.length; i++){
			final TRTrackerAnnouncerResponsePeer	peer = peers[i];
			
			final ArrayList peer_transports = peer_transports_cow;
			
			boolean already_connected = false;
			
      		for( int x=0; x < peer_transports.size(); x++ ) {
				final PEPeerTransport transport = (PEPeerTransport)peer_transports.get( x );
				
				// allow loopback connects for co-located proxy-based connections and testing
				
				if( peer.getAddress().equals( transport.getIp() )){
					
					final boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) ||
					transport.getIp().equals( "127.0.0.1" );
					
					if( !same_allowed || peer.getPort() == transport.getPort() ) {
						already_connected = true;
						break;
					}
				}
			}
			
			if( already_connected )  continue;
			
			if( peer_database != null ) {				
				byte type = peer.getProtocol() == DownloadAnnounceResultPeer.PROTOCOL_CRYPT ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
				
				byte crypto_level = peer.getAZVersion() < TRTrackerAnnouncer.AZ_TRACKER_VERSION_3?PeerItemFactory.CRYPTO_LEVEL_1:PeerItemFactory.CRYPTO_LEVEL_2;
				
				PeerItem item = PeerItemFactory.createPeerItem( 
									peer.getAddress(), 
									peer.getPort(), 
									PeerItem.convertSourceID( peer.getSource() ), 
									type, 
									peer.getUDPPort(), 
									crypto_level,
									peer.getUploadSpeed());
				
				peer_database.addDiscoveredPeer( item );
			}
			
			int	http_port = peer.getHTTPPort();
			
			if ( http_port != 0 ){
				
				adapter.addHTTPSeed( peer.getAddress(), http_port );
			}
		}
	}
	
	
	/**
	 * Request a new outgoing peer connection.
	 * @param address ip of remote peer
	 * @param port remote peer listen port
	 * @return null if the connection was added to the transport list, reason if rejected
	 */
	private String 
	makeNewOutgoingConnection( 
		String		peer_source,
		String 		address, 
		int 		tcp_port,
		int			udp_port,
		boolean		use_tcp,
		boolean 	require_crypto,
		byte		crypto_level ) 
	{    
		//make sure this connection isn't filtered
   
		if( ip_filter.isInRange( address, adapter.getDisplayName() ) ) {
			return "IPFilter block";
		}
		
		//make sure we need a new connection
		final int needed = getMaxNewConnectionsAllowed();
		
		if( needed == 0 ){
			
			if ( 	peer_source != PEPeerSource.PS_PLUGIN ||
					!doOptimisticDisconnect( AddressUtils.isLANLocalAddress( address ) != AddressUtils.LAN_LOCAL_NO)){
			
				return "Too many connections";
			}
		}
		
		//make sure not already connected to the same IP address; allow loopback connects for co-located proxy-based connections and testing
		final boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || address.equals( "127.0.0.1" );
		if( !same_allowed && PeerIdentityManager.containsIPAddress( _hash, address ) ){  
			return "Already connected to IP";
		}
		
		if( PeerUtils.ignorePeerPort( tcp_port ) ) {
			if (Logger.isEnabled())
				Logger.log(new LogEvent(disk_mgr.getTorrent(), LOGID,
						"Skipping connect with " + address + ":" + tcp_port
								+ " as peer port is in ignore list."));
			return "TCP port in ignore list";
		}
		
		//start the connection
		PEPeerTransport real = PEPeerTransportFactory.createTransport( this, peer_source, address, tcp_port, udp_port, use_tcp, require_crypto, crypto_level );
		
		addToPeerTransports( real );
		return null;
	}
	
	
	/**
	 * A private method that checks if PEPieces being downloaded are finished
	 * If all blocks from a PEPiece are written to disk, this method will
     * queue the piece for hash check.
	 * Elsewhere, if it passes sha-1 check, it will be marked as downloaded,
	 * otherwise, it will unmark it as fully downloaded, so blocks can be retreived again.
	 */
	private void checkCompletedPieces() {
		if ((mainloop_loop_count %MAINLOOP_ONE_SECOND_INTERVAL) !=0)
			return;
		
		//for every piece
		for (int i = 0; i <_nbPieces; i++) {
			final DiskManagerPiece dmPiece =dm_pieces[i];
			//if piece is completly written, not already checking, and not Done
			if (dmPiece.isNeedsCheck())
			{
				//check the piece from the disk
				dmPiece.setChecking();
				
				DiskManagerCheckRequest req = 
					disk_mgr.createCheckRequest(
						i, new Integer(CHECK_REASON_DOWNLOADED));
						
				req.setAdHoc( false );
				
				disk_mgr.enqueueCheckRequest( req, this );
			}
		}
	}

	/** Checks given piece to see if it's active but empty, and if so deactivates it.
	 * @param pieceNumber to check
	 * @return true if the piece was removed and is no longer active (pePiece ==null)
	 */ 
	private boolean checkEmptyPiece(final int pieceNumber)
	{
        if (piecePicker.isInEndGameMode())
            return false;   // be sure to not remove pieces in EGM
		final PEPiece pePiece =pePieces[pieceNumber];
		final DiskManagerPiece dmPiece =dm_pieces[pieceNumber];
		if (pePiece == null || pePiece.isRequested())
			return false;
		if (dmPiece.getNbWritten() >0 ||pePiece.getNbRequests() >0 ||pePiece.getSpeed() >0 ||pePiece.getReservedBy() !=null)
			return false;
		removePiece(pePiece, pieceNumber);
		return true;
	}
	
	/**
	 * Check if a piece's Speed is too fast for it to be getting new data
	 * and if a reserved pieced failed to get data within 120 seconds
	 */
	private void checkSpeedAndReserved()
	{
		if ( (mainloop_loop_count %MAINLOOP_FIVE_SECOND_INTERVAL) != 0 ){
			return;
		}
		
		final int nbPieces =_nbPieces;
		final PEPieceImpl[] pieces =pePieces;
		//for every piece
		for (int i =0; i <nbPieces; i++)
		{
			final PEPieceImpl pePiece =pieces[i];
			// these checks are only against pieces being downloaded yet needing requests still/again
			if (pePiece !=null)
			{
                final long timeSinceActivity =pePiece.getTimeSinceLastActivity();
				if (timeSinceActivity >4 *1000)
				{
                    final int oldSpeed =pePiece.getSpeed();
					// maybe piece's speed is too high for it to get new data
					if (oldSpeed >0)
                    {
                        final DiskManagerPiece dmPiece =dm_pieces[i];
                        if (pePiece.isRequested() ||timeSinceActivity >29 *1000)
                            pePiece.setSpeed(0);
                        else
                        {
                            final long calcSpeed =((dmPiece.getNbWritten() *DiskManager.BLOCK_SIZE) /timeSinceActivity) -1;
                            if (calcSpeed <oldSpeed)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -