📄 peer_agent.cc
字号:
if (gapp_->segmentID_==msgHandle_->payload_.queryhit_->segmentID_){ gapp_->update_segmentcache(msgHandle_->payload_.queryhit_->segmentID_,msgHandle_->header_.hops_); gapp_->search(NULL); } }*/ break; default: debug_warning("WARNING: Unknown Message Type\n"); break; }; msgHandle_->clear(); return pktdata->size(); }int GnutellaAgent::command(int argc, const char*const* argv) { Tcl &tcl = Tcl::instance(); if(argc==3) { if(strcmp(argv[1], "set-app")==0) { GnutellaApp *app = (GnutellaApp *)tcl.lookup(argv[2]); if(app!=NULL) { gapp_ = app; app_ = gapp_; return TCL_OK; } return TCL_ERROR; } } if(argc==2) { if(strcmp(argv[1], "statistics")==0) { printf("I will printf!!\n"); statistics(); return TCL_OK; } } return TCL_ERROR;}void GnutellaAgent::statistics() { if (!gapp_->isBootserver_) { bkblock_->print(0); fwblock_->print(1); gapp_->degree_->print(2); rcvRate_->print(3); }}// APIs to GnutellaApp/* bootstrap request */void GnutellaAgent::Bootstrap(NodeAddr_t peer) { Socket *cur=NULL; SockMap_t::iterator si = lsocks_.find(peer); if(si==lsocks_.end()) { Tcl &tcl = Tcl::instance();#ifdef PDNS tcl.evalf("[Simulator instance] create-sock %d %s -1 1 %d %d", app_->addr_, name(), use_prio_, rate_limit_); cur = (Socket *)tcl.lookup(tcl.result());#else tcl.evalf("[Simulator instance] create-sock %d %s -1 0 %d %d", app_->addr_, name(), use_prio_, rate_limit_); cur = (Socket *)tcl.lookup(tcl.result());#endif if(cur==NULL) { debug_warning("WARNING: unable to create-sock to %d\n", peer); return; } cur->connect(peer, LISTEN_PORT); SockEntry *newentry = new SockEntry(cur, peer, SOCK_BOOT_WAIT); lsocks_.insert(SockMap_t::value_type(peer, *newentry)); }}/* respond to a bootstrap request*/void GnutellaAgent::Bootstrap_Reply(NodeAddr_t peer, int cnt, NodeAddr_t *iplist) { PacketData *content; unsigned char *dataptr; char bootstr[BOOTSTRAP_RESLEN]; int len; SockMap_t::iterator si = lsocks_.find(peer); if(si==lsocks_.end()) { debug_warning("WARNING: error sending bootstrap through unconnected peer\n"); return; } len = sizeof(int) + sizeof(NodeAddr_t) * cnt + BOOTSTRAP_RESLEN; content = new PacketData(len); dataptr = content->data(); strcpy((char *)bootstr, GNUTELLA_BOOTSTRAP_RES); memcpy(dataptr, bootstr, BOOTSTRAP_RESLEN); dataptr = dataptr + BOOTSTRAP_RESLEN; *((int *)dataptr) = cnt; memcpy((void *)&dataptr[4], (void *)iplist, sizeof(NodeAddr_t)*cnt); si->second.sock_->send(len, content);}/* request to connect to a peer*/int GnutellaAgent::Connect(NodeAddr_t peer, double timeout) { Socket *cur=NULL; PendingConnEntry *newconn=NULL; SockMap_t::iterator si = lsocks_.find(peer); PendingConns_t::iterator pi = conn_pending_.find(peer); if(si!=lsocks_.end()) { debug_info("existing socket from %d to %d not removed\n", app_->addr_, peer); return FALSE; } if(pi==conn_pending_.end()) { Tcl &tcl = Tcl::instance();#ifdef PDNS tcl.evalf("[Simulator instance] create-sock %d %s -1 1 %d %d", gapp_->addr_, name(), use_prio_, rate_limit_);#else tcl.evalf("[Simulator instance] create-sock %d %s -1 0 %d %d", gapp_->addr_, name(), use_prio_, rate_limit_);#endif cur = (Socket *)tcl.lookup(tcl.result()); if(cur==NULL) { debug_warning("WARNING: unable to create-sock to\n", peer); return FALSE; } cur->connect(peer, LISTEN_PORT); newconn = new PendingConnEntry(peer, timeout, NOW, NOT_CONNECTED); } else return FALSE; //add to pending connection list conn_pending_.insert(PendingConns_t::value_type(peer, *newconn)); if(conn_pending_.size()==1) conn_timer_->resched(CONN_TIMEOUT); return TRUE;}/* disconnect a peer (peers) */void GnutellaAgent::Disconnect(NodeAddr_t peer) { SockMap_t::iterator i; if(peer==-1) { //disconnect all for(i=lsocks_.begin(); i != lsocks_.end(); i++) { if(i->second.type_!=SOCK_BOOT_WAIT) i->second.sock_->close(); } while(!lsocks_.empty()) { i = lsocks_.begin(); lsocks_.erase(i); } return; } //disconnect a specific peer i = lsocks_.find(peer); if(i!=lsocks_.end()) { i->second.sock_->close(); lsocks_.erase(i); }}/* send PING to a peer */void GnutellaAgent::Ping(NodeAddr_t peer, int ttl) { PacketData *pkt; SockMap_t::iterator si = lsocks_.find(peer); if(si==lsocks_.end()) { debug_warning("WARNING: %d error pinging unconnected peer %d\n", app_->addr_, peer); return; } pkt = msgHandle_->newpacket(NULL, PING, ttl, 0, 0, NULL); if(pkt) { si->second.sock_->send(DESC_HDRLEN, pkt); PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_PING); pending_req_.insert(pending_req_.end(), *newquery); }}/* respond to a PING with a specific descriptor ID */void GnutellaAgent::Pong(NodeAddr_t peer, int ttl, int cnt, NodeAddr_t *iplist, int *size, int *fnum, char *id) { tPong pong; PacketData *pkt=NULL; SockMap_t::iterator si = lsocks_.find(peer); if(si==lsocks_.end()) { //sending PONG through unconnected peer could happen return; } for(int i=0; i<cnt; i++) { pong.port_ = 0; pong.addr_ = iplist[i]; pong.file_shared_ = fnum[i]; pong.byte_shared_ = size[i]; pkt = msgHandle_->newpacket(id, PONG, INIT_TTL, 0, PONG_LEN, (char *)&pong); if(pkt) si->second.sock_->send(DESC_HDRLEN + PONG_LEN, pkt); }}/* send a Query */void GnutellaAgent::Query(NodeAddr_t peer, Word_t minSpeed, char *search) { SockMap_t::iterator si; char *data; PacketData *pkt=NULL; int len = strlen(search) + 1; int additional_len=0; char *id = NULL; data = (char *)malloc(len+sizeof(Word_t)); memcpy((void *)data, (void *)&minSpeed, sizeof(Word_t)); additional_len +=sizeof(Word_t); if (len>1) strcpy(&data[additional_len], search); nquery++; if(peer==-1) { //send to all connected peers for(si = lsocks_.begin(); si != lsocks_.end(); si++) { pkt = msgHandle_->newpacket(id, QUERY, INIT_QUERY_TTL, 0, len + additional_len, data); if(pkt) { si->second.sock_->send(len+additional_len+DESC_HDRLEN, pkt); if(id==NULL) id = msgHandle_->header_.id_; } } PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_QUERY); pending_req_.insert(pending_req_.end(), *newquery); free(data); return; } //or, send to a specific peer si = lsocks_.find(peer); if(si==lsocks_.end()) { debug_warning("WARNING: error sending Query to unconnected peer %d\n", peer); free(data); return; } pkt = msgHandle_->newpacket(NULL, QUERY, INIT_QUERY_TTL, 0, len + 2, data); free(data); if(pkt) { si->second.sock_->send(len+2+DESC_HDRLEN, pkt); PendingReqEntry *newquery = new PendingReqEntry(msgHandle_->header_.id_, NOW, REQ_QUERY); pending_req_.insert(pending_req_.end(), *newquery); }}/* respond to a Query with QueryHit */void GnutellaAgent::QueryHit(NodeAddr_t peer, Byte_t number, NodeAddr_t responder, int speed, char *results, char *id,int ttl) { char *data; PacketData *pkt=NULL; tQueryHit queryhit; int len, tlen, res=-1; if(results) len = strlen(results) + 1; else len = 0; SockMap_t::iterator si = lsocks_.find(peer); if(si==lsocks_.end()) { // sending QueryHit through unconnected peer could happen return; } queryhit.port_ = LISTEN_PORT; queryhit.num_hits_ = number; queryhit.addr_ = responder; queryhit.speed_ = speed; queryhit.segmentID_=*results; if(!firewalled_) data = (char *)malloc(QUERYHIT_FIXLEN + len); else data = (char *)malloc(QUERYHIT_FIXLEN + len + 16); memcpy((void *)data, (void *)&queryhit, QUERYHIT_FIXLEN); if(len>0) memcpy((void *)&data[QUERYHIT_FIXLEN], (void *)results, len); if(firewalled_) memcpy((void *)&data[QUERYHIT_FIXLEN + len], mySrvId_, 16); if(!firewalled_) tlen = QUERYHIT_FIXLEN + len; else tlen = QUERYHIT_FIXLEN + len + 16;//modify by zdh 04-03pkt = msgHandle_->newpacket(id, QUERYHIT, ttl, 0, tlen, data); //pkt = msgHandle_->newpacket(id, QUERYHIT, INIT_TTL, 0, tlen, data); if(pkt) res = si->second.sock_->send(DESC_HDRLEN + tlen, pkt); if(res!=-1) squeryhit++; free(data);}//called within GnutellaAgent/* compose and send LimeWire-like bootstrap request */void GnutellaAgent::lime_bootstrap(Socket *sock) { PacketData *content; unsigned char *dataptr; char bootstr[BOOTSTRAP_LEN]; content = new PacketData(BOOTSTRAP_LEN-1); dataptr = content->data(); strcpy((char *)bootstr, GNUTELLA_BOOTSTRAP); memcpy(dataptr, bootstr, BOOTSTRAP_LEN-1); sock->send(BOOTSTRAP_LEN-1, content);}/* agent send out cache update messages to connected bootstrap servers*/void GnutellaAgent::UpdateBootcache() { for(SockMap_t::iterator si = lsocks_.begin(); si != lsocks_.end(); si++) { if(si->second.type_==SOCK_BOOT) { PacketData *content; unsigned char *dataptr; char bootstr[CACHEUPDATE_LEN]; content = new PacketData(CACHEUPDATE_LEN-1); dataptr = content->data(); strcpy((char *)bootstr, BOOTCACHE_UPDATE); memcpy(dataptr, bootstr, CACHEUPDATE_LEN-1); si->second.sock_->send(CACHEUPDATE_LEN-1, content); } }}/* compose and send Gnutella Connection OK msg */void GnutellaAgent::gnutella_ok(Socket *sock) { PacketData *content; unsigned char *dataptr; char connstr[CONNOK_LEN]; content = new PacketData(CONNOK_LEN-1); dataptr = content->data(); strcpy((char *)connstr, GNUTELLA_OK); memcpy(dataptr, connstr, CONNOK_LEN-1); sock->send(CONNOK_LEN-1, content);}/* compose and send Gnutella connection rejection msg */void GnutellaAgent::gnutella_reject(Socket *sock) { PacketData *content; unsigned char *dataptr; char connstr[CONNREJ_LEN]; content = new PacketData(CONNREJ_LEN-1); dataptr = content->data(); strcpy((char *)connstr, GNUTELLA_REJ); memcpy(dataptr, connstr, CONNREJ_LEN-1); sock->send(CONNREJ_LEN-1, content);}/* compose and send Gnutella Connection Request msg */void GnutellaAgent::gnutella_req(Socket *sock) { PacketData *content; unsigned char *dataptr; char connstr[CONN_LEN]; content = new PacketData(CONN_LEN-1); dataptr = content->data(); strcpy((char *)connstr, GNUTELLA_CONN); memcpy(dataptr, connstr, CONN_LEN-1); sock->send(CONN_LEN-1, content); if(conn_pending_.size()==1) conn_timer_->resched(CONN_TIMEOUT);}/* Conn timeout handler, notify GnutellaApp of requests that have timed out*/void GnutellaAgent::conn_timeout() { double ctime; ctime = NOW; for(PendingConns_t::iterator pi = conn_pending_.begin(); pi != conn_pending_.end(); pi++) { if(ctime - pi->second.start_ > pi->second.timeout_ && pi->second.state_==TRANSPORT_CONNECTED) gapp_->ConnectionTimeout(pi->second.peer_); }}/* find the incoming socket matching a descriptor ID*/Socket *GnutellaAgent::find_desc(char *descid) { for(DescMap_t::iterator di=desc_cache_.begin(); di !=desc_cache_.end(); di++) { if(memcmp(descid, di->id_, 16)==0) { return di->sock_; } }; return NULL;}/* find the outstanding Query matching a descriptor ID */int GnutellaAgent::find_query(char *descid, int poll) { int cnt = -1; double ctime = NOW, tmp, delay; for(ReqList_t::iterator di=pending_req_.begin(); di !=pending_req_.end(); di++) { if(di->type_==REQ_QUERY && memcmp(descid, di->id_, 16)==0) { cnt = di->replycnt_; if(poll) //poll only, do not update replycnt_ return cnt; if(cnt==0) { delay = ctime - di->tstamp_; tmp = (double)nthru * Avg_Rsp + delay; nthru++; Avg_Rsp = tmp/(double)nthru; debug_stat("QueryHit %d %d %d %d %f %f %d %d\n", nquery, nqueryhit, nthru, squeryhit, Avg_Rsp, delay, rquery, gapp_->lpeers_.size()); fflush(NULL); } di->replycnt_++; //if(di->replycnt_ > 20) //remove Query if there are already many replies //modify by zdh 04-04 if(di->replycnt_!=0) pending_req_.erase(di); return cnt; } }; return -1;}/* find the outstanding Ping matching a descriptor ID */int GnutellaAgent::find_ping(char *descid, int poll) { int cnt = -1; for(ReqList_t::iterator di=pending_req_.begin(); di !=pending_req_.end(); di++) { if(di->type_==REQ_PING && memcmp(descid, di->id_, 16)==0) { cnt = di->replycnt_; if(!poll) di->replycnt_++; return cnt; } }; return -1;}/* forward a PING or Query message */int GnutellaAgent::forward(Socket *incoming, PacketData *data, desc_hdr *hdr) { PacketData *newdata=NULL; DescEntry* newentry=NULL; int bcnt=0, totalout=0; int fcnt=0; for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) { if(si->second.sock_!=incoming) { totalout++; if(si->second.sock_->blocked(SOCK_WRITE)) { bcnt++; fwblock_->increment(); } } } for(SockMap_t::iterator si = lsocks_.begin(); si!=lsocks_.end(); si++) { if(si->second.sock_!=incoming) { if(!si->second.sock_->blocked(SOCK_WRITE)) { newdata = new PacketData(*data); si->second.sock_->send(data->size(), newdata); } } } newentry = new DescEntry(hdr->id_, incoming, NOW); desc_cache_.insert(desc_cache_.end(), *newentry); return FALSE;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -