⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer_agent.cc

📁 升级版的p2p文件共享linux环境ns2仿真软件
💻 CC
📖 第 1 页 / 共 5 页
字号:
   gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, sbytes, sfiles, id);   if(sbytes)     free(sbytes);   if(sfiles)     free(sfiles);}/* PONG reply received */void GnutellaApp::PongReply(NodeAddr_t peer, int ttl, char* payload) {  tPong *pong = (tPong *)payload;  if(lpeers_.size() < max_deg_ && pong->addr_!= addr_) {    PeerMap_t::iterator pi = lpeers_.find(pong->addr_);    if(pi==lpeers_.end()) {      gagent_->Connect(pong->addr_, CONN_TIMEOUT);    }  }  if(find_servent(pong->addr_))    return;  if(pong->addr_!= addr_) {    ServentRec *newsrv = new ServentRec(pong->port_, pong->addr_, pong->byte_shared_, pong->file_shared_);    servent_cache_.push_back(*newsrv);    while(servent_cache_.size() >= KNOWNCACHE_LIMIT)        servent_cache_.pop_front();  }}/* Query Request received *///void GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {//modify by zdh 04-04int GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {  double hitprob=0;  rquery++;   if(isFreeloader_ || state_==PS_OFFLINE)     return 0;//20060322 Jackie commented/*  unsigned char nhit = (unsigned char)((float)rand()/(float)RAND_MAX *5.0);    if(ac_) {    int M = ac_->sys_->nFiles_;    int fnum = atoi(payload);    hitprob = (double)(M+1-fnum)/(double)(M+1);  }  double hit = (float)rand()/(float)RAND_MAX; //random probability of hit    if(hitprob>0 && hit<hitprob || (hitprob==0 && hit<HIT_PROB))  */   //int fnum = atoi(payload); int fnum = *((Word_t *)payload); debug_info("Query Request receive by: %d from: %d,request segment id: %d,my buffer seg id:%d \n",addr_,peer,fnum,buffered_segment_[0].segment_id_); unsigned char nhit = 0;  if (segmentIsBuffer(fnum))  	{    gagent_->QueryHit((int)peer, nhit, addr_, speed_, payload, id,ttl); 	return 1;  	}  else return 0;}int GnutellaApp::segmentIsBuffer(int request_segmentID){return request_segmentID==buffered_segment_[0].segment_id_;}/* QueryHit received*/void GnutellaApp::QueryHitReply(NodeAddr_t peer, int ttl, char *payload) {tQueryHit queryhit;// add by zdh 04-03memcpy((void *)&queryhit, (void *)payload, QUERYHIT_FIXLEN);/*if (segmentID_==file_size_){	 debug_info(" The transfer for node :%d  is finished, altogether %d segments are transfered \n", gapp_->addr_,gapp_->file_size_);	 segmentID_++;     	}else{*/     		//the current querying segment is hits     		     		//}if (segment_lable_[queryhit.segmentID_].searched_){			if (!segment_lable_[queryhit.segmentID_].received_)				{debug_my("QueryHit received by %d from %d segment: %d hops : %d\n", addr_, peer,queryhit.segmentID_,ttl);				segment_lable_[queryhit.segmentID_].received_=1;			update_segmentcache(queryhit.segmentID_,ttl);			//search(NULL);				}     			}  if(ac_ && state_==PS_ACTIVE)    ac_->expire(NULL);}/* available for more connections? */int GnutellaApp::avail(int type) {  if(lpeers_.size() < max_deg_)    return TRUE;  else    return FALSE;}/* configure a list of bootstrap servers from a file */void GnutellaApp::setBootServer(FILE *file) {  char line[20];  NodeAddr_t servent;  BootServerRec *brec;  while(fgets((char *)&line, 20, file)) {#ifdef PDNS    Tcl &tcl = Tcl::instance();    tcl.evalf("[Simulator instance] convert-ipaddr %s\n", line);    servent = atoi(tcl.result()); #else    servent = atoi(line);#endif    brec = new BootServerRec(servent, LISTEN_PORT);    if (brec)       bserver_list_.insert(bserver_list_.end(), *brec);  }}/* find a servent in the servent cache */int GnutellaApp::find_servent(NodeAddr_t peer) {  for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) {    if(si->addr_==peer)      return TRUE;  }  return FALSE;}/* remove a servent from the known servent cache */void GnutellaApp::remove_servent(NodeAddr_t peer) {  for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) {    if(si->addr_==peer) {      servent_cache_.erase(si);      return;    }  }}void GnutellaApp::update_segmentcache(Word_t segID,Byte_t hops) {int i,tmp,not_delay=0;//double delay=0;	//for (i=0;i<buffer_size_;i++){	//add by zdh	for (i=1;i<segID;i++){		if (segment_lable_[i].received_==0)			{segment_lable_[segID].tdelay_=0;		not_delay=1;		tmp=i-1+nwindow;			if(!segment_lable_[tmp].searched_)				segmentID_=tmp;				return;			}		}	if(!not_delay){		for(i=1;i<segID;i++)		{	segment_lable_[i].Tdelay_=segment_lable_[i-1].Tdelay_+segment_lable_[i].tdelay_;	//debug_my("segment_lable_[%d].Tdelay_: %f,segment_lable_[%d].tdelay_: %f\n",i,segment_lable_[i].Tdelay_,i,segment_lable_[i].tdelay_);		}	if (segID<=nwindow)		segment_lable_[segID].tdelay_=hops-(start_up_time+segment_play_time*(segID-1)+segment_lable_[segID-1].Tdelay_);	else		segment_lable_[segID].tdelay_=hops+segment_play_time*(1-nwindow)-(segment_lable_[segID-1].Tdelay_-segment_lable_[segID-nwindow].Tdelay_);		if (segment_lable_[segID].tdelay_<0)		segment_lable_[segID].tdelay_=0;		segment_lable_[segID].Tdelay_=segment_lable_[segID-1].Tdelay_+segment_lable_[segID].tdelay_;	//debug_my("segment_lable_[%d].Tdelay_: %f,segment_lable_[%d].tdelay_: %f\n",segID,segment_lable_[segID].Tdelay_,segID,segment_lable_[segID].tdelay_);				//debug_my("total_delay[%d]: %f segment_delay:%f,buffer time %f\n",segID,segment_lable_[segID].Tdelay_,segment_lable_[segID].tdelay_,buffered_segment_[0].buffer_time_);	if ((buffered_segment_[0].buffer_time_<segment_lable_[segID].tdelay_) && (buffered_segment_[0].segment_id_!=segID)){	//if ((buffered_segment_[i].hops_<hops) && (buffered_segment_[i].segment_id_!=segID))		if (buffered_segment_[0].hops_!=0)			debug_my(" Updates cache for node: %d  old segment: %d  old hops:%d  old buffer time %f to new segment: %d,new hops: %d segment delay %f\n", addr_,buffered_segment_[0].segment_id_, buffered_segment_[0].hops_,buffered_segment_[0].buffer_time_,segID,hops,segment_lable_[segID].tdelay_);		else			debug_my(" Inserting cache for node: %d  with segment: %d,hops: %d delay %f \n", addr_,segID,hops,segment_lable_[segID].tdelay_);		buffered_segment_[0].segment_id_=segID;		buffered_segment_[0].hops_=hops;		buffered_segment_[0].buffer_time_=segment_lable_[segID].tdelay_;					//for(i=segID+1; i<file_size_; i++)		//	segment_deadline_[i]=segment_deadline_[i]+segment_delay_;					}	segmentID_=segID+nwindow;			}	search(NULL);						return ;}/* send an update to the bootstrap server */ void GnutellaApp::update_bootcache() {  if(smpBoot_)	return;  gagent_->UpdateBootcache();}int GnutellaApp::command(int argc, const char*const* argv) {  Tcl &tcl = Tcl::instance();  if(argc==2) {    if(strcmp(argv[1], "start")==0) {      join();      return TCL_OK;    }    if(strcmp(argv[1], "join")==0) {      join();      return TCL_OK;    }    if(strcmp(argv[1], "leave")==0) {      leave();      return TCL_OK;    }    if(strcmp(argv[1], "stop")==0) {      stop();      return TCL_OK;    }    if(strcmp(argv[1], "statistics")==0) {		stat();		//commented by zdh 04-03      //gagent_->statistics();      return TCL_OK;    }  }  if(argc==3) {    if(strcmp(argv[1], "set-agent")==0) {      GnutellaAgent *tmp = (GnutellaAgent *)tcl.lookup(argv[2]);      if(tmp) {	gagent_ = tmp;	agent_ = tmp; 	return TCL_OK;      }      return TCL_ERROR;    }    if(strcmp(argv[1], "search")==0) {      search((char *)argv[2]);      return TCL_OK;    }    if(strcmp(argv[1], "use-bootserver")==0) {      FILE *file = fopen(argv[2], "r");      if(file) {	setBootServer(file);	return TCL_OK;      }      return TCL_ERROR;    }    if(strcmp(argv[1], "use-smpbootserver")==0) {      SmpBootServer *tmp = (SmpBootServer *)tcl.lookup(argv[2]);      if(tmp!=NULL) {	bootSrv_ = tmp;	smpBoot_ = TRUE;	return TCL_OK;      }      return TCL_ERROR;    }  }    if(argc==4) {    if(strcmp(argv[1], "attach-peersys")==0) {      PeerSys *tmp = (PeerSys *)tcl.lookup(argv[2]);      if(tmp!=NULL) {	ac_ = new ActivityController(tmp, this, atoi(argv[3]));	return TCL_OK;      }      return TCL_ERROR;    }  }  return TCL_ERROR;}/* Ping timer */PingTimer::PingTimer(GnutellaApp *app) {  app_ = app;}void PingTimer::expire(Event *) {  app_->ping();}/* Connection watchdog */WatchDog::WatchDog(GnutellaApp *app) {  app_ = app;}void WatchDog::expire(Event *) {  app_->connect(TRUE);  app_->update_bootcache();}/* known servent record */ServentRec::ServentRec(Byte_t port, NodeAddr_t addr, int nbytes, int nfiles) {  port_ = port;  addr_ = addr;  file_shared_ = nfiles;  byte_shared_ = nbytes;}/**********************************************************************//****            Part III: Simple bootstrap server                 ****//* servent record maintained by bootstrap server */GServentRec::GServentRec(Byte_t port, NodeAddr_t addr, int nbytes, int nfiles, GnutellaApp *app) {  port_ = port;  addr_ = addr;  app_ = app;  file_shared_ = nfiles;  byte_shared_ = nbytes;}SmpBootServer::SmpBootServer() {}void SmpBootServer::RemovePeer(NodeAddr_t peer, GnutellaApp *app) {  GServentMap_t::iterator fi = servent_cache_.find(peer);  if(fi!=servent_cache_.end()) {    servent_cache_.erase(fi);  }}/* called directly by a new servent for bootstrapping */BootstrapRes_t *SmpBootServer::BootstrapRequest(NodeAddr_t peer, GnutellaApp *app, int peertype) {  BootstrapRes_t *res;  int i = 0;  int cnt = 0, asize = 0;   double thresh, prob;  for(GServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) {    if(fi->second.app_->avail(peertype))      cnt++;  }  asize = cnt > MAX_BOOTREPLY? MAX_BOOTREPLY:cnt;  if (cnt <= MAX_BOOTREPLY)    thresh = 1;  else    thresh = (float)MAX_BOOTREPLY/(float)cnt;  res = (BootstrapRes_t *)malloc(sizeof(BootstrapRes_t));  NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*asize);  for(GServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) {    if(!fi->second.app_->avail(peertype))      continue;    prob = (double)rand()/(double)RAND_MAX;    if(prob <= thresh) {            addrs[i] = fi->second.addr_;      i++;      if(i>=asize)	break;    }  }  res->cnt_ = i;  res->servents_ = addrs;  if(peertype!=TYPE_LEAF) {    GServentMap_t::iterator fi = servent_cache_.find(peer);    if(fi==servent_cache_.end()) {      GServentRec *newsrv;      newsrv = new GServentRec(LISTEN_PORT, peer, 0, 0, app);      servent_cache_.insert(GServentMap_t::value_type(peer, *newsrv));    }  }  return res;}/* simple bootstrap server in PDNS */PDNSBootServer::PDNSBootServer(NodeAddr_t addr): SmpBootServer() {  addr_ = addr;}BootstrapRes_t *PDNSBootServer::BootstrapRequest(NodeAddr_t peer, GnutellaApp *app, int peertype) {    BootstrapRes_t *res;  int i = 0;  int cnt = 0, asize = 0;   double thresh, prob;  cnt = pservent_cache_.size();  asize = cnt > MAX_BOOTREPLY? MAX_BOOTREPLY: cnt;  if (cnt <= MAX_BOOTREPLY)    thresh = 1;  else    thresh = (float)MAX_BOOTREPLY/(float)cnt;  res = (BootstrapRes_t *)malloc(sizeof(BootstrapRes_t));  NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*asize);  for(PDNSServentMap_t::iterator fi = pservent_cache_.begin(); fi != pservent_cache_.end(); ++fi) {    prob = (double)rand()/(double)RAND_MAX;    if(prob <= thresh) {            addrs[i] = fi->second.addr_;      i++;      if(i>=asize)	break;    }  }  res->cnt_ = i;  res->servents_ = addrs;  if(peertype!=TYPE_LEAF) {    PDNSServentMap_t::iterator fi = pservent_cache_.find(peer);    if(fi==pservent_cache_.end()) {      PDNSServentEntry *newsrv;      newsrv = new PDNSServentEntry(LISTEN_PORT, peer);      pservent_cache_.insert(PDNSServentMap_t::value_type(peer, *newsrv));      broadcast(ADD_PEER, peer);    }  }  return res;}void PDNSBootServer::RemovePeer(NodeAddr_t peer) {  PDNSServentMap_t::iterator fi = pservent_cache_.find(peer);  if(fi!=pservent_cache_.end()) {    pservent_cache_.erase(fi);    broadcast(REM_PEER, peer);  }}/* PDNSBootServer receives updates from boot servers in other physical nodes*/int PDNSBootServer::upcall_recv(Socket *sock, PacketData *pktdata, Handler *){  PDNSBootMsg *msg = NULL;  NodeAddr_t peer;  PDNSServentMap_t::iterator pi;  PDNSServentEntry *newentry = NULL;  if(pktdata==NULL)    return 0;  unsigned char *data = pktdata->data();  msg = parse(pktdata->size(), data);  if(msg==NULL)     return pktdata->size();  peer = msg->peer_;  switch(msg->type_) {  case ADD_PEER:    pi=pservent_cache_.find(peer);    if(pi==pservent_cache_.end()) {      PDNSServentEntry *newentry = new PDNSServentEntry(BOOTSERVER_PORT, peer);      pservent_cache_.insert(PDNSServentMap_t::value_type(peer, *newentry));    } else {      //warning    }        break;  case REM_PEER:    pi=pservent_cache_.find(peer);    if(pi!=pservent_cache_.end()) {      pservent_cache_.erase(pi);    } else {      //warning    }    break;    default:      debug_warning("WARNING: unknown msg type %d\n", msg->type_);  }    free(msg);  return pktdata->size();}/* parse the messages received from other boot ser

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -