📄 peer.cpp
字号:
memcpy(stream.in_buffer.BasePointer()+20, BTCONTENT.GetShakeBuffer()+20, 8); } if( memcmp(stream.in_buffer.BasePointer(), BTCONTENT.GetShakeBuffer(),48) != 0 ){ if(arg_verbose){ CONSOLE.Debug_n(""); CONSOLE.Debug_n("mine: 0x"); for(int i=0; i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(BTCONTENT.GetShakeBuffer()[i])); CONSOLE.Debug_n(""); CONSOLE.Debug_n("peer: 0x"); for(int i=0; i<48; i++) CONSOLE.Debug_n("%2.2hx", (unsigned short)(unsigned char)(stream.in_buffer.BasePointer()[i])); } return -1; } memcpy(id, stream.in_buffer.BasePointer()+48, PEER_ID_LEN); if(arg_verbose){ TextPeerID((unsigned char *)(stream.in_buffer.BasePointer()+48), txtid); CONSOLE.Debug("Peer %p ID: %s", this, txtid); } // ignore peer id verify if( !BTCONTENT.pBF->IsEmpty()){ char *bf = new char[BTCONTENT.pBF->NBytes()];#ifndef WINDOWS if(!bf) return -1;#endif BTCONTENT.pBF->WriteToBuffer(bf); r = stream.Send_Bitfield(bf,BTCONTENT.pBF->NBytes()); delete []bf; } if( r >= 0){ if( stream.in_buffer.PickUp(68) < 0 ) return -1; m_status = P_SUCCESS; m_retried = 0; // allow reconnect attempt // When seeding, new peer starts at the end of the line. if( BTCONTENT.Seeding() ){ // i am seed // Allow resurrected peer to resume its place in line. if( 0 == m_unchoke_timestamp ) m_unchoke_timestamp = now; m_connect_seed = 1; } if( stream.HaveMessage() ) return RecvModule(); } return r;}int btPeer::Send_ShakeInfo(){ return stream.Send_Buffer((char*)BTCONTENT.GetShakeBuffer(),68);}int btPeer::NeedWrite(int limited){ int yn = 0; size_t r; if( stream.out_buffer.Count() ) yn = 1; // data in buffer to send else if( P_CONNECTING == m_status ) yn = 1; // peer is connecting else if( WORLD.IsPaused() ) yn = 0; // paused--no up/download allowed else if( !m_state.local_choked && !reponse_q.IsEmpty() && !limited ) yn = 1; //can upload a slice else if( !m_state.remote_choked && m_state.local_interested && request_q.IsEmpty() && !m_standby ) yn = 1; // can request a new piece else if( request_q.NextSend() && m_req_out < m_req_send && (m_req_out < 2 || !(r = RateDL()) || 1 >= (m_req_out+1) * request_q.GetRequestLen() / (double)r - m_latency) ) yn = 1; // can send a queued request return yn;}int btPeer::NeedRead(int limited){ int yn = 1; if( P_SUCCESS == m_status && stream.PeekMessage(M_PIECE) && ((g_next_dn && g_next_dn != this) || limited) ){ yn = 0; } return yn;}int btPeer::CouldReponseSlice(){ // If the entire buffer isn't big enough, go ahead and let the put resize it. if( !m_state.local_choked && (stream.out_buffer.LeftSize() >= H_LEN + H_PIECE_LEN + reponse_q.GetRequestLen() || stream.out_buffer.Count() + stream.out_buffer.LeftSize() < H_LEN + H_PIECE_LEN + reponse_q.GetRequestLen()) ) return 1; return 0;}int btPeer::AreYouOK(){ m_f_keepalive = 1; return stream.Send_Keepalive();}int btPeer::RecvModule(){ ssize_t r = 0; if( stream.PeekMessage(M_PIECE) ){ if( !g_next_dn || g_next_dn==this ){ int limited = WORLD.BandWidthLimitDown(Self.LateDL()); if( !limited ){ if( g_next_dn ) g_next_dn = (btPeer *)0; r = stream.Feed(&rate_dl); // feed full amount (can download)// if(r>=0) CONSOLE.Debug("%p fed piece, now has %d bytes", this, r); Self.OntimeDL(0); } else if( !g_next_dn ){ if(arg_verbose) CONSOLE.Debug("%p waiting for DL bandwidth", this); g_next_dn = this; } } // else deferring DL, unless limited. }else if( !stream.HaveMessage() ){ // could have been called post-handshake r = stream.Feed(BUF_DEF_SIZ, &rate_dl);// if(r>=0) CONSOLE.Debug("%p fed, now has %d bytes (msg=%d)",// this, r, (int)(stream.PeekMessage())); } if( r < 0 ){ if(arg_verbose) CONSOLE.Debug("%p: %s", this, (r==-2) ? "remote closed" : strerror(errno)); return -1; } while( r = stream.HaveMessage() ){ if( r < 0 ) return -1; if( (r = MsgDeliver()) == -2 ){ if(arg_verbose) CONSOLE.Debug("%p seed<->seed detected", this); m_want_again = 0; } if( r < 0 || stream.PickMessage() < 0 ) return -1; } return 0;}int btPeer::SendModule(){ if( stream.out_buffer.Count() && stream.Flush() < 0 ){ if(arg_verbose) CONSOLE.Debug("%p: %s", this, strerror(errno)); return -1; } if( !reponse_q.IsEmpty() && CouldReponseSlice() ){ int limited = WORLD.BandWidthLimitUp(Self.LateUL()); if( !g_next_up || g_next_up==this ){ if( !limited ){ if( g_next_up ) g_next_up = (btPeer *)0; StartULTimer(); Self.StartULTimer(); if( ReponseSlice() < 0 ) return -1; Self.OntimeUL(0); } else if( !g_next_up ){ if(arg_verbose) CONSOLE.Debug("%p waiting for UL bandwidth", this); g_next_up = this; if( g_defer_up ) g_defer_up = 0; } }else if( !limited ){ if(arg_verbose) CONSOLE.Debug("%p deferring UL to %p", this, g_next_up); if( !g_defer_up ) g_defer_up = 1; WORLD.Defer(); } }else if( g_next_up == this ) g_next_up = (btPeer *)0; return (!m_state.remote_choked) ? RequestCheck() : 0;}// Prevent a peer object from holding g_next_up when it's not ready to write.int btPeer::CheckSendStatus(){ if( g_next_up == this && !WORLD.BandWidthLimitUp(Self.LateUL()) ){ if(arg_verbose){ CONSOLE.Debug("%p is not write-ready", this); if( g_defer_up ) CONSOLE.Debug("%p skipped UL", this); } g_next_up = (btPeer *)0; } return g_next_up ? 1 : 0;}/* Detect if a peer ignored, discarded, or lost my request and we're waiting for a piece that may never arrive. */int btPeer::HealthCheck(){ if( BTCONTENT.IsFull() ){ // Catch seeders who suppress HAVE and don't disconnect other seeders, // or who just sit there and waste a connection. if( m_health_time <= now - 300 ){ m_health_time = now; if( !m_state.remote_interested ){ if( m_bad_health ) return -1; m_bad_health = 1; } else m_bad_health = 0; } }else if( m_health_time <= now - 60 ){ m_health_time = now; if( !m_state.remote_choked && m_req_out ){ size_t allowance = !m_latency ? 150 : ((m_latency < 60) ? 60 : m_latency); if( m_receive_time < now - 2*allowance ){ // if a repeat occurrence, get rid of the peer if( m_bad_health || PeerError(2, "unresponsive") < 0 ) return -1; m_bad_health = 1; if(arg_verbose) CONSOLE.Debug("%p unresponsive; resetting request queue", this); int retval = CancelRequest(); PutPending(); return (retval < 0) ? -1 : 0; }else if( m_receive_time < now - allowance ){ CONSOLE.Debug("%p unresponsive; sending keepalive", this); AreYouOK(); // keepalive--may stimulate the connection }else m_bad_health = 0; }else m_bad_health = 0; } return 0;}// This handles peers that suppress HAVE messages so that we don't always think// that they're empty. If we've sent the peer an amount of data equivalent to// two pieces, assume that they now have at least one complete piece.int btPeer::IsEmpty() const{ return ( bitfield.IsEmpty() && TotalUL() < BTCONTENT.GetPieceLength()*2 ) ? 1:0;}void btPeer::PutPending(){ if( !request_q.IsEmpty() ){ if( PENDINGQUEUE.Pending(&request_q) != 0 ) WORLD.RecalcDupReqs(); WORLD.UnStandby(); } m_req_out = 0;}int btPeer::NeedPrefetch() const{ if( P_SUCCESS == m_status && ( Is_Local_UnChoked() || (!BTCONTENT.IsFull() && Is_Remote_UnChoked() && m_prefetch_completion < 2 && request_q.LastSlice()) ) ) return 1; else return 0;}// Call NeedPrefetch() first, which checks additional conditions!void btPeer::Prefetch(time_t deadline){ size_t rd, ru; size_t idx, off, len; time_t predict, next_chance; if( !BTCONTENT.IsFull() && Is_Remote_UnChoked() && m_prefetch_completion < 2 && request_q.LastSlice() && (rd=RateDL()) > 0 && request_q.Peek(&idx, &off, &len)==0 && m_last_timestamp + len / rd < now + WORLD.GetUnchokeInterval() && Self.RateDL() > 0 && m_last_timestamp + len / rd < now + (cfg_cache_size*1024*1024 - BTCONTENT.GetPieceLength(idx)) / Self.RateDL() ){ switch( BTCONTENT.CachePrep(idx) ){ case -1: // don't prefetch m_prefetch_completion = 2; break; case 0: // ready, no data flushed if( m_prefetch_completion || off==0 ){ if( off+len < BTCONTENT.GetPieceLength(idx) ) BTCONTENT.ReadSlice(NULL, idx, off+len, BTCONTENT.GetPieceLength(idx)-off-len); m_prefetch_completion = 2; }else{ BTCONTENT.ReadSlice(NULL, idx, 0, off); if( off+len < BTCONTENT.GetPieceLength(idx) ) m_prefetch_completion = 1; else m_prefetch_completion = 2; } break; case 1: // data was flushed (time used) break; } } else if( Is_Local_UnChoked() && reponse_q.Peek(&idx, &off, &len) == 0 ){ if( cfg_max_bandwidth_up ) next_chance = (time_t)( Self.LastSendTime() + (double)(Self.LastSizeSent()) / cfg_max_bandwidth_up ); else next_chance = now; if( g_next_up ){ if( g_next_up != this ){ // deferral pending; we'll get another chance to prefetch return; }else m_next_send_time = next_chance; // I am the next sender } if( m_next_send_time < next_chance ) predict = next_chance; else predict = m_next_send_time; // Don't prefetch if it will expire from cache before being sent. if( predict < deadline && (0==(rd = Self.RateDL()) || predict <= now + cfg_cache_size*1024*1024 / rd) ){ // This allows re-prefetch if it might have expired from the cache. if( !m_prefetch_time || (0==rd && 0==(ru = Self.RateUL())) || now - m_prefetch_time > BTCONTENT.CacheSize() / (rd + ru) ){ BTCONTENT.ReadSlice(NULL, idx, off, len); m_prefetch_time = now; } } }}int btPeer::PeerError(int weight, const char *message){ int old_count = m_err_count; m_err_count += weight; if( m_err_count < 0 ) m_err_count = 0; if( arg_verbose && (weight > 0 || old_count > 0) ) CONSOLE.Debug("%p error %+d (%d) %s", this, weight, m_err_count, message); if( m_err_count >= 16 ){ m_want_again = 0; return -1; }else return 0;}void btPeer::dump(){ struct sockaddr_in sin; GetAddress(&sin); CONSOLE.Print("%s: %d -> %d:%d %llud:%lluu", inet_ntoa(sin.sin_addr), bitfield.Count(), Is_Remote_UnChoked() ? 1 : 0, request_q.IsEmpty() ? 0 : 1, (unsigned long long)TotalDL(), (unsigned long long)TotalUL());}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -