📄 peer_agent.cc
字号:
if(app_->buffered_segment_[0].hops_==INIT_QUERY_TTL+1) return;int tmp=app_->end_of_search>app_->nwindow?app_->nwindow:app_->end_of_search;if(!app_->queried){ queryone=1; if (app_->segmentID_==1){ //stat_my("node %d gen_query\n",app_->addr_); //for(int i=1;i<app_->nwindow+1;i++){ for(int i=1;i<tmp+1;i++){ sprintf((char*)criteria, "%d", i); app_->search((char *)criteria); //app_->update_segmentcache(i-1, app_->segment_lable_[i-1].hops_); //app_->segment_lable_[i-1].stopped=1; } app_->segmentID_=tmp; } else { // if(app_->segment_lable_[app_->segmentID_].searched_) for(int i=1;i<=app_->segmentID_;i++){ if(!app_->segment_lable_[i].received_){ sprintf((char*)criteria, "%d", i); app_->search((char *)criteria); } } else { for(int i=app_->segmentID_-app_->nwindow;i<app_->segmentID_;i++){ app_->segmentID_=i+app_->nwindow; if(app_->segment_lable_[i].received_&&(app_->segmentID_<=app_->end_of_search)){ sprintf((char*)criteria, "%d", app_->segmentID_); app_->search((char *)criteria); //app_->update_segmentcache(i, app_->segment_lable_[i].hops_); //app_->segment_lable_[i].stopped=1; } else {app_->segmentID_=app_->segmentID_-1; if(app_->segmentID_>app_->end_of_search) app_->segmentID_=app_->end_of_search; break; } } } } } }/* returns the popularity of a file *//*double ActivityController::find_file(int fnum) { if (fnum >=sys_->nFiles_ || fnum < 0) return sys_->cdffile_[fnum]; else return -1;}*//*** base class: PeerApp ***/PeerApp::PeerApp(NodeAddr_t addr) { addr_ = addr; shared_Mb_ = (int)((float)rand()/(float)RAND_MAX * 500.0); shared_files_ = (int)((float)rand()*20.0/(float)RAND_MAX); speed_ = (int)((float)rand()/(float)RAND_MAX*1500.0); state_ = PS_OUTSYS; ac_ = NULL; isFreeloader_ = FALSE; bind("shared_Mb_", &shared_Mb_); bind("shared_files_", &shared_files_); bind("speed_", &speed_); bind("isFreeloader_", &isFreeloader_); srand((unsigned int)addr);}/**** GnutellaApp ****/GnutellaApp::GnutellaApp(NodeAddr_t addr): PeerApp(addr) { int i,i_buffer=0; unsigned int seed=0; struct timeval tv={0}; ping_timer_ = new PingTimer(this); watchDog_ = new WatchDog(this); boottimer_ = new BootTimer(this); isBootserver_ = FALSE; //20060321 Jackie modified segmentID_=1; buffer_size_=1; //file_size_=5; file_size_=SEGMENT_NUM; searched=0; calculated_num=1; max_delay_time=0; nwindow=1; progressing=0; //add by zdh 04-03 segment_lable_=(desc_segment_ *)calloc(sizeof(desc_segment_)*(file_size_+1),1); for(i=0;i<(file_size_+1);i++){ segment_lable_[i].Tdelay_=0; segment_lable_[i].tdelay_=0; segment_lable_[i].searched_=0; segment_lable_[i].received_=0; segment_lable_[i].hops_=0; segment_lable_[i].printed_=0; segment_lable_[i].minHop=16; } //add by zdh 04-06 //joined=0; queried=0; cached=0; /*if (rand()%5==0) i_buffer=1; */ buffered_segment_=(desc_cache *)calloc(sizeof(desc_cache)*buffer_size_,1); /*if (i_buffer==1){ if (gettimeofday(&tv,NULL)!=0){ printf("Error while getting time of day\n"); } seed=tv.tv_usec; srand(seed); }*/ //modify by zdh 04-03 //for (;i<buffer_size_;i++){ /*if (i_buffer==1&&segment_cached<file_size_){ //buffered_segment_[0].segment_id_=rand()%(file_size_)+1; buffered_segment_[0].segment_id_=segment_cached%(file_size_)+1; buffered_segment_[0].hops_=INIT_QUERY_TTL; //modify by zdh 06-04-03 buffered_segment_[0].buffer_time_=INIT_QUERY_TTL; segment_cached++; } else{*/ //} //} //printf("Initializing node %d with segment id buffered: %d \n",addr,buffered_segment_[0].segment_id_); //bind("file_size_", &file_size_); bind("isBootserver_", &isBootserver_); bind("max_deg_", &max_deg_); ping_interval_ = PING_INTERVAL; watch_interval_ = CONN_INTERVAL; boot_interval_=BOOT_INTERVAL; max_deg_ = MAX_DEGREE; bootSrv_ = NULL; smpBoot_ = FALSE; update_enable=1; degree_ = new BasicStat(addr); //add by zdh 04-10 /* for(i=0;i<4;i++){ ServentRec *newsrv = new ServentRec(LISTEN_PORT, addr_+i+1, 0, 0); servent_cache_.push_back(*newsrv); } // join();*/ //add by zdh 04-10 finish_search=0; //add by zdh 06-05 }//06-04-03add by zdhvoid GnutellaApp::stat() { stat_my("node %d cache segment %d hops %d buffer time %f \n",addr_,buffered_segment_[0].segment_id_,buffered_segment_[0].hops_,buffered_segment_[0].buffer_time_); //for(int i=1;i<file_size_+1;i++) for(int i=1;i<=file_size_;i++) { if (segment_lable_[i].received_&&!segment_lable_[i].printed_) { stat_my("segment %d delay: %f hops %d\n",i,segment_lable_[i].tdelay_,segment_lable_[i].hops_); segment_lable_[i].printed_=1; } else continue; } stat_my("end of search is: %d, peers number: %d\n",end_of_search,lpeers_.size()); stat_my("rquery: %d, nquery: %d, squeryhit: %d, nqueryhit:%d\n",rquery,nquery,squeryhit,nqueryhit);// for(PeerMap_t::iterator i = lpeers_.begin(); i != lpeers_.end(); ++i) //stat_my("node %d's peer is %d\n",addr_,i->second.peer_); //stat_my(" node %d progressing num is: %d\n", addr_,progressing); }/* user command interface *//* join: outsys->insys */void GnutellaApp::join() {//if (!joined){ //if(buffered_segment_[0].segment_id_!=0){ nnode=nnode+1; if(addr_==80){ debug_my("node %d with segment id buffered: %d \n",addr_,buffered_segment_[0].segment_id_); //add by zdh 04-06 //join(); queried=1; } /*if((nnode%8==0)&&(nnode/8<=20)){ buffered_segment_[0].segment_id_=nnode/8; buffered_segment_[0].hops_=INIT_QUERY_TTL+1; //modify by zdh 06-04-03 buffered_segment_[0].buffer_time_=INIT_QUERY_TTL+1; stat_my("node %d cache segment %d\n",addr_,buffered_segment_[0].segment_id_); cached=1; //if (nnode/8==20) // nnode=0; } else{*/ buffered_segment_[0].segment_id_=0; buffered_segment_[0].hops_=0; //modify by zdh 06-04-03 buffered_segment_[0].buffer_time_=0; //} ping_interval_ = PING_INTERVAL; //bootstrap(); bootSrv_->BootstrapRequest0(addr_, this, TYPE_LEGACY); boottimer_->resched(boot_interval_); //joined=1;//}/*double rprob = (double)rand()/(double)RAND_MAX/file_size_; int j=0; for( j=0; j<ac_->sys_->nFiles_; j++) { if (rprob > ac_->sys_->cdffile_[j]){end_of_search=j+1; // debug_my("rprob is : %f\n",rprob); break; } } if(j==ac_->sys_->nFiles_) end_of_search=j;*/ FILE *fh=fopen("pop.txt","r"); fseek(fh,sizeof(Word_t)*addr_,0); fread(&end_of_search,sizeof(Word_t),1,fh); stat_my("%d end of search is %d\n",addr_,end_of_search); //printf("%d ",m); fclose(fh); fh=fopen("delay.txt","r"); fseek(fh,sizeof(Word_t)*addr_,0); fread(&delay,sizeof(Word_t),1,fh); stat_my("%d delay is %d\n",addr_,delay); //printf("%d ",m); fclose(fh); //end_of_search=file_size_; peers_size=1; peers_size_n=0; for(int i=0;i<20;i++){ //req_n[i]=1; req_1[i]=0; pop[i]=ac_->sys_->cdffile_[i]; } req_total=0; //req_total_n=20;}/* leave: insys->outsys */void GnutellaApp::leave() { if(state_!= PS_OUTSYS) setState(PS_OFFLINE); if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); if(ac_ && ac_->status_==TIMER_PENDING) ac_->cancel();}void GnutellaApp::stop() { if(state_ != PS_OUTSYS) setState(PS_OFFLINE); if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); if(ac_ && ac_->status_==TIMER_PENDING) ac_->cancel();}void GnutellaApp::disconnect(NodeAddr_t node) { gagent_->Disconnect(node); if(node==-1) { lpeers_.clear(); if(smpBoot_) { if(bootSrv_) bootSrv_->RemovePeer(addr_, this); } }else { PeerMap_t::iterator pi = lpeers_.find(node); if(pi!=lpeers_.end()) { lpeers_.erase(pi); } } degree_->increment(lpeers_.size());}/* share files */void GnutellaApp::share() {}/* maintenance of peer relationship */void GnutellaApp::maintenance() {}/* initiate a query */void GnutellaApp::search(char *criteria) {//20060321 Jackie modifiedchar *tmp;Word_t segment_search;int i=atoi(criteria);tquery query;searched=1;int n=0;query.segment_id=i;for(int j=0;j<20;j++){query.pop[j]=pop[j];}query.peers=peers_size;query.buffered_seg=buffered_segment_[0].segment_id_;query.addr=addr_;gagent_->Query(-1,100,(char *)&query);stat_my(" Node: %d is querying the %d segment, \n", addr_,i);segment_lable_[i].searched_=1;//segment_lable_[i].search_time_=NOW;}/* GnutellaApp internal operations *//* proactively change its own status */void GnutellaApp::setState(PeerState_t state) { PeerState_t old_state = state_; double tmp, ctime = NOW; state_ = state; //the following code is for membership accounting purpose if(state_ == PS_OFFLINE && old_state != PS_OFFLINE) { if(!isFreeloader_) { if(Last_Chg1 !=-1) { tmp = (ctime - Last_Chg1) * (double)Na1 + Avg_Na1 * TSysTime1 ; TSysTime1 += (ctime - Last_Chg1); Avg_Na1 = tmp/TSysTime1; /* periodic averaging */ tmp = (ctime - Last_Chg1) * (double)Na1 + pAvg_Na1 * pTime1; pTime1 += (ctime - Last_Chg1); pAvg_Na1 = tmp/pTime1; Na1--; }else { debug_warning("WARNING: shouldn't get here (setState:1)\n"); } Last_Chg1 = ctime; } else { if(Last_Chg2 !=-1) { tmp = (ctime - Last_Chg2) * (double)Na2 + Avg_Na2 * TSysTime2 ; TSysTime2 += (ctime - Last_Chg2); Avg_Na2 = tmp/TSysTime2; /* periodic averaging */ tmp = (ctime - Last_Chg2) * (double)Na2 + pAvg_Na1 * pTime2; pTime2 += (ctime - Last_Chg2); pAvg_Na2 = tmp/pTime2; Na2 --; }else { debug_warning("WARNING: shouldn't get here (setState:2)\n"); } Last_Chg2 = ctime; } if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); //disconnect() is required for correct functioning though disconnect(-1); debug_stat("Membership %d\t%d\n", Na1, Na2); gagent_->bkblock_->offline(); gagent_->fwblock_->offline(); gagent_->rcvRate_->offline(); degree_->offline(); fflush(NULL); } if(state_ != PS_OFFLINE && old_state == PS_OFFLINE) { if(!isFreeloader_) { if(Last_Chg1 !=-1) { tmp = (ctime - Last_Chg1 ) * (double)Na1 + Avg_Na1 * TSysTime1 ; TSysTime1 += (ctime - Last_Chg1); Avg_Na1 = tmp/TSysTime1; tmp = (ctime - Last_Chg1) * (double)Na1 + pAvg_Na1 * pTime1; pTime1 += (ctime - Last_Chg1); pAvg_Na1 = tmp/pTime1; } Last_Chg1 = ctime; }else { if(Last_Chg2 !=-1) { tmp = (ctime - Last_Chg2 ) * (double)Na2 + Avg_Na2 * TSysTime2 ; TSysTime2 += (ctime - Last_Chg2); Avg_Na2 = tmp/TSysTime2; tmp = (ctime - Last_Chg2) * (double)Na2 + pAvg_Na1 * pTime2; pTime2 += (ctime - Last_Chg2); pAvg_Na2 = tmp/pTime2; } Last_Chg2 = ctime; } if(!isFreeloader_) Na1 ++; else Na2 ++; if(ping_timer_) ping_timer_->resched(ping_interval_); if(watchDog_) watchDog_->resched(watch_interval_); debug_stat("Membership %d\t%d\n", Na1, Na2); gagent_->bkblock_->online(); gagent_->fwblock_->online(); gagent_->rcvRate_->online(); degree_->online(); fflush(NULL); } if(pTime1 > SMP_INTERVAL) { debug_info("Membership non-freeloader %f\t%d\t%d\n", pAvg_Na1, Na1, Na2); pAvg_Na1 = 0; pTime1 = 0; } if(pTime2 > SMP_INTERVAL) { debug_info("Membership freeloader %f\t%d\t%d\n", pAvg_Na2, Na1, Na2); pAvg_Na2 = 0; pTime2 = 0; }}/* bootstrap */void GnutellaApp::bootstrap() { if(smpBoot_) { if(bootSrv_) smp_bootstrap(); return; } if(bserver_list_.size()>0) { BootServerRec bi = bserver_list_.front(); gagent_->Bootstrap(bi.addr_); bserver_list_.push_back(bi); bserver_list_.pop_front(); }}/* use SmpBootServer for bootstrapping */void GnutellaApp::smp_bootstrap() { BootstrapRes_t *res; //20060321 Jackie modified if (file_size_<=0) return ; res = bootSrv_->BootstrapRequest1(addr_, this, TYPE_LEGACY); if(res == NULL) {debug_info("bootstrap result is NULL"); return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -