peer.cpp

来自「最经典的bittorrent协议的实现的源码」· C++ 代码 · 共 1,243 行 · 第 1/3 页

CPP
1,243
字号
#include "peer.h"#include <stdlib.h>#include <string.h>#include <ctype.h>#include <sys/time.h>#include "btstream.h"#include "./btcontent.h"#include "./msgencode.h"#include "./peerlist.h"#include "./btconfig.h"#include "bttime.h"#include "console.h"#if !defined(HAVE_CLOCK_GETTIME) || !defined(HAVE_SNPRINTF)#include "compat.h"#endif// Convert a peer ID to a printable string.int TextPeerID(const unsigned char *peerid, char *txtid){  int i, j;  for(i=j=0; i < PEER_ID_LEN; i++){    if( i==j && isprint(peerid[i]) && !isspace(peerid[i]) )      txtid[j++] = peerid[i];    else{      if(i==j){ sprintf(txtid+j, "0x"); j+=2; }      snprintf(txtid+j, 3, "%.2X", (int)(peerid[i]));      j += 2;    }  }  txtid[j] = '\0';  return 0;}/* g_next_up is used to rotate uploading.  If we have the opportunity to   upload to a peer but skip it due to bw limiting, the var is set to point to   that peer and it will be given priority at the next opportunity.   g_next_dn is similar, but for downloading.   g_defer_up is used to let the g_next peer object know if it skipped, as the   socket could go non-ready if other messages are sent while waiting for bw.*/btPeer *btPeer::g_next_up = (btPeer *)0;btPeer *btPeer::g_next_dn = (btPeer *)0;unsigned char btPeer::g_defer_up = 0;btBasic Self;void btBasic::SetIp(struct sockaddr_in addr){  memcpy(&m_sin.sin_addr,&addr.sin_addr,sizeof(struct in_addr));}void btBasic::SetAddress(struct sockaddr_in addr){  memcpy(&m_sin,&addr,sizeof(struct sockaddr_in));}int btBasic::IpEquiv(struct sockaddr_in addr){//  CONSOLE.Print("IpEquiv: %s <=> %s", inet_ntoa(m_sin.sin_addr),//    inet_ntoa(addr.sin_addr));  return (memcmp(&m_sin.sin_addr,&addr.sin_addr,sizeof(struct in_addr)) == 0) ?     1 : 0;}int btPeer::Need_Local_Data() const{  if( m_state.remote_interested && !bitfield.IsFull()){    if( BTCONTENT.IsFull() ) return 1; // i am seed    BitField tmpBitfield = *BTCONTENT.pBF;    tmpBitfield.Except(bitfield);    return tmpBitfield.IsEmpty() ? 0 : 1;  }  return 0;}int btPeer::Need_Remote_Data() const{  if( BTCONTENT.Seeding() ) return 0;  else if( bitfield.IsFull() &&    BTCONTENT.CheckedPieces() >= BTCONTENT.GetNPieces() ) return 1;  else{    BitField tmpBitfield = bitfield;                // what peer has    tmpBitfield.Except(*BTCONTENT.pBF);             // what I have    tmpBitfield.Except(*BTCONTENT.pBMasterFilter);  // what I don't want    tmpBitfield.And(*BTCONTENT.pBChecked);          // what I've checked    return tmpBitfield.IsEmpty() ? 0 : 1;  }  return 0;}btPeer::btPeer(){  m_f_keepalive = 0;  m_status = P_CONNECTING;  m_unchoke_timestamp = (time_t)0;  m_last_timestamp = m_next_send_time = now;  m_state.remote_choked = m_state.local_choked = 1;  m_state.remote_interested = m_state.local_interested = 0;  m_err_count = 0;  m_cached_idx = BTCONTENT.GetNPieces();  m_standby = 0;  m_req_send = 5;  m_req_out = 0;  m_latency = 0;  m_prev_dlrate = 0;  m_health_time = m_receive_time = m_choketime = m_last_timestamp;  m_cancel_time = m_latency_timestamp = (time_t)0;  m_bad_health = 0;  m_want_again = m_connect = m_retried = 0;  m_connect_seed = 0;  m_prefetch_time = (time_t)0;  m_requested = 0;  rate_dl.SetSelf(Self.DLRatePtr());  rate_ul.SetSelf(Self.ULRatePtr());}void btPeer::CopyStats(btPeer *peer){  SetDLRate(peer->GetDLRate());  SetULRate(peer->GetULRate());  m_unchoke_timestamp = peer->GetLastUnchokeTime();  m_retried = peer->Retried(); // don't try to reconnect over & over}int btPeer::SetLocal(unsigned char s){  switch(s){  case M_CHOKE:    if( m_state.local_choked ) return 0;    m_unchoke_timestamp = now;//  if(arg_verbose) CONSOLE.Debug("Choking %p", this);    if(arg_verbose) CONSOLE.Debug("Choking %p (D=%lluMB@%dK/s)", this,      (unsigned long long)TotalDL() >> 20, (int)(RateDL() >> 10));    m_state.local_choked = 1;     if( g_next_up == this ) g_next_up = (btPeer *)0;    if( !reponse_q.IsEmpty()) reponse_q.Empty();    StopULTimer();    if( !m_requested && BTCONTENT.IsFull() ){      // hasn't sent a request since unchoke      if(arg_verbose) CONSOLE.Debug("%p inactive", this);      return -1;    }    m_requested = 0;    break;  case M_UNCHOKE:     if( !reponse_q.IsEmpty() ) StartULTimer();    if( !m_state.local_choked ) return 0;    m_unchoke_timestamp = now;//  if(arg_verbose) CONSOLE.Debug("Unchoking %p", this);    if(arg_verbose) CONSOLE.Debug("Unchoking %p (D=%lluMB@%dK/s)", this,      (unsigned long long)TotalDL() >> 20, (int)(RateDL() >> 10));    m_state.local_choked = 0;    // No data is queued, so rate cannot delay sending.    m_next_send_time = now;    break;  case M_INTERESTED:     if( BTCONTENT.Seeding() ) return 0;    m_standby = 0;    if( m_state.local_interested ) return 0;    if(arg_verbose) CONSOLE.Debug("Interested in %p", this);    m_state.local_interested = 1;    break;  case M_NOT_INTERESTED:    if( !m_state.local_interested ) return 0;    if(arg_verbose) CONSOLE.Debug("Not interested in %p", this);    m_state.local_interested = 0;     if( !request_q.IsEmpty() ){      if( CancelRequest(request_q.GetHead()) < 0 ) return -1;      request_q.Empty();    }    break;  default:    return -1;			// BUG ???  }  return stream.Send_State(s);}int btPeer::RequestPiece(){  size_t idx;  BitField tmpBitfield, *pfilter;  int endgame = 0;  size_t qsize = request_q.Qsize();  size_t psize = BTCONTENT.GetPieceLength() / cfg_req_slice_size;  // See if there's room in the queue for a new piece.  // Also, don't queue another piece if we still have a full piece queued.  if( cfg_req_queue_length - qsize < psize || qsize >= psize ){    m_req_send = m_req_out;   // don't come back until you receive something.    return 0;  }  tmpBitfield = bitfield;  tmpBitfield.Except(*BTCONTENT.pBMasterFilter);  if( PENDINGQUEUE.ReAssign(&request_q, tmpBitfield) ){    if(arg_verbose) CONSOLE.Debug("Assigning to %p from Pending", this);    return SendRequest();  }  if( m_cached_idx < BTCONTENT.CheckedPieces() && !BTCONTENT.pBF->IsEmpty() ){    // A HAVE msg already selected what we want from this peer    // but ignore it in initial-piece mode.    idx = m_cached_idx;    m_cached_idx = BTCONTENT.GetNPieces();    if( !BTCONTENT.pBF->IsSet(idx) &&        (!BTCONTENT.GetFilter() || !BTCONTENT.GetFilter()->IsSet(idx)) &&        !PENDINGQUEUE.Exist(idx) &&        !WORLD.AlreadyRequested(idx) ){      if(arg_verbose) CONSOLE.Debug("Assigning #%d to %p", (int)idx, this);      return (request_q.CreateWithIdx(idx) < 0) ? -1 : SendRequest();    }  }  // If we didn't want the cached piece, select another.  if( BTCONTENT.pBF->IsEmpty() ){    // If we don't have a complete piece yet, try to get one that's already    // in progress.  (Initial-piece mode)    pfilter = BTCONTENT.GetFilter();    do{      tmpBitfield = bitfield;      if( pfilter ){        tmpBitfield.Except(*pfilter);        pfilter = BTCONTENT.GetNextFilter(pfilter);      }    }while( pfilter && tmpBitfield.IsEmpty() );    idx = WORLD.What_Can_Duplicate(tmpBitfield, this, BTCONTENT.GetNPieces());    if( idx < BTCONTENT.GetNPieces() ){      if(arg_verbose) CONSOLE.Debug("Want to dup #%d to %p", (int)idx, this);      btPeer *peer = WORLD.WhoHas(idx);      if(peer){        if(arg_verbose) CONSOLE.Debug("Duping: %p to %p (#%d)",          peer, this, (int)idx);        if(request_q.CopyShuffle(&peer->request_q, idx) < 0) return -1;        else return SendRequest();      }    }else if(arg_verbose) CONSOLE.Debug("Nothing to dup to %p", this);  }  // Doesn't have a piece that's already in progress--choose another.  pfilter = BTCONTENT.GetFilter();  do{    tmpBitfield = bitfield;    tmpBitfield.Except(*BTCONTENT.pBF);    if( pfilter ){      tmpBitfield.Except(*pfilter);      pfilter = BTCONTENT.GetNextFilter(pfilter);    }    // Don't go after pieces we might already have (but don't know yet)    tmpBitfield.And(*BTCONTENT.pBChecked);    // tmpBitfield tells what we need from this peer...  }while( pfilter && tmpBitfield.IsEmpty() );  if( tmpBitfield.IsEmpty() ){    // We don't need anything from the peer.  How'd we get here?    return SetLocal(M_NOT_INTERESTED);  }  BitField tmpBitfield2 = tmpBitfield;  WORLD.CheckBitField(tmpBitfield2);  // [tmpBitfield2] ...that we haven't requested from anyone.  if( tmpBitfield2.IsEmpty() ){    // Everything this peer has that I want, I've already requested.    if( WORLD.Endgame() ){  // OK to duplicate a request.//    idx = tmpBitfield.Random();      idx = 0;  // flag for Who_Can_Duplicate()      BitField tmpBitfield3 = tmpBitfield2;      idx = WORLD.What_Can_Duplicate(tmpBitfield3, this, idx);      if( idx < BTCONTENT.GetNPieces() ){        if(arg_verbose) CONSOLE.Debug("Want to dup #%d to %p",          (int)idx, this);        btPeer *peer = WORLD.WhoHas(idx);        if(peer){          if(arg_verbose) CONSOLE.Debug("Duping: %p to %p (#%d)",            peer, this, (int)idx);          if(request_q.CopyShuffle(&peer->request_q, idx) < 0) return -1;          else return SendRequest();        }      }else if(arg_verbose) CONSOLE.Debug("Nothing to dup to %p", this);    }else{  // not endgame mode      btPeer *peer = WORLD.Who_Can_Abandon(this); // slowest choice      if(peer){        // Cancel a request to the slowest peer & request it from this one.        if(arg_verbose) CONSOLE.Debug("Reassigning %p to %p (#%d)",          peer, this, (int)(peer->request_q.GetRequestIdx()));        // RequestQueue class "moves" rather than "copies" in assignment!        if( request_q.Copy(&peer->request_q) < 0 ) return -1;        if(peer->CancelPiece() < 0 || peer->RequestCheck() < 0)          peer->CloseConnection();        return SendRequest();      }else if( BTCONTENT.CheckedPieces() >= BTCONTENT.GetNPieces() ){        if(arg_verbose) CONSOLE.Debug("%p standby", this);        m_standby = 1;  // nothing to do at the moment      }    }  }else{    // Request something that we haven't requested yet (most common case).    // Try to make it something that has good trade value.    BitField tmpBitfield3 = tmpBitfield2;    WORLD.FindValuedPieces(tmpBitfield3, this, BTCONTENT.pBF->IsEmpty());    if( tmpBitfield3.IsEmpty() ) tmpBitfield3 = tmpBitfield2;    idx = tmpBitfield3.Random();    if(arg_verbose) CONSOLE.Debug("Assigning #%d to %p", (int)idx, this);    return (request_q.CreateWithIdx(idx) < 0) ? -1 : SendRequest();  }  return 0;}int btPeer::MsgDeliver(){  size_t r,idx,off,len;  int retval = 0;  char *msgbuf = stream.in_buffer.BasePointer();  r = get_nl(msgbuf);  // Don't require keepalives if we're receiving other messages.  m_last_timestamp = now;  if( 0 == r ){    if( !m_f_keepalive ) if( stream.Send_Keepalive() < 0 ) return -1;    m_f_keepalive = 0;    return 0;  }else{    char msg = msgbuf[H_LEN];    switch(msg){    case M_CHOKE:      if(H_BASE_LEN != r) return -1;      if(arg_verbose) CONSOLE.Debug("%p choked me", this);      if( m_lastmsg == M_UNCHOKE && m_last_timestamp <= m_choketime+1 ){        m_err_count+=2;        if(arg_verbose) CONSOLE.Debug("err: %p (%d) Choke oscillation",          this, m_err_count);      }      m_choketime = m_last_timestamp;      m_state.remote_choked = 1;      StopDLTimer();      if( g_next_dn == this ) g_next_dn = (btPeer *)0;      if( !request_q.IsEmpty() ){        m_req_out = 0;        PENDINGQUEUE.Pending(&request_q);      }      break;    case M_UNCHOKE:      if(H_BASE_LEN != r) return -1;      if(arg_verbose) CONSOLE.Debug("%p unchoked me", this);      if( m_lastmsg == M_CHOKE && m_last_timestamp <= m_choketime+1 ){        m_err_count+=2;        if(arg_verbose) CONSOLE.Debug("err: %p (%d) Choke oscillation",          this, m_err_count);      }      m_choketime = m_last_timestamp;      m_state.remote_choked = 0;      retval = RequestCheck();      break;    case M_INTERESTED:      if(H_BASE_LEN != r) return -1;      if(arg_verbose) CONSOLE.Debug("%p is interested", this);      m_state.remote_interested = 1;      if( Need_Local_Data() ) WORLD.UnchokeIfFree(this);      break;    case M_NOT_INTERESTED:      if(H_BASE_LEN != r) return -1;      if(arg_verbose) CONSOLE.Debug("%p is not interested", this);      m_state.remote_interested = 0;      /* remove peer's reponse queue */      if( !reponse_q.IsEmpty()) reponse_q.Empty();      /* if I've been seed for a while, nobody should be uninterested */      if( BTCONTENT.IsFull() && BTCONTENT.GetSeedTime() - now >= 300 )         return -2;      break;    case M_HAVE:      if(H_HAVE_LEN != r) return -1;      idx = get_nl(msgbuf + H_LEN + H_BASE_LEN);      if( idx >= BTCONTENT.GetNPieces() || bitfield.IsSet(idx) ) return -1;      bitfield.Set(idx);      if( bitfield.IsFull() ){        if( BTCONTENT.IsFull() ) return -2;        else stream.out_buffer.SetSize(BUF_DEF_SIZ);      }      if( !BTCONTENT.pBF->IsSet(idx) && !BTCONTENT.pBMasterFilter->IsSet(idx) ){        if( m_cached_idx >= BTCONTENT.GetNPieces() || m_standby ||            (!BTCONTENT.GetFilter() || !BTCONTENT.GetFilter()->IsSet(idx)) )          m_cached_idx = idx;        if(arg_verbose && m_standby) CONSOLE.Debug("%p un-standby", this);        m_standby = 0;      }      // see if we're Interested now      if(!m_standby) retval = RequestCheck();      break;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?