📄 peer.cpp
字号:
m_req_send = (int)( m_latency / (len / (double)RateDL()) + 1 ); m_req_send = (m_req_send < 2) ? 2 : m_req_send; // If latency increases, we will see this as a dlrate decrease. if( RateDL() < m_prev_dlrate ) m_req_send++; else if( m_last_timestamp - m_latency_timestamp >= 30 && // Try to force latency measurement every 30 seconds. m_req_out == m_req_send - 1 ){ m_req_send--; m_latency_timestamp = m_last_timestamp; } m_prev_dlrate = RateDL(); }else if (m_req_send < 5) m_req_send = 5; if( requested ) m_req_out--; /* if piece download complete. */ return ( request_q.IsEmpty() || !request_q.HasIdx(idx) ) ? ReportComplete(idx) : RequestCheck();}int btPeer::RequestCheck(){ if( BTCONTENT.pBF->IsFull() ){ if( bitfield.IsFull() ){ return -2; } return SetLocal(M_NOT_INTERESTED); } if( Need_Remote_Data() ){ if(!m_state.local_interested && SetLocal(M_INTERESTED) < 0) return -1; if( !m_state.remote_choked ){ if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) fprintf(stderr, "ERROR@4: %p m_req_out underflow, resetting\n", this); m_req_out = 0; } if( request_q.IsEmpty() && RequestPiece() < 0 ) return -1; else if( m_req_out < m_req_send && (m_req_out < 2 || !RateDL() || 1 >= (m_req_out+1) * request_q.GetRequestLen() / (double)RateDL() - m_latency) // above formula is to try to allow delay between sending batches of reqs && SendRequest() < 0 ) return -1; } }else if(m_state.local_interested && SetLocal(M_NOT_INTERESTED) < 0) return -1; if(!request_q.IsEmpty()) StartDLTimer(); else StopDLTimer(); return 0;}void btPeer::CloseConnection(){ if(arg_verbose) fprintf(stderr, "%p closed\n", this); if( P_FAILED != m_status ){ m_status = P_FAILED; stream.Close(); if( !request_q.IsEmpty() ) PENDINGQUEUE.Pending(&request_q); } if( g_next_up == this ) g_next_up = (btPeer *)0; if( g_next_dn == this ) g_next_dn = (btPeer *)0;}int btPeer::HandShake(){ char txtid[PEER_ID_LEN*2+3]; ssize_t r = stream.Feed(); if( r < 0 ){// if(arg_verbose) fprintf(stderr, "hs: r<0 (%d)\n", r); return -1; } else if( r < 68 ){ if(r >= 21){ // Ignore 8 reserved bytes following protocol ID. if( memcmp(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, (r<28) ? r-20 : 8) != 0 ){ if(arg_verbose){ fprintf( stderr, "\npeer %p gave 0x", this); for(int i=20; i<r && i<27; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); fprintf( stderr, " as reserved bytes (partial)\n" ); } memcpy(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, (r<28) ? r-20 : 8); } } if(r && memcmp(stream.in_buffer.BasePointer(),BTCONTENT.GetShakeBuffer(), (r<48) ? r : 48) != 0){ if(arg_verbose){ fprintf(stderr, "\nmine: 0x"); for(int i=0; i<r && i<48; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(BTCONTENT.GetShakeBuffer()[i])); fprintf(stderr, "\npeer: 0x"); for(int i=0; i<r && i<48; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); fprintf(stderr, "\n"); if( r>48 ){ TextPeerID((unsigned char *)(stream.in_buffer.BasePointer()+48), txtid); fprintf(stderr, "peer is %s\n", txtid); } } return -1; } return 0; } // If the reserved bytes differ, make them the same. // If they mean anything important, the handshake is likely to fail anyway. if( memcmp(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, 8) != 0 ){ if(arg_verbose){ fprintf(stderr, "\npeer %p gave 0x", this); for(int i=20; i<27; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); fprintf( stderr, " as reserved bytes\n" ); } memcpy(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, 8); } if( memcmp(stream.in_buffer.BasePointer(), BTCONTENT.GetShakeBuffer(),48) != 0 ){ if(arg_verbose){ fprintf(stderr, "\nmine: 0x"); for(int i=0; i<48; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(BTCONTENT.GetShakeBuffer()[i])); fprintf(stderr, "\npeer: 0x"); for(int i=0; i<48; i++) fprintf(stderr, "%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); fprintf(stderr, "\n"); } return -1; } memcpy(id, stream.in_buffer.BasePointer()+48, PEER_ID_LEN); if(arg_verbose){ TextPeerID((unsigned char *)(stream.in_buffer.BasePointer()+48), txtid); fprintf(stderr, "Peer %p ID: %s\n", this, txtid); } // ignore peer id verify if( !BTCONTENT.pBF->IsEmpty()){ char *bf = new char[BTCONTENT.pBF->NBytes()];#ifndef WINDOWS if(!bf) return -1;#endif BTCONTENT.pBF->WriteToBuffer(bf); r = stream.Send_Bitfield(bf,BTCONTENT.pBF->NBytes()); delete []bf; } if( r >= 0){ if( stream.in_buffer.PickUp(68) < 0 ) return -1; m_status = P_SUCCESS; } return r;}int btPeer::Send_ShakeInfo(){ return stream.Send_Buffer((char*)BTCONTENT.GetShakeBuffer(),68);}int btPeer::BandWidthLimitUp(){ if( cfg_max_bandwidth_up <= 0 ) return 0; return ((Self.RateUL()) >= cfg_max_bandwidth_up) ? 1:0;}int btPeer::BandWidthLimitDown(){ if( cfg_max_bandwidth_down <= 0 ) return 0; return ((Self.RateDL()) >= cfg_max_bandwidth_down) ? 1:0;}int btPeer::NeedWrite(){ int yn = 0; if( m_standby && WORLD.Endgame() ){ if(arg_verbose) fprintf(stderr, "%p un-standby (endgame)\n", this); m_standby = 0; } if( stream.out_buffer.Count() || // data need send in buffer. // can upload a slice (!reponse_q.IsEmpty() && CouldReponseSlice() && !BandWidthLimitUp()) || ( (request_q.NextSend() && m_req_out < m_req_send && (m_req_out < 2 || !RateDL() || 1 >= (m_req_out+1) * request_q.GetRequestLen() / (double)RateDL() - m_latency)) // can send queued request || (request_q.IsEmpty() && !m_state.remote_choked && m_state.local_interested && !m_standby) // can request a new piece ) // ok to send requests || P_CONNECTING == m_status ){ // peer is connecting yn = 1; if( g_next_up==this && g_defer_up ){ if(arg_verbose) fprintf(stderr, "%p skipped UL\n", this); g_next_up = (btPeer *)0; } } return yn;}int btPeer::NeedRead(){ int yn = 1; if( !request_q.IsEmpty() && BandWidthLimitDown() ) yn = 0; else if( g_next_dn==this && g_defer_dn ){ if(arg_verbose) fprintf(stderr, "%p skipped DL\n", this); g_next_dn = (btPeer *)0; } return yn;}int btPeer::CouldReponseSlice(){ if(!m_state.local_choked && (stream.out_buffer.LeftSize() > reponse_q.GetRequestLen() + 4 * 1024 )) return 1; return 0;}int btPeer::AreYouOK(){ m_f_keepalive = 1; return stream.Send_Keepalive();}int btPeer::RecvModule(){ int f_peer_closed = 0; ssize_t r; if ( 64 < m_err_count ) return -1; if( request_q.IsEmpty() || !BandWidthLimitDown() ){ if ( request_q.IsEmpty() || !g_next_dn || g_next_dn==this ){ if( g_next_dn ) g_next_dn = (btPeer *)0; r = stream.Feed(); if( r < 0 && r != -2 ) return -1; else if ( r == -2 ) f_peer_closed = 1; r = stream.HaveMessage(); for( ; r;){ if( r < 0 ) return -1; if(MsgDeliver() < 0 || stream.PickMessage() < 0) return -1; r = stream.HaveMessage(); } }else{ if(arg_verbose) fprintf(stderr, "%p deferring DL to %p\n", this, g_next_dn); if( !g_defer_dn ) g_defer_dn = 1; } }else if( !g_next_dn ){ if(arg_verbose) fprintf(stderr, "%p waiting for DL bandwidth\n", this); g_next_dn = this; if( g_defer_dn ) g_defer_dn = 0; } return f_peer_closed ? -1 : 0;}int btPeer::SendModule(){ if( stream.out_buffer.Count() && stream.Flush() < 0) return -1; if( !reponse_q.IsEmpty() && CouldReponseSlice() ) { if( !BandWidthLimitUp() ){ if( !g_next_up || g_next_up==this ){ if( g_next_up ) g_next_up = (btPeer *)0; StartULTimer(); Self.StartULTimer(); if( ReponseSlice() < 0 ) return -1; }else{ if(arg_verbose) fprintf(stderr, "%p deferring UL to %p\n", this, g_next_up); if( !g_defer_up ) g_defer_up = 1; } }else if( !g_next_up ){ if(arg_verbose) fprintf(stderr, "%p waiting for UL bandwidth\n", this); g_next_up = this; if( g_defer_up ) g_defer_up = 0; } }else if( g_next_up == this ) g_next_up = (btPeer *)0; return (!m_state.remote_choked) ? RequestCheck() : 0;}// Prevent a peer object from holding g_next_up when it's not ready to write.void btPeer::CheckSendStatus(){ if( g_next_up == this && !BandWidthLimitUp() ){ if(arg_verbose) fprintf(stderr, "%p is not write-ready\n", this); g_next_up = (btPeer *)0; }}/* Detect if a peer ignored, discarded, or lost my request and we're waiting for a piece that may never arrive. */int btPeer::HealthCheck(time_t now){ if( m_health_time <= now - 60 ){ m_health_time = now; if( !m_state.remote_choked && m_req_out && m_receive_time < now - (!m_latency ? 300 : ((m_latency < 30) ? 60 : 2*m_latency)) ){ // if a repeat occurrence, get rid of the peer if( m_bad_health ) return -1; m_bad_health = 1; if(arg_verbose) fprintf(stderr, "%p unresponsive; resetting request queue\n", this); PSLICE ps = request_q.GetHead(); int retval = CancelRequest(ps); PENDINGQUEUE.Pending(&request_q); return (retval < 0) ? -1 : RequestCheck(); } else m_bad_health = 0; } return 0;}// This handles peers that suppress HAVE messages so that we don't always think// that they're empty. If we've sent the peer an amount of data equivalent to// two pieces, assume that they now have at least one complete piece.int btPeer::IsEmpty() const{ return ( bitfield.IsEmpty() && TotalUL() < BTCONTENT.GetPieceLength()*2 ) ? 1:0;}void btPeer::dump(){ struct sockaddr_in sin; GetAddress(&sin); printf("%s: %d -> %d:%d %lud:%lud\n", inet_ntoa(sin.sin_addr), bitfield.Count(), Is_Remote_UnChoked() ? 1 : 0, request_q.IsEmpty() ? 0 : 1, (unsigned long)TotalDL(), (unsigned long)TotalUL());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -