📄 peer.cpp
字号:
} // Don't count the slice in our DL total if it was unsolicited or bad. // (We don't owe the swarm any UL for such data.) if( f_count ) DataRecved(len); return (P_FAILED == m_status) ? -1 : RequestCheck();}// This is for re-requesting unsuccessful slices.// Use RequestPiece for normal request queueing.int btPeer::RequestSlice(size_t idx,size_t off,size_t len){ int r; r = request_q.Requeue(idx,off,len); if( r < 0 ) return -1; else if( r ){ if(stream.Send_Request(idx,off,len) < 0){ return -1; } m_req_out++; m_receive_time = now; } return 0;}int btPeer::RequestCheck(){ if( BTCONTENT.pBF->IsFull() || WORLD.IsPaused() ) return SetLocal(M_NOT_INTERESTED); if( Need_Remote_Data() && !WORLD.SeedOnly() ){ if(!m_state.local_interested && SetLocal(M_INTERESTED) < 0) return -1; if( !m_state.remote_choked ){ if( m_req_out > cfg_req_queue_length ){ if(arg_verbose) CONSOLE.Debug("ERROR@4: %p m_req_out underflow, resetting", this); m_req_out = 0; } if( request_q.IsEmpty() && RequestPiece() < 0 ) return -1; else if( m_req_out < m_req_send && (m_req_out < 2 || !RateDL() || 1 >= (m_req_out+1) * request_q.GetRequestLen() / (double)RateDL() - m_latency) // above formula is to try to allow delay between sending batches of reqs && SendRequest() < 0 ) return -1; } }else if(m_state.local_interested && SetLocal(M_NOT_INTERESTED) < 0) return -1; if(!request_q.IsEmpty()) StartDLTimer(); else StopDLTimer(); return 0;}void btPeer::CloseConnection(){ if(arg_verbose) CONSOLE.Debug("%p closed", this); if( P_FAILED != m_status ){ m_status = P_FAILED; StopDLTimer(); StopULTimer(); stream.Close(); if( !request_q.IsEmpty() ) PENDINGQUEUE.Pending(&request_q); } if( g_next_up == this ) g_next_up = (btPeer *)0; if( g_next_dn == this ) g_next_dn = (btPeer *)0;}int btPeer::HandShake(){ char txtid[PEER_ID_LEN*2+3]; ssize_t r = stream.Feed(); if( r < 0 ){// if(arg_verbose) CONSOLE.Debug("hs: r<0 (%d)", r); return -1; } else if( r < 68 ){ if(r >= 21){ // Ignore 8 reserved bytes following protocol ID. if( memcmp(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, (r<28) ? r-20 : 8) != 0 ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("peer %p gave 0x", this); for(int i=20; i<r && i<27; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); CONSOLE.Debug_n(" as reserved bytes (partial)"); } memcpy(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, (r<28) ? r-20 : 8); } } if(r && memcmp(stream.in_buffer.BasePointer(),BTCONTENT.GetShakeBuffer(), (r<48) ? r : 48) != 0){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("mine: 0x"); for(int i=0; i<r && i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(BTCONTENT.GetShakeBuffer()[i])); CONSOLE.Debug_n(""); CONSOLE.Debug_n("peer: 0x"); for(int i=0; i<r && i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); if( r>48 ){ TextPeerID((unsigned char *)(stream.in_buffer.BasePointer()+48), txtid); CONSOLE.Debug("peer is %s", txtid); } } return -1; } return 0; } // If the reserved bytes differ, make them the same. // If they mean anything important, the handshake is likely to fail anyway. if( memcmp(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, 8) != 0 ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("peer %p gave 0x", this); for(int i=20; i<27; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); CONSOLE.Debug_n(" as reserved bytes" ); } memcpy(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, 8); } if( memcmp(stream.in_buffer.BasePointer(), BTCONTENT.GetShakeBuffer(),48) != 0 ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("mine: 0x"); for(int i=0; i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(BTCONTENT.GetShakeBuffer()[i])); CONSOLE.Debug_n(""); CONSOLE.Debug_n("peer: 0x"); for(int i=0; i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); } return -1; } memcpy(id, stream.in_buffer.BasePointer()+48, PEER_ID_LEN); if(arg_verbose){ TextPeerID((unsigned char *)(stream.in_buffer.BasePointer()+48), txtid); CONSOLE.Debug("Peer %p ID: %s", this, txtid); } // ignore peer id verify if( !BTCONTENT.pBF->IsEmpty()){ char *bf = new char[BTCONTENT.pBF->NBytes()];#ifndef WINDOWS if(!bf) return -1;#endif BTCONTENT.pBF->WriteToBuffer(bf); r = stream.Send_Bitfield(bf,BTCONTENT.pBF->NBytes()); delete []bf; } if( r >= 0){ if( stream.in_buffer.PickUp(68) < 0 ) return -1; m_status = P_SUCCESS; m_retried = 0; // allow reconnect attempt // When seeding, new peer starts at the end of the line. if( BTCONTENT.pBF->IsFull() ){ // i am seed // Allow resurrected peer to resume its place in line. if( 0 == m_unchoke_timestamp ) m_unchoke_timestamp = now; m_connect_seed = 1; } } return r;}int btPeer::Send_ShakeInfo(){ return stream.Send_Buffer((char*)BTCONTENT.GetShakeBuffer(),68);}int btPeer::NeedWrite(){ int yn = 0; size_t r; if( m_standby && WORLD.Endgame() ){ if(arg_verbose) CONSOLE.Debug("%p un-standby (endgame)", this); m_standby = 0; } if( stream.out_buffer.Count() ) yn = 1; // data in buffer to send else if( P_CONNECTING == m_status ) yn = 1; // peer is connecting else if( WORLD.IsPaused() ) yn = 0; // paused--no up/download allowed else if( !m_state.local_choked && !reponse_q.IsEmpty() && !WORLD.BandWidthLimitUp(Self.LateUL()) ) yn = 1; //can upload a slice else if( !m_state.remote_choked && m_state.local_interested && request_q.IsEmpty() && !m_standby ) yn = 1; // can request a new piece else if( request_q.NextSend() && m_req_out < m_req_send && (m_req_out < 2 || !(r = RateDL()) || 1 >= (m_req_out+1) * request_q.GetRequestLen() / (double)r - m_latency) ) yn = 1; // can send a queued request return yn;}int btPeer::NeedRead(){ int yn = 1; if( P_SUCCESS == m_status && M_PIECE == stream.PeekMessage() && ((g_next_dn && g_next_dn != this) || WORLD.BandWidthLimitDown(Self.LateDL())) ){ yn = 0; } return yn;}int btPeer::CouldReponseSlice(){ // If the entire buffer isn't big enough, go ahead and let the put resize it. if( !m_state.local_choked && (stream.out_buffer.LeftSize() >= H_LEN + H_PIECE_LEN + reponse_q.GetRequestLen() || stream.out_buffer.Count() + stream.out_buffer.LeftSize() < H_LEN + H_PIECE_LEN + reponse_q.GetRequestLen()) ) return 1; return 0;}int btPeer::AreYouOK(){ m_f_keepalive = 1; return stream.Send_Keepalive();}int btPeer::RecvModule(){ ssize_t r = 0; if ( 32 <= m_err_count ){ m_want_again = 0; return -1; } if( M_PIECE == stream.PeekMessage() ){ if( !g_next_dn || g_next_dn==this ){ int limited = WORLD.BandWidthLimitDown(Self.LateDL()); if( !limited ){ if( g_next_dn ) g_next_dn = (btPeer *)0; r = stream.Feed(&rate_dl); // feed full amount (can download)// if(r>=0) CONSOLE.Debug("%p fed piece, now has %d bytes", this, r); } else if( limited && !g_next_dn ){ if(arg_verbose) CONSOLE.Debug("%p waiting for DL bandwidth", this); g_next_dn = this; } } // else deferring DL, unless limited. }else{ r = stream.Feed(BUF_DEF_SIZ, &rate_dl);// if(r>=0) CONSOLE.Debug("%p fed, now has %d bytes (msg=%d)",// this, r, (int)(stream.PeekMessage())); } if( r < 0 ) return -1; while( r = stream.HaveMessage() ){ if( r < 0 ) return -1; if( (r = MsgDeliver()) == -2 ){ if(arg_verbose) CONSOLE.Debug("%p seed<->seed detected", this); m_want_again = 0; } if( r < 0 || stream.PickMessage() < 0 ) return -1; } return 0;}int btPeer::SendModule(){ if( stream.out_buffer.Count() && stream.Flush() < 0) return -1; if( !reponse_q.IsEmpty() && CouldReponseSlice() ){ int limited = WORLD.BandWidthLimitUp(Self.LateUL()); if( !g_next_up || g_next_up==this ){ if( !limited ){ if( g_next_up ) g_next_up = (btPeer *)0; StartULTimer(); Self.StartULTimer(); if( ReponseSlice() < 0 ) return -1; } else if( limited && !g_next_up ){ if(arg_verbose) CONSOLE.Debug("%p waiting for UL bandwidth", this); g_next_up = this; if( g_defer_up ) g_defer_up = 0; } }else if( !limited ){ if(arg_verbose) CONSOLE.Debug("%p deferring UL to %p", this, g_next_up); if( !g_defer_up ) g_defer_up = 1; WORLD.Defer(); } }else if( g_next_up == this ) g_next_up = (btPeer *)0; return (!m_state.remote_choked) ? RequestCheck() : 0;}// Prevent a peer object from holding g_next_up when it's not ready to write.int btPeer::CheckSendStatus(){ if( g_next_up == this && !WORLD.BandWidthLimitUp(Self.LateUL()) ){ if(arg_verbose){ CONSOLE.Debug("%p is not write-ready", this); if( g_defer_up ) CONSOLE.Debug("%p skipped UL", this); } g_next_up = (btPeer *)0; } return g_next_up ? 1 : 0;}/* Detect if a peer ignored, discarded, or lost my request and we're waiting for a piece that may never arrive. */int btPeer::HealthCheck(){ if( BTCONTENT.pBF->IsFull() ){ // Catch seeders who suppress HAVE and don't disconnect other seeders, // or who just sit there and waste a connection. if( m_health_time <= now - 300 ){ m_health_time = now; if( !m_state.remote_interested ){ if( m_bad_health ) return -1; m_bad_health = 1; } else m_bad_health = 0; } }else if( m_health_time <= now - 60 ){ m_health_time = now; if( !m_state.remote_choked && m_req_out && m_receive_time < now - (!m_latency ? 300 : ((m_latency < 30) ? 60 : 2*m_latency)) ){ // if a repeat occurrence, get rid of the peer if( m_bad_health ) return -1; m_bad_health = 1; if(arg_verbose) CONSOLE.Debug("%p unresponsive; resetting request queue", this); int retval = CancelRequest(request_q.GetHead()); PENDINGQUEUE.Pending(&request_q); m_req_out = 0; return (retval < 0) ? -1 : RequestCheck(); } else m_bad_health = 0; } return 0;}// This handles peers that suppress HAVE messages so that we don't always think// that they're empty. If we've sent the peer an amount of data equivalent to// two pieces, assume that they now have at least one complete piece.int btPeer::IsEmpty() const{ return ( bitfield.IsEmpty() && TotalUL() < BTCONTENT.GetPieceLength()*2 ) ? 1:0;}int btPeer::PutPending(){ int retval = 0; if( !request_q.IsEmpty() ){ retval = CancelRequest(request_q.GetHead()); PENDINGQUEUE.Pending(&request_q); } m_req_out = 0; return retval;}void btPeer::Prefetch(time_t deadline){ size_t idx, off, len; time_t predict, next_chance; if( reponse_q.Peek(&idx, &off, &len) == 0 ){ if( cfg_max_bandwidth_up ) next_chance = (time_t)( Self.LastSendTime() + (double)(Self.LastSizeSent()) / cfg_max_bandwidth_up ); else next_chance = now; if( g_next_up ){ if( g_next_up != this ){ // deferral pending; we'll get another chance to prefetch return; }else m_next_send_time = next_chance; // I am the next sender } if( m_next_send_time < next_chance ) predict = next_chance; else predict = m_next_send_time; // Don't prefetch if it will expire from cache before being sent. size_t rd, ru; if( predict < deadline && (0==(rd = Self.RateDL()) || predict <= now + cfg_cache_size*1024*1024 / rd) ){ // This allows re-prefetch if it might have expired from the cache. if( !m_prefetch_time || (0==rd && 0==(ru = Self.RateUL())) || now - m_prefetch_time > BTCONTENT.CacheSize() / (rd + ru) ){ BTCONTENT.ReadSlice(NULL, idx, off, len); m_prefetch_time = now; } } }}void btPeer::dump(){ struct sockaddr_in sin; GetAddress(&sin); CONSOLE.Print("%s: %d -> %d:%d %lud:%luu", inet_ntoa(sin.sin_addr), bitfield.Count(), Is_Remote_UnChoked() ? 1 : 0, request_q.IsEmpty() ? 0 : 1, (unsigned long)TotalDL(), (unsigned long)TotalUL());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -