📄 peer_agent.cc
字号:
gagent_->Pong((int)peer, INIT_TTL, cnt, addrs, sbytes, sfiles, id); if(sbytes) free(sbytes); if(sfiles) free(sfiles);}/* PONG reply received */void GnutellaApp::PongReply(NodeAddr_t peer, int ttl, char* payload) { tPong *pong = (tPong *)payload; if(lpeers_.size() < max_deg_ && pong->addr_!= addr_) { PeerMap_t::iterator pi = lpeers_.find(pong->addr_); if(pi==lpeers_.end()) { gagent_->Connect(pong->addr_, CONN_TIMEOUT); } } if(find_servent(pong->addr_)) return; if(pong->addr_!= addr_) { ServentRec *newsrv = new ServentRec(pong->port_, pong->addr_, pong->byte_shared_, pong->file_shared_); servent_cache_.push_back(*newsrv); while(servent_cache_.size() >= KNOWNCACHE_LIMIT) servent_cache_.pop_front(); }}/* Query Request received *///void GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) {//modify by zdh 04-04int GnutellaApp::QueryRequest(NodeAddr_t peer, int ttl, char* payload, char *id) { double hitprob=0; rquery++; if(isFreeloader_ || state_==PS_OFFLINE) return 0;//20060322 Jackie commented/* unsigned char nhit = (unsigned char)((float)rand()/(float)RAND_MAX *5.0); if(ac_) { int M = ac_->sys_->nFiles_; int fnum = atoi(payload); hitprob = (double)(M+1-fnum)/(double)(M+1); } double hit = (float)rand()/(float)RAND_MAX; //random probability of hit if(hitprob>0 && hit<hitprob || (hitprob==0 && hit<HIT_PROB)) */ //int fnum = atoi(payload); int fnum = *((Word_t *)payload); debug_info("Query Request receive by: %d from: %d,request segment id: %d,my buffer seg id:%d \n",addr_,peer,fnum,buffered_segment_[0].segment_id_); unsigned char nhit = 0; if (segmentIsBuffer(fnum)) { gagent_->QueryHit((int)peer, nhit, addr_, speed_, payload, id,ttl); return 1; } else return 0;}int GnutellaApp::segmentIsBuffer(int request_segmentID){return request_segmentID==buffered_segment_[0].segment_id_;}/* QueryHit received*/void GnutellaApp::QueryHitReply(NodeAddr_t peer, int ttl, char *payload) {tQueryHit queryhit;// add by zdh 04-03memcpy((void *)&queryhit, (void *)payload, QUERYHIT_FIXLEN);/*if (segmentID_==file_size_){ debug_info(" The transfer for node :%d is finished, altogether %d segments are transfered \n", gapp_->addr_,gapp_->file_size_); segmentID_++; }else{*/ //the current querying segment is hits //}if (segment_lable_[queryhit.segmentID_].searched_){ if (!segment_lable_[queryhit.segmentID_].received_) {debug_my("QueryHit received by %d from %d segment: %d hops : %d\n", addr_, peer,queryhit.segmentID_,ttl); segment_lable_[queryhit.segmentID_].received_=1; update_segmentcache(queryhit.segmentID_,ttl); //search(NULL); } } if(ac_ && state_==PS_ACTIVE) ac_->expire(NULL);}/* available for more connections? */int GnutellaApp::avail(int type) { if(lpeers_.size() < max_deg_) return TRUE; else return FALSE;}/* configure a list of bootstrap servers from a file */void GnutellaApp::setBootServer(FILE *file) { char line[20]; NodeAddr_t servent; BootServerRec *brec; while(fgets((char *)&line, 20, file)) {#ifdef PDNS Tcl &tcl = Tcl::instance(); tcl.evalf("[Simulator instance] convert-ipaddr %s\n", line); servent = atoi(tcl.result()); #else servent = atoi(line);#endif brec = new BootServerRec(servent, LISTEN_PORT); if (brec) bserver_list_.insert(bserver_list_.end(), *brec); }}/* find a servent in the servent cache */int GnutellaApp::find_servent(NodeAddr_t peer) { for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) { if(si->addr_==peer) return TRUE; } return FALSE;}/* remove a servent from the known servent cache */void GnutellaApp::remove_servent(NodeAddr_t peer) { for(ServentMap_t::iterator si = servent_cache_.begin(); si != servent_cache_.end(); si++) { if(si->addr_==peer) { servent_cache_.erase(si); return; } }}void GnutellaApp::update_segmentcache(Word_t segID,Byte_t hops) {int i,tmp,not_delay=0;//double delay=0; //for (i=0;i<buffer_size_;i++){ //add by zdh for (i=1;i<segID;i++){ if (segment_lable_[i].received_==0) {segment_lable_[segID].tdelay_=0; not_delay=1; tmp=i-1+nwindow; if(!segment_lable_[tmp].searched_) segmentID_=tmp; return; } } if(!not_delay){ for(i=1;i<segID;i++) { segment_lable_[i].Tdelay_=segment_lable_[i-1].Tdelay_+segment_lable_[i].tdelay_; //debug_my("segment_lable_[%d].Tdelay_: %f,segment_lable_[%d].tdelay_: %f\n",i,segment_lable_[i].Tdelay_,i,segment_lable_[i].tdelay_); } if (segID<=nwindow) segment_lable_[segID].tdelay_=hops-(start_up_time+segment_play_time*(segID-1)+segment_lable_[segID-1].Tdelay_); else segment_lable_[segID].tdelay_=hops+segment_play_time*(1-nwindow)-(segment_lable_[segID-1].Tdelay_-segment_lable_[segID-nwindow].Tdelay_); if (segment_lable_[segID].tdelay_<0) segment_lable_[segID].tdelay_=0; segment_lable_[segID].Tdelay_=segment_lable_[segID-1].Tdelay_+segment_lable_[segID].tdelay_; //debug_my("segment_lable_[%d].Tdelay_: %f,segment_lable_[%d].tdelay_: %f\n",segID,segment_lable_[segID].Tdelay_,segID,segment_lable_[segID].tdelay_); //debug_my("total_delay[%d]: %f segment_delay:%f,buffer time %f\n",segID,segment_lable_[segID].Tdelay_,segment_lable_[segID].tdelay_,buffered_segment_[0].buffer_time_); if ((buffered_segment_[0].buffer_time_<segment_lable_[segID].tdelay_) && (buffered_segment_[0].segment_id_!=segID)){ //if ((buffered_segment_[i].hops_<hops) && (buffered_segment_[i].segment_id_!=segID)) if (buffered_segment_[0].hops_!=0) debug_my(" Updates cache for node: %d old segment: %d old hops:%d old buffer time %f to new segment: %d,new hops: %d segment delay %f\n", addr_,buffered_segment_[0].segment_id_, buffered_segment_[0].hops_,buffered_segment_[0].buffer_time_,segID,hops,segment_lable_[segID].tdelay_); else debug_my(" Inserting cache for node: %d with segment: %d,hops: %d delay %f \n", addr_,segID,hops,segment_lable_[segID].tdelay_); buffered_segment_[0].segment_id_=segID; buffered_segment_[0].hops_=hops; buffered_segment_[0].buffer_time_=segment_lable_[segID].tdelay_; //for(i=segID+1; i<file_size_; i++) // segment_deadline_[i]=segment_deadline_[i]+segment_delay_; } segmentID_=segID+nwindow; } search(NULL); return ;}/* send an update to the bootstrap server */ void GnutellaApp::update_bootcache() { if(smpBoot_) return; gagent_->UpdateBootcache();}int GnutellaApp::command(int argc, const char*const* argv) { Tcl &tcl = Tcl::instance(); if(argc==2) { if(strcmp(argv[1], "start")==0) { join(); return TCL_OK; } if(strcmp(argv[1], "join")==0) { join(); return TCL_OK; } if(strcmp(argv[1], "leave")==0) { leave(); return TCL_OK; } if(strcmp(argv[1], "stop")==0) { stop(); return TCL_OK; } if(strcmp(argv[1], "statistics")==0) { stat(); //commented by zdh 04-03 //gagent_->statistics(); return TCL_OK; } } if(argc==3) { if(strcmp(argv[1], "set-agent")==0) { GnutellaAgent *tmp = (GnutellaAgent *)tcl.lookup(argv[2]); if(tmp) { gagent_ = tmp; agent_ = tmp; return TCL_OK; } return TCL_ERROR; } if(strcmp(argv[1], "search")==0) { search((char *)argv[2]); return TCL_OK; } if(strcmp(argv[1], "use-bootserver")==0) { FILE *file = fopen(argv[2], "r"); if(file) { setBootServer(file); return TCL_OK; } return TCL_ERROR; } if(strcmp(argv[1], "use-smpbootserver")==0) { SmpBootServer *tmp = (SmpBootServer *)tcl.lookup(argv[2]); if(tmp!=NULL) { bootSrv_ = tmp; smpBoot_ = TRUE; return TCL_OK; } return TCL_ERROR; } } if(argc==4) { if(strcmp(argv[1], "attach-peersys")==0) { PeerSys *tmp = (PeerSys *)tcl.lookup(argv[2]); if(tmp!=NULL) { ac_ = new ActivityController(tmp, this, atoi(argv[3])); return TCL_OK; } return TCL_ERROR; } } return TCL_ERROR;}/* Ping timer */PingTimer::PingTimer(GnutellaApp *app) { app_ = app;}void PingTimer::expire(Event *) { app_->ping();}/* Connection watchdog */WatchDog::WatchDog(GnutellaApp *app) { app_ = app;}void WatchDog::expire(Event *) { app_->connect(TRUE); app_->update_bootcache();}/* known servent record */ServentRec::ServentRec(Byte_t port, NodeAddr_t addr, int nbytes, int nfiles) { port_ = port; addr_ = addr; file_shared_ = nfiles; byte_shared_ = nbytes;}/**********************************************************************//**** Part III: Simple bootstrap server ****//* servent record maintained by bootstrap server */GServentRec::GServentRec(Byte_t port, NodeAddr_t addr, int nbytes, int nfiles, GnutellaApp *app) { port_ = port; addr_ = addr; app_ = app; file_shared_ = nfiles; byte_shared_ = nbytes;}SmpBootServer::SmpBootServer() {}void SmpBootServer::RemovePeer(NodeAddr_t peer, GnutellaApp *app) { GServentMap_t::iterator fi = servent_cache_.find(peer); if(fi!=servent_cache_.end()) { servent_cache_.erase(fi); }}/* called directly by a new servent for bootstrapping */BootstrapRes_t *SmpBootServer::BootstrapRequest(NodeAddr_t peer, GnutellaApp *app, int peertype) { BootstrapRes_t *res; int i = 0; int cnt = 0, asize = 0; double thresh, prob; for(GServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) { if(fi->second.app_->avail(peertype)) cnt++; } asize = cnt > MAX_BOOTREPLY? MAX_BOOTREPLY:cnt; if (cnt <= MAX_BOOTREPLY) thresh = 1; else thresh = (float)MAX_BOOTREPLY/(float)cnt; res = (BootstrapRes_t *)malloc(sizeof(BootstrapRes_t)); NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*asize); for(GServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) { if(!fi->second.app_->avail(peertype)) continue; prob = (double)rand()/(double)RAND_MAX; if(prob <= thresh) { addrs[i] = fi->second.addr_; i++; if(i>=asize) break; } } res->cnt_ = i; res->servents_ = addrs; if(peertype!=TYPE_LEAF) { GServentMap_t::iterator fi = servent_cache_.find(peer); if(fi==servent_cache_.end()) { GServentRec *newsrv; newsrv = new GServentRec(LISTEN_PORT, peer, 0, 0, app); servent_cache_.insert(GServentMap_t::value_type(peer, *newsrv)); } } return res;}/* simple bootstrap server in PDNS */PDNSBootServer::PDNSBootServer(NodeAddr_t addr): SmpBootServer() { addr_ = addr;}BootstrapRes_t *PDNSBootServer::BootstrapRequest(NodeAddr_t peer, GnutellaApp *app, int peertype) { BootstrapRes_t *res; int i = 0; int cnt = 0, asize = 0; double thresh, prob; cnt = pservent_cache_.size(); asize = cnt > MAX_BOOTREPLY? MAX_BOOTREPLY: cnt; if (cnt <= MAX_BOOTREPLY) thresh = 1; else thresh = (float)MAX_BOOTREPLY/(float)cnt; res = (BootstrapRes_t *)malloc(sizeof(BootstrapRes_t)); NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*asize); for(PDNSServentMap_t::iterator fi = pservent_cache_.begin(); fi != pservent_cache_.end(); ++fi) { prob = (double)rand()/(double)RAND_MAX; if(prob <= thresh) { addrs[i] = fi->second.addr_; i++; if(i>=asize) break; } } res->cnt_ = i; res->servents_ = addrs; if(peertype!=TYPE_LEAF) { PDNSServentMap_t::iterator fi = pservent_cache_.find(peer); if(fi==pservent_cache_.end()) { PDNSServentEntry *newsrv; newsrv = new PDNSServentEntry(LISTEN_PORT, peer); pservent_cache_.insert(PDNSServentMap_t::value_type(peer, *newsrv)); broadcast(ADD_PEER, peer); } } return res;}void PDNSBootServer::RemovePeer(NodeAddr_t peer) { PDNSServentMap_t::iterator fi = pservent_cache_.find(peer); if(fi!=pservent_cache_.end()) { pservent_cache_.erase(fi); broadcast(REM_PEER, peer); }}/* PDNSBootServer receives updates from boot servers in other physical nodes*/int PDNSBootServer::upcall_recv(Socket *sock, PacketData *pktdata, Handler *){ PDNSBootMsg *msg = NULL; NodeAddr_t peer; PDNSServentMap_t::iterator pi; PDNSServentEntry *newentry = NULL; if(pktdata==NULL) return 0; unsigned char *data = pktdata->data(); msg = parse(pktdata->size(), data); if(msg==NULL) return pktdata->size(); peer = msg->peer_; switch(msg->type_) { case ADD_PEER: pi=pservent_cache_.find(peer); if(pi==pservent_cache_.end()) { PDNSServentEntry *newentry = new PDNSServentEntry(BOOTSERVER_PORT, peer); pservent_cache_.insert(PDNSServentMap_t::value_type(peer, *newentry)); } else { //warning } break; case REM_PEER: pi=pservent_cache_.find(peer); if(pi!=pservent_cache_.end()) { pservent_cache_.erase(pi); } else { //warning } break; default: debug_warning("WARNING: unknown msg type %d\n", msg->type_); } free(msg); return pktdata->size();}/* parse the messages received from other boot ser
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -