📄 peer.cpp
字号:
int btPeer::MsgDeliver(){ size_t r,idx,off,len; int retval = 0; char *msgbuf = stream.in_buffer.BasePointer(); r = get_nl(msgbuf); // Don't require keepalives if we're receiving other messages. m_last_timestamp = now; if( 0 == r ){ if( !m_f_keepalive ) if( stream.Send_Keepalive() < 0 ) return -1; m_f_keepalive = 0; return 0; }else{ char msg = msgbuf[H_LEN]; switch(msg){ case M_CHOKE: if(H_BASE_LEN != r) return -1; if(arg_verbose) CONSOLE.Debug("%p choked me", this); if( m_lastmsg == M_UNCHOKE && m_last_timestamp <= m_choketime+1 ){ if( PeerError(2, "Choke oscillation") < 0 ) return -1; } m_choketime = m_last_timestamp; m_state.remote_choked = 1; StopDLTimer(); if( g_next_dn == this ) g_next_dn = (btPeer *)0; if( !request_q.IsEmpty() ){ BTCONTENT.pBMultPeer->Set(request_q.GetRequestIdx()); PutPending(); } m_cancel_time = now; break; case M_UNCHOKE: if(H_BASE_LEN != r) return -1; if(arg_verbose) CONSOLE.Debug("%p unchoked me", this); if( m_lastmsg == M_CHOKE && m_last_timestamp <= m_choketime+1 ){ if( PeerError(2, "Choke oscillation") < 0 ) return -1; } m_choketime = m_last_timestamp; m_state.remote_choked = 0; m_standby = 0; if( !stream.PeekNextMessage(M_CHOKE) ){ m_prefetch_completion = 0; retval = RequestCheck(); } break; case M_INTERESTED: if(H_BASE_LEN != r) return -1; if(arg_verbose) CONSOLE.Debug("%p is interested", this); m_state.remote_interested = 1; if( Need_Local_Data() ) WORLD.UnchokeIfFree(this); break; case M_NOT_INTERESTED: if(H_BASE_LEN != r) return -1; if(arg_verbose) CONSOLE.Debug("%p is not interested", this); m_state.remote_interested = 0; /* remove peer's reponse queue */ if( !reponse_q.IsEmpty()) reponse_q.Empty(); /* if I've been seed for a while, nobody should be uninterested */ if( BTCONTENT.IsFull() && BTCONTENT.GetSeedTime() - now >= 300 ) return -2; break; case M_HAVE: if(H_HAVE_LEN != r) return -1; idx = get_nl(msgbuf + H_LEN + H_BASE_LEN); if( idx >= BTCONTENT.GetNPieces() || bitfield.IsSet(idx) ) return -1; bitfield.Set(idx); if( bitfield.IsFull() ){ if( BTCONTENT.IsFull() ) return -2; else stream.out_buffer.SetSize(BUF_DEF_SIZ); } if( !BTCONTENT.pBF->IsSet(idx) && !BTCONTENT.pBMasterFilter->IsSet(idx) ){ if( m_cached_idx >= BTCONTENT.GetNPieces() || m_standby || (!BTCONTENT.GetFilter() || !BTCONTENT.GetFilter()->IsSet(idx)) ) m_cached_idx = idx; if(arg_verbose && m_standby) CONSOLE.Debug("%p un-standby", this); m_standby = 0; } // see if we're Interested now if(!m_standby) retval = RequestCheck(); break; case M_REQUEST: if(H_REQUEST_LEN != r || !m_state.remote_interested) return -1; idx = get_nl(msgbuf + H_LEN + H_BASE_LEN); if( !BTCONTENT.pBF->IsSet(idx) ) return -1; off = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN); len = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN * 2); if(arg_verbose) CONSOLE.Debug("%p is requesting %d/%d/%d", this, (int)idx, (int)off, (int)len); if( !reponse_q.IsValidRequest(idx, off, len) ) return -1; if( m_state.local_choked ){ if( m_last_timestamp - m_unchoke_timestamp > (m_latency ? (m_latency*2) : 60) ){ if( PeerError(1, "choked request") < 0 ) return -1; if( stream.Send_State(M_CHOKE) < 0 ) return -1; // This will mess with the unchoke rotation (to this peer's // disadvantage), but otherwise we may spam them with choke msgs. m_unchoke_timestamp = m_last_timestamp; } }else{ if( !m_requested ){ m_requested = 1; if( stream.out_buffer.SetSize(BUF_DEF_SIZ + (len < DEFAULT_SLICE_SIZE) ? DEFAULT_SLICE_SIZE : len) < 0 ) return -1; if( (!m_receive_time || BTCONTENT.Seeding()) && now > m_unchoke_timestamp ){ m_latency = (now <= m_unchoke_timestamp) ? 1 : (now - m_unchoke_timestamp); if(arg_verbose) CONSOLE.Debug("%p latency is %d sec (request)", this, (int)m_latency); } } retval = reponse_q.Add(idx, off, len); } break; case M_PIECE: if( H_PIECE_LEN >= r ) return -1; m_receive_time = m_last_timestamp; // PieceDeliver handles the error determination & DL counting retval = PieceDeliver(r); break; case M_BITFIELD: if( (r - H_BASE_LEN) != bitfield.NBytes() || !bitfield.IsEmpty() ) return -1; bitfield.SetReferBuffer(msgbuf + H_LEN + H_BASE_LEN); if(bitfield.IsFull()){ if(arg_verbose) CONSOLE.Debug("%p is a seed (bitfield is full)", this); if(BTCONTENT.IsFull()) return -2; else{ stream.out_buffer.SetSize(BUF_DEF_SIZ); if( !m_want_again ) m_want_again = 1; } }else if(arg_verbose){ if( bitfield.IsEmpty() ) CONSOLE.Debug("%p bitfield is empty", this); else CONSOLE.Debug("%p bitfield has %d%%", this, 100 * bitfield.Count() / BTCONTENT.GetNPieces()); } // This is needed in order to set our Interested state. retval = RequestCheck(); // fixed client stall break; case M_CANCEL: if(H_CANCEL_LEN != r) return -1; idx = get_nl(msgbuf + H_LEN + H_BASE_LEN); off = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN); len = get_nl(msgbuf + H_LEN + H_BASE_LEN + H_INT_LEN * 2); if( reponse_q.Remove(idx,off,len) < 0 ){ if( m_state.local_choked && m_last_timestamp - m_unchoke_timestamp > (m_latency ? (m_latency*2) : 60) ){ if( PeerError(1, "Bad cancel") < 0 ) return -1; } }else if( reponse_q.IsEmpty() && g_next_up == this ) g_next_up = (btPeer *)0; break; default: if(arg_verbose) CONSOLE.Debug("Unknown message type %d from peer %p", (int)msg, this); } // switch if( retval >= 0 ) m_lastmsg = msg; } return retval;}int btPeer::ReponseSlice(){ size_t len = 0; struct timespec nowspec; ssize_t retval; size_t idx,off; reponse_q.Pop(&idx,&off,&len); if( BTCONTENT.global_buffer_size < len ){ delete []BTCONTENT.global_piece_buffer; BTCONTENT.global_piece_buffer = new char[len]; BTCONTENT.global_buffer_size = BTCONTENT.global_piece_buffer ? len : 0; } retval = BTCONTENT.ReadSlice(BTCONTENT.global_piece_buffer,idx,off,len); if( retval < 0 ) return -1; else if( retval && cfg_cache_size ) Self.OntimeUL(0); // disk read delay // If not using cache, need to always allow time for a disk read. size_t currentrate = CurrentUL(); if(arg_verbose) CONSOLE.Debug("Sending %d/%d/%d to %p", (int)idx, (int)off, (int)len, this); // project the time to send another slice if( 0==currentrate ){ // don't know peer's rate; use best guess // These are "int" for signed calculations below. int rate = (int)(Self.RateUL()); int unchoked = (int)(WORLD.GetUnchoked()); // can't be 0 here if( cfg_max_bandwidth_up < unchoked || cfg_max_bandwidth_up <= rate ){ if( rate < unchoked || rate < (unchoked*len)/3600 ) m_next_send_time = now; else m_next_send_time = now + len / (rate / unchoked); }else{ m_next_send_time = now + len / ( ((int)cfg_max_bandwidth_up - rate > (int)cfg_max_bandwidth_up / unchoked) ? (cfg_max_bandwidth_up - rate) : ((cfg_max_bandwidth_up + unchoked-1) / unchoked) ); } }else m_next_send_time = now + len / ( (currentrate < cfg_max_bandwidth_up || 0==cfg_max_bandwidth_up) ? currentrate : cfg_max_bandwidth_up ); m_prefetch_time = (time_t)0; clock_gettime(CLOCK_REALTIME, &nowspec); retval = stream.Send_Piece(idx,off,BTCONTENT.global_piece_buffer,len); if( retval >= 0 ){ WORLD.Upload(); DataSended(len, nowspec.tv_sec + (double)(nowspec.tv_nsec)/1000000000); if( !m_want_again && BTCONTENT.Seeding() ) m_want_again = 1; } else if(arg_verbose) CONSOLE.Debug("%p: %s", this, strerror(errno)); return (int)retval;}int btPeer::SendRequest(){ int first = 1; PSLICE ps = request_q.NextSend(); if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@5: %p m_req_out underflow, resetting", this); m_req_out = 0; } if( ps && m_req_out < m_req_send ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("Requesting #%d from %p (%d left, %d slots):", (int)(ps->index), this, (int)(request_q.Qsize()), (int)m_req_send); } for( int i=0; ps && m_req_out < m_req_send && i<5; ps = ps->next, i++ ){ if( first && (!RateDL() || 0 >= (m_req_out+1) * ps->length / (double)RateDL() - m_latency) ){ request_q.SetReqTime(ps, now); first = 0; } else request_q.SetReqTime(ps, (time_t)0); if(arg_verbose) CONSOLE.Debug_n("."); if(stream.Send_Request(ps->index,ps->offset,ps->length) < 0){ return -1; } m_last_req_piece = ps->index; request_q.SetNextSend(ps->next); m_req_out++; } if(arg_verbose) CONSOLE.Debug_n(""); m_receive_time = now; } return ( m_req_out < m_req_send && !m_standby ) ? RequestPiece() : 0;}int btPeer::CancelPiece(size_t idx){ PSLICE ps = request_q.GetHead(); PSLICE next; int cancel = 1; for( ; ps && ps->index != idx; ps=ps->next ){ // find the piece if( ps == request_q.NextSend() ) cancel = 0; } if( !ps ) return 0; for( ; ps; ps = next ){ if( ps->index != idx ) break; if( ps == request_q.NextSend() ) cancel = 0; if( cancel ){ if(arg_verbose) CONSOLE.Debug("Cancelling %d/%d/%d to %p", (int)(ps->index), (int)(ps->offset), (int)(ps->length), this); if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@1: %p m_req_out underflow, resetting", this); m_req_out = 0; } m_cancel_time = now; } next = ps->next; request_q.Remove(ps->index, ps->offset, ps->length); } if( request_q.IsEmpty() ){ StopDLTimer(); m_standby = 0; } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 1;}int btPeer::CancelRequest(){ PSLICE ps; int retval; ps = request_q.GetHead(); for( ; ps; ps = ps->next){ if( ps == request_q.NextSend() ) break; if(arg_verbose) CONSOLE.Debug("Cancelling %d/%d/%d to %p", (int)(ps->index), (int)(ps->offset), (int)(ps->length), this); if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; m_req_out--; if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@2: %p m_req_out underflow, resetting", this); m_req_out = 0; } m_cancel_time = now; } if( !m_req_out && g_next_dn == this ) g_next_dn = (btPeer *)0; return 0;}int btPeer::CancelSliceRequest(size_t idx, size_t off, size_t len){ PSLICE ps; int cancel = 1;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -