📄 peer.cpp
字号:
m_err_count++; if(arg_verbose) CONSOLE.Debug("err: %p (%d) choked request", this, m_err_count); if( stream.Send_State(M_CHOKE) < 0 ) return -1; // This will mess with the unchoke rotation (to this peer's // disadvantage), but otherwise we may spam them with choke msgs. m_unchoke_timestamp = m_last_timestamp; } }else{ if( !m_requested ){ m_requested = 1; if( stream.out_buffer.SetSize(BUF_DEF_SIZ + (len < DEFAULT_SLICE_SIZE) ? DEFAULT_SLICE_SIZE : len) < 0 ) return -1; if( (!m_receive_time || BTCONTENT.pBF->IsFull()) && now > m_unchoke_timestamp ){ m_latency = (now <= m_unchoke_timestamp) ? 1 : now - m_unchoke_timestamp; if(arg_verbose) CONSOLE.Debug("%p latency is %d sec (request)", this, (int)m_latency); } } retval = reponse_q.Add(idx, off, len); } break; case M_PIECE: if( H_PIECE_LEN >= r ) return -1; m_receive_time = m_last_timestamp; // PieceDeliver handles the error determination & DL counting retval = PieceDeliver(r); break; case M_BITFIELD: if( (r - H_BASE_LEN) != bitfield.NBytes() || !bitfield.IsEmpty() ) return -1; bitfield.SetReferBuffer(msgbuf + H_LEN + H_BASE_LEN); if(bitfield.IsFull()){ if(arg_verbose) CONSOLE.Debug("%p is a seed", this); if(BTCONTENT.pBF->IsFull()) return -2; else{ stream.out_buffer.SetSize(BUF_DEF_SIZ); if( !m_want_again ) m_want_again = 1; } } // This is needed in order to set our Interested state. retval = RequestCheck(); // fixed client stall break; case M_CANCEL: if(H_CANCEL_LEN != r) return -1; idx = get_nl(msgbuf + H_LEN + H_BASE_LEN); off = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN); len = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN * 2); if( reponse_q.Remove(idx,off,len) < 0 ){ if( m_state.local_choked && m_last_timestamp - m_unchoke_timestamp > (m_latency ? m_latency*2 : 60) ){ m_err_count++; if(arg_verbose) CONSOLE.Debug("err: %p (%d) Bad cancel", this, m_err_count); } }else if( reponse_q.IsEmpty() && g_next_up == this ) g_next_up = (btPeer *)0; break; default: if(arg_verbose) CONSOLE.Debug("Unknown message type %d from peer %p", (int)(msgbuf[H_LEN]), this); } // switch if( retval >= 0 ) m_lastmsg = msgbuf[H_LEN]; } return retval;}int btPeer::ReponseSlice(){ size_t len = 0; struct timespec nowspec; ssize_t retval; size_t idx,off; reponse_q.Pop(&idx,&off,&len); retval = BTCONTENT.ReadSlice(BTCONTENT.global_piece_buffer,idx,off,len); if( retval < 0 ) return -1; else if( retval ) Self.OntimeUL(0); // delayed, read from disk size_t currentrate = CurrentUL(); if(arg_verbose) CONSOLE.Debug("Sending %d/%d/%d to %p", (int)idx, (int)off, (int)len, this); // project the time to send another slice if( 0==currentrate ){ // don't know peer's rate; use best guess int rate = (int)(Self.RateUL()); int unchoked = (int)(WORLD.GetUnchoked()); if( unchoked < 1 ) unchoked = 1; if( 0==cfg_max_bandwidth_up ){ if( 0==rate ) m_next_send_time = now; else m_next_send_time = now + len / (rate / unchoked); }else{ m_next_send_time = now + len / ( ((int)cfg_max_bandwidth_up - rate > (int)cfg_max_bandwidth_up / unchoked) ? cfg_max_bandwidth_up - rate : (cfg_max_bandwidth_up + unchoked-1) / unchoked ); } }else m_next_send_time = now + len / ( (currentrate < cfg_max_bandwidth_up || 0==cfg_max_bandwidth_up) ? currentrate : cfg_max_bandwidth_up ); m_prefetch_time = (time_t)0; clock_gettime(CLOCK_REALTIME, &nowspec); retval = stream.Send_Piece(idx,off,BTCONTENT.global_piece_buffer,len); if( retval >= 0 ){ WORLD.Upload(); DataSended(len, nowspec.tv_sec + (double)(nowspec.tv_nsec)/1000000000); if( !m_want_again && BTCONTENT.pBF->IsFull() ) m_want_again = 1; } return (int)retval;}int btPeer::SendRequest(){ int first = 1; PSLICE ps = request_q.NextSend(); if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@5: %p m_req_out underflow, resetting", this); m_req_out = 0; } if( ps && m_req_out < m_req_send ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("Requesting #%d from %p (%d left, %d slots):", (int)(ps->index), this, (int)(request_q.Qsize()), (int)m_req_send); } for( int i=0; ps && m_req_out < m_req_send && i<5; ps = ps->next, i++ ){ if( first && (!RateDL() || 0 >= (m_req_out+1) * ps->length / (double)RateDL() - m_latency) ){ request_q.SetReqTime(ps, now); first = 0; } else request_q.SetReqTime(ps, (time_t)0); if(arg_verbose) CONSOLE.Debug_n("."); if(stream.Send_Request(ps->index,ps->offset,ps->length) < 0){ return -1; } request_q.SetNextSend(ps->next); m_req_out++; } if(arg_verbose) CONSOLE.Debug_n(""); m_receive_time = now; } return ( m_req_out < m_req_send ) ? RequestPiece() : 0;}int btPeer::CancelPiece(){ return CancelPiece(request_q.GetHead()->index);}int btPeer::CancelPiece(size_t idx){ PSLICE ps = request_q.GetHead(); PSLICE next; int cancel = 1; int retval; for( ; ps && ps->index != idx; ps=ps->next ); // find the piece for( ; ps; ps = next ){ if( ps->index != idx ) break; if( ps == request_q.NextSend() ) cancel = 0; if( cancel ){ if(arg_verbose) CONSOLE.Debug("Cancelling %d/%d/%d to %p", (int)(ps->index), (int)(ps->offset), (int)(ps->length), this); if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@1: %p m_req_out underflow, resetting", this); m_req_out = 0; } m_cancel_time = now; } next = ps->next; request_q.Remove(ps->index, ps->offset, ps->length); } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 0;}int btPeer::CancelRequest(PSLICE ps){ int retval; for( ; ps; ps = ps->next){ if( ps == request_q.NextSend() ) break; if(arg_verbose) CONSOLE.Debug("Cancelling %d/%d/%d to %p", (int)(ps->index), (int)(ps->offset), (int)(ps->length), this); if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@2: %p m_req_out underflow, resetting", this); m_req_out = 0; } m_cancel_time = now; } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 0;}int btPeer::CancelSliceRequest(size_t idx, size_t off, size_t len){ PSLICE ps; int cancel = 1; int idxfound = 0; int retval; for(ps = request_q.GetHead(); ps; ps = ps->next){ if( ps == request_q.NextSend() ) cancel = 0; if( idx == ps->index ){ if( off == ps->offset && len == ps->length ){ request_q.Remove(idx,off,len); if(cancel){ if(arg_verbose) CONSOLE.Debug("Cancelling %d/%d/%d to %p", (int)idx, (int)off, (int)len, this); if(stream.Send_Cancel(idx,off,len) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@3: %p m_req_out underflow, resetting",this); m_req_out = 0; } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; m_cancel_time = now; // Don't call RequestCheck() here since that could cause the slice // we're cancelling to be dup'd from another peer. } break; } idxfound = 1; }else if( idxfound ) break; } return 0;}int btPeer::ReportComplete(size_t idx){ int r; if( (r = BTCONTENT.APieceComplete(idx)) > 0 ){ if(arg_verbose) CONSOLE.Debug("Piece #%d completed", (int)idx); WORLD.Tell_World_I_Have(idx); // We don't track request duplication accurately, so clean up just in case. WORLD.CancelPiece(idx); PENDINGQUEUE.Delete(idx); if( BTCONTENT.pBF->IsFull() ) WORLD.CloseAllConnectionToSeed(); if( arg_file_to_download ){ BitField tmpBitField = *BTCONTENT.pBF; tmpBitField.Except(*BTCONTENT.pBFilter); while( arg_file_to_download && tmpBitField.Count() >= BTCONTENT.getFilePieces(arg_file_to_download) ){ // when the file is complete, we go after the next ++arg_file_to_download; BTCONTENT.SetFilter(); tmpBitField = *BTCONTENT.pBF; tmpBitField.Except(*BTCONTENT.pBFilter); } } }else if( 0 == r ){ // hash check failed // Don't count an error against the peer in initial or endgame mode, since // some slices may have come from other peers. int dup = 0; if( BTCONTENT.pBF->Count() < 2 || WORLD.Pieces_I_Can_Get() - BTCONTENT.pBF->Count() < WORLD.GetPeersCount() ) dup = 1; else if( arg_file_to_download ){ BitField afdBitField = *BTCONTENT.pBF; afdBitField.Except(*BTCONTENT.pBFilter); if( BTCONTENT.getFilePieces(arg_file_to_download) - afdBitField.Count() < WORLD.GetPeersCount() ) dup = 1; } if( !dup ){ m_err_count++; if(arg_verbose) CONSOLE.Debug("err: %p (%d) Bad complete", this, m_err_count); ResetDLTimer(); // set peer rate=0 so we don't favor for upload } } return r;}int btPeer::PieceDeliver(size_t mlen){ size_t idx,off,len; char *msgbuf = stream.in_buffer.BasePointer(); time_t t = (time_t)0; int f_requested = 0, f_success = 1, f_count = 1, f_want = 1; idx = get_nl(msgbuf + H_LEN + H_BASE_LEN); off = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN); len = mlen - H_PIECE_LEN; if( !request_q.IsEmpty() ){ t = request_q.GetReqTime(idx,off,len); // Verify whether this is an outstanding request (not for error counting). PSLICE ps = request_q.GetHead(); for( ; ps; ps = ps->next){ if( ps == request_q.NextSend() ) break; if( idx==ps->index && off==ps->offset && len==ps->length ){ f_requested = 1; break; } } } Self.StartDLTimer(); if( f_requested ){ if(arg_verbose) CONSOLE.Debug("Receiving piece %d/%d/%d from %p", (int)idx, (int)off, (int)len, this); if( !BTCONTENT.pBF->IsSet(idx) && BTCONTENT.WriteSlice(msgbuf + H_LEN + H_PIECE_LEN,idx,off,len) < 0 ){ CONSOLE.Warning(2, "warn, WriteSlice failed; is filesystem full?"); f_success = 0; // Re-queue the request, unless WriteSlice triggered SeedOnly (then the // request is already in Pending). if( !WORLD.SeedOnly() ){ // This removes only the first instance; re-queued request is safe. request_q.Remove(idx,off,len); m_req_out--; if( RequestSlice(idx,off,len) < 0 ){ // At least it's still queued & will go to Pending at peer close. if( f_count ) DataRecved(len); return -1; } } }else{ // saved or had the data request_q.Remove(idx,off,len); m_req_out--; // Check for & cancel requests for this slice from other peers in initial // and endgame modes. int dup = 0; if( BTCONTENT.pBF->Count() < 2 || WORLD.Pieces_I_Can_Get() - BTCONTENT.pBF->Count() < WORLD.GetPeersCount() ) dup = 1; else if( arg_file_to_download ){ BitField afdBitField = *BTCONTENT.pBF; afdBitField.Except(*BTCONTENT.pBFilter); if( BTCONTENT.getFilePieces(arg_file_to_download) - afdBitField.Count() < WORLD.GetPeersCount() ) dup = 1; } if( dup ) WORLD.CancelSlice(idx, off, len); if( dup || WORLD.SeedOnly() ) PENDINGQUEUE.DeleteSlice(idx, off, len); } }else{ // not requested--not saved if( m_last_timestamp - m_cancel_time > (m_latency ? m_latency*2 : 60) ){ m_err_count++; if(arg_verbose) CONSOLE.Debug("err: %p (%d) Unrequested piece %d/%d/%d", this, m_err_count, (int)idx, (int)off, (int)len, this); ResetDLTimer(); // set peer rate=0 so we don't favor for upload f_count = 0; f_want = 0; }else if(arg_verbose) CONSOLE.Debug("Unneeded piece %d/%d/%d from %p", (int)idx, (int)off, (int)len, this); f_success = 0; } if( !m_want_again && f_want ) m_want_again = 1; // Determine how many outstanding requests we should maintain, roughly: // (request turnaround latency) / (time to transmit one slice) if(t){ m_latency = (m_last_timestamp <= t) ? 1 : m_last_timestamp - t; if(arg_verbose) CONSOLE.Debug("%p latency is %d sec (receive)", this, (int)m_latency); m_latency_timestamp = m_last_timestamp; } size_t rate; if( (rate = RateDL()) > len/20 && m_latency_timestamp ){ // 20==RATE_INTERVAL from rate.cpp. This is really just a check to see if // rate is measurable/usable. m_req_send = (size_t)( m_latency / (len / (double)rate) + 1 ); if( m_req_send < 2 ) m_req_send = 2; // If latency increases, we will see this as a dlrate decrease. if( rate < m_prev_dlrate ) m_req_send++; else if( m_last_timestamp - m_latency_timestamp >= 30 && m_req_out == m_req_send - 1 ){ // Try to force latency measurement every 30 seconds. m_req_send--; m_latency_timestamp = m_last_timestamp; } m_prev_dlrate = rate; }else if (m_req_send < 5) m_req_send = 5; /* if piece download complete. */ if( f_success && (request_q.IsEmpty() || !request_q.HasIdx(idx)) && !BTCONTENT.pBF->IsSet(idx) ){ // Above WriteSlice may have triggered SeedOnly. If data was saved, slice // has been deleted from Pending. If piece is incomplete, it's in Pending. if( !(WORLD.SeedOnly() && PENDINGQUEUE.Exist(idx)) && !ReportComplete(idx) ) f_count = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -