⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer_agent.cc

📁 升级版的p2p文件共享linux环境ns2仿真软件
💻 CC
📖 第 1 页 / 共 5 页
字号:
     		if (gapp_->segmentID_==msgHandle_->payload_.queryhit_->segmentID_){			gapp_->update_segmentcache(msgHandle_->payload_.queryhit_->segmentID_,msgHandle_->header_.hops_);			gapp_->search(NULL);     			}     		}*/    break;  default:     debug_warning("WARNING: Unknown Message Type\n");    break;  };  msgHandle_->clear();  return pktdata->size();  }int GnutellaAgent::command(int argc, const char*const* argv) {  Tcl &tcl = Tcl::instance();  if(argc==3) {    if(strcmp(argv[1], "set-app")==0) {      GnutellaApp *app = (GnutellaApp *)tcl.lookup(argv[2]);      if(app!=NULL) {	gapp_ = app;	app_ = gapp_;	return TCL_OK;      }      return TCL_ERROR;    }  }    if(argc==2) {    if(strcmp(argv[1], "statistics")==0) {	printf("I will printf!!\n");      statistics();      return TCL_OK;    }  }  return TCL_ERROR;}void GnutellaAgent::statistics() {     if (!gapp_->isBootserver_) {       bkblock_->print(0);       fwblock_->print(1);       gapp_->degree_->print(2);       rcvRate_->print(3);     }}// APIs to GnutellaApp/* bootstrap request */void GnutellaAgent::Bootstrap(NodeAddr_t peer) {  Socket *cur=NULL;  SockMap_t::iterator si = lsocks_.find(peer);  if(si==lsocks_.end()) {   Tcl &tcl = Tcl::instance();#ifdef PDNS  tcl.evalf("[Simulator instance] create-sock %d %s -1 1 %d %d", app_->addr_, name(), use_prio_, rate_limit_);   cur = (Socket *)tcl.lookup(tcl.result());#else  tcl.evalf("[Simulator instance] create-sock %d %s -1 0 %d %d", app_->addr_, name(), use_prio_, rate_limit_);   cur = (Socket *)tcl.lookup(tcl.result());#endif  if(cur==NULL) {    debug_warning("WARNING: unable to create-sock to %d\n", peer);    return;  }  cur->connect(peer, LISTEN_PORT);  SockEntry *newentry = new SockEntry(cur, peer, SOCK_BOOT_WAIT);  lsocks_.insert(SockMap_t::value_type(peer, *newentry));  }}/* respond to a bootstrap request*/void GnutellaAgent::Bootstrap_Reply(NodeAddr_t peer, int cnt, NodeAddr_t *iplist) {  PacketData *content;  unsigned char *dataptr;  char bootstr[BOOTSTRAP_RESLEN];  int len;  SockMap_t::iterator si = lsocks_.find(peer);  if(si==lsocks_.end()) {    debug_warning("WARNING: error sending bootstrap through unconnected peer\n");    return;  }  len = sizeof(int) + sizeof(NodeAddr_t) * cnt + BOOTSTRAP_RESLEN;  content = new PacketData(len);  dataptr = content->data();  strcpy((char *)bootstr, GNUTELLA_BOOTSTRAP_RES);  memcpy(dataptr, bootstr, BOOTSTRAP_RESLEN);   dataptr = dataptr + BOOTSTRAP_RESLEN;  *((int *)dataptr) = cnt;  memcpy((void *)&dataptr[4], (void *)iplist, sizeof(NodeAddr_t)*cnt);   si->second.sock_->send(len, content);}/* request to connect to a peer*/int GnutellaAgent::Connect(NodeAddr_t peer, double timeout) {  Socket *cur=NULL;  PendingConnEntry *newconn=NULL;  SockMap_t::iterator si = lsocks_.find(peer);  PendingConns_t::iterator pi = conn_pending_.find(peer);  if(si!=lsocks_.end()) {    debug_info("existing socket from %d to %d not removed\n", app_->addr_, peer);    return FALSE;  }  if(pi==conn_pending_.end()) {    Tcl &tcl = Tcl::instance();#ifdef PDNS    tcl.evalf("[Simulator instance] create-sock %d %s -1 1 %d %d", gapp_->addr_, name(), use_prio_, rate_limit_);#else    tcl.evalf("[Simulator instance] create-sock %d %s -1 0 %d %d", gapp_->addr_, name(), use_prio_, rate_limit_);#endif    cur = (Socket *)tcl.lookup(tcl.result());    if(cur==NULL) {      debug_warning("WARNING: unable to create-sock to\n", peer);      return FALSE;    }    cur->connect(peer, LISTEN_PORT);    newconn = new PendingConnEntry(peer, timeout, NOW, NOT_CONNECTED);  } else    return FALSE;  //add to pending connection list  conn_pending_.insert(PendingConns_t::value_type(peer, *newconn));  if(conn_pending_.size()==1)    conn_timer_->resched(CONN_TIMEOUT);  return TRUE;}/* disconnect a peer (peers) */void GnutellaAgent::Disconnect(NodeAddr_t peer) {  SockMap_t::iterator i;  if(peer==-1) { //disconnect all    for(i=lsocks_.begin(); i != lsocks_.end(); i++) {      if(i->second.type_!=SOCK_BOOT_WAIT)	      i->second.sock_->close();    }    while(!lsocks_.empty()) {      i = lsocks_.begin();      lsocks_.erase(i);    }    return;  }  //disconnect a specific peer  i = lsocks_.find(peer);  if(i!=lsocks_.end()) {    i->second.sock_->close();    lsocks_.erase(i);  }}/* send PING to a peer */void GnutellaAgent::Ping(NodeAddr_t peer, int ttl) {  PacketData *pkt;  SockMap_t::iterator si = lsocks_.find(peer);  if(si==lsocks_.end()) {    debug_warning("WARNING: %d error pinging unconnected peer %d\n", app_->addr_, peer);    return;  }   pkt = msgHandle_->newpacket(NULL, PING, ttl, 0, 0, NULL);  if(pkt) {    si->second.sock_->send(DESC_HDRLEN, pkt);    PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_PING);    pending_req_.insert(pending_req_.end(), *newquery);  }}/* respond to a PING with a specific descriptor ID */void GnutellaAgent::Pong(NodeAddr_t peer, int ttl, int cnt, NodeAddr_t *iplist, int *size, int *fnum, char *id) {  tPong pong;  PacketData *pkt=NULL;  SockMap_t::iterator si = lsocks_.find(peer);  if(si==lsocks_.end()) {    //sending PONG through unconnected peer could happen    return;  }    for(int i=0; i<cnt; i++) {    pong.port_ = 0;    pong.addr_ = iplist[i];    pong.file_shared_ = fnum[i];    pong.byte_shared_ = size[i];    pkt = msgHandle_->newpacket(id, PONG, INIT_TTL, 0, PONG_LEN, (char *)&pong);    if(pkt)       si->second.sock_->send(DESC_HDRLEN + PONG_LEN, pkt);  }}/* send a Query */void GnutellaAgent::Query(NodeAddr_t peer, Word_t minSpeed, char *search) {  SockMap_t::iterator si;  char *data;  PacketData *pkt=NULL;  int len = strlen(search) + 1;  int additional_len=0;  char *id = NULL;     data = (char *)malloc(len+sizeof(Word_t));  memcpy((void *)data, (void *)&minSpeed, sizeof(Word_t));  additional_len +=sizeof(Word_t);  if (len>1)  	strcpy(&data[additional_len], search);  nquery++;     if(peer==-1) { //send to all connected peers    for(si = lsocks_.begin(); si != lsocks_.end(); si++) {      pkt = msgHandle_->newpacket(id, QUERY, INIT_QUERY_TTL, 0, len + additional_len, data);      if(pkt) {	  si->second.sock_->send(len+additional_len+DESC_HDRLEN, pkt);	  if(id==NULL)	    id = msgHandle_->header_.id_;	}    }    PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_QUERY);    pending_req_.insert(pending_req_.end(), *newquery);    free(data);    return;  }  //or, send to a specific peer  si = lsocks_.find(peer);  if(si==lsocks_.end()) {    debug_warning("WARNING: error sending Query to unconnected peer %d\n", peer);    free(data);    return;  }  pkt = msgHandle_->newpacket(NULL, QUERY, INIT_QUERY_TTL, 0, len + 2, data);    free(data);  if(pkt) {    si->second.sock_->send(len+2+DESC_HDRLEN, pkt);    PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_QUERY);    pending_req_.insert(pending_req_.end(), *newquery);  }}/* respond to a Query with QueryHit */void GnutellaAgent::QueryHit(NodeAddr_t peer, Byte_t number, NodeAddr_t responder, int speed, char *results, char *id,int ttl) {  char *data;  PacketData *pkt=NULL;  tQueryHit queryhit;  int len, tlen, res=-1;  if(results)    len = strlen(results) + 1;  else    len = 0;  SockMap_t::iterator si = lsocks_.find(peer);  if(si==lsocks_.end()) {    // sending QueryHit through unconnected peer could happen    return;  }  queryhit.port_ = LISTEN_PORT;  queryhit.num_hits_ = number;  queryhit.addr_ = responder;  queryhit.speed_ = speed;  queryhit.segmentID_=*results;  if(!firewalled_)    data = (char *)malloc(QUERYHIT_FIXLEN + len);  else    data = (char *)malloc(QUERYHIT_FIXLEN + len + 16);  memcpy((void *)data, (void *)&queryhit, QUERYHIT_FIXLEN);  if(len>0)    memcpy((void *)&data[QUERYHIT_FIXLEN], (void *)results, len);    if(firewalled_)     memcpy((void *)&data[QUERYHIT_FIXLEN + len], mySrvId_, 16);  if(!firewalled_) 	tlen = QUERYHIT_FIXLEN + len;  else	tlen = QUERYHIT_FIXLEN + len + 16;//modify by zdh 04-03pkt = msgHandle_->newpacket(id, QUERYHIT, ttl, 0, tlen, data);  //pkt = msgHandle_->newpacket(id, QUERYHIT, INIT_TTL, 0, tlen, data);  if(pkt)    res = si->second.sock_->send(DESC_HDRLEN + tlen, pkt);  if(res!=-1)       squeryhit++;  free(data);}//called within GnutellaAgent/* compose and send LimeWire-like bootstrap request */void GnutellaAgent::lime_bootstrap(Socket *sock) {  PacketData *content;  unsigned char *dataptr;  char bootstr[BOOTSTRAP_LEN];  content = new PacketData(BOOTSTRAP_LEN-1);  dataptr = content->data();  strcpy((char *)bootstr, GNUTELLA_BOOTSTRAP);  memcpy(dataptr, bootstr, BOOTSTRAP_LEN-1);   sock->send(BOOTSTRAP_LEN-1, content);}/* agent send out cache update messages to connected bootstrap servers*/void GnutellaAgent::UpdateBootcache() {  for(SockMap_t::iterator si = lsocks_.begin(); si != lsocks_.end(); si++) {	if(si->second.type_==SOCK_BOOT) {  	  PacketData *content;	  unsigned char *dataptr;	  char bootstr[CACHEUPDATE_LEN];	  content = new PacketData(CACHEUPDATE_LEN-1);	  dataptr = content->data();	  strcpy((char *)bootstr, BOOTCACHE_UPDATE);	  memcpy(dataptr, bootstr, CACHEUPDATE_LEN-1); 	  si->second.sock_->send(CACHEUPDATE_LEN-1, content);	}  }}/* compose and send Gnutella Connection OK msg */void GnutellaAgent::gnutella_ok(Socket *sock) {  PacketData *content;  unsigned char *dataptr;  char connstr[CONNOK_LEN];  content = new PacketData(CONNOK_LEN-1);  dataptr = content->data();  strcpy((char *)connstr, GNUTELLA_OK);  memcpy(dataptr, connstr, CONNOK_LEN-1);   sock->send(CONNOK_LEN-1, content);}/* compose and send Gnutella connection rejection msg */void GnutellaAgent::gnutella_reject(Socket *sock) {  PacketData *content;  unsigned char *dataptr;  char connstr[CONNREJ_LEN];  content = new PacketData(CONNREJ_LEN-1);  dataptr = content->data();  strcpy((char *)connstr, GNUTELLA_REJ);  memcpy(dataptr, connstr, CONNREJ_LEN-1);   sock->send(CONNREJ_LEN-1, content);}/* compose and send Gnutella Connection Request msg */void GnutellaAgent::gnutella_req(Socket *sock) {  PacketData *content;  unsigned char *dataptr;  char connstr[CONN_LEN];  content = new PacketData(CONN_LEN-1);  dataptr = content->data();  strcpy((char *)connstr, GNUTELLA_CONN);  memcpy(dataptr, connstr, CONN_LEN-1);   sock->send(CONN_LEN-1, content);    if(conn_pending_.size()==1)    conn_timer_->resched(CONN_TIMEOUT);}/* Conn timeout handler, notify GnutellaApp of requests that have timed out*/void GnutellaAgent::conn_timeout() {  double ctime;    ctime = NOW;  for(PendingConns_t::iterator pi = conn_pending_.begin(); pi != conn_pending_.end(); pi++) {    if(ctime - pi->second.start_ > pi->second.timeout_ && pi->second.state_==TRANSPORT_CONNECTED)      gapp_->ConnectionTimeout(pi->second.peer_);  }}/* find the incoming socket matching a descriptor ID*/Socket *GnutellaAgent::find_desc(char *descid) {  for(DescMap_t::iterator di=desc_cache_.begin(); di !=desc_cache_.end(); di++) {    if(memcmp(descid, di->id_, 16)==0) {      return di->sock_;    }  };  return NULL;}/* find the outstanding Query matching a descriptor ID */int GnutellaAgent::find_query(char *descid, int poll) {  int cnt = -1;  double ctime = NOW, tmp, delay;  for(ReqList_t::iterator di=pending_req_.begin(); di !=pending_req_.end(); di++) {    if(di->type_==REQ_QUERY && memcmp(descid, di->id_, 16)==0) {      cnt = di->replycnt_;      if(poll) //poll only, do not update replycnt_	return cnt;      if(cnt==0) {	delay = ctime - di->tstamp_;	tmp = (double)nthru * Avg_Rsp + delay;	nthru++;	Avg_Rsp = tmp/(double)nthru;	debug_stat("QueryHit %d %d %d %d %f %f %d %d\n", nquery, nqueryhit, nthru, squeryhit, Avg_Rsp, delay, rquery, gapp_->lpeers_.size());	fflush(NULL);      }      di->replycnt_++;      //if(di->replycnt_ > 20) //remove Query if there are already many replies      //modify by zdh 04-04      if(di->replycnt_!=0)	pending_req_.erase(di);      return cnt;    }  };  return -1;}/* find the outstanding Ping matching a descriptor ID */int GnutellaAgent::find_ping(char *descid, int poll) {  int cnt = -1;  for(ReqList_t::iterator di=pending_req_.begin(); di !=pending_req_.end(); di++) {    if(di->type_==REQ_PING && memcmp(descid, di->id_, 16)==0) {      cnt = di->replycnt_;      if(!poll)	di->replycnt_++;      return cnt;    }  };  return -1;}/* forward a PING or Query message */int GnutellaAgent::forward(Socket *incoming, PacketData *data, desc_hdr *hdr) {  PacketData *newdata=NULL;  DescEntry* newentry=NULL;  int bcnt=0, totalout=0;  int fcnt=0;  for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) {    if(si->second.sock_!=incoming) {      totalout++;      if(si->second.sock_->blocked(SOCK_WRITE)) {	bcnt++;	fwblock_->increment();       }    }  }  for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) {    if(si->second.sock_!=incoming) {      if(!si->second.sock_->blocked(SOCK_WRITE)) {	newdata = new PacketData(*data);	si->second.sock_->send(data->size(), newdata);      }     }  }  newentry = new DescEntry(hdr->id_, incoming, NOW);  desc_cache_.insert(desc_cache_.end(), *newentry);  return FALSE;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -