📄 peer_agent.cc
字号:
BootstrapResult(res->cnt_, res->servents_); // stat_my("node %d bootstrap result is %d\n",addr_, res->cnt_); if(res->servents_) free(res->servents_); free(res);}/* tries to connect to some known servents */void GnutellaApp::connect(int trybootserver) { int cnt=0, success, limit; limit = max_deg_ - lpeers_.size(); for (ServentMap_t::const_iterator i = servent_cache_.begin(); cnt < limit && i != servent_cache_.end(); ++i ) { NodeAddr_t peer = i->addr_; PeerMap_t::iterator pi = lpeers_.find(peer); if(pi==lpeers_.end()) { success = gagent_->Connect(peer, CONN_TIMEOUT); if(success) cnt++; } } if (trybootserver && !smpBoot_ && cnt < limit) //not enough known servents bootstrap();}/* periodically send PING to connected peers */void GnutellaApp::ping(int first) { double ctime; ctime = NOW; for(PeerMap_t::iterator i = lpeers_.begin(); i != lpeers_.end(); ++i) { if(ctime - i->second.lstamp_ > ping_interval_) { gagent_->Ping(i->second.peer_, INIT_TTL); i->second.lstamp_ = ctime; } } ping_timer_->resched(ping_interval_); }// upcalls from GnutellaAgent/* Connection request to a peer succeeded */void GnutellaApp::ConnectSucceeded(NodeAddr_t peer) { PeerMap_t::iterator pi = lpeers_.find(peer); if(pi==lpeers_.end()) { PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE); lpeers_.insert(PeerMap_t::value_type(peer, *newentry)); degree_->increment(lpeers_.size()); if(lpeers_.size()==1) { if(state_ == PS_OUTSYS) state_ = PS_OFFLINE; if(state_ == PS_OFFLINE) { setState(PS_IDLE); if(ac_) ac_->expire(NULL); } // ping_timer_->resched(ping_interval_);//delete by zdh 06-03 watchDog_->resched(watch_interval_); if(gagent_->gc_ && gagent_->gc_->status()==TIMER_IDLE) gagent_->gc_->resched(DESC_TIMEOUT/2); } }/*if(lpeers_.size()>=3){ }*/debug_my("node %d have %d peers\n",addr_,peer);}/* Bootstrap request received*/void GnutellaApp::BootstrapRequest(NodeAddr_t peer) { debug_info("bootstrap request received from %d\n", peer); if(isBootserver_) { int i=0, size = servent_cache_.size(); int cnt = MAX_BOOTREPLY>size ? size: MAX_BOOTREPLY; NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt); for(ServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) { addrs[i] = fi->addr_; i++; if(i>=MAX_BOOTREPLY) break; } gagent_->Bootstrap_Reply(peer, cnt, addrs); if(find_servent(peer)) remove_servent(peer); int prob = (int)((double)rand()/(double)RAND_MAX * 100.0); if(prob < PUBLISH_PROB) { ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*newsrv); } while(servent_cache_.size() > KNOWNCACHE_LIMIT) servent_cache_.pop_front(); free(addrs); }}/* Bootstrap results received */void GnutellaApp::BootstrapResult(int cnt, NodeAddr_t *res) { int i; for(i=0; i<cnt; i++) { NodeAddr_t cur = res[i]; //stat_my("node %d's peer is %d\n",addr_,res[i]); if(!find_servent(cur) && cur != addr_) { ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, cur, 0, 0); servent_cache_.push_back(*newsrv); } } connect(FALSE); ping(TRUE);}/* Bootstrap results received */void GnutellaApp::BootcacheUpdate(NodeAddr_t peer) { if (!isBootserver_) return; if(!find_servent(peer)) return; remove_servent(peer); ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*newsrv);}/* connection request received*/int GnutellaApp::ConnectionRequest(NodeAddr_t peer, Socket *sock) { PeerMap_t::iterator i = lpeers_.find(peer); if(avail(TYPE_ANY) && i==lpeers_.end()) { gagent_->gnutella_ok(sock); PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE); lpeers_.insert(PeerMap_t::value_type(peer, *newentry)); degree_->increment(lpeers_.size()); debug_my("node %d have %d peers\n",addr_,peer); if(lpeers_.size()==1) { if(state_ == PS_OUTSYS) state_ = PS_OFFLINE; if(state_ == PS_OFFLINE) { setState(PS_IDLE); if(ac_) ac_->expire(NULL); }// ping_timer_->resched(ping_interval_);//delete by zdh 06-03 watchDog_->resched(watch_interval_); } //add by zdh 04-10/* if(lpeers_.size()>=3){ if(state_ == PS_OUTSYS) state_ = PS_OFFLINE; if(state_ == PS_OFFLINE) { setState(PS_IDLE); if(ac_) ac_->expire(NULL); }}*/debug_my("node %d have %d peers\n",addr_,lpeers_.size()); if(!find_servent(peer)) { ServentRec *srec = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*srec); } return 1; } else { gagent_->gnutella_reject(sock); return 0; }}/* connection request rejected */void GnutellaApp::ConnectionRejected(NodeAddr_t peer) { debug_info("connection request from %d to %d rejected\n", addr_, peer); if(find_servent(peer)) remove_servent(peer);}/* connection failed, probably at network level */void GnutellaApp::ConnectionFailed(NodeAddr_t peer) { debug_info("connection to %d failed\n", peer); if(find_servent(peer)) remove_servent(peer);}/* connection request timed out */void GnutellaApp::ConnectionTimeout(NodeAddr_t peer) { debug_info("connection from %d to %d timed out\n", addr_, peer); if(find_servent(peer)) remove_servent(peer);}/* connection closed by peer */void GnutellaApp::ConnectionClosed(NodeAddr_t peer) { debug_info("connection closed by peer %d\n", peer); for(PeerMap_t::iterator i=lpeers_.begin(); i != lpeers_.end(); ++i) { if(i->second.peer_ == peer) { lpeers_.erase(i); degree_->increment(lpeers_.size()); return; } }}/* PING request received */void GnutellaApp::PingRequest(NodeAddr_t peer, int ttl, char *id) {//return calculate popularity(pop[20] and peers)debug_info("PingRequest from %d to %d hops %d\n", peer, addr_, ttl);int cnt = file_size_+1;float *addrs = (float *)malloc(sizeof(float)*cnt);for(int i=0;i<20;i++) addrs[i]=pop[i];addrs[20]=(float)peers_size;gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, id);if(addrs) free(addrs); /*int i=0; int cnt = servent_cache_.size(); debug_info("PingRequest from %d to %d\n", peer, addr_); int *sbytes = (int *)malloc(sizeof(int)*cnt); int *sfiles = (int *)malloc(sizeof(int)*cnt); NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt); for(ServentMap_t::iterator si = servent_cache_.begin(); si!=servent_cache_.end(); si++) { sbytes[i] = si->byte_shared_; addrs[i] = si->addr_; sfiles[i] = si->file_shared_; i++; } gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, sbytes, sfiles, id); if(sbytes) free(sbytes); if(sfiles) free(sfiles);*/ //modify by zdh 06-03 }/* PONG reply received */void GnutellaApp::PongReply(NodeAddr_t peer, int ttl, char* payload) { tPong *pong = (tPong *)payload; if(lpeers_.size() < max_deg_ && pong->addr_!= addr_) { PeerMap_t::iterator pi = lpeers_.find(pong->addr_); if(pi==lpeers_.end()) { gagent_->Connect(pong->addr_, CONN_TIMEOUT); } } if(find_servent(pong->addr_)) return; if(pong->addr_!= addr_) { ServentRec *newsrv = new ServentRec(pong->port_, pong->addr_, pong->byte_shared_, pong->file_shared_); servent_cache_.push_back(*newsrv); while(servent_cache_.size() >= KNOWNCACHE_LIMIT) servent_cache_.pop_front(); } }/* Query Request received *///void GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {//modify by zdh 04-04int GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) { double hitprob=0; char payloadn; rquery++; if(isFreeloader_ || state_==PS_OFFLINE) return 0;//20060322 Jackie commented/* unsigned char nhit = (unsigned char)((float)rand()/(float)RAND_MAX *5.0); if(ac_) { int M = ac_->sys_->nFiles_; int fnum = atoi(payload); hitprob = (double)(M+1-fnum)/(double)(M+1); } double hit = (float)rand()/(float)RAND_MAX; //random probability of hit if(hitprob>0 && hit<hitprob || (hitprob==0 && hit<HIT_PROB)) */ tquery *query; int fnum ; Byte_t cached_seg; query=(tquery*)payload; //memcpy((void *)&query, (void *)payload, sizeof(tquery)); fnum=query->segment_id; cached_seg=query->buffered_seg; if(cached_seg!=0) if(segment_lable_[cached_seg].minHop>ttl){ for(int j=1;j<=SEGMENT_NUM;j++){ if((segment_lable_[j].minHop!=16)&&(segment_lable_[j].source==query->addr)&&(j!=query->segment_id)){ segment_lable_[j].minHop=16; //stat_my("seg %d replaced by seg %d \n",j,query->segment_id); break; } } segment_lable_[cached_seg].minHop=ttl; segment_lable_[cached_seg].source=query->addr; //stat_my("node %d receive query from %d cache %d hop %d\n",addr_,query->addr,query->buffered_seg,ttl); }// int fnum = *((Word_t *)payload); //stat_my("Query Request receive by: %d from: %d,request segment id: %d,hops %d, my buffer seg id:%d \n",addr_,peer,fnum,ttl,buffered_segment_[0].segment_id_); unsigned char nhit = 0; if(ttl==1){ req_1[fnum-1]=req_1[fnum-1]+1; req_total=req_total+1; peers_size_n=peers_size_n+query->peers; for(int j=0;j<20;j++){ /*req_n[j]=req_n[j]+query->req[j]; req_total_n=req_total_n+query->req[j];*/ pop_n[j]=pop_n[j]+query->pop[j]; //stat_my("node %d pop_n[%d]:%f\n",addr_, j,pop_n[j]); } //stat_my("node %d req_1[%d]: %d, req-total: %d\n",addr_,fnum-1,req_1[fnum-1],req_total); }/*for(int i=calculated_num;i<=SEGMENT_NUM;i++){ if(segment_lable_[i].received_){ update_segmentcache(i, segment_lable_[i].hops_); segment_lable_[i].stopped=1; } else{ calculated_num=i; break; }}*/ if (segmentIsBuffer(fnum)){ buffered_segment_[0].hit_num=buffered_segment_[0].hit_num+1; buffered_segment_[0].hit_ttl_sum+=ttl; sprintf(&payloadn,"%d",fnum); gagent_->QueryHit((int)peer, nhit, addr_, speed_, &payloadn, id,ttl); return 1; } else return 0;}int GnutellaApp::segmentIsBuffer(int request_segmentID){if(addr_==33)return 1;elsereturn request_segmentID==buffered_segment_[0].segment_id_;}/* QueryHit received*/void GnutellaApp::QueryHitReply(NodeAddr_t peer, int ttl, char *payload) {tQueryHit queryhit;int i=end_of_search-nwindow+1;// add by zdh 04-03memcpy((void *)&queryhit, (void *)payload, QUERYHIT_FIXLEN);stat_my("QueryHit received by %d from %d segment: %d hops : %d\n", addr_, queryhit.addr_,queryhit.segmentID_,ttl);if(!segment_lable_[queryhit.segmentID_].received_) {segmentID_=queryhit.segmentID_+1; segment_lable_[queryhit.segmentID_].received_=1; segment_lable_[queryhit.segmentID_].hops_=ttl; segment_lable_[queryhit.segmentID_].tdelay_=ttl-queryhit.segmentID_; if(segment_lable_[queryhit.segmentID_].tdelay_<0) segment_lable_[queryhit.segmentID_].tdelay_=0; if(segment_lable_[queryhit.segmentID_].tdelay_==0){ progressing++; if (queryhit.segmentID_==end_of_search) stat_my(" node %d progressing num is: %d\n", addr_,progressing); } else if(progressing!=0){ stat_my(" node %d progressing num is: %d\n", addr_,progressing); progressing=0; } update_segmentcache(queryhit.segmentID_, ttl); if(ac_ && state_==PS_ACTIVE&&!segment_lable_[segmentID_].searched_) ac_->resched(QUERY_INTERVAL); } if((queryhit.segmentID_>end_of_search-nwindow)&&(queryhit.segmentID_<=end_of_search)){ for(;i<=end_of_search;i++) if(!segment_lable_[i].received_) break; if(i>end_of_search) {finish_search=1; queried=1; stat(); } } // add by zdh 04-10 //queried=1; //queryone=1; //search(NULL);/*if(ac_ && state_==PS_ACTIVE) ac_->expire(NULL); */ // ac_->gen_query(); }/* available for more connections? */int GnutellaApp::avail(int type) { if(lpeers_.size() < max_deg_) return TRUE; else return FALSE;}/* configure a list of bootstrap servers from a file */void GnutellaApp::setBootServer(FILE *file) { char line[20]; NodeAddr_t servent; BootServerRec *brec; while(fgets((char *)&line, 20, file)) {#ifdef PDNS Tcl &tcl = Tcl::instance(); tcl.evalf("[Simulator instance] convert-ipaddr %s\n", line); servent = atoi(tcl.result()); #else servent = atoi(line);#endif brec = new BootServerRec(servent, LISTEN_PORT); if (brec) bserver_list_.insert(bserver_list_.end(), *brec); }}/* find a servent in the servent cache */int GnutellaApp::find_servent(NodeAddr_t peer) { for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) { if(si->addr_==peer) return TRUE; } return FALSE;}/* remove a servent from the known servent cache */void GnutellaApp::remove_servent(NodeAddr_t peer) { for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) { if(si->addr_==peer) { servent_cache_.erase(si); return; } }}//update_segmentcache for nwindow buffer delay replacement algrithm/*void GnutellaApp::update_segmentcache(Word_t segID,Byte_t hops) {int i,j,tmp,not_delay=0; if (segID<=WINDOW) segment_lable_[segID].tdelay_=segment_lable_[segID].hops_-(start_up_time+segment_play_time*(segID-1)+segment_lable_[segID-1].Tdelay_); else segment_lable_[segID].tdelay_=segment_lable_[segID].hops_+segment_play_time*(1-nwindow)-(segment_lable_[segID-1].Tdelay_-segment_lable_[segID-WINDOW].Tdelay_); if (segment_lable_[segID].tdelay_<0) segment_lable_[segID].tdelay_=0; segment_lable_[segID].Tdelay_=segment_lable_[segID-1].Tdelay_+segment_lable_[segID].tdelay_; stat_my("node %d segment %d tdelay is %f,Tdelay is %f\n",addr_,segID,segment_lable_[segID].tdelay_,segment_lable_[segID].Tdelay_); if (buffered_segment_[0].buffer_time_<segment_lable_[segID].tdelay_) { if (buffered_segment_[0].hops_!=0) stat_my(" Updates cache for node:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -