⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 peer_agent.cc

📁 升级版的p2p文件共享linux环境ns2仿真软件
💻 CC
📖 第 1 页 / 共 5 页
字号:
/*-------------------------------------------------------------------------*//* Packet-level peer-to-peer and Gnutella simulator                        *//* Author: Qi He <http://www.cc.gatech.edu/~qhe> 01Aug2003                 *//* $Revision:$ $Name:$ $Date:$                                             *//*-------------------------------------------------------------------------*/#include <stdarg.h>#include <stdlib.h>#include <sys/time.h>#include "random.h"#include "peer_agent.h"#include "ip.h"#include "math.h"#include "tcp-sock.h"using namespace std;static class PeerAppClass: public TclClass {public:  PeerAppClass(): TclClass("PeerApp") {}  TclObject* create(int argc, const char*const* argv) {    return (new PeerApp(atoi(argv[4])));  }}class_peerapp;static class GnutellaAppClass: public TclClass {public:  GnutellaAppClass(): TclClass("PeerApp/GnutellaApp") {}  TclObject* create(int argc, const char*const* argv) {    return (new GnutellaApp(atoi(argv[4])));  }}class_gnutellaapp;static class UltrapeerClass: public TclClass {public:  UltrapeerClass(): TclClass("PeerApp/GnutellaApp/Ultrapeer") {}  TclObject* create(int argc, const char*const* argv) {    return (new Ultrapeer(atoi(argv[4])));  }}class_ultrapeer;static class LeafClass: public TclClass {public:  LeafClass(): TclClass("PeerApp/GnutellaApp/Leaf") {}  TclObject* create(int argc, const char*const* argv) {    return (new Leaf(atoi(argv[4])));  }}class_leaf; static class PeerAgentClass: public TclClass {public:  PeerAgentClass(): TclClass("SocketApp/PeerAgent") {}  TclObject* create(int, const char*const* argv) {    return (new PeerAgent(atoi(argv[4])));  }}class_peeragent;static class GnutellaAgentClass: public TclClass {public:  GnutellaAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent") {}  TclObject* create(int argc, const char*const* argv) {    return (new GnutellaAgent(atoi(argv[4])));  }}class_gnutellaagent;static class UltraAgentClass: public TclClass {public:  UltraAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent/UltraAgent") {}  TclObject* create(int argc, const char*const* argv) {    return (new UltraAgent(atoi(argv[4])));  }}class_ultraagent;static class LeafAgentClass: public TclClass {public:  LeafAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent/LeafAgent") {}  TclObject* create(int argc, const char*const* argv) {    return (new LeafAgent(atoi(argv[4])));  }}class_leafagent;static class PeerSysClass: public TclClass {public:  PeerSysClass(): TclClass("PeerSys") {}  TclObject* create(int, const char*const* argv) {    return (new PeerSys());  }}class_peersys;static class SmpBootServerClass: public TclClass {public:  SmpBootServerClass(): TclClass("SocketApp/SmpBootServer") {}  TclObject* create(int, const char*const* argv) {    return (new SmpBootServer());  }}class_smpbootserver;static class PDNSBootServerClass: public TclClass {public:  PDNSBootServerClass(): TclClass("SocketApp/SmpBootServer/PDNSBootServer") {}  TclObject* create(int, const char*const* argv) {    return (new PDNSBootServer(atoi(argv[4])));  }}class_pdnsbootserver;static void debug_info(char *format, ...) {  va_list ap; if(getenv("GNUSIM_INFO")!=NULL) {    va_start(ap, format);    vfprintf(stdout, format, ap);    va_end(ap);  }  fflush(NULL);};static void debug_my(char *format, ...) {  va_list ap; //if(getenv("GNUSIM_INFO")!=NULL) {    va_start(ap, format);    vfprintf(stdout, format, ap);    va_end(ap);  //}  fflush(NULL);};static void debug_warning(char *format, ...) {  va_list ap;    if(getenv("GNUSIM_WARNING")!=NULL) {    va_start(ap, format);    vfprintf(stderr, format, ap);    va_end(ap);  }  fflush(NULL);};static void debug_stat(char *format, ...) {  va_list ap;    if(getenv("GNUSIM_NOSTAT")!=NULL) {    va_start(ap, format);    vfprintf(stderr, format, ap);    va_end(ap);  }  fflush(NULL);};/* global variables used to keep track of some statistics             */static int nquery = 0, nqueryhit = 0, nthru = 0, rquery = 0, squeryhit = 0;static long pongcnt = 0, nforceremove=0; static int Na1, Na2;static double Avg_Na1 = 0, Avg_Na2 = 0, Last_Chg1 = -1, Last_Chg2 = -1, TSysTime1 = 0, TSysTime2 = 0; static double pAvg_Na1 = 0, pAvg_Na2 = 0, pTime1=0, pTime2 = 0, Avg_Rsp = 0;#ifdef PDNSNsObject* GetLocalIP(ipaddr_t ipaddr);#endif/**********************************************************************//****                Part I: GnutellaAgent related                 ****//* base class PeerAgent */PeerAgent::PeerAgent(PeerApp *app) {  app_ = app;  firewalled_ = FALSE;}/****  GnutellaAgent ****/GnutellaAgent::GnutellaAgent(NodeAddr_t addr): PeerAgent(addr), rate_limit_(0), use_prio_(0) {  conn_timer_ = new ConnTimer(this);  msgHandle_ = new GnutellaMsg(addr);  gc_ = new GC(this);  init_id();  fwblock_  = new BasicStat(addr);  bkblock_ = new BasicStat(addr);  rcvRate_ = new BasicStat(addr);  sec_ = 0;  secRcv_ = 0;    bind ("rate_limit_", &rate_limit_);  bind ("use_prio_", &use_prio_);}GnutellaAgent::GnutellaAgent(GnutellaApp *app): PeerAgent(app) {  conn_timer_ = new ConnTimer(this);  app_ = app;  gapp_ = (GnutellaApp *)app_;  msgHandle_ = new GnutellaMsg(app_->addr_);  init_id();  fwblock_  = new BasicStat(app_->addr_);  bkblock_ = new BasicStat(app_->addr_);  rcvRate_ = new BasicStat(app_->addr_);  sec_ = 0;  secRcv_ = 0;    bind ("rate_limit_", &rate_limit_);  bind ("use_prio_", &use_prio_);}void GnutellaAgent::init_id() {}// upcalls from NSSocket/* passive connection established */void GnutellaAgent::upcall_passconn(Socket *sock) {  if(gapp_->isBootserver_) {    NodeAddr_t src = sock->peer_.addr_;    SockMap_t::iterator si = lsocks_.find(src);    if(si==lsocks_.end()) {	SockEntry *newentry = new SockEntry(sock, src, SOCK_BOOT);	lsocks_.insert(SockMap_t::value_type(src, *newentry));    }   }}/*outstanding socket connection request accepted */void GnutellaAgent::upcall_connected(Socket *sock) {  NodeAddr_t src = sock->peer_.addr_;  SockMap_t::iterator si = lsocks_.find(src);  if(si!=lsocks_.end() && si->second.type_==SOCK_BOOT_WAIT) {    si->second.type_ = SOCK_BOOT;    lime_bootstrap(sock);    return;  }  gnutella_req(sock);  PendingConns_t::iterator centry = conn_pending_.find(sock->peer_.addr_);  if(centry==conn_pending_.end()) {    debug_warning("WARNING: upcall_connected() for non pending socket on %d\n", app_->addr_);    return;   }   debug_info("upcall_connected() for pending socket with %d on %d\n", sock->peer_.addr_, app_->addr_);  centry->second.state_ = TRANSPORT_CONNECTED;}/* socket being closed by peer */void GnutellaAgent::upcall_closing(Socket *sock) {  SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_);  if(si!=lsocks_.end()) {    SockEntry *cur;    for(SockMap_t::iterator i=lsocks_.begin(); i!=lsocks_.end(); i++) {      if(i->second.peer_ == sock->peer_.addr_) {	lsocks_.erase(i);	gapp_->ConnectionClosed(sock->peer_.addr_);	break;      }    }  }}/* socket ready for sending */void GnutellaAgent::upcall_send(Socket *sock) {  SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_);  Socket *tsock=NULL;  if(si!=lsocks_.end()) {    SockEntry *cur;        for(SockMap_t::iterator i=lsocks_.begin(); i!=lsocks_.end(); i++) {      tsock = i->second.sock_;      if(tsock != sock && !tsock->blocked(SOCK_READ)) {	i->second.sock_->recv();      }    }  }}/* data packet received */int GnutellaAgent::upcall_recv(Socket *sock, PacketData *pktdata, Handler *) {   char teststring[LEAFCONN_LEN];  unsigned char *data = pktdata->data();  NodeAddr_t src;  int msg, res, cnt, block=FALSE, nhit;  SockMap_t::iterator si;  PendingConns_t::iterator pi;  //continue only if peer is currently online   //(bootstrap server is never in ONLINE state)  if(app_->state_==PS_OFFLINE && !gapp_->isBootserver_) {    msgHandle_->clear();    return pktdata->size();  }    long cur = (long)NOW;  if(sec_!=cur) {    rcvRate_->increment2(secRcv_);    sec_ = cur;    secRcv_ = 0;  }  secRcv_ ++;  src = sock->peer_.addr_;  msg = msgHandle_->parse(pktdata->size(), data);  //bootstrap server should only recv BOOTSTRAP msg  if (gapp_->isBootserver_ && msg!=MSG_BOOTSTRAP && msg!=MSG_BOOTCACHE_UPDATE) {	msgHandle_->clear();	return pktdata->size();	  }	  switch(msg) {  case MSG_BOOTSTRAP:    gapp_->BootstrapRequest(src);    break;  case MSG_BOOTCACHE_UPDATE:    gapp_->BootcacheUpdate(src);    break;  case MSG_BOOTSTRAP_RES:    cnt = *((int *)(data + BOOTSTRAP_RESLEN));    gapp_->BootstrapResult(cnt, (NodeAddr_t *)(data + BOOTSTRAP_RESLEN + sizeof(int)));    break;  case MSG_CONNREQ:    res = gapp_->ConnectionRequest(src, sock);    /*if connection request granted by the app, add the socket to socket list*/    if(res) {      SockMap_t::iterator si = lsocks_.find(src);      if(si==lsocks_.end()) {	SockEntry *newentry = new SockEntry(sock, src, SOCK_LEGACY);	lsocks_.insert(SockMap_t::value_type(src, *newentry));      } else {	debug_info("requested conn to %d from %d already exists\n", app_->addr_, src);      }    }          break;    case MSG_ULTRACONNREQ:    res = gapp_->UltraConnRequest(src, sock);    if(res) {      SockMap_t::iterator si = lsocks_.find(src);      if(si==lsocks_.end()) {	SockEntry *newentry = new SockEntry(sock, src, SOCK_ULTRA);	lsocks_.insert(SockMap_t::value_type(src, *newentry));      } else {	debug_info("requested conn to %d from %d already exists\n", app_->addr_, src);      }    }    break;  case MSG_LEAFCONNREQ:    res = gapp_->LeafConnRequest(src, sock);    if(res) {      SockMap_t::iterator si = lsocks_.find(src);      if(si==lsocks_.end()) {	SockEntry *newentry = new SockEntry(sock, src, SOCK_LEAF);	lsocks_.insert(SockMap_t::value_type(src, *newentry));      } else {	debug_info("requested conn to %d from %d already exists\n", app_->addr_, src);      }    }    break;  case MSG_CONNOK:    for(PendingConns_t::iterator i = conn_pending_.begin(); i!=conn_pending_.end(); i++) {      if(i->second.peer_==src) {	conn_pending_.erase(i);	break;      }    }    si = lsocks_.find(src);    if(si!=lsocks_.end()) {      // this could happen when two nodes try to connect to each other       // simultaneously      // ConnReq(A->B)      // ConnReq(B->A)      // ConnOK (B->A), B lsocks_.add()      // ConnOK (A->B), A lsocks_.add()    } else {      SockEntry *newentry = new SockEntry(sock, src, SOCK_LEGACY);      lsocks_.insert(SockMap_t::value_type(src, *newentry));    }    gapp_->ConnectSucceeded(src);     break;  case MSG_CONNREJ:    pi = conn_pending_.find(src);    if(pi!=conn_pending_.end()) {      gapp_->ConnectionRejected(src);      for(pi=conn_pending_.begin(); pi!=conn_pending_.end(); pi++) {	if(pi->second.peer_==src) {	  conn_pending_.erase(pi);	  	  sock->close();	  break;	}      }    }    break;  case MSG_PING:    if(find_desc(msgHandle_->header_.id_)!=NULL)       break;  //do not forward if DescID already seen    if(find_ping(msgHandle_->header_.id_, 1)!=-1)      break;  //do not forward if it originates from me    if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) {      block = forward(sock, pktdata, &(msgHandle_->header_));      if(block) {	msgHandle_->clear();	return 0;      }    }    gapp_->PingRequest(src, msgHandle_->header_.TTL_, msgHandle_->header_.id_);     break;  case MSG_PONG:    // PONG has to be in descriptor cache or outstanding PING cache     if(find_desc(msgHandle_->header_.id_)==NULL && find_ping(msgHandle_->header_.id_, 0)==-1)      break;    if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_)       block = backroute(pktdata, &(msgHandle_->header_));    gapp_->PongReply(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.pong_);    if(find_desc(msgHandle_->header_.id_)==NULL) {      pongcnt++;      if(pongcnt%100==0)	debug_info("pong cnt %d\n",pongcnt);    }    break;  case MSG_QUERY:     msgHandle_->header_.hops_++;    if(find_desc(msgHandle_->header_.id_)!=NULL)      break;    if(find_query(msgHandle_->header_.id_, 1)!=-1)      break;	//modify by zdh 04-04	if(gapp_->QueryRequest(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_))		break;    if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) {      block = forward(sock, pktdata, &(msgHandle_->header_));      if(block) {	msgHandle_->clear();	return 0;      }    }//06-04-03 modified by zdh//gapp_->QueryRequest(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_);    //gapp_->QueryRequest(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_);    break;  case MSG_QUERYHIT:  	msgHandle_->header_.hops_++;    if(find_desc(msgHandle_->header_.id_)==NULL && find_query(msgHandle_->header_.id_, 0)==-1)      break;    if(find_desc(msgHandle_->header_.id_)!=NULL && msgHandle_->header_.TTL_ > msgHandle_->header_.hops_)       backroute(pktdata, &(msgHandle_->header_));     //modify by zdh 04-03     gapp_->QueryHitReply(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.queryhit_);    //gapp_->QueryHitReply(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.queryhit_);    //we didn't forward the corresponding Query request    //we must be receiving it because we initiate the Query message    if(find_desc(msgHandle_->header_.id_)==NULL)       nqueryhit++; //20060322 Jackie modified   /* if (gapp_->segmentID_==gapp_->file_size_){	 debug_info(" The transfer for node :%d  is finished, altogether %d segments are transfered \n", gapp_->addr_,gapp_->file_size_);	 gapp_->segmentID_++;     	}else{     		//the current querying segment is hits

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -