📄 peer_agent.cc
字号:
}/* route a PONG or QueryHit back to the initiator */int GnutellaAgent::backroute(PacketData *data, desc_hdr *header) { PacketData *newdata=NULL; Socket *sock = find_desc(header->id_); if(sock!=NULL) { SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_); if(si!=lsocks_.end()) { if(sock->blocked(SOCK_WRITE)) { bkblock_->increment(); } newdata = new PacketData(*data); sock->send(data->size(), newdata); } } return FALSE;}/* Garbage collection of descriptor ID and pending connection */void GnutellaAgent::gc() { double curtime = NOW; DescMap_t::iterator di; while(!desc_cache_.empty()) { di = desc_cache_.begin(); if(curtime - di->tstamp_ > DESC_TIMEOUT) { desc_cache_.erase(di); } else break; }; ReqList_t::iterator qi; while(!pending_req_.empty()) { qi = pending_req_.begin(); if(curtime - qi->tstamp_ > DESC_TIMEOUT) { if(qi->replycnt_==0) { nforceremove++; } pending_req_.erase(qi); } else break; };}/* Gnutella Message Format Handler */GnutellaMsg::GnutellaMsg(NodeAddr_t addr) { bzero((void *)&header_, sizeof(header_)); payload_.pong_ = NULL; payload_.query_ = NULL; payload_.queryhit_ = NULL; payload_.push_ = NULL; addr_ = addr; seq1_ = seq2_ = seq3_ = 0; }/* clear after every use of GnutellaMsg::parse() */int GnutellaMsg::clear() { if(payload_.pong_) { free(payload_.pong_); payload_.pong_ = NULL; } if(payload_.query_) { if(payload_.query_->criteria_) free(payload_.query_->criteria_); payload_.query_->criteria_ = NULL; free(payload_.query_); payload_.query_ = NULL; } if(payload_.queryhit_) { free(payload_.queryhit_); payload_.query_ = NULL; } if(payload_.push_){ free(payload_.push_); payload_.push_ = NULL; } bzero((void *)&header_, sizeof(header_));}/* Gnutella Message Parsing */int GnutellaMsg::parse(int len, unsigned char *data) { char *teststring = (char *)malloc(len+1); void *temp=NULL; memcpy((char *)teststring, (char *)data, len); if(strcmp(GNUTELLA_BOOTSTRAP_RES, (char *)teststring)==0) { free(teststring); return MSG_BOOTSTRAP_RES; } teststring[len] = 0; if(strcmp(GNUTELLA_BOOTSTRAP, (char *)teststring)==0) { free(teststring); return MSG_BOOTSTRAP; } if(strcmp(GNUTELLA_CONN, (char *)teststring)==0) { free(teststring); return MSG_CONNREQ; } if(strcmp(ULTRA_CONN, (char *)teststring)==0) { free(teststring); return MSG_ULTRACONNREQ; } if(strcmp(LEAF_CONN, (char *)teststring)==0) { free(teststring); return MSG_LEAFCONNREQ; } if(strcmp(GNUTELLA_OK, (char *)teststring)==0) { free(teststring); return MSG_CONNOK; } if(strcmp(ULTRA_OK, (char *)teststring)==0) { free(teststring); return MSG_CONNOK; } if(strcmp(GNUTELLA_REJ, (char *)teststring)==0) { free(teststring); return MSG_CONNREJ; } if(strcmp(BOOTCACHE_UPDATE, (char *)teststring)==0) { free(teststring); return MSG_BOOTCACHE_UPDATE; } free(teststring); if(len < DESC_HDRLEN) return MSG_UNKNOWN; memcpy((void *)&header_, (void *)data, DESC_HDRLEN); data[18] = header_.hops_ + 1; switch(header_.payload_desc_) { case PING: return MSG_PING; case PONG: if(header_.length_ != PONG_LEN) return MSG_ILLFORMAT; temp = malloc(header_.length_); memcpy(temp, (void *)&data[DESC_HDRLEN], PONG_LEN); payload_.pong_ = (tPong *)temp; return MSG_PONG; case QUERY: if(header_.length_< 1) return MSG_ILLFORMAT; temp = malloc(sizeof(tQuery)); memcpy(temp, (void *)&data[DESC_HDRLEN], sizeof(Word_t)); payload_.query_ = (tQuery *)temp; payload_.query_->criteria_ = (char *)malloc(header_.length_ - sizeof(Word_t)); strcpy(payload_.query_->criteria_, (char *)&data[DESC_HDRLEN + sizeof(Word_t)]); return MSG_QUERY; case QUERYHIT: if(header_.length_ < QUERYHIT_FIXLEN) return MSG_ILLFORMAT; temp = malloc(sizeof(tQueryHit)); memcpy((void *)temp, (void *)&data[DESC_HDRLEN], header_.length_); payload_.queryhit_ = (tQueryHit *)temp; return MSG_QUERYHIT; default: return MSG_UNKNOWN; }; return MSG_UNKNOWN;}/* compose a PacketData with a Gnutella message as the payload *//* called with message type and length, as well as some optional parms */PacketData *GnutellaMsg::newpacket(char *id, Byte_t type, int ttl, int hops, int len, char *data) { PacketData *userdata = new PacketData(DESC_HDRLEN + len); unsigned char *dataptr = userdata->data(); if(dataptr == NULL) return NULL; header_.payload_desc_ = type; header_.TTL_ = ttl; header_.hops_ = hops; header_.length_ = len; if(id) memcpy((void *)&header_.id_, id, 16); else { id = new_descid(); memcpy((void *)&header_.id_, id, 16); free(id); } memcpy((void *)dataptr, (void *)&header_, DESC_HDRLEN); if(data && len>0) memcpy((void *)&dataptr[DESC_HDRLEN], data, len); return userdata;}/* generate a new unique descriptor ID */char *GnutellaMsg::new_descid() { char *newid = NULL; newid = (char*)malloc(16); memcpy((void *)newid, &addr_, 4); seq1_ ++; memcpy((void *)&newid[4], &seq1_, 4); memcpy((void *)&newid[8], &seq2_, 4); memcpy((void *)&newid[12], &seq3_, 4); if(seq1_ == INT_MAX -1) { seq1_ = 0; seq2_ ++; if(seq2_ == INT_MAX -1) { seq2_ = 0; seq3_ ++; } } return newid;}/* Garbage collection timeout handler*/void GC::expire(Event *) { agent_->gc(); resched(DESC_TIMEOUT/2);}/* Connection Timer */ConnTimer::ConnTimer(GnutellaAgent *agent) { agent_ = agent;}void ConnTimer::expire(Event *) { agent_->conn_timeout();}/* socket entry */SockEntry::SockEntry(Socket *sock, NodeAddr_t peer, Byte_t type) { sock_ = sock; peer_ = peer; type_ = type;}/**********************************************************************//**** Part II: GnutellaApp related ****//* UMASS P2P system model */PeerSys::PeerSys() { bind("nFiles_", &nFiles_); bind("nClasses_", &nClasses_); bind("alpha_", &alpha_); cdffile_ = NULL; init_file();}/* initialize the file popularity CDF structure */void PeerSys::init_file() { int i; double K=0.0; cdffile_ = (double *)malloc(sizeof(double)*nFiles_); for(i=0; i< nFiles_; i++) { K = K + 1.0/pow(i+1, alpha_); } K = 1.0/K; for(i=0; i<nFiles_; i++) { if(i==0) cdffile_[i] = K/pow(i+1, alpha_); else cdffile_[i] = K/pow(i+1, alpha_) + cdffile_[i-1]; }}/* initialize the class info based on a configuration file *//* configuration file default: classinfo.txt */void PeerSys::init_class(FILE *fh) { char line[100]; for(int i=0; i< nClasses_; i++) { ClassSpec_t *ncl = (ClassSpec_t *)malloc(sizeof(ClassSpec_t)); if(fgets((char *)&line, 100, fh)!=NULL) { sscanf(line, "%lf\t%lf\t%lf\t%d\n", &ncl->loff_, &ncl->lidle_, &ncl->poff_, &ncl->isFreeloader_); classes_.insert(ClassMap_t::value_type(i, *ncl)); } }}int PeerSys::command(int argc, const char*const* argv) { FILE *fh; if(strcmp(argv[1], "init-class")==0) { if(argc==2) fh = fopen("classinfo.txt", "r"); else fh = fopen(argv[2], "r"); if(fh==NULL) { fprintf(stderr, "PeerSys::init-class: unable to open class info file\n"); return TCL_ERROR; } init_class(fh); fclose(fh); return TCL_OK; } return TclObject::command(argc, argv);}/* Peer state: OUTSYS->INSYS->OUTSYS while INSYS, a peer could be OFFLINE, ONLINE and IDLE*//* ActivityController: *//* dynamically generates events corresponding to peer actions */ActivityController::ActivityController(PeerSys *sys, GnutellaApp *app, int cl) { ClassMap_t::iterator ci = sys->classes_.find(cl); if(ci==sys->classes_.end()) { fprintf(stderr, "WARNING: assigning to nonexistent class\n"); return; } if(ci!=sys->classes_.end()) { clSpec_ = &(ci->second); } sys_ = sys; app_ = app; state_ = &app_->state_; app_->setFreeloader(clSpec_->isFreeloader_); }/* change of status triggered by a timer scheduled during last change */void ActivityController::expire(Event *e) { double roff, rdelay; if(*state_==PS_ACTIVE) { //a certain probability of going offline from active searching roff = (double)rand()/(double)RAND_MAX; if(roff < clSpec_->poff_) { app_->setState(PS_OFFLINE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->loff_; resched(rdelay); } else { app_->setState(PS_IDLE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->lidle_; resched(rdelay); } } else { if(*state_==PS_IDLE) { app_->setState(PS_ACTIVE); gen_query(); resched(QUERY_TIMEOUT); } else { app_->bootstrap(); app_->setState(PS_IDLE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->lidle_; resched(rdelay); } }}/* generate a query based on file statistics of the system */void ActivityController::gen_query() { double rprob = (double)rand()/(double)RAND_MAX; char criteria[4]; for(int i=0; i<sys_->nFiles_; i++) { if (rprob < sys_->cdffile_[i]) { sprintf((char*)criteria, "%d", i); app_->search((char *)criteria); break; } }}/* returns the popularity of a file *//*double ActivityController::find_file(int fnum) { if (fnum >=sys_->nFiles_ || fnum < 0) return sys_->cdffile_[fnum]; else return -1;}*//*** base class: PeerApp ***/PeerApp::PeerApp(NodeAddr_t addr) { addr_ = addr; shared_Mb_ = (int)((float)rand()/(float)RAND_MAX * 500.0); shared_files_ = (int)((float)rand()*20.0/(float)RAND_MAX); speed_ = (int)((float)rand()/(float)RAND_MAX*1500.0); state_ = PS_OUTSYS; ac_ = NULL; isFreeloader_ = FALSE; bind("shared_Mb_", &shared_Mb_); bind("shared_files_", &shared_files_); bind("speed_", &speed_); bind("isFreeloader_", &isFreeloader_); srand((unsigned int)addr);}/**** GnutellaApp ****/GnutellaApp::GnutellaApp(NodeAddr_t addr): PeerApp(addr) { int i=1,i_buffer=0; unsigned int seed=0; struct timeval tv={0}; ping_timer_ = new PingTimer(this); watchDog_ = new WatchDog(this); isBootserver_ = FALSE; //20060321 Jackie modified segmentID_=0; buffer_size_=1; file_size_=5; nwindow=2; //add by zdh 04-03 segment_lable_=(desc_segment_ *)calloc(sizeof(desc_segment_)*(file_size_+1),1); for(i=0;i<(file_size_+1);i++){ segment_lable_[i].Tdelay_=0; segment_lable_[i].tdelay_=0; segment_lable_[i].searched_=0; segment_lable_[i].received_=0; } /* segment_deadline_=NULL; segment_deadline_ = (double *)malloc(sizeof(double)*file_size_); segment_deadline_[0]=0; for(; i<file_size_+1; i++){ if (i==1) segment_deadline_[i]=start_up_time; else segment_deadline_[i]=segment_deadline_[i-1]+segment_play_time; }*/ /*segment_delay_=NULL; segment_delay_=(double *)malloc(sizeof(double)*(file_size_+1)); // total_delay_=NULL; total_delay_= (double *)malloc(sizeof(double)*(file_size_+1)); total_delay_[0]=0; segment_received=NULL; segment_received=(int *)malloc(sizeof(int)*(file_size_+1)); for(i=1; i<file_size_+1; i++) segment_received[i]=0; */ if (rand()%4==0) i_buffer=1; buffered_segment_=(desc_cache *)calloc(sizeof(desc_cache)*buffer_size_,1); /*if (i_buffer==1){ if (gettimeofday(&tv,NULL)!=0){ printf("Error while getting time of day\n"); } seed=tv.tv_usec; srand(seed); }*/ //modify by zdh 04-03 //for (;i<buffer_size_;i++){ if (i_buffer==1){ buffered_segment_[0].segment_id_=rand()%(file_size_)+1; buffered_segment_[0].hops_=INIT_QUERY_TTL; //modify by zdh 06-04-03 buffered_segment_[0].buffer_time_=INIT_QUERY_TTL; } else{ buffered_segment_[0].segment_id_=0; buffered_segment_[0].hops_=0; //modify by zdh 06-04-03 buffered_segment_[0].buffer_time_=0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -