📄 peer_agent.cc
字号:
/*-------------------------------------------------------------------------*//* Packet-level peer-to-peer and Gnutella simulator *//* Author: Qi He <http://www.cc.gatech.edu/~qhe> 01Aug2003 *//* $Revision:$ $Name:$ $Date:$ *//*-------------------------------------------------------------------------*/#include <stdarg.h>#include <stdlib.h>#include <sys/time.h>#include "random.h"#include "peer_agent.h"#include "ip.h"#include "math.h"#include "tcp-sock.h"using namespace std;static class PeerAppClass: public TclClass {public: PeerAppClass(): TclClass("PeerApp") {} TclObject* create(int argc, const char*const* argv) { return (new PeerApp(atoi(argv[4]))); }}class_peerapp;static class GnutellaAppClass: public TclClass {public: GnutellaAppClass(): TclClass("PeerApp/GnutellaApp") {} TclObject* create(int argc, const char*const* argv) { return (new GnutellaApp(atoi(argv[4]))); }}class_gnutellaapp;static class UltrapeerClass: public TclClass {public: UltrapeerClass(): TclClass("PeerApp/GnutellaApp/Ultrapeer") {} TclObject* create(int argc, const char*const* argv) { return (new Ultrapeer(atoi(argv[4]))); }}class_ultrapeer;static class LeafClass: public TclClass {public: LeafClass(): TclClass("PeerApp/GnutellaApp/Leaf") {} TclObject* create(int argc, const char*const* argv) { return (new Leaf(atoi(argv[4]))); }}class_leaf; static class PeerAgentClass: public TclClass {public: PeerAgentClass(): TclClass("SocketApp/PeerAgent") {} TclObject* create(int, const char*const* argv) { return (new PeerAgent(atoi(argv[4]))); }}class_peeragent;static class GnutellaAgentClass: public TclClass {public: GnutellaAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent") {} TclObject* create(int argc, const char*const* argv) { return (new GnutellaAgent(atoi(argv[4]))); }}class_gnutellaagent;static class UltraAgentClass: public TclClass {public: UltraAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent/UltraAgent") {} TclObject* create(int argc, const char*const* argv) { return (new UltraAgent(atoi(argv[4]))); }}class_ultraagent;static class LeafAgentClass: public TclClass {public: LeafAgentClass(): TclClass("SocketApp/PeerAgent/GnutellaAgent/LeafAgent") {} TclObject* create(int argc, const char*const* argv) { return (new LeafAgent(atoi(argv[4]))); }}class_leafagent;static class PeerSysClass: public TclClass {public: PeerSysClass(): TclClass("PeerSys") {} TclObject* create(int, const char*const* argv) { return (new PeerSys()); }}class_peersys;static class SmpBootServerClass: public TclClass {public: SmpBootServerClass(): TclClass("SocketApp/SmpBootServer") {} TclObject* create(int, const char*const* argv) { return (new SmpBootServer()); }}class_smpbootserver;static class PDNSBootServerClass: public TclClass {public: PDNSBootServerClass(): TclClass("SocketApp/SmpBootServer/PDNSBootServer") {} TclObject* create(int, const char*const* argv) { return (new PDNSBootServer(atoi(argv[4]))); }}class_pdnsbootserver;static void debug_info(char *format, ...) { va_list ap; if(getenv("GNUSIM_INFO")!=NULL) { va_start(ap, format); vfprintf(stdout, format, ap); va_end(ap); } fflush(NULL);};static void debug_my(char *format, ...) { va_list ap; //if(getenv("GNUSIM_INFO")!=NULL) { va_start(ap, format); vfprintf(stdout, format, ap); va_end(ap); //} fflush(NULL);};static void debug_warning(char *format, ...) { va_list ap; if(getenv("GNUSIM_WARNING")!=NULL) { va_start(ap, format); vfprintf(stderr, format, ap); va_end(ap); } fflush(NULL);};static void debug_stat(char *format, ...) { va_list ap; if(getenv("GNUSIM_NOSTAT")!=NULL) { va_start(ap, format); vfprintf(stderr, format, ap); va_end(ap); } fflush(NULL);};/* global variables used to keep track of some statistics */static int nquery = 0, nqueryhit = 0, nthru = 0, rquery = 0, squeryhit = 0;static long pongcnt = 0, nforceremove=0; static int Na1, Na2;static double Avg_Na1 = 0, Avg_Na2 = 0, Last_Chg1 = -1, Last_Chg2 = -1, TSysTime1 = 0, TSysTime2 = 0; static double pAvg_Na1 = 0, pAvg_Na2 = 0, pTime1=0, pTime2 = 0, Avg_Rsp = 0;#ifdef PDNSNsObject* GetLocalIP(ipaddr_t ipaddr);#endif/**********************************************************************//**** Part I: GnutellaAgent related ****//* base class PeerAgent */PeerAgent::PeerAgent(PeerApp *app) { app_ = app; firewalled_ = FALSE;}/**** GnutellaAgent ****/GnutellaAgent::GnutellaAgent(NodeAddr_t addr): PeerAgent(addr), rate_limit_(0), use_prio_(0) { conn_timer_ = new ConnTimer(this); msgHandle_ = new GnutellaMsg(addr); gc_ = new GC(this); init_id(); fwblock_ = new BasicStat(addr); bkblock_ = new BasicStat(addr); rcvRate_ = new BasicStat(addr); sec_ = 0; secRcv_ = 0; bind ("rate_limit_", &rate_limit_); bind ("use_prio_", &use_prio_);}GnutellaAgent::GnutellaAgent(GnutellaApp *app): PeerAgent(app) { conn_timer_ = new ConnTimer(this); app_ = app; gapp_ = (GnutellaApp *)app_; msgHandle_ = new GnutellaMsg(app_->addr_); init_id(); fwblock_ = new BasicStat(app_->addr_); bkblock_ = new BasicStat(app_->addr_); rcvRate_ = new BasicStat(app_->addr_); sec_ = 0; secRcv_ = 0; bind ("rate_limit_", &rate_limit_); bind ("use_prio_", &use_prio_);}void GnutellaAgent::init_id() {}// upcalls from NSSocket/* passive connection established */void GnutellaAgent::upcall_passconn(Socket *sock) { if(gapp_->isBootserver_) { NodeAddr_t src = sock->peer_.addr_; SockMap_t::iterator si = lsocks_.find(src); if(si==lsocks_.end()) { SockEntry *newentry = new SockEntry(sock, src, SOCK_BOOT); lsocks_.insert(SockMap_t::value_type(src, *newentry)); } }}/*outstanding socket connection request accepted */void GnutellaAgent::upcall_connected(Socket *sock) { NodeAddr_t src = sock->peer_.addr_; SockMap_t::iterator si = lsocks_.find(src); if(si!=lsocks_.end() && si->second.type_==SOCK_BOOT_WAIT) { si->second.type_ = SOCK_BOOT; lime_bootstrap(sock); return; } gnutella_req(sock); PendingConns_t::iterator centry = conn_pending_.find(sock->peer_.addr_); if(centry==conn_pending_.end()) { debug_warning("WARNING: upcall_connected() for non pending socket on %d\n", app_->addr_); return; } debug_info("upcall_connected() for pending socket with %d on %d\n", sock->peer_.addr_, app_->addr_); centry->second.state_ = TRANSPORT_CONNECTED;}/* socket being closed by peer */void GnutellaAgent::upcall_closing(Socket *sock) { SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_); if(si!=lsocks_.end()) { SockEntry *cur; for(SockMap_t::iterator i=lsocks_.begin(); i!=lsocks_.end(); i++) { if(i->second.peer_ == sock->peer_.addr_) { lsocks_.erase(i); gapp_->ConnectionClosed(sock->peer_.addr_); break; } } }}/* socket ready for sending */void GnutellaAgent::upcall_send(Socket *sock) { SockMap_t::iterator si = lsocks_.find(sock->peer_.addr_); Socket *tsock=NULL; if(si!=lsocks_.end()) { SockEntry *cur; for(SockMap_t::iterator i=lsocks_.begin(); i!=lsocks_.end(); i++) { tsock = i->second.sock_; if(tsock != sock && !tsock->blocked(SOCK_READ)) { i->second.sock_->recv(); } } }}/* data packet received */int GnutellaAgent::upcall_recv(Socket *sock, PacketData *pktdata, Handler *) { char teststring[LEAFCONN_LEN]; unsigned char *data = pktdata->data(); NodeAddr_t src; int msg, res, cnt, block=FALSE, nhit; SockMap_t::iterator si; PendingConns_t::iterator pi; //continue only if peer is currently online //(bootstrap server is never in ONLINE state) if(app_->state_==PS_OFFLINE && !gapp_->isBootserver_) { msgHandle_->clear(); return pktdata->size(); } long cur = (long)NOW; if(sec_!=cur) { rcvRate_->increment2(secRcv_); sec_ = cur; secRcv_ = 0; } secRcv_ ++; src = sock->peer_.addr_; msg = msgHandle_->parse(pktdata->size(), data); //bootstrap server should only recv BOOTSTRAP msg if (gapp_->isBootserver_ && msg!=MSG_BOOTSTRAP && msg!=MSG_BOOTCACHE_UPDATE) { msgHandle_->clear(); return pktdata->size(); } switch(msg) { case MSG_BOOTSTRAP: gapp_->BootstrapRequest(src); break; case MSG_BOOTCACHE_UPDATE: gapp_->BootcacheUpdate(src); break; case MSG_BOOTSTRAP_RES: cnt = *((int *)(data + BOOTSTRAP_RESLEN)); gapp_->BootstrapResult(cnt, (NodeAddr_t *)(data + BOOTSTRAP_RESLEN + sizeof(int))); break; case MSG_CONNREQ: res = gapp_->ConnectionRequest(src, sock); /*if connection request granted by the app, add the socket to socket list*/ if(res) { SockMap_t::iterator si = lsocks_.find(src); if(si==lsocks_.end()) { SockEntry *newentry = new SockEntry(sock, src, SOCK_LEGACY); lsocks_.insert(SockMap_t::value_type(src, *newentry)); } else { debug_info("requested conn to %d from %d already exists\n", app_->addr_, src); } } break; case MSG_ULTRACONNREQ: res = gapp_->UltraConnRequest(src, sock); if(res) { SockMap_t::iterator si = lsocks_.find(src); if(si==lsocks_.end()) { SockEntry *newentry = new SockEntry(sock, src, SOCK_ULTRA); lsocks_.insert(SockMap_t::value_type(src, *newentry)); } else { debug_info("requested conn to %d from %d already exists\n", app_->addr_, src); } } break; case MSG_LEAFCONNREQ: res = gapp_->LeafConnRequest(src, sock); if(res) { SockMap_t::iterator si = lsocks_.find(src); if(si==lsocks_.end()) { SockEntry *newentry = new SockEntry(sock, src, SOCK_LEAF); lsocks_.insert(SockMap_t::value_type(src, *newentry)); } else { debug_info("requested conn to %d from %d already exists\n", app_->addr_, src); } } break; case MSG_CONNOK: for(PendingConns_t::iterator i = conn_pending_.begin(); i!=conn_pending_.end(); i++) { if(i->second.peer_==src) { conn_pending_.erase(i); break; } } si = lsocks_.find(src); if(si!=lsocks_.end()) { // this could happen when two nodes try to connect to each other // simultaneously // ConnReq(A->B) // ConnReq(B->A) // ConnOK (B->A), B lsocks_.add() // ConnOK (A->B), A lsocks_.add() } else { SockEntry *newentry = new SockEntry(sock, src, SOCK_LEGACY); lsocks_.insert(SockMap_t::value_type(src, *newentry)); } gapp_->ConnectSucceeded(src); break; case MSG_CONNREJ: pi = conn_pending_.find(src); if(pi!=conn_pending_.end()) { gapp_->ConnectionRejected(src); for(pi=conn_pending_.begin(); pi!=conn_pending_.end(); pi++) { if(pi->second.peer_==src) { conn_pending_.erase(pi); sock->close(); break; } } } break; case MSG_PING: if(find_desc(msgHandle_->header_.id_)!=NULL) break; //do not forward if DescID already seen if(find_ping(msgHandle_->header_.id_, 1)!=-1) break; //do not forward if it originates from me if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) { block = forward(sock, pktdata, &(msgHandle_->header_)); if(block) { msgHandle_->clear(); return 0; } } gapp_->PingRequest(src, msgHandle_->header_.TTL_, msgHandle_->header_.id_); break; case MSG_PONG: // PONG has to be in descriptor cache or outstanding PING cache if(find_desc(msgHandle_->header_.id_)==NULL && find_ping(msgHandle_->header_.id_, 0)==-1) break; if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) block = backroute(pktdata, &(msgHandle_->header_)); gapp_->PongReply(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.pong_); if(find_desc(msgHandle_->header_.id_)==NULL) { pongcnt++; if(pongcnt%100==0) debug_info("pong cnt %d\n",pongcnt); } break; case MSG_QUERY: msgHandle_->header_.hops_++; if(find_desc(msgHandle_->header_.id_)!=NULL) break; if(find_query(msgHandle_->header_.id_, 1)!=-1) break; //modify by zdh 04-04 if(gapp_->QueryRequest(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_)) break; if(msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) { block = forward(sock, pktdata, &(msgHandle_->header_)); if(block) { msgHandle_->clear(); return 0; } }//06-04-03 modified by zdh//gapp_->QueryRequest(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_); //gapp_->QueryRequest(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.query_->criteria_, msgHandle_->header_.id_); break; case MSG_QUERYHIT: msgHandle_->header_.hops_++; if(find_desc(msgHandle_->header_.id_)==NULL && find_query(msgHandle_->header_.id_, 0)==-1) break; if(find_desc(msgHandle_->header_.id_)!=NULL && msgHandle_->header_.TTL_ > msgHandle_->header_.hops_) backroute(pktdata, &(msgHandle_->header_)); //modify by zdh 04-03 gapp_->QueryHitReply(src, msgHandle_->header_.hops_, (char *)msgHandle_->payload_.queryhit_); //gapp_->QueryHitReply(src, msgHandle_->header_.TTL_, (char *)msgHandle_->payload_.queryhit_); //we didn't forward the corresponding Query request //we must be receiving it because we initiate the Query message if(find_desc(msgHandle_->header_.id_)==NULL) nqueryhit++; //20060322 Jackie modified /* if (gapp_->segmentID_==gapp_->file_size_){ debug_info(" The transfer for node :%d is finished, altogether %d segments are transfered \n", gapp_->addr_,gapp_->file_size_); gapp_->segmentID_++; }else{ //the current querying segment is hits
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -