📄 nssocket_prio.cc
字号:
/*-------------------------------------------------------------------------*//* Priority Message Queues on Gnutella servents *//* Author: Qi He <http://www.cc.gatech.edu/~qhe> 01Aug2003 *//* $Revision:$ $Name:$ $Date:$ *//*-------------------------------------------------------------------------*/#include "nssocket_prio.h"using namespace std;static class PrioSocketClass: public TclClass {public: PrioSocketClass(): TclClass("Application/AdvwTcpApplication/NSSocket/PrioSocket") {} TclObject* create(int, const char*const*) { return (new PrioSocket()); }}class_priosocket;static class QueSocketClass: public TclClass {public: QueSocketClass(): TclClass("Application/AdvwTcpApplication/NSSocket/QueSocket") {} TclObject* create(int, const char*const*) { return (new QueSocket()); }}class_quesocket;QueSocket::QueSocket() { avail_len_ = 0; avail_pkt_ = 0;}int QueSocket::upcall_recv(PacketData *pktdata) { unsigned char *data = pktdata->data(); if (avail_len_ > RBUF_LIMIT) { return pktdata->size(); } PacketData *newcopy = (PacketData *)pktdata->copy(); PrioPkt_t newentry; newentry.read_ = FALSE; newentry.data_ = newcopy; pktqueue_.insert(pktqueue_.end(), newentry); avail_pkt_ ++; avail_len_ += pktdata->size(); PrioPktQueue::iterator pi; for(pi=pktqueue_.begin(); pi!=pktqueue_.end(); pi++) { int res = app_->upcall_recv(this, (PacketData *)(pi->data_), NULL); if(res==0) { break; } else { pi->read_ = TRUE; } } while(!pktqueue_.empty()) { pi = pktqueue_.begin(); if(pi->read_==TRUE) { avail_len_ -= pi->data_->size(); delete pi->data_; pktqueue_.erase(pi); avail_pkt_--; } else break; } return pktdata->size();}void QueSocket::recv() { PrioPktQueue::iterator pi; int consumed = FALSE; for(pi=pktqueue_.begin(); pi!=pktqueue_.end(); pi++) { int res = app_->upcall_recv(this, (PacketData *)(pi->data_), NULL); if(res==0) break; else pi->read_ = TRUE; } while(!pktqueue_.empty()) { pi = pktqueue_.begin(); if(pi->read_==TRUE) { avail_len_ -= pi->data_->size(); delete pi->data_; pktqueue_.erase(pi); avail_pkt_ --; consumed = TRUE; } else break; } if(consumed) ((SocketTcp *)agent_)->tcp_command_block_receive(0); }PrioSocket::PrioSocket() { avail_len_ = 0; avail_pkt_ = 0; isPrio_ = TRUE;}int PrioSocket::prio_drop(int type, int hops) { for(int i = NUM_PRIOLEVELS -1; i > type; i--) { for(int j= INIT_TTL; j>=0; j--) { if(!pktqueue_[i][j].empty()) { PrioPkt_t last = pktqueue_[i][j].back(); avail_pkt_ --; avail_len_ -= last.data_->size(); pktqueue_[i][j].pop_back(); return TRUE; } } } for(int j = INIT_TTL; j > hops ; j--) { if(!pktqueue_[type][j].empty()) { PrioPkt_t last = pktqueue_[type][j].back(); avail_pkt_ --; avail_len_ -= last.data_->size(); pktqueue_[type][j].pop_back(); return TRUE; } } return FALSE;}int PrioSocket::send(int len, PacketData *pktdata) { int res; if(state_ == CONNECTED) { PacketData *newcopy = (PacketData *)pktdata->copy(); res = ((SocketTcp *)agent_)->sendmsg(pktdata, len); if(res==-1||res==0) insert(newcopy); delete (newcopy); return len; } switch(state_) { case CONNECTING: case UNCONNECTED: errno_ = ENOTCONN; break; default: break; } if(pktdata!=NULL) { delete pktdata; } return len;}int PrioSocket::insert(PacketData *pktdata) { unsigned char *data = pktdata->data(); int len = pktdata->size(), index , hops; unsigned char *tdata = (unsigned char *)malloc(sizeof(unsigned char) * len); memcpy(tdata, data, len); int msgtype = ((GnutellaAgent *)app_)->msgHandle_->parse(len, tdata); free(tdata); if(msgtype <1) return len; if(msgtype < MSG_QUERYHIT) { index = 0; hops = 0; } index = msgtype - MSG_QUERYHIT + 1; hops = ((GnutellaAgent *)app_)->msgHandle_->header_.hops_; if (msgtype == MSG_BOOTCACHE_UPDATE) hops = 0; if(index<0 || index >= NUM_PRIOLEVELS) { printf("wrong index\n"); fflush(NULL); exit(1); } if(hops<0 || hops > INIT_TTL) { printf("wrong hops\n"); fflush(NULL); exit(1); } if(avail_pkt_ > PBUF_LIMIT) { if(!prio_drop(index, hops)) { return pktdata->size(); } } int done = FALSE; PrioPktQueue *qptr = &(pktqueue_[index][hops]); PacketData *newcopy = (PacketData *)pktdata->copy(); PrioPkt_t newentry; newentry.read_ = FALSE; newentry.data_ = newcopy; qptr->insert(qptr->end(), newentry); avail_pkt_ ++; avail_len_ += pktdata->size(); for(int i=0; i< NUM_PRIOLEVELS && !done; i++) { for(int j=0; j< INIT_TTL+1 && !done; j++) { PrioPktQueue::iterator pi; for(pi=pktqueue_[i][j].begin(); pi!=pktqueue_[i][j].end(); pi++) { if(pi->read_) { printf("warning 1:: data read but not deleted 1\n"); } PacketData *tdata = (PacketData *)(pi->data_)->copy(); int res = ((SocketTcp *)agent_)->sendmsg(tdata, tdata->size()); if(res<=0) { done = TRUE; break; } else { pi->read_ = TRUE; } } while(!pktqueue_[i][j].empty()) { pi = pktqueue_[i][j].begin(); if(pi->read_==TRUE) { avail_len_ -= pi->data_->size(); delete pi->data_; pktqueue_[i][j].erase(pi); avail_pkt_ --; } else break; } } } return pktdata->size();}void PrioSocket::upcall_send() { int done = FALSE; if(state_==CLOSING) return; state_ = CONNECTED; for(int i=0; i< NUM_PRIOLEVELS && !done; i++) { for(int j=0; j< INIT_TTL+1 && !done; j++) { PrioPktQueue::iterator pi; for(pi=pktqueue_[i][j].begin(); pi!=pktqueue_[i][j].end(); pi++) { if(pi->read_) { printf("warning 2:: data read but not deleted\n"); } PacketData *tdata = (PacketData *)(pi->data_)->copy(); int res = ((SocketTcp *)agent_)->sendmsg(tdata, tdata->size()); if(res<=0) { done = TRUE; break; } else { pi->read_ = TRUE; } } while(!pktqueue_[i][j].empty()) { pi = pktqueue_[i][j].begin(); if(pi->read_) { avail_len_ -= pi->data_->size(); delete pi->data_; pktqueue_[i][j].erase(pi); avail_pkt_ --; } else break; } } } if(app_) app_->upcall_send(this);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -