📄 peer.cpp
字号:
case M_NOT_INTERESTED: if(r != H_BASE_LEN){return -1;} if(arg_verbose) fprintf(stderr, "%p is not interested\n", this); m_state.remote_interested = 0; StopULTimer(); /* remove peer's reponse queue */ if( !reponse_q.IsEmpty()) reponse_q.Empty(); break; case M_HAVE: if(H_HAVE_LEN != r){return -1;} idx = get_nl(msgbuf + 5); if( idx >= BTCONTENT.GetNPieces() || bitfield.IsSet(idx)) return -1; bitfield.Set(idx); if( bitfield.IsFull() && BTCONTENT.pBF->IsFull() ){ return -2; } if( !BTCONTENT.pBF->IsSet(idx) && !BTCONTENT.pBFilter->IsSet(idx) ){ m_cached_idx = idx; if(arg_verbose && m_standby) fprintf(stderr, "%p un-standby\n", this); m_standby = 0; } // if( !BTCONTENT.pBF->IsSet(idx) ) m_cached_idx = idx; // see if we're Interested now if(!m_standby) retval = RequestCheck(); break; case M_REQUEST: if(H_REQUEST_LEN != r || !m_state.remote_interested){ return -1; } idx = get_nl(msgbuf + 5); if( !BTCONTENT.pBF->IsSet(idx) ) return -1; off = get_nl(msgbuf + 9); len = get_nl(msgbuf + 13); if( !reponse_q.IsValidRequest(idx, off, len) ) return -1; if( m_state.local_choked ){ if( (m_latency && m_last_timestamp - m_unchoke_timestamp > m_latency) || (!m_latency && m_last_timestamp - m_unchoke_timestamp > 60) ){ m_err_count++; if(arg_verbose) fprintf(stderr,"err: %p (%d) choked request\n", this, m_err_count); if( stream.Send_State(M_CHOKE) < 0 ) return -1; // This will mess with the unchoke rotation (to this peer's // disadvantage), but otherwise we may spam them with choke msgs. m_unchoke_timestamp = m_last_timestamp; } }else retval = reponse_q.Add(idx, off, len); break; case M_PIECE: m_receive_time = m_last_timestamp; if( request_q.IsEmpty() || !m_state.local_interested){ m_err_count++; if(arg_verbose) fprintf(stderr,"err: %p (%d) Unwanted piece\n", this, m_err_count); }else retval = PieceDeliver(r); break; case M_BITFIELD: if( (r - 1) != bitfield.NBytes() || !bitfield.IsEmpty()) return -1; bitfield.SetReferBuffer(msgbuf + 5); if(bitfield.IsFull()){ if(arg_verbose) fprintf(stderr, "%p is a seed\n", this); if(BTCONTENT.pBF->IsFull()) return -2; } //This is needed in order to set our Interested state retval = RequestCheck(); // fixed client stall break; case M_CANCEL: if(r != H_CANCEL_LEN || !m_state.remote_interested) return -1; idx = get_nl(msgbuf + 5); off = get_nl(msgbuf + 9); len = get_nl(msgbuf + 13); if( reponse_q.Remove(idx,off,len) < 0 ){ m_err_count++; if(arg_verbose) fprintf(stderr, "err: %p (%d) Bad cancel\n", this, m_err_count); }else{ if( reponse_q.IsEmpty() ) StopULTimer(); if( reponse_q.IsEmpty() || !CouldReponseSlice() ){ if( g_next_up == this ) g_next_up = (btPeer *)0; } } break; default: if(arg_verbose) fprintf(stderr, "Unknown message type %u from peer %p\n", msgbuf[4], this); } // switch if( retval >= 0 ) m_lastmsg = msgbuf[4]; } return retval;}int btPeer::ReponseSlice(){ size_t len = 0; reponse_q.Peek((size_t*) 0,(size_t*) 0, &len); if(len && stream.out_buffer.LeftSize() <= (len + 13 + 3 * 1024)) stream.Flush(); if(len && stream.out_buffer.LeftSize() > (len + 13 + 3 * 1024)){ size_t idx,off; reponse_q.Pop(&idx,&off,(size_t *) 0); if(BTCONTENT.ReadSlice(BTCONTENT.global_piece_buffer,idx,off,len) != 0 ){ return -1; } Self.DataSended(len); DataSended(len); if(arg_verbose) fprintf(stderr, "Sending %d/%d/%d to %p\n", (int)idx, (int)off, (int)len, this); return stream.Send_Piece(idx,off,BTCONTENT.global_piece_buffer,len); } return 0;}int btPeer::SendRequest(){ int first = 1; PSLICE ps = request_q.NextSend(); if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) fprintf(stderr, "ERROR@5: %p m_req_out underflow, resetting\n", this); m_req_out = 0; } if( ps && m_req_out < m_req_send ){ if(arg_verbose) fprintf(stderr, "Requesting #%u from %p (%d left, %d slots):", ps->index, this, request_q.Qsize(), m_req_send); for( int i=0; ps && m_req_out < m_req_send && i<5; ps = ps->next, i++ ){ if( first && (!RateDL() || 0 >= (m_req_out+1) * ps->length / (double)RateDL() - m_latency) ){ request_q.SetReqTime(ps, now); first = 0; } else request_q.SetReqTime(ps, (time_t)0); if(arg_verbose) fprintf(stderr, "."); if(stream.Send_Request(ps->index,ps->offset,ps->length) < 0){ return -1; } request_q.SetNextSend(ps->next); m_req_out++; } if(arg_verbose) fprintf(stderr, "\n"); m_receive_time = now; } return ( m_req_out < m_req_send ) ? RequestPiece() : 0;}int btPeer::CancelPiece(){ PSLICE ps = request_q.GetHead(); size_t idx; int cancel = 1; int retval; idx = ps->index; for( ; ps; ps = ps->next){ if( ps->index != idx ) break; if( ps == request_q.NextSend() ) cancel = 0; if( cancel ){ if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) fprintf(stderr, "ERROR@1: %p m_req_out underflow, resetting\n", this); m_req_out = 0; } } request_q.Remove(ps->index, ps->offset, ps->length); } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 0;}int btPeer::CancelRequest(PSLICE ps){ int retval; for( ; ps; ps = ps->next){ if( ps == request_q.NextSend() ) break; if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) fprintf(stderr, "ERROR@2: %p m_req_out underflow, resetting\n", this); m_req_out = 0; } } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 0;}int btPeer::CancelSliceRequest(size_t idx, size_t off, size_t len){ PSLICE ps; int cancel = 1; int idxfound = 0; int retval; for(ps = request_q.GetHead() ; ps; ps = ps->next){ if( ps == request_q.NextSend() ) cancel = 0; if( idx == ps->index ){ if( off == ps->offset && len == ps->length ){ if( request_q.Remove(idx,off,len) < 0 ){ m_err_count++; if(arg_verbose) fprintf(stderr,"err: %p (%d) Bad CS remove\n", this, m_err_count); } if(cancel){ if(stream.Send_Cancel(idx,off,len) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) fprintf(stderr, "ERROR@3: %p m_req_out underflow, resetting\n", this); m_req_out = 0; } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; // Don't call RequestCheck() here since that could cause the slice // we're cancelling to be dup'd from another peer. return 0; } break; } idxfound = 1; }else if( idxfound ) break; } return 0;}int btPeer::ReportComplete(size_t idx){ if( BTCONTENT.APieceComplete(idx) ){ if(arg_verbose) fprintf(stderr, "Piece #%u completed\n", idx); WORLD.Tell_World_I_Have(idx); PENDINGQUEUE.Delete(idx); if( BTCONTENT.pBF->IsFull() ){ ResetDLTimer(); WORLD.CloseAllConnectionToSeed(); } if( arg_file_to_download ){ BitField tmpBitField = *BTCONTENT.pBF; tmpBitField.Except(*BTCONTENT.pBFilter); while( arg_file_to_download && tmpBitField.Count() >= BTCONTENT.getFilePieces(arg_file_to_download) ){ //when the file is complete, we go after the next ++arg_file_to_download; BTCONTENT.FlushCache(); BTCONTENT.SetFilter(); tmpBitField = *BTCONTENT.pBF; tmpBitField.Except(*BTCONTENT.pBFilter); } WORLD.CheckInterest(); } }else{ m_err_count++; if(arg_verbose) fprintf(stderr, "err: %p (%d) Bad complete\n", this, m_err_count); } return (P_FAILED == m_status) ? -1 : RequestCheck();}int btPeer::PieceDeliver(size_t mlen){ size_t idx,off,len; char *msgbuf = stream.in_buffer.BasePointer(); time_t t; int dup = 0, requested = 1; idx = get_nl(msgbuf + 5); off = get_nl(msgbuf + 9); len = mlen - 9; if(arg_verbose) fprintf(stderr, "Receiving piece %d/%d/%d from %p\n", (int)idx, (int)off, (int)len, this); t = request_q.GetReqTime(idx,off,len); PSLICE ps = request_q.GetHead(); if( request_q.NextSend() ) for( ; ps; ps = ps->next){ if( ps == request_q.NextSend() ){ requested = 0; break; } if( idx==ps->index && off==ps->offset && len==ps->length ) break; } if( request_q.Remove(idx,off,len) < 0 ){ m_err_count++; if(arg_verbose) fprintf(stderr, "err: %p (%d) Bad remove\n", this, m_err_count); return 0; } if(BTCONTENT.WriteSlice((char*)(msgbuf + 13),idx,off,len) < 0){ fprintf(stderr, "warn, WriteSlice failed; is filesystem full?\n"); return 0; } Self.StartDLTimer(); Self.DataRecved(len); DataRecved(len); // Check for & cancel requests for this slice from other peers in initial // and endgame modes. if( BTCONTENT.pBF->Count() < 2 || WORLD.Pieces_I_Can_Get() - BTCONTENT.pBF->Count() < WORLD.TotalPeers() ) dup = 1; else if( arg_file_to_download ){ BitField afdBitField = *BTCONTENT.pBF; afdBitField.Except(*BTCONTENT.pBFilter); if( BTCONTENT.getFilePieces(arg_file_to_download) - afdBitField.Count() < WORLD.TotalPeers() ) dup = 1; } if( dup ){ WORLD.CancelSlice(idx, off, len); PENDINGQUEUE.DeleteSlice(idx, off, len); } // Determine how many outstanding requests we should maintain, roughly: // (request turnaround latency) / (time to transmit one slice) if(t){ m_latency = (m_last_timestamp <= t) ? 1 : m_last_timestamp - t; if(arg_verbose) fprintf(stderr, "%p latency is %d sec\n", this, (int)m_latency); m_latency_timestamp = m_last_timestamp; } if( RateDL() > len/20 ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -