⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pepeercontrolimpl.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        }
        if (cancel)
        {   // cancel any matching outstanding download requests
            // For all connections cancel the request
            List peer_transports =peer_transports_cow;
            for (int i =0; i <peer_transports.size(); i++)
            {
                PEPeerTransport connection =(PEPeerTransport) peer_transports.get(i);
                DiskManagerReadRequest dmr =disk_mgr.createReadRequest(pieceNumber, offset, dmPiece.getBlockSize(blockNumber));
                connection.sendCancel(dmr);
            }
        }
    }
	
	
//	/**
//	 * This method is only called when a block is received after the initial request expired,
//	 * but the data has not yet been fulfilled by any other peer, so we use the block data anyway
//	 * instead of throwing it away, and cancel any outstanding requests for that block that might have
//	 * been sent after initial expiry.
//	 */
//	public void writeBlockAndCancelOutstanding(int pieceNumber, int offset, DirectByteBuffer data,PEPeer sender) {
//        final int blockNumber =offset /DiskManager.BLOCK_SIZE;
//        final DiskManagerPiece dmPiece =dm_pieces[pieceNumber];
//        if (dmPiece.isWritten(blockNumber))
//        {
//            data.returnToPool();
//            return;
//        }
//		DiskManagerWriteRequest request =disk_mgr.createWriteRequest(pieceNumber, offset, data, sender);
//		disk_mgr.enqueueWriteRequest(request, this);
//
//		// cancel any matching outstanding download requests
//		List peer_transports =peer_transports_cow;
//		for (int i =0; i <peer_transports.size(); i++)
//		{
//			PEPeerTransport connection =(PEPeerTransport) peer_transports.get(i);
//			DiskManagerReadRequest dmr =disk_mgr.createReadRequest(pieceNumber, offset, dmPiece.getBlockSize(blockNumber));
//			connection.sendCancel(dmr);
//		}
//	}
	
	
	public boolean isWritten(int piece_number, int offset)
	{
		return dm_pieces[piece_number].isWritten(offset /DiskManager.BLOCK_SIZE);
	}

	public boolean checkBlock(int pieceNumber, int offset, int length) {
		return disk_mgr.checkBlockConsistency(pieceNumber, offset, length);
	}

	public boolean checkBlock(int pieceNumber, int offset, DirectByteBuffer data) {
		return disk_mgr.checkBlockConsistency(pieceNumber, offset, data);
	}
	
	public int getAvailability(int pieceNumber)
	{
		return piecePicker.getAvailability(pieceNumber); 
	}
	
	public void havePiece(int pieceNumber, int pieceLength, PEPeer pcOrigin) {
		piecePicker.addHavePiece(pieceNumber);
		_stats.haveNewPiece(pieceLength);
		
		if(superSeedMode) {
			superSeedPieces[pieceNumber].peerHasPiece(pcOrigin);
			if(pieceNumber == pcOrigin.getUniqueAnnounce()) {
				pcOrigin.setUniqueAnnounce(-1);
				superSeedModeNumberOfAnnounces--;
			}      
		}
		int availability =piecePicker.getAvailability(pieceNumber) -1;
		if (availability < 4) {
			if (dm_pieces[pieceNumber].isDone())
				availability--;
			if (availability <= 0)
				return;
			//for all peers
			
			List	peer_transports = peer_transports_cow;
			
			for (int i = peer_transports.size() - 1; i >= 0; i--) {
				PEPeerTransport pc = (PEPeerTransport) peer_transports.get(i);
				if (pc !=pcOrigin &&pc.getPeerState() ==PEPeer.TRANSFERING &&pc.isPieceAvailable(pieceNumber))
					((PEPeerStatsImpl)pc.getStats()).statisticalSentPiece(pieceLength / availability);
			}
		}
	}
	
	public int getPieceLength(int pieceNumber) {
		if (pieceNumber ==_nbPieces -1)
			return disk_mgr.getLastPieceLength();
		return disk_mgr.getPieceLength();
	}

	public int 
	getNbPeers() 
	{
		return _peers;
	}
	
	public int getNbSeeds() 
	{
		return _seeds;
	}
	
	public int getNbRemoteConnections() 
	{
		return _remotes;
	}
	
  public long getLastRemoteConnectionTime()
  {
	  return( last_remote_time );
  }
  
	public PEPeerManagerStats getStats() {
		return _stats;
	}
	
	
	
	/**
	 * Returns the ETA time in seconds.
	 * If the returned time is 0, the download is complete.
	 * If the returned time is negative, the download
	 * is complete and it took -xxx time to complete.
	 */
	public long 
	getETA() 
	{	
		long	now = SystemTime.getCurrentTime();
		
		if ( now < last_eta_calculation || now - last_eta_calculation > 900 ){
			
			long dataRemaining = disk_mgr.getRemainingExcludingDND();
	
			if ( dataRemaining > 0 ){
				
				int writtenNotChecked = 0;
				
				for (int i = 0; i < _nbPieces; i++)
				{
					if (dm_pieces[i].isInteresting()){
						writtenNotChecked +=dm_pieces[i].getNbWritten() *DiskManager.BLOCK_SIZE;
					}
				}
			
				dataRemaining = dataRemaining - writtenNotChecked;
			
				if  (dataRemaining < 0 ){
					
					dataRemaining	= 0;
				}
			}
			
			long	result;
			
			if (dataRemaining == 0) {
				long timeElapsed = (_timeFinished - _timeStarted)/1000;
				//if time was spent downloading....return the time as negative
				if(timeElapsed > 1){
					result = timeElapsed * -1;
				}else{
					result = 0;
				}
			}else{
			
				long averageSpeed = _averageReceptionSpeed.getAverage();
				long lETA = (averageSpeed == 0) ? Constants.INFINITY_AS_INT : dataRemaining / averageSpeed;
				// stop the flickering of ETA from "Finished" to "x seconds" when we are 
				// just about complete, but the data rate is jumpy.
				if (lETA == 0)
					lETA = 1;
				result = lETA;
			}
		
			last_eta				= result;
			last_eta_calculation	= now;
		}
		
		return( last_eta );
	}
	
	
	
	private void
	addToPeerTransports(
		PEPeerTransport		peer )
	{
		boolean added = false;
		
		try{
			peer_transports_mon.enter();
			
			if( peer_transports_cow.contains( peer ) ){
				Debug.out( "Transport added twice" );
				return;  //we do not want to close it
			}
			
			if( is_running ) {
				//copy-on-write semantics
				ArrayList new_peer_transports = new ArrayList(peer_transports_cow.size() +1);
				
				new_peer_transports.addAll( peer_transports_cow );
				
				new_peer_transports.add( peer );
				
				peer_transports_cow = new_peer_transports;
				
				added = true;
			}
		}
		finally{
			peer_transports_mon.exit();
		}
		
		if( added ) {
      if ( peer.isIncoming()){
	      long	connect_time = SystemTime.getCurrentTime();
	        
	      if ( connect_time > last_remote_time ){
	        	
	       	last_remote_time = connect_time;
	      }
      }
      
			peerAdded( peer ); 
		}
		else {
			peer.closeConnection( "PeerTransport added when manager not running" );
		}
	}
	
	
//	the peer calls this method itself in closeConnection() to notify this manager
	public void peerConnectionClosed( PEPeerTransport peer ) {
		boolean	connection_found = false;
		
		try{
			peer_transports_mon.enter();
			
			if( peer_transports_cow.contains( peer )) {
				
				ArrayList new_peer_transports = new ArrayList( peer_transports_cow );
				
				new_peer_transports.remove(peer);
				
				peer_transports_cow = new_peer_transports;
				
				connection_found  = true;
			}
		}
		finally{
			peer_transports_mon.exit();
		}
		
		if ( connection_found ){
    	if( peer.getPeerState() != PEPeer.DISCONNECTED ) {
    		System.out.println( "peer.getPeerState() != PEPeer.DISCONNECTED: " +peer.getPeerState() );
    	}
    	
			peerRemoved( peer );  //notify listeners
		}
	}
	
	
	
	
	public void 
	peerAdded(
		PEPeer pc) 
	{
		adapter.addPeer(pc);  //async downloadmanager notification
		
		//sync peermanager notification
		ArrayList peer_manager_listeners = peer_manager_listeners_cow;
		
		for( int i=0; i < peer_manager_listeners.size(); i++ ) {
      		((PEPeerManagerListener)peer_manager_listeners.get(i)).peerAdded( this, pc );
		}

	}
	
	
	public void 
	peerRemoved(
		PEPeer pc) 
	{
		int piece = pc.getUniqueAnnounce();
		if(piece != -1 && superSeedMode ) {
			superSeedModeNumberOfAnnounces--;
			superSeedPieces[piece].peerLeft();
		}
		
		adapter.removePeer(pc);  //async downloadmanager notification
    	
		//sync peermanager notification
		ArrayList peer_manager_listeners = peer_manager_listeners_cow;
		
		for( int i=0; i < peer_manager_listeners.size(); i++ ) {
      		((PEPeerManagerListener)peer_manager_listeners.get(i)).peerRemoved( this, pc );
		}
	}
	
	/** Don't pass a null to this method.
     * @param piece PEPiece invoked; notifications of it's invocation need to be done
     * @param pieceNumber of the PEPiece 
	 */
	public void addPiece(PEPiece piece, int pieceNumber)
	{
		pePieces[pieceNumber] =(PEPieceImpl)piece;
		adapter.addPiece(piece);
	}
	
    /** Sends messages to listeners that the piece is no longer active.
     * The piece will be null upon return
     * @param pePiece PEPiece to remove
     * @param pieceNumber int
     */
	public void removePiece(PEPiece pePiece, int pieceNumber) {
        adapter.removePiece(pePiece);
        pePieces[pieceNumber] =null;
	}
	
	public String getElapsedTime() {
		return TimeFormatter.format((SystemTime.getCurrentTime() - _timeStarted) / 1000);
	}
	
//	Returns time started in ms
	public long getTimeStarted() {
		return _timeStarted;
	}
	
	public long getTimeStartedSeeding() {
		return _timeStartedSeeding;
	}
	
	private byte[] computeMd5Hash(DirectByteBuffer buffer)
	{
		BrokenMd5Hasher md5 =new BrokenMd5Hasher();
		md5.reset();
		int position =buffer.position(DirectByteBuffer.SS_DW);
		md5.update(buffer.getBuffer(DirectByteBuffer.SS_DW));
		buffer.position(DirectByteBuffer.SS_DW, position);
		ByteBuffer md5Result =ByteBuffer.allocate(16);
		md5Result.position(0);
		md5.finalDigest(md5Result);

		byte[] result =new byte[16];
		md5Result.position(0);
		for (int i =0; i <result.length; i++ )
		{
			result[i] =md5Result.get();
		}

		return result;
	}

	private void MD5CheckPiece(PEPiece piece, boolean correct)
	{
		String[] writers =piece.getWriters();
		int offset =0;
		for (int i =0; i <writers.length; i++ )
		{
			int length =piece.getBlockSize(i);
			String peer =writers[i];
			if (peer !=null)
			{
				DirectByteBuffer buffer =disk_mgr.readBlock(piece.getPieceNumber(), offset, length);

				if (buffer !=null)
				{
					byte[] hash =computeMd5Hash(buffer);
					buffer.returnToPool();
					buffer =null;
					piece.addWrite(i, peer, hash, correct);
				}
			}
			offset +=length;
		}
	}

	public void checkCompleted(DiskManagerCheckRequest request, boolean passed)
	{
		try
		{
			piece_check_result_list_mon.enter();
			piece_check_result_list.add(new Object[]{request, new Integer(passed ?1 :0)});
		} finally
		{
			piece_check_result_list_mon.exit();
		}
	}

	public void checkCancelled(DiskManagerCheckRequest request)
	{
		try
		{
			piece_check_result_list_mon.enter();
			piece_check_result_list.add(new Object[]{request, new Integer(2)});

		} finally
		{
			piece_check_result_list_mon.exit();
		}
	}

	public void checkFailed(DiskManagerCheckRequest request, Throwable cause)
	{
		try
		{
			piece_check_result_list_mon.enter();
			piece_check_result_list.add(new Object[]{request, new Integer(0)});

		} finally
		{
			piece_check_result_list_mon.exit();
		}
	}

	public boolean needsMD5CheckOnCompletion(int pieceNumber)
	{
		PEPieceImpl piece = pePieces[pieceNumber];    
		if (piece == null)
        {
			return false;
        }
		return piece.getPieceWrites().size() > 0;
	}
	
	private void processPieceCheckResult(DiskManagerCheckRequest request, int outcome)
	{
		int check_type =((Integer) request.getUserData()).intValue();

		try
		{
			final int pieceNumber =request.getPieceNumber();

			// System.out.println( "processPieceCheckResult(" + _finished + "/" + recheck_on_completion + "):" + pieceNumber +
			// "/" + piece + " - " + result );

			// passed = 1, failed = 0, cancelled = 2

			if (check_type ==CHECK_REASON_COMPLETE)
			{ // this is a recheck, so don't send HAVE msgs
				if (outcome ==0)
				{
					// piece failed; restart the download afresh
					Debug.out("Piece #" +pieceNumber +" failed final re-check. Re-downloading...");

					if (!restart_initiated)
					{
						restart_initiated =true;
						adapter.restartDownload(false);
					}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -