📄 piecepickerimpl.java
字号:
if ( pt == null || pt.getPeerState() != PEPeer.TRANSFERING ){
return( false );
}
final BitFlags peerHavePieces =pt.getAvailable();
if ( peerHavePieces ==null || peerHavePieces.nbSet <=0 ){
return( false );
}
final int peerSpeed =(int) pt.getStats().getDataReceiveRate() /1000; // how many KB/s has the peer has been sending
final int startI = peerHavePieces.start;
final int endI = peerHavePieces.end;
int piece_min_rta_index = -1;
int piece_min_rta_block = 0;
long piece_min_rta_time = Long.MAX_VALUE;
long now = SystemTime.getCurrentTime();
long my_next_block_eta = now + getNextBlockETAFromNow( pt );
for ( int i=startI; i <=endI; i++){
long piece_rta = provider_piece_rtas[i];
if ( peerHavePieces.flags[i] && startPriorities[i] == PRIORITY_REALTIME && piece_rta > 0 ){
if ( LOG_RTA ){
System.out.println( "findPiece: " + i + "/" + (piece_rta - now));
}
final DiskManagerPiece dmPiece =dmPieces[i];
if ( !dmPiece.isDownloadable()){
continue;
}
final PEPiece pePiece = pePieces[i];
if ( pePiece != null && pePiece.isDownloaded()){
continue;
}
Object realtime_data = null;
if ( piece_rta >= piece_min_rta_time ){
if ( LOG_RTA ){
System.out.println( " less urgent" );
}
// piece is less urgent than an already found one
}else if ( my_next_block_eta > piece_rta && !best_uploader ){
// only allocate if we have a chance of getting this block in time or we're
// the best uploader we've got
if ( LOG_RTA ){
System.out.println( " we're not fast enough" );
}
}else if ( pePiece == null || ( realtime_data = pePiece.getRealTimeData()) == null ){
if ( LOG_RTA ){
System.out.println( " alloc new" );
}
// no real-time block allocated yet
piece_min_rta_time = piece_rta;
piece_min_rta_index = i;
}else{
RealTimeData rtd = (RealTimeData)realtime_data;
// check the blocks to see if any are now lagging behind their ETA given current peer speed
List[] peer_requests = rtd.getRequests();
for (int j=0;j<peer_requests.length;j++){
if ( LOG_RTA ){
System.out.println( " block " + j );
}
if ( pePiece.isDownloaded( j ) || pePiece.isWritten( j )){
// this block is already downloaded, ignore
continue;
}
List block_peer_requests = peer_requests[j];
long best_eta = Long.MAX_VALUE;
boolean pt_already_present = false;
// tidy up existing request data
Iterator it = block_peer_requests.iterator();
while( it.hasNext()){
RealTimePeerRequest pr = (RealTimePeerRequest)it.next();
PEPeerTransport this_pt = pr.getPeer();
if ( this_pt.getPeerState() != PEPeer.TRANSFERING ){
if ( LOG_RTA ){
System.out.println( " peer dead" );
}
// peer's dead
it.remove();
continue;
}
DiskManagerReadRequest this_request = pr.getRequest();
int request_index = this_pt.getRequestIndex( this_request );
if ( request_index == -1 ){
if ( LOG_RTA ){
System.out.println( " request lost" );
}
// request's gone
it.remove();
continue;
}
if ( this_pt == pt ){
if ( LOG_RTA ){
System.out.println( " already req" );
}
pt_already_present = true;
break;
}
long this_up_bps = this_pt.getStats().getDataReceiveRate();
if ( this_up_bps < 1 ){
this_up_bps = 1;
}
int next_block_bytes = ( request_index + 1 ) * DiskManager.BLOCK_SIZE;
long this_peer_eta = now + (( next_block_bytes * 1000 ) / this_up_bps );
best_eta = Math.min( best_eta, this_peer_eta );
if ( LOG_RTA ){
System.out.println( " best_eta = " + ( best_eta - now ));
}
}
// if we've not already requested this piece
if ( !pt_already_present ){
// and there are no outstanding requests or outstanding requests are lagging
if ( block_peer_requests.size() == 0 ){
if ( LOG_RTA ){
System.out.println( " block has no requests and is better rta" );
}
piece_min_rta_time = piece_rta;
piece_min_rta_index = i;
piece_min_rta_block = j;
break; // earlier blocks always have priority
}else if ( best_eta > piece_rta ){
if ( LOG_RTA ){
System.out.println( " block is lagging" );
}
// if we can do better than existing best effort allocate
if ( my_next_block_eta < best_eta ){
if ( LOG_RTA ){
System.out.println( " I can do better!" );
}
piece_min_rta_time = piece_rta;
piece_min_rta_index = i;
piece_min_rta_block = j;
break; // earlier blocks always have priority
}
}
}
}
}
}
}
if ( piece_min_rta_index != -1 ){
PEPiece pePiece = pePieces[piece_min_rta_index];
if ( pePiece == null ){
// create piece manually
pePiece = new PEPieceImpl( pt.getManager(), dmPieces[piece_min_rta_index], peerSpeed >>1 );
// Assign the created piece to the pieces array.
peerControl.addPiece(pePiece, piece_min_rta_index);
pePiece.setResumePriority( PRIORITY_REALTIME );
if ( availability[piece_min_rta_index] <=globalMinOthers ){
nbRarestActive++;
}
}
RealTimeData rtd = (RealTimeData)pePiece.getRealTimeData();
if ( rtd == null ){
rtd = new RealTimeData( pePiece );
pePiece.setRealTimeData( rtd );
}
pePiece.getAndMarkBlock( pt, piece_min_rta_block );
DiskManagerReadRequest request = pt.request(piece_min_rta_index, piece_min_rta_block *DiskManager.BLOCK_SIZE, pePiece.getBlockSize(piece_min_rta_block));
if ( request != null ){
List real_time_requests = rtd.getRequests()[piece_min_rta_block];
real_time_requests.add( new RealTimePeerRequest( pt, request ));
if ( LOG_RTA ){
System.out.println( "RT Request: " + piece_min_rta_index + "/" + piece_min_rta_block + " -> " + pt.getIp() + "[tot=" + real_time_requests.size() + "]" );
}
pt.setLastPiece(piece_min_rta_index);
pePiece.setLastRequestedPeerSpeed( peerSpeed );
}
return( true );
}else{
return( false );
}
}
/**
* This method is the downloading core. It decides, for a given peer,
* which block should be requested. Here is the overall algorithm :
* 0. If there a FORCED_PIECE or reserved piece, that will be started/resumed if possible
* 1. Scan all the active pieces and find the rarest piece (and highest priority among equally rarest)
* that can possibly be continued by this peer, if any
* 2. While scanning the active pieces, develop a list of equally highest priority pieces
* (and equally rarest among those) as candidates for starting a new piece
* 3. If it can't find any piece, this means all pieces are
* already downloaded/full requested
* 4. Returns int[] pieceNumber, blockNumber if a request to be made is found,
* or null if none could be found
* @param pc PEPeerTransport to work with
*
* @return int with pieceNumberto be requested or -1 if no request could be found
*/
private final int getRequestCandidate(final PEPeerTransport pt )
{
if (pt ==null ||pt.getPeerState() !=PEPeer.TRANSFERING)
return -1;
final BitFlags peerHavePieces =pt.getAvailable();
if (peerHavePieces ==null ||peerHavePieces.nbSet <=0)
return -1;
// piece number and its block number that we'll try to DL
int reservedPieceNumber = pt.getReservedPieceNumber();
// If there's a piece seserved to this peer resume it and only it (if possible)
if ( reservedPieceNumber >=0 ){
PEPiece pePiece = pePieces[reservedPieceNumber];
if ( pePiece != null ){
String peerReserved = pePiece.getReservedBy();
if ( peerReserved != null && peerReserved.equals( pt.getIp())){
if ( peerHavePieces.flags[reservedPieceNumber] &&pePiece.isRequestable()){
return reservedPieceNumber;
}
}
}
// reserved piece is no longer valid, dump it
pt.setReservedPieceNumber(-1);
// clear the reservation if the piece is still allocated to the peer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -