⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeercontrolimpl.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
          new_peer_transports.remove( peer );
           
          peer_transports_cow = new_peer_transports;
          
          removed = true;
        }
    }
    finally{ 
      peer_transports_mon.exit();
    }
    
    if( removed ) {
    	peer.closeConnection( reason );
      peerRemoved( peer );  //notify listeners      
    }
    else {
    	if ( log_if_not_found ){
    		Debug.out( "closeAndRemovePeer(): peer not removed" );
    	}
    }
  }
  
  
  
	private void closeAndRemoveAllPeers( String reason, boolean reconnect ) {
		ArrayList peer_transports;
		
		try{
			peer_transports_mon.enter();
			
			peer_transports = peer_transports_cow;
			
			peer_transports_cow = new ArrayList( 0 );  
		}
		finally{
			peer_transports_mon.exit();
		}
		
    for( int i=0; i < peer_transports.size(); i++ ) {
	    PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
	      
	    try{
	    	peer.closeConnection( reason );
	     
	    }catch( Throwable e ){
	    		
    		// if something goes wrong with the close process (there's a bug in there somewhere whereby
    		// we occasionally get NPEs then we want to make sure we carry on and close the rest
    		
	    	Debug.printStackTrace(e);
	    }
	    
	    try{
	    	peerRemoved( peer );  //notify listeners
	    	
	    }catch( Throwable e ){
    		   		
	    	Debug.printStackTrace(e);
	    }
    }
    
    if( reconnect ) {
      for( int i=0; i < peer_transports.size(); i++ ) {
        PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
        
        if( peer.getTCPListenPort() > 0 ) {
        	boolean use_crypto = peer.getPeerItemIdentity().getHandshakeType() == PeerItemFactory.HANDSHAKE_TYPE_CRYPTO;
          PEPeerTransport new_conn = PEPeerTransportFactory.createTransport( this, peer.getPeerSource(), peer.getIp(), peer.getTCPListenPort(), use_crypto );
          addToPeerTransports( new_conn );
        }
      }
    }
  }
	
	
	
	
	public void addPeer( String ip_address, int port ) {	//TODO do plugins need a way to force crypto???	
		PeerItem peer_item = PeerItemFactory.createPeerItem( ip_address, port, PeerItem.convertSourceID( PEPeerSource.PS_PLUGIN ), PeerItemFactory.HANDSHAKE_TYPE_PLAIN );
		
		if( !isAlreadyConnected( peer_item ) ) {
			boolean use_crypto = peer_item.getHandshakeType() == PeerItemFactory.HANDSHAKE_TYPE_CRYPTO;
			boolean added = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, port, use_crypto );  //directly inject the the imported peer
			if( !added )  Debug.out( "injected peer was not added" );
	}
	}
	
	
	
	private void 
	addPeersFromTracker(
		TRTrackerAnnouncerResponsePeer[]		peers )
	{
		
		for (int i = 0; i < peers.length; i++){
			TRTrackerAnnouncerResponsePeer	peer = peers[i];
			
			ArrayList peer_transports = peer_transports_cow;
			
			boolean already_connected = false;
			
      		for( int x=0; x < peer_transports.size(); x++ ) {
				PEPeerTransport transport = (PEPeerTransport)peer_transports.get( x );
				
				// allow loopback connects for co-located proxy-based connections and testing
				
				if( peer.getAddress().equals( transport.getIp() )){
					
					boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) ||
					transport.getIp().equals( "127.0.0.1" );
					
					if( !same_allowed || peer.getPort() == transport.getPort() ) {
						already_connected = true;
						break;
					}
				}
			}
			
			if( already_connected )  continue;
			
			if( peer_database != null ) {				
				int type = peer.getProtocol() == TRTrackerAnnouncerResponsePeer.PROTOCOL_CRYPT ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
				PeerItem item = PeerItemFactory.createPeerItem( peer.getAddress(), peer.getPort(), PeerItem.convertSourceID( peer.getSource() ), type );
				peer_database.addDiscoveredPeer( item );
			}
		}
	}
	
	
	/**
	 * Request a new outgoing peer connection.
	 * @param address ip of remote peer
	 * @param port remote peer listen port
	 * @return true if the connection was added to the transport list, false if rejected
	 */
	private boolean 
	makeNewOutgoingConnection( 
		String	peer_source,
		String 	address, 
		int port,
		boolean require_crypto ) 
	{    
		//make sure this connection isn't filtered
    if( ip_filter.isInRange( address, adapter.getDisplayName() ) ) {
			return false;
		}
		
		//make sure we need a new connection
		int needed = getMaxNewConnectionsAllowed();
		if( needed == 0 )  return false;
		
		//make sure not already connected to the same IP address; allow loopback connects for co-located proxy-based connections and testing
		boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || address.equals( "127.0.0.1" );
		if( !same_allowed && PeerIdentityManager.containsIPAddress( _hash, address ) ){  
			return false;
		}
		
		if( PeerUtils.ignorePeerPort( port ) ) {
    	if (Logger.isEnabled())
				Logger.log(new LogEvent(disk_mgr.getTorrent(), LOGID,
						"Skipping connect with " + address + ":" + port
								+ " as peer port is in ignore list."));
			return false;
		}
		
		//start the connection
		PEPeerTransport real = PEPeerTransportFactory.createTransport( this, peer_source, address, port, require_crypto );
		
		addToPeerTransports( real );
		return true;
	}
	
	
	/**
	 * A private method that checks if PEPieces being downloaded are finished
	 * If all blocks from a PEPiece are written to disk, this method will
     * queue the piece for hash check.
	 * Elsewhere, if it passes sha-1 check, it will be marked as downloaded,
	 * otherwise, it will unmark it as fully downloaded, so blocks can be retreived again.
	 */
	private void checkCompletedPieces() {
		//for every piece
		for (int i = 0; i <_nbPieces; i++) {
			final DiskManagerPiece dmPiece =dm_pieces[i];
			//if piece is completly written, not already checking, and not Done
			if (dmPiece.calcWritten() &&!dmPiece.isChecking() &&!dmPiece.isDone())
			{
				//check the piece from the disk
				dmPiece.setChecking();
				
				DiskManagerCheckRequest req = 
					disk_mgr.createCheckRequest(
						i, new Integer(CHECK_REASON_DOWNLOADED));
						
				req.setAdHoc( false );
				
				disk_mgr.enqueueCheckRequest( req, this );
			}
		}
	}

	/** Checks given piece to see if it's active but empty, and if so deactivates it.
	 * @param pieceNumber to check
	 * @return true if the piece was removed and is no longer active (pePiece ==null)
	 */ 
	private boolean checkEmptyPiece(final int pieceNumber)
	{
		final PEPiece pePiece =pePieces[pieceNumber];
		final DiskManagerPiece dmPiece =dm_pieces[pieceNumber];
		if (pePiece ==null ||dmPiece.isRequested())
			return false;
		if (dmPiece.getNbWritten() >0 ||pePiece.getNbRequests() >0 ||pePiece.getSpeed() >0 ||pePiece.getReservedBy() !=null)
			return false;
		removePiece(pePiece, pieceNumber);
		return true;
	}
	
	/**
	 * Check if a piece's Speed is too fast for it to be getting new data
	 * and if a reserved pieced failed to get data within 120 seconds
	 */
	private void checkSpeedAndReserved()
	{
		if ( (mainloop_loop_count %MAINLOOP_FIVE_SECOND_INTERVAL) != 0 ){
			return;
		}
		
		final long now =SystemTime.getCurrentTime();
		final int nbPieces =_nbPieces;
		final PEPieceImpl[] pieces =pePieces;
		//for every piece
		for (int i =0; i <nbPieces; i++)
		{
			final PEPieceImpl pePiece =pieces[i];
			// these checks are only against pieces being downloaded yet needing requests still/again
			if (pePiece !=null)
			{
                final long timeSinceActivity =pePiece.getTimeSinceLastActivity();
				if (timeSinceActivity >4 *1000)
				{
                    final int oldSpeed =pePiece.getSpeed();
					// maybe piece's speed is too high for it to get new data
					if (oldSpeed >0)
                    {
                        final DiskManagerPiece dmPiece =dm_pieces[i];
                        if (dmPiece.isRequested() ||timeSinceActivity >29 *1000)
                            pePiece.setSpeed(0);
                        else
                        {
                            final long calcSpeed =((dmPiece.getNbWritten() *DiskManager.BLOCK_SIZE) /timeSinceActivity) -1;
                            if (calcSpeed <oldSpeed)
                                pePiece.setSpeed((int)(calcSpeed >0 ?calcSpeed :0));
                        }
                    } else if (timeSinceActivity >(120 *1000))
					{
						// has reserved piece gone stagnant?
						final String reservingPeer =pePiece.getReservedBy();
						if (reservingPeer !=null)
						{
							if (needsMD5CheckOnCompletion(i))
								badPeerDetected(reservingPeer);
							else
							{
								final PEPeerTransport pt =getTransportFromAddress(reservingPeer);
								if (pt !=null)
									closeAndRemovePeer(pt, "Reserved piece data timeout; 120 seconds", true);
							}
                            pePiece.setReservedBy(null);
						}
//						if (!piecePicker.isInEndGameMode())
//							pePiece.checkRequests();
						checkEmptyPiece(i);
					}
				}
			}
		}
	}

    private void checkInterested()
    {
        if ( (mainloop_loop_count %MAINLOOP_ONE_SECOND_INTERVAL) != 0 ){
            return;
        }
        
        if (lastNeededUndonePieceChange >=piecePicker.getNeededUndonePieceChange())
            return;
        
        lastNeededUndonePieceChange =piecePicker.getNeededUndonePieceChange();

        final List peer_transports =peer_transports_cow;
        for (int i =0; i <peer_transports.size(); i++)
        {
            PEPeerTransport peer =(PEPeerTransport)peer_transports.get(i);
            peer.checkInterested();
        }
    }


	/**
	 * Private method to process the results given by DiskManager's
	 * piece checking thread via asyncPieceChecked(..)
	 */
	private void 
	processPieceChecks() 
	{
		if ( piece_check_result_list.size() > 0 ){
			
			List pieces;
			
			// process complete piece results
			
			try{
				piece_check_result_list_mon.enter();
				
				pieces = new ArrayList( piece_check_result_list );
				
				piece_check_result_list.clear();
				
			}finally{
				
				piece_check_result_list_mon.exit();
			}
			
			Iterator it = pieces.iterator();
			
			while (it.hasNext()) {
				
				Object[]	data = (Object[])it.next();
				
	    		processPieceCheckResult((DiskManagerCheckRequest)data[0],((Integer)data[1]).intValue());
	    			
			}
		}
	}

	protected void
	checkRescan()
	{
		if ( rescan_piece_time == 0 ){
			
				// pending a piece completion
			
			return;
		}
		
		if ( next_rescan_piece == -1 ){
			
			if ( mainloop_loop_count % MAINLOOP_FIVE_SECOND_INTERVAL == 0 ){
			
				if ( adapter.isPeriodicRescanEnabled()){
					
					next_rescan_piece	= 0;
				}
			}
		}else{
			
			if ( mainloop_loop_count % MAINLOOP_TEN_MINUTE_INTERVAL == 0 ){

				if ( !adapter.isPeriodicRescanEnabled()){
					
					next_rescan_piece	= -1;
				}
			}
		}
		
		if ( next_rescan_piece == -1 ){
			
			return;
		}
		
			// delay as required
		
		long	now = SystemTime.getCurrentTime();
		
		if ( rescan_piece_time > now ){
			
			rescan_piece_time	= now;
		}
		
			// 250K/sec limit
		
		long	piece_size = disk_mgr.getPieceLength();
		
		long	millis_per_piece = piece_size / 250;
		
		if ( now - rescan_piece_time < millis_per_piece ){
			
			return;
		}
		
		while( next_rescan_piece != -1 ){
			
			int	this_piece = next_rescan_piece;
			
			next_rescan_piece++;
			
			if ( next_rescan_piece == _nbPieces ){
				
				next_rescan_piece	= -1;
			}
			
			if ( pePieces[this_piece] == null && !dm_pieces[this_piece].isDone()){
				
				DiskManagerCheckRequest	req = 
					disk_mgr.createCheckRequest(
						this_piece, 
						new Integer( CHECK_REASON_SCAN ));	
				
			   	if ( Logger.isEnabled()){
			   		
					Logger.log(
							new LogEvent(
								disk_mgr.getTorrent(), LOGID,
								"Rescanning piece " + this_piece ));
							
			   	}
			   	
				rescan_piece_time	= 0;	// mark as check piece in process
				
				try{
					disk_mgr.enqueueCheckRequest( req, this );
					
				}catch( Throwable e ){
					
					rescan_piece_time	= now;
					
					Debug.printStackTrace(e);
				}
				
				break;
			}
		}
	}
	
	/**
	 * This method checks if the downloading process is finished.
	 * 
	 */
	private void checkFinished(boolean start_of_day)
	{
		boolean all_pieces_done =disk_mgr.getRemaining() ==0;

		if (all_pieces_done)
		{
			seeding_mode =true;
			piecePicker.clearEndGameChunks();

			if (!start_of_day)
				adapter.setStateFinishing();

			_timeFinished =SystemTime.getCurrentTime();
			List peer_transports =peer_transports_cow;

			// remove previous snubbing
			for (int i =0; i <peer_transports.size(); i++ )
			{
				PEPeerTransport pc =(PEPeerTransport) peer_transports.get(i);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -