📄 peer_agent.cc
字号:
//} //printf("Initializing node %d with segment id buffered: %d \n",addr,buffered_segment_[0].segment_id_); //bind("file_size_", &file_size_); if(buffered_segment_[0].segment_id_!=0) debug_my("node %d with segment id buffered: %d \n",addr,buffered_segment_[0].segment_id_); bind("isBootserver_", &isBootserver_); bind("max_deg_", &max_deg_); ping_interval_ = PING_INTERVAL; watch_interval_ = CONN_INTERVAL; max_deg_ = MAX_DEGREE; bootSrv_ = NULL; smpBoot_ = FALSE; degree_ = new BasicStat(addr);}//06-04-03add by zdhvoid GnutellaApp::stat() { debug_my("node %d cache segment %d hops %d buffer time %f \n",addr_,buffered_segment_[0].segment_id_,buffered_segment_[0].hops_,buffered_segment_[0].buffer_time_); for(int i=1;i<file_size_+1;i++) debug_my("segment[%d] delay: %f\n",i,segment_lable_[i].tdelay_);}/* user command interface *//* join: outsys->insys */void GnutellaApp::join() { ping_interval_ = PING_INTERVAL; bootstrap();}/* leave: insys->outsys */void GnutellaApp::leave() { if(state_!= PS_OUTSYS) setState(PS_OFFLINE); if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); if(ac_ && ac_->status_==TIMER_PENDING) ac_->cancel();}void GnutellaApp::stop() { if(state_ != PS_OUTSYS) setState(PS_OFFLINE); if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); if(ac_ && ac_->status_==TIMER_PENDING) ac_->cancel();}void GnutellaApp::disconnect(NodeAddr_t node) { gagent_->Disconnect(node); if(node==-1) { lpeers_.clear(); if(smpBoot_) { if(bootSrv_) bootSrv_->RemovePeer(addr_, this); } }else { PeerMap_t::iterator pi = lpeers_.find(node); if(pi!=lpeers_.end()) { lpeers_.erase(pi); } } degree_->increment(lpeers_.size());}/* share files */void GnutellaApp::share() {}/* maintenance of peer relationship */void GnutellaApp::maintenance() {}/* initiate a query */void GnutellaApp::search(char *criteria) {//20060321 Jackie modifiedchar tmp[10]={0};//only the first invoke has non_null cirteriaif (criteria){//add by zdh 04-04 for(int i=0;i<(file_size_+1);i++){ segment_lable_[i].Tdelay_=0; segment_lable_[i].tdelay_=0; segment_lable_[i].searched_=0; segment_lable_[i].received_=0; } //commented by zdh //segmentID_= *(Word_t *)criteria ; //if (segmentID_==0||segmentID_>(file_size_+1)) for(segmentID_=1;segmentID_<nwindow+1;segmentID_++) {segment_lable_[segmentID_].searched_=1; memcpy(tmp,&segmentID_,sizeof(segmentID_));debug_my(" Node: %d is querying the %d segment, \n", addr_,segmentID_); gagent_->Query(-1, 100, tmp);} segmentID_=nwindow; /*segmentID_=2; memcpy(tmp,&segmentID_,sizeof(segmentID_));debug_my(" Node: %d is querying the %d segment, \n", addr_,segmentID_); gagent_->Query(-1, 100, tmp);*/ }else {//last_segment=segmentID_; //segmentID_++;if (segmentID_>file_size_) return;// ADD by zdh 04-04segment_lable_[segmentID_].searched_=1;memcpy(tmp,&segmentID_,sizeof(segmentID_));debug_my(" Node: %d is querying the %d segment, \n", addr_,segmentID_); gagent_->Query(-1, 100, tmp);}}/* GnutellaApp internal operations *//* proactively change its own status */void GnutellaApp::setState(PeerState_t state) { PeerState_t old_state = state_; double tmp, ctime = NOW; state_ = state; //the following code is for membership accounting purpose if(state_ == PS_OFFLINE && old_state != PS_OFFLINE) { if(!isFreeloader_) { if(Last_Chg1 !=-1) { tmp = (ctime - Last_Chg1) * (double)Na1 + Avg_Na1 * TSysTime1 ; TSysTime1 += (ctime - Last_Chg1); Avg_Na1 = tmp/TSysTime1; /* periodic averaging */ tmp = (ctime - Last_Chg1) * (double)Na1 + pAvg_Na1 * pTime1; pTime1 += (ctime - Last_Chg1); pAvg_Na1 = tmp/pTime1; Na1--; }else { debug_warning("WARNING: shouldn't get here (setState:1)\n"); } Last_Chg1 = ctime; } else { if(Last_Chg2 !=-1) { tmp = (ctime - Last_Chg2) * (double)Na2 + Avg_Na2 * TSysTime2 ; TSysTime2 += (ctime - Last_Chg2); Avg_Na2 = tmp/TSysTime2; /* periodic averaging */ tmp = (ctime - Last_Chg2) * (double)Na2 + pAvg_Na1 * pTime2; pTime2 += (ctime - Last_Chg2); pAvg_Na2 = tmp/pTime2; Na2 --; }else { debug_warning("WARNING: shouldn't get here (setState:2)\n"); } Last_Chg2 = ctime; } if(ping_timer_ && ping_timer_->status_==TIMER_PENDING) ping_timer_->cancel(); if(watchDog_ && watchDog_->status_==TIMER_PENDING) watchDog_->cancel(); //disconnect() is required for correct functioning though disconnect(-1); debug_stat("Membership %d\t%d\n", Na1, Na2); gagent_->bkblock_->offline(); gagent_->fwblock_->offline(); gagent_->rcvRate_->offline(); degree_->offline(); fflush(NULL); } if(state_ != PS_OFFLINE && old_state == PS_OFFLINE) { if(!isFreeloader_) { if(Last_Chg1 !=-1) { tmp = (ctime - Last_Chg1 ) * (double)Na1 + Avg_Na1 * TSysTime1 ; TSysTime1 += (ctime - Last_Chg1); Avg_Na1 = tmp/TSysTime1; tmp = (ctime - Last_Chg1) * (double)Na1 + pAvg_Na1 * pTime1; pTime1 += (ctime - Last_Chg1); pAvg_Na1 = tmp/pTime1; } Last_Chg1 = ctime; }else { if(Last_Chg2 !=-1) { tmp = (ctime - Last_Chg2 ) * (double)Na2 + Avg_Na2 * TSysTime2 ; TSysTime2 += (ctime - Last_Chg2); Avg_Na2 = tmp/TSysTime2; tmp = (ctime - Last_Chg2) * (double)Na2 + pAvg_Na1 * pTime2; pTime2 += (ctime - Last_Chg2); pAvg_Na2 = tmp/pTime2; } Last_Chg2 = ctime; } if(!isFreeloader_) Na1 ++; else Na2 ++; if(ping_timer_) ping_timer_->resched(ping_interval_); if(watchDog_) watchDog_->resched(watch_interval_); debug_stat("Membership %d\t%d\n", Na1, Na2); gagent_->bkblock_->online(); gagent_->fwblock_->online(); gagent_->rcvRate_->online(); degree_->online(); fflush(NULL); } if(pTime1 > SMP_INTERVAL) { debug_info("Membership non-freeloader %f\t%d\t%d\n", pAvg_Na1, Na1, Na2); pAvg_Na1 = 0; pTime1 = 0; } if(pTime2 > SMP_INTERVAL) { debug_info("Membership freeloader %f\t%d\t%d\n", pAvg_Na2, Na1, Na2); pAvg_Na2 = 0; pTime2 = 0; }}/* bootstrap */void GnutellaApp::bootstrap() { if(smpBoot_) { if(bootSrv_) smp_bootstrap(); return; } if(bserver_list_.size()>0) { BootServerRec bi = bserver_list_.front(); gagent_->Bootstrap(bi.addr_); bserver_list_.push_back(bi); bserver_list_.pop_front(); }}/* use SmpBootServer for bootstrapping */void GnutellaApp::smp_bootstrap() { BootstrapRes_t *res; //20060321 Jackie modified if (file_size_<=0) return ; res = bootSrv_->BootstrapRequest(addr_, this, TYPE_LEGACY); if(res == NULL) return; BootstrapResult(res->cnt_, res->servents_); if(res->servents_) free(res->servents_); free(res);}/* tries to connect to some known servents */void GnutellaApp::connect(int trybootserver) { int cnt=0, success, limit; limit = max_deg_ - lpeers_.size(); for (ServentMap_t::const_iterator i = servent_cache_.begin(); cnt < limit && i != servent_cache_.end(); ++i ) { NodeAddr_t peer = i->addr_; PeerMap_t::iterator pi = lpeers_.find(peer); if(pi==lpeers_.end()) { success = gagent_->Connect(peer, CONN_TIMEOUT); if(success) cnt++; } } if (trybootserver && !smpBoot_ && cnt < limit) //not enough known servents bootstrap();}/* periodically send PING to connected peers */void GnutellaApp::ping() { double ctime; ctime = NOW; for(PeerMap_t::iterator i = lpeers_.begin(); i != lpeers_.end(); ++i) { if(ctime - i->second.lstamp_ > ping_interval_) { gagent_->Ping(i->second.peer_, INIT_TTL); i->second.lstamp_ = ctime; } } ping_timer_->resched(ping_interval_); }// upcalls from GnutellaAgent/* Connection request to a peer succeeded */void GnutellaApp::ConnectSucceeded(NodeAddr_t peer) { PeerMap_t::iterator pi = lpeers_.find(peer); if(pi==lpeers_.end()) { PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE); lpeers_.insert(PeerMap_t::value_type(peer, *newentry)); degree_->increment(lpeers_.size()); if(lpeers_.size()==1) { ping_timer_->resched(ping_interval_); watchDog_->resched(watch_interval_); if(gagent_->gc_ && gagent_->gc_->status()==TIMER_IDLE) gagent_->gc_->resched(DESC_TIMEOUT/2); } } if(state_ == PS_OUTSYS) state_ = PS_OFFLINE; if(state_ == PS_OFFLINE) { setState(PS_IDLE); if(ac_) ac_->expire(NULL); }}/* Bootstrap request received*/void GnutellaApp::BootstrapRequest(NodeAddr_t peer) { debug_info("bootstrap request received from %d\n", peer); if(isBootserver_) { int i=0, size = servent_cache_.size(); int cnt = MAX_BOOTREPLY>size ? size: MAX_BOOTREPLY; NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt); for(ServentMap_t::iterator fi = servent_cache_.begin(); fi != servent_cache_.end(); ++fi) { addrs[i] = fi->addr_; i++; if(i>=MAX_BOOTREPLY) break; } gagent_->Bootstrap_Reply(peer, cnt, addrs); if(find_servent(peer)) remove_servent(peer); int prob = (int)((double)rand()/(double)RAND_MAX * 100.0); if(prob < PUBLISH_PROB) { ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*newsrv); } while(servent_cache_.size() > KNOWNCACHE_LIMIT) servent_cache_.pop_front(); free(addrs); }}/* Bootstrap results received */void GnutellaApp::BootstrapResult(int cnt, NodeAddr_t *res) { int i; for(i=0; i<cnt; i++) { NodeAddr_t cur = res[i]; if(!find_servent(cur) && cur != addr_) { ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, cur, 0, 0); servent_cache_.push_back(*newsrv); } } connect(FALSE);}/* Bootstrap results received */void GnutellaApp::BootcacheUpdate(NodeAddr_t peer) { if (!isBootserver_) return; if(!find_servent(peer)) return; remove_servent(peer); ServentRec *newsrv; newsrv = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*newsrv);}/* connection request received*/int GnutellaApp::ConnectionRequest(NodeAddr_t peer, Socket *sock) { PeerMap_t::iterator i = lpeers_.find(peer); if(avail(TYPE_ANY) && i==lpeers_.end()) { gagent_->gnutella_ok(sock); PeerEntry *newentry = new PeerEntry(peer, NOW, FALSE); lpeers_.insert(PeerMap_t::value_type(peer, *newentry)); degree_->increment(lpeers_.size()); if(lpeers_.size()==1) { if(state_ == PS_OUTSYS) state_ = PS_OFFLINE; if(state_ == PS_OFFLINE) { setState(PS_IDLE); if(ac_) ac_->expire(NULL); } ping_timer_->resched(ping_interval_); watchDog_->resched(watch_interval_); } if(!find_servent(peer)) { ServentRec *srec = new ServentRec(LISTEN_PORT, peer, 0, 0); servent_cache_.push_back(*srec); } return 1; } else { gagent_->gnutella_reject(sock); return 0; }}/* connection request rejected */void GnutellaApp::ConnectionRejected(NodeAddr_t peer) { debug_info("connection request from %d to %d rejected\n", addr_, peer); if(find_servent(peer)) remove_servent(peer);}/* connection failed, probably at network level */void GnutellaApp::ConnectionFailed(NodeAddr_t peer) { debug_info("connection to %d failed\n", peer); if(find_servent(peer)) remove_servent(peer);}/* connection request timed out */void GnutellaApp::ConnectionTimeout(NodeAddr_t peer) { debug_info("connection from %d to %d timed out\n", addr_, peer); if(find_servent(peer)) remove_servent(peer);}/* connection closed by peer */void GnutellaApp::ConnectionClosed(NodeAddr_t peer) { debug_info("connection closed by peer %d\n", peer); for(PeerMap_t::iterator i=lpeers_.begin(); i != lpeers_.end(); ++i) { if(i->second.peer_ == peer) { lpeers_.erase(i); degree_->increment(lpeers_.size()); return; } }}/* PING request received */void GnutellaApp::PingRequest(NodeAddr_t peer, int ttl, char *id) { int i=0; int cnt = servent_cache_.size(); debug_info("PingRequest from %d to %d\n", peer, addr_); int *sbytes = (int *)malloc(sizeof(int)*cnt); int *sfiles = (int *)malloc(sizeof(int)*cnt); NodeAddr_t *addrs = (NodeAddr_t *)malloc(sizeof(NodeAddr_t)*cnt); for(ServentMap_t::iterator si = servent_cache_.begin(); si!=servent_cache_.end(); si++) { sbytes[i] = si->byte_shared_; addrs[i] = si->addr_; sfiles[i] = si->file_shared_; i++; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -