⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer_agent.cc

📁 P2P文件共享基于ns2的仿真代码
💻 CC
📖 第 1 页 / 共 5 页
字号:
  BootstrapResult(res->cnt_, res->servents_); // stat_my("node %d bootstrap result is %d\n",addr_, res->cnt_);  if(res->servents_)    free(res->servents_);  free(res);}/* tries to connect to some known servents */void GnutellaApp::connect(int trybootserver) {  int cnt=0, success, limit;  limit = max_deg_ - lpeers_.size();  for (ServentMap_t::const_iterator i = servent_cache_.begin(); cnt < limit && i != servent_cache_.end(); ++i ) {    NodeAddr_t peer = i->addr_;    PeerMap_t::iterator pi = lpeers_.find(peer);    if(pi==lpeers_.end()) {      success = gagent_->Connect(peer, CONN_TIMEOUT);      if(success)      	cnt++;    }  }  if (trybootserver && !smpBoot_ && cnt < limit)   //not enough known servents    bootstrap();}/* periodically send PING to connected peers */void GnutellaApp::ping(int first) {  double ctime;    ctime = NOW;  for(PeerMap_t::iterator i = lpeers_.begin(); i != lpeers_.end(); ++i) {    if(ctime - i->second.lstamp_ > ping_interval_) {      gagent_->Ping(i->second.peer_, INIT_TTL);      i->second.lstamp_ = ctime;    }  }  ping_timer_->resched(ping_interval_);  }// upcalls from GnutellaAgent/* Connection request to a peer succeeded */void GnutellaApp::ConnectSucceeded(NodeAddr_t peer) {  PeerMap_t::iterator pi = lpeers_.find(peer);  if(pi==lpeers_.end()) {    PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE);    lpeers_.insert(PeerMap_t::value_type(peer, *newentry));    degree_->increment(lpeers_.size());    if(lpeers_.size()==1) {	if(state_ == PS_OUTSYS)    state_ = PS_OFFLINE;  if(state_ == PS_OFFLINE) {    setState(PS_IDLE);    if(ac_)      ac_->expire(NULL);  } //	ping_timer_->resched(ping_interval_);//delete by zdh 06-03 	watchDog_->resched(watch_interval_);	if(gagent_->gc_ && gagent_->gc_->status()==TIMER_IDLE) 	  gagent_->gc_->resched(DESC_TIMEOUT/2);    }  }/*if(lpeers_.size()>=3){  }*/debug_my("node %d have %d peers\n",addr_,peer);}/* Bootstrap request received*/void GnutellaApp::BootstrapRequest(NodeAddr_t peer) {  debug_info("bootstrap request received from %d\n", peer);  if(isBootserver_) {   int i=0, size = servent_cache_.size();   int cnt = MAX_BOOTREPLY>size ? size: MAX_BOOTREPLY;   NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt);   for(ServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) {     addrs[i] = fi->addr_;     i++;     if(i>=MAX_BOOTREPLY)	break;   }   gagent_->Bootstrap_Reply(peer, cnt, addrs);   if(find_servent(peer))	remove_servent(peer);     int prob = (int)((double)rand()/(double)RAND_MAX * 100.0);          if(prob < PUBLISH_PROB) {       ServentRec *newsrv;       newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0);       servent_cache_.push_back(*newsrv);     }     while(servent_cache_.size() > KNOWNCACHE_LIMIT)       servent_cache_.pop_front();   free(addrs);  }}/* Bootstrap results received */void GnutellaApp::BootstrapResult(int cnt, NodeAddr_t *res) {  int i;    for(i=0; i<cnt; i++) {    NodeAddr_t cur = res[i];	//stat_my("node %d's peer is %d\n",addr_,res[i]);    if(!find_servent(cur) && cur != addr_) {      ServentRec *newsrv;      newsrv = new ServentRec(LISTEN_PORT, cur, 0, 0);      servent_cache_.push_back(*newsrv);    }  }  connect(FALSE);  ping(TRUE);}/* Bootstrap results received */void GnutellaApp::BootcacheUpdate(NodeAddr_t peer) {   if (!isBootserver_)	return;   if(!find_servent(peer))	return;   remove_servent(peer);   ServentRec *newsrv;   newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0);   servent_cache_.push_back(*newsrv);}/* connection request received*/int GnutellaApp::ConnectionRequest(NodeAddr_t peer, Socket *sock) {  PeerMap_t::iterator i = lpeers_.find(peer);  if(avail(TYPE_ANY) && i==lpeers_.end()) {    gagent_->gnutella_ok(sock);    PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE);    lpeers_.insert(PeerMap_t::value_type(peer, *newentry));    degree_->increment(lpeers_.size());	debug_my("node %d have %d peers\n",addr_,peer);    if(lpeers_.size()==1) {      if(state_ == PS_OUTSYS)	state_ = PS_OFFLINE;      if(state_ == PS_OFFLINE) {	setState(PS_IDLE);	if(ac_)	  ac_->expire(NULL);      }//	ping_timer_->resched(ping_interval_);//delete by zdh 06-03	watchDog_->resched(watch_interval_);    }	//add by zdh 04-10/*	if(lpeers_.size()>=3){  if(state_ == PS_OUTSYS)    state_ = PS_OFFLINE;  if(state_ == PS_OFFLINE) {    setState(PS_IDLE);    if(ac_)      ac_->expire(NULL);  }}*/debug_my("node %d have %d peers\n",addr_,lpeers_.size());    if(!find_servent(peer)) {      ServentRec *srec = new ServentRec(LISTEN_PORT, peer, 0, 0);      servent_cache_.push_back(*srec);    }    return 1;  }  else {    gagent_->gnutella_reject(sock);    return 0;  }}/* connection request rejected */void GnutellaApp::ConnectionRejected(NodeAddr_t peer) {  debug_info("connection request from %d to %d rejected\n", addr_, peer);  if(find_servent(peer))     remove_servent(peer);}/* connection failed, probably at network level */void GnutellaApp::ConnectionFailed(NodeAddr_t peer) {  debug_info("connection to %d failed\n", peer);  if(find_servent(peer))    remove_servent(peer);}/* connection request timed out */void GnutellaApp::ConnectionTimeout(NodeAddr_t peer) {  debug_info("connection from %d to %d timed out\n", addr_, peer);  if(find_servent(peer))    remove_servent(peer);}/* connection closed by peer */void GnutellaApp::ConnectionClosed(NodeAddr_t peer) {  debug_info("connection closed by peer %d\n", peer);  for(PeerMap_t::iterator i=lpeers_.begin(); i != lpeers_.end(); ++i) {    if(i->second.peer_ == peer) {      lpeers_.erase(i);      degree_->increment(lpeers_.size());      return;    }  }}/* PING request received */void GnutellaApp::PingRequest(NodeAddr_t peer, int ttl, char *id) {//return calculate popularity(pop[20] and peers)debug_info("PingRequest from %d to %d hops %d\n", peer, addr_, ttl);int cnt = file_size_+1;float *addrs = (float *)malloc(sizeof(float)*cnt);for(int i=0;i<20;i++)	addrs[i]=pop[i];addrs[20]=(float)peers_size;gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, id);if(addrs)     free(addrs);  /*int i=0;   int cnt = servent_cache_.size();   debug_info("PingRequest from %d to %d\n", peer, addr_);   int *sbytes = (int *)malloc(sizeof(int)*cnt);   int *sfiles = (int *)malloc(sizeof(int)*cnt);   NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt);   for(ServentMap_t::iterator si = servent_cache_.begin(); si!=servent_cache_.end(); si++) {     sbytes[i] = si->byte_shared_;     addrs[i] = si->addr_;     sfiles[i] = si->file_shared_;     i++;   }   gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, sbytes, sfiles, id);   if(sbytes)     free(sbytes);   if(sfiles)     free(sfiles);*/ //modify by zdh 06-03     }/* PONG reply received */void GnutellaApp::PongReply(NodeAddr_t peer, int ttl, char* payload) {  tPong *pong = (tPong *)payload;  if(lpeers_.size() < max_deg_ && pong->addr_!= addr_) {    PeerMap_t::iterator pi = lpeers_.find(pong->addr_);    if(pi==lpeers_.end()) {      gagent_->Connect(pong->addr_, CONN_TIMEOUT);    }  }  if(find_servent(pong->addr_))    return;  if(pong->addr_!= addr_) {    ServentRec *newsrv = new ServentRec(pong->port_, pong->addr_, pong->byte_shared_, pong->file_shared_);    servent_cache_.push_back(*newsrv);    while(servent_cache_.size() >= KNOWNCACHE_LIMIT)        servent_cache_.pop_front();  }    }/* Query Request received *///void GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {//modify by zdh 04-04int GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {  double hitprob=0;  char payloadn;  rquery++;   if(isFreeloader_ || state_==PS_OFFLINE)     return 0;//20060322 Jackie commented/*  unsigned char nhit = (unsigned char)((float)rand()/(float)RAND_MAX *5.0);    if(ac_) {    int M = ac_->sys_->nFiles_;    int fnum = atoi(payload);    hitprob = (double)(M+1-fnum)/(double)(M+1);  }  double hit = (float)rand()/(float)RAND_MAX; //random probability of hit    if(hitprob>0 && hit<hitprob || (hitprob==0 && hit<HIT_PROB))  */  tquery *query; int fnum ; Byte_t cached_seg; query=(tquery*)payload; //memcpy((void *)&query, (void *)payload, sizeof(tquery)); fnum=query->segment_id; cached_seg=query->buffered_seg; if(cached_seg!=0) 	if(segment_lable_[cached_seg].minHop>ttl){		for(int j=1;j<=SEGMENT_NUM;j++){			if((segment_lable_[j].minHop!=16)&&(segment_lable_[j].source==query->addr)&&(j!=query->segment_id)){					segment_lable_[j].minHop=16;					//stat_my("seg %d replaced by seg %d \n",j,query->segment_id);					break;				}		}		segment_lable_[cached_seg].minHop=ttl;		segment_lable_[cached_seg].source=query->addr;		//stat_my("node %d receive query from %d cache %d hop %d\n",addr_,query->addr,query->buffered_seg,ttl); 		}// int fnum = *((Word_t *)payload); //stat_my("Query Request receive by: %d from: %d,request segment id: %d,hops %d, my buffer seg id:%d \n",addr_,peer,fnum,ttl,buffered_segment_[0].segment_id_); unsigned char nhit = 0; if(ttl==1){ 	req_1[fnum-1]=req_1[fnum-1]+1;	req_total=req_total+1;	peers_size_n=peers_size_n+query->peers;	for(int j=0;j<20;j++){		/*req_n[j]=req_n[j]+query->req[j];		req_total_n=req_total_n+query->req[j];*/		pop_n[j]=pop_n[j]+query->pop[j];		//stat_my("node %d pop_n[%d]:%f\n",addr_, j,pop_n[j]);		}	//stat_my("node %d req_1[%d]: %d, req-total: %d\n",addr_,fnum-1,req_1[fnum-1],req_total); 	}/*for(int i=calculated_num;i<=SEGMENT_NUM;i++){	if(segment_lable_[i].received_){	update_segmentcache(i, segment_lable_[i].hops_);	segment_lable_[i].stopped=1;		}	else{		calculated_num=i;		break;		}}*/  if (segmentIsBuffer(fnum)){  	buffered_segment_[0].hit_num=buffered_segment_[0].hit_num+1;	buffered_segment_[0].hit_ttl_sum+=ttl;  	sprintf(&payloadn,"%d",fnum);    gagent_->QueryHit((int)peer, nhit, addr_, speed_, &payloadn, id,ttl); 	return 1;  	}  else return 0;}int GnutellaApp::segmentIsBuffer(int request_segmentID){if(addr_==33)return 1;elsereturn request_segmentID==buffered_segment_[0].segment_id_;}/* QueryHit received*/void GnutellaApp::QueryHitReply(NodeAddr_t peer, int ttl, char *payload) {tQueryHit queryhit;int i=end_of_search-nwindow+1;// add by zdh 04-03memcpy((void *)&queryhit, (void *)payload, QUERYHIT_FIXLEN);stat_my("QueryHit received by %d from %d segment: %d hops : %d\n", addr_, queryhit.addr_,queryhit.segmentID_,ttl);if(!segment_lable_[queryhit.segmentID_].received_)					{segmentID_=queryhit.segmentID_+1;					segment_lable_[queryhit.segmentID_].received_=1;									segment_lable_[queryhit.segmentID_].hops_=ttl;					segment_lable_[queryhit.segmentID_].tdelay_=ttl-queryhit.segmentID_;					if(segment_lable_[queryhit.segmentID_].tdelay_<0)						segment_lable_[queryhit.segmentID_].tdelay_=0;					if(segment_lable_[queryhit.segmentID_].tdelay_==0){						progressing++;						if (queryhit.segmentID_==end_of_search)						stat_my(" node %d progressing num is: %d\n", addr_,progressing);						}					else if(progressing!=0){						stat_my(" node %d progressing num is: %d\n", addr_,progressing);						progressing=0;						}										update_segmentcache(queryhit.segmentID_, ttl);					   if(ac_ && state_==PS_ACTIVE&&!segment_lable_[segmentID_].searched_)					   	  ac_->resched(QUERY_INTERVAL);					}						if((queryhit.segmentID_>end_of_search-nwindow)&&(queryhit.segmentID_<=end_of_search)){			for(;i<=end_of_search;i++)				if(!segment_lable_[i].received_)					break;				if(i>end_of_search)					{finish_search=1;						queried=1;						stat();					}			}			// add by zdh 04-10			//queried=1;			//queryone=1;			//search(NULL);/*if(ac_ && state_==PS_ACTIVE)    ac_->expire(NULL);	*/   // ac_->gen_query();   }/* available for more connections? */int GnutellaApp::avail(int type) {  if(lpeers_.size() < max_deg_)    return TRUE;  else    return FALSE;}/* configure a list of bootstrap servers from a file */void GnutellaApp::setBootServer(FILE *file) {  char line[20];  NodeAddr_t servent;  BootServerRec *brec;  while(fgets((char *)&line, 20, file)) {#ifdef PDNS    Tcl &tcl = Tcl::instance();    tcl.evalf("[Simulator instance] convert-ipaddr %s\n", line);    servent = atoi(tcl.result()); #else    servent = atoi(line);#endif    brec = new BootServerRec(servent, LISTEN_PORT);    if (brec)       bserver_list_.insert(bserver_list_.end(), *brec);  }}/* find a servent in the servent cache */int GnutellaApp::find_servent(NodeAddr_t peer) {  for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) {    if(si->addr_==peer)      return TRUE;  }  return FALSE;}/* remove a servent from the known servent cache */void GnutellaApp::remove_servent(NodeAddr_t peer) {  for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) {    if(si->addr_==peer) {      servent_cache_.erase(si);      return;    }  }}//update_segmentcache for nwindow buffer delay replacement algrithm/*void GnutellaApp::update_segmentcache(Word_t segID,Byte_t hops) {int i,j,tmp,not_delay=0;			if (segID<=WINDOW)				segment_lable_[segID].tdelay_=segment_lable_[segID].hops_-(start_up_time+segment_play_time*(segID-1)+segment_lable_[segID-1].Tdelay_);	else		segment_lable_[segID].tdelay_=segment_lable_[segID].hops_+segment_play_time*(1-nwindow)-(segment_lable_[segID-1].Tdelay_-segment_lable_[segID-WINDOW].Tdelay_);		if (segment_lable_[segID].tdelay_<0)		segment_lable_[segID].tdelay_=0;		segment_lable_[segID].Tdelay_=segment_lable_[segID-1].Tdelay_+segment_lable_[segID].tdelay_;		stat_my("node %d segment %d tdelay is %f,Tdelay is %f\n",addr_,segID,segment_lable_[segID].tdelay_,segment_lable_[segID].Tdelay_);	if (buffered_segment_[0].buffer_time_<segment_lable_[segID].tdelay_) {			if (buffered_segment_[0].hops_!=0)			stat_my(" Updates cache for node: 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -