📄 peer_agent.cc
字号:
if(di->replycnt_>5) pending_req_.erase(di); return cnt; } }; return -1;}/* find the outstanding Ping matching a descriptor ID */int GnutellaAgent::find_ping(char *descid, int poll) { int cnt = -1; for(ReqList_t::iterator di=pending_req_.begin(); di !=pending_req_.end(); di++) { if(di->type_==REQ_PING && memcmp(descid, di->id_, 16)==0) { cnt = di->replycnt_; if(!poll) di->replycnt_++; return cnt; } }; return -1;}/* forward a PING or Query message */int GnutellaAgent::forward(Socket *incoming, PacketData *data, desc_hdr *hdr) { PacketData *newdata=NULL; DescEntry* newentry=NULL; int bcnt=0, totalout=0; int fcnt=0; for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) { if(si->second.sock_!=incoming) { totalout++; if(si->second.sock_->blocked(SOCK_WRITE)) { bcnt++; fwblock_->increment(); } } } for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) { if(si->second.sock_!=incoming) { if(!si->second.sock_->blocked(SOCK_WRITE)) { newdata = new PacketData(*data); si->second.sock_->send(data->size(), newdata); } } } newentry = new DescEntry(hdr->id_, incoming, NOW); desc_cache_.insert(desc_cache_.end(), *newentry); return FALSE;}/* route a PONG or QueryHit back to the initiator */int GnutellaAgent::backroute(PacketData *data, desc_hdr *header) { PacketData *newdata=NULL; Socket *sock = find_desc(header->id_); if(sock!=NULL) { SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_); if(si!=lsocks_.end()) { if(sock->blocked(SOCK_WRITE)) { bkblock_->increment(); } newdata = new PacketData(*data); sock->send(data->size(), newdata); } } return FALSE;}/* Garbage collection of descriptor ID and pending connection */void GnutellaAgent::gc() { double curtime = NOW; DescMap_t::iterator di; while(!desc_cache_.empty()) { di = desc_cache_.begin(); if(curtime - di->tstamp_ > DESC_TIMEOUT) { desc_cache_.erase(di); } else break; }; ReqList_t::iterator qi; while(!pending_req_.empty()) { qi = pending_req_.begin(); if(curtime - qi->tstamp_ > DESC_TIMEOUT) { if(qi->replycnt_==0) { nforceremove++; } pending_req_.erase(qi); } else break; };}/* Gnutella Message Format Handler */GnutellaMsg::GnutellaMsg(NodeAddr_t addr) { bzero((void *)&header_, sizeof(header_)); payload_.pong_ = NULL; payload_.query_ = NULL; payload_.queryhit_ = NULL; payload_.push_ = NULL; addr_ = addr; seq1_ = seq2_ = seq3_ = 0; }/* clear after every use of GnutellaMsg::parse() */int GnutellaMsg::clear() { if(payload_.pong_) { free(payload_.pong_); payload_.pong_ = NULL; } if(payload_.query_) { if(payload_.query_->criteria_) free(payload_.query_->criteria_); payload_.query_->criteria_ = NULL; free(payload_.query_); payload_.query_ = NULL; } if(payload_.queryhit_) { free(payload_.queryhit_); payload_.query_ = NULL; } if(payload_.push_){ free(payload_.push_); payload_.push_ = NULL; } bzero((void *)&header_, sizeof(header_));}/* Gnutella Message Parsing */int GnutellaMsg::parse(int len, unsigned char *data) { char *teststring = (char *)malloc(len+1); void *temp=NULL; memcpy((char *)teststring, (char *)data, len); if(strcmp(GNUTELLA_BOOTSTRAP_RES, (char *)teststring)==0) { free(teststring); return MSG_BOOTSTRAP_RES; } teststring[len] = 0; if(strcmp(GNUTELLA_BOOTSTRAP, (char *)teststring)==0) { free(teststring); return MSG_BOOTSTRAP; } if(strcmp(GNUTELLA_CONN, (char *)teststring)==0) { free(teststring); return MSG_CONNREQ; } if(strcmp(ULTRA_CONN, (char *)teststring)==0) { free(teststring); return MSG_ULTRACONNREQ; } if(strcmp(LEAF_CONN, (char *)teststring)==0) { free(teststring); return MSG_LEAFCONNREQ; } if(strcmp(GNUTELLA_OK, (char *)teststring)==0) { free(teststring); return MSG_CONNOK; } if(strcmp(ULTRA_OK, (char *)teststring)==0) { free(teststring); return MSG_CONNOK; } if(strcmp(GNUTELLA_REJ, (char *)teststring)==0) { free(teststring); return MSG_CONNREJ; } if(strcmp(BOOTCACHE_UPDATE, (char *)teststring)==0) { free(teststring); return MSG_BOOTCACHE_UPDATE; } free(teststring); if(len < DESC_HDRLEN) return MSG_UNKNOWN; memcpy((void *)&header_, (void *)data, DESC_HDRLEN); data[18] = header_.hops_ + 1; header_.hops_++; switch(header_.payload_desc_) { case PING: return MSG_PING; case PONG: if(header_.length_ != PONG_LEN) return MSG_ILLFORMAT; temp = malloc(header_.length_); memcpy(temp, (void *)&data[DESC_HDRLEN], PONG_LEN); payload_.pong_ = (tPong *)temp; return MSG_PONG; case QUERY: if(header_.length_< 1) return MSG_ILLFORMAT; temp = malloc(sizeof(tQuery)); memcpy(temp, (void *)&data[DESC_HDRLEN], sizeof(Word_t)); payload_.query_ = (tQuery *)temp; payload_.query_->criteria_ = (char *)malloc(header_.length_ - sizeof(Word_t)); //strcpy(payload_.query_->criteria_, (char *)&data[DESC_HDRLEN + sizeof(Word_t)]); memcpy((void *)payload_.query_->criteria_,(void *)&data[DESC_HDRLEN + sizeof(Word_t)],sizeof(tquery)); return MSG_QUERY; case QUERYHIT: if(header_.length_ < QUERYHIT_FIXLEN) return MSG_ILLFORMAT; temp = malloc(sizeof(tQueryHit)); memcpy((void *)temp, (void *)&data[DESC_HDRLEN], header_.length_); payload_.queryhit_ = (tQueryHit *)temp; return MSG_QUERYHIT; default: return MSG_UNKNOWN; }; return MSG_UNKNOWN;}/* compose a PacketData with a Gnutella message as the payload *//* called with message type and length, as well as some optional parms */PacketData *GnutellaMsg::newpacket(char *id, Byte_t type, int ttl, int hops, int len, char *data) { PacketData *userdata = new PacketData(DESC_HDRLEN + len); unsigned char *dataptr = userdata->data(); if(dataptr == NULL) return NULL; header_.payload_desc_ = type; header_.TTL_ = ttl; header_.hops_ = hops; header_.length_ = len; if(id) memcpy((void *)&header_.id_, id, 16); else { id = new_descid(); memcpy((void *)&header_.id_, id, 16); free(id); } memcpy((void *)dataptr, (void *)&header_, DESC_HDRLEN); if(data && len>0) memcpy((void *)&dataptr[DESC_HDRLEN], data, len); return userdata;}/* generate a new unique descriptor ID */char *GnutellaMsg::new_descid() { char *newid = NULL; newid = (char*)malloc(16); memcpy((void *)newid, &addr_, 4); seq1_ ++; memcpy((void *)&newid[4], &seq1_, 4); memcpy((void *)&newid[8], &seq2_, 4); memcpy((void *)&newid[12], &seq3_, 4); if(seq1_ == INT_MAX -1) { seq1_ = 0; seq2_ ++; if(seq2_ == INT_MAX -1) { seq2_ = 0; seq3_ ++; } } return newid;}/* Garbage collection timeout handler*/void GC::expire(Event *) { agent_->gc(); resched(DESC_TIMEOUT/2);}/* Connection Timer */ConnTimer::ConnTimer(GnutellaAgent *agent) { agent_ = agent;}void ConnTimer::expire(Event *) { agent_->conn_timeout();}/* socket entry */SockEntry::SockEntry(Socket *sock, NodeAddr_t peer, Byte_t type) { sock_ = sock; peer_ = peer; type_ = type;}/**********************************************************************//**** Part II: GnutellaApp related ****//* UMASS P2P system model */PeerSys::PeerSys() { bind("nFiles_", &nFiles_); bind("nClasses_", &nClasses_); bind("alpha_", &alpha_); cdffile_ = NULL; nFiles_=SEGMENT_NUM; init_file();}/* initialize the file popularity CDF structure */void PeerSys::init_file() { int i; double K=0.0,j=0; cdffile_ = (double *)malloc(sizeof(double)*nFiles_); //cdffile_ = (double *)malloc(sizeof(double)*20); for(i=0; i< nFiles_; i++) { //for(i=0; i< 20; i++) { K = K + 1.0/pow(i+1, alpha_); } K = 1.0/K; for(i=0; i<nFiles_; i++) { // for(i=0; i< 20; i++) { //if(i==0) cdffile_[i] = K/pow(i+1, alpha_); j=j+cdffile_[i] ; debug_my("cdffile_[%d] : %f\n",i,cdffile_[i]); //printf("nFiles is %d",nFiles_); //else //cdffile_[i] = K/pow(i+1, alpha_) + cdffile_[i-1]; } debug_my("total popularity if %f\n",j);}/* initialize the class info based on a configuration file *//* configuration file default: classinfo.txt */void PeerSys::init_class(FILE *fh) { char line[100]; for(int i=0; i< nClasses_; i++) { ClassSpec_t *ncl = (ClassSpec_t *)malloc(sizeof(ClassSpec_t)); if(fgets((char *)&line, 100, fh)!=NULL) { sscanf(line, "%lf\t%lf\t%lf\t%d\n", &ncl->loff_, &ncl->lidle_, &ncl->poff_, &ncl->isFreeloader_); classes_.insert(ClassMap_t::value_type(i, *ncl)); } }}int PeerSys::command(int argc, const char*const* argv) { FILE *fh; if(strcmp(argv[1], "init-class")==0) { if(argc==2) fh = fopen("classinfo.txt", "r"); else fh = fopen(argv[2], "r"); if(fh==NULL) { fprintf(stderr, "PeerSys::init-class: unable to open class info file\n"); return TCL_ERROR; } init_class(fh); fclose(fh); return TCL_OK; } return TclObject::command(argc, argv);}/* Peer state: OUTSYS->INSYS->OUTSYS while INSYS, a peer could be OFFLINE, ONLINE and IDLE*//* ActivityController: *//* dynamically generates events corresponding to peer actions */ActivityController::ActivityController(PeerSys *sys, GnutellaApp *app, int cl) { ClassMap_t::iterator ci = sys->classes_.find(cl); if(ci==sys->classes_.end()) { fprintf(stderr, "WARNING: assigning to nonexistent class\n"); return; } if(ci!=sys->classes_.end()) { clSpec_ = &(ci->second); } sys_ = sys; app_ = app; state_ = &app_->state_; app_->setFreeloader(clSpec_->isFreeloader_); first=0;}/* change of status triggered by a timer scheduled during last change */void ActivityController::expire(Event *e) { double roff, rdelay; if(*state_==PS_ACTIVE) { //if(app_->finish_search||app_->queried){//add by zdh 04-10 if(app_->finish_search||!app_->searched){ //a certain probability of going offline from active searching roff = (double)rand()/(double)RAND_MAX; /*if(roff < clSpec_->poff_) { app_->setState(PS_OFFLINE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->loff_; resched(rdelay); } else {*/ app_->setState(PS_IDLE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->lidle_; resched(rdelay); //} } else {//add by zdh 04-10 gen_query(); resched(QUERY_TIMEOUT); } } else { if(*state_==PS_IDLE) { if (first){ app_->setState(PS_ACTIVE); gen_query(); //add by zdh 04-06 //resched(QUERY_TIMEOUT+rdelay); resched(QUERY_TIMEOUT); } else { //rdelay = (app_->addr_%MAX_DEGREE*3)+(app_->addr_/MAX_DEGREE); // rdelay=0; // rdelay=(double)rand()/(double)RAND_MAX*150; rdelay=app_->delay; resched(rdelay); first=1; } } else { app_->bootstrap(); app_->setState(PS_IDLE); rdelay = (double)rand()/(double)RAND_MAX * clSpec_->lidle_; resched(rdelay); } }}/* generate a query based on file statistics of the system */void ActivityController::gen_query( ) { char criteria[4]; //if((app_->queried==0)&&(!queryone)){ // if(app_->queried==0){ //queryone=1; //app_->queried=1; //if (rprob < sys_->cdffile_[i]) { //sprintf((char*)criteria, "%d", i); //} //else //debug_my("%d queried is %d\n",app_->addr_,app_->queried);//if(!queryone&&!app_->queried||app_->end_of_search){if(app_->addr_==33) return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -