⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nssocket_prio.cc

📁 升级版的p2p文件共享linux环境ns2仿真软件
💻 CC
字号:
/*-------------------------------------------------------------------------*//* Priority Message Queues on Gnutella servents                            *//* Author: Qi He <http://www.cc.gatech.edu/~qhe> 01Aug2003                 *//* $Revision:$ $Name:$ $Date:$                                             *//*-------------------------------------------------------------------------*/#include "nssocket_prio.h"using namespace std;static class PrioSocketClass: public TclClass {public:  PrioSocketClass(): TclClass("Application/AdvwTcpApplication/NSSocket/PrioSocket") {}  TclObject* create(int, const char*const*) {    return (new PrioSocket());  }}class_priosocket;static class QueSocketClass: public TclClass {public:  QueSocketClass(): TclClass("Application/AdvwTcpApplication/NSSocket/QueSocket") {}  TclObject* create(int, const char*const*) {    return (new QueSocket());  }}class_quesocket;QueSocket::QueSocket() {  avail_len_ = 0;  avail_pkt_ = 0;}int QueSocket::upcall_recv(PacketData *pktdata) {  unsigned char *data = pktdata->data();  if (avail_len_ > RBUF_LIMIT) {    return pktdata->size();  }  PacketData *newcopy = (PacketData *)pktdata->copy();  PrioPkt_t newentry;  newentry.read_ = FALSE;  newentry.data_ = newcopy;  pktqueue_.insert(pktqueue_.end(), newentry);  avail_pkt_ ++;  avail_len_ +=  pktdata->size();    PrioPktQueue::iterator pi;  for(pi=pktqueue_.begin(); pi!=pktqueue_.end(); pi++) {    int res = app_->upcall_recv(this, (PacketData *)(pi->data_), NULL);    if(res==0) {      break;    }    else {      pi->read_ = TRUE;     }  }  while(!pktqueue_.empty()) {    pi = pktqueue_.begin();    if(pi->read_==TRUE) {      avail_len_ -= pi->data_->size();      delete pi->data_;      pktqueue_.erase(pi);      avail_pkt_--;    } else      break;  }  return pktdata->size();}void QueSocket::recv() {  PrioPktQueue::iterator pi;  int consumed = FALSE;  for(pi=pktqueue_.begin(); pi!=pktqueue_.end(); pi++) {    int res = app_->upcall_recv(this, (PacketData *)(pi->data_), NULL);    if(res==0)       break;    else       pi->read_ = TRUE;   }  while(!pktqueue_.empty()) {    pi = pktqueue_.begin();    if(pi->read_==TRUE) {      avail_len_ -= pi->data_->size();      delete pi->data_;      pktqueue_.erase(pi);      avail_pkt_ --;      consumed = TRUE;    } else      break;  }  if(consumed)    ((SocketTcp *)agent_)->tcp_command_block_receive(0); }PrioSocket::PrioSocket() {  avail_len_ = 0;  avail_pkt_ = 0;  isPrio_ = TRUE;}int PrioSocket::prio_drop(int type, int hops) {  for(int i = NUM_PRIOLEVELS -1; i > type; i--) {    for(int j= INIT_TTL; j>=0; j--) {      if(!pktqueue_[i][j].empty()) {	PrioPkt_t last = pktqueue_[i][j].back();	avail_pkt_ --;	avail_len_ -= last.data_->size();	pktqueue_[i][j].pop_back();	return TRUE;      }    }  }  for(int j = INIT_TTL; j > hops ; j--) {    if(!pktqueue_[type][j].empty()) {      PrioPkt_t last = pktqueue_[type][j].back();      avail_pkt_ --;      avail_len_ -= last.data_->size();      pktqueue_[type][j].pop_back();      return TRUE;    }  }  return FALSE;}int PrioSocket::send(int len, PacketData *pktdata) {  int res;  if(state_ == CONNECTED) {    PacketData *newcopy = (PacketData *)pktdata->copy();    res = ((SocketTcp *)agent_)->sendmsg(pktdata, len);    if(res==-1||res==0)       insert(newcopy);    delete (newcopy);    return len;  }  switch(state_) {  case CONNECTING:  case UNCONNECTED:    errno_ = ENOTCONN;    break;      default:     break;  }  if(pktdata!=NULL) {    delete pktdata;  }  return len;}int PrioSocket::insert(PacketData *pktdata) {  unsigned char *data = pktdata->data();  int len = pktdata->size(), index , hops;  unsigned char *tdata = (unsigned char *)malloc(sizeof(unsigned char) * len);  memcpy(tdata, data, len);  int msgtype = ((GnutellaAgent *)app_)->msgHandle_->parse(len, tdata);  free(tdata);  if(msgtype <1)	return len;  if(msgtype < MSG_QUERYHIT) {      index = 0;      hops = 0;   }  index = msgtype - MSG_QUERYHIT + 1;  hops = ((GnutellaAgent *)app_)->msgHandle_->header_.hops_;  if (msgtype == MSG_BOOTCACHE_UPDATE) 	hops = 0;  if(index<0 || index >= NUM_PRIOLEVELS) {    printf("wrong index\n");    fflush(NULL);    exit(1);  }  if(hops<0 || hops > INIT_TTL) {    printf("wrong hops\n");    fflush(NULL);    exit(1);  }  if(avail_pkt_ > PBUF_LIMIT) {    if(!prio_drop(index, hops)) {      return pktdata->size();    }  }  int done = FALSE;  PrioPktQueue *qptr = &(pktqueue_[index][hops]);  PacketData *newcopy = (PacketData *)pktdata->copy();  PrioPkt_t newentry;  newentry.read_ = FALSE;  newentry.data_ = newcopy;  qptr->insert(qptr->end(), newentry);  avail_pkt_ ++;  avail_len_ +=  pktdata->size();  for(int i=0; i< NUM_PRIOLEVELS && !done; i++) {    for(int j=0; j< INIT_TTL+1 && !done; j++) {      PrioPktQueue::iterator pi;      for(pi=pktqueue_[i][j].begin(); pi!=pktqueue_[i][j].end(); pi++) {	if(pi->read_) {	  printf("warning 1:: data read but not deleted 1\n");	}	PacketData *tdata = (PacketData *)(pi->data_)->copy();	int res = ((SocketTcp *)agent_)->sendmsg(tdata, tdata->size());	if(res<=0) {	  done = TRUE;	  break;	} else {	  pi->read_ = TRUE;	}      }      while(!pktqueue_[i][j].empty()) {	pi = pktqueue_[i][j].begin();	if(pi->read_==TRUE) {	  avail_len_ -= pi->data_->size();	  delete pi->data_;	  pktqueue_[i][j].erase(pi);	  avail_pkt_ --;	} else	  break;      }    }  }    return pktdata->size();}void PrioSocket::upcall_send() {  int done = FALSE;  if(state_==CLOSING)    return;  state_ = CONNECTED;  for(int i=0; i< NUM_PRIOLEVELS && !done; i++) {    for(int j=0; j< INIT_TTL+1 && !done; j++) {      PrioPktQueue::iterator pi;      for(pi=pktqueue_[i][j].begin(); pi!=pktqueue_[i][j].end(); pi++) {	if(pi->read_) {	  printf("warning 2:: data read but not deleted\n");	}	PacketData *tdata = (PacketData *)(pi->data_)->copy();	int res = ((SocketTcp *)agent_)->sendmsg(tdata, tdata->size());	if(res<=0) {	  done = TRUE;	  break;	} else {	  pi->read_ = TRUE;	}      }      while(!pktqueue_[i][j].empty()) {	pi = pktqueue_[i][j].begin();	if(pi->read_)  {	  avail_len_ -= pi->data_->size();	  delete pi->data_;	  pktqueue_[i][j].erase(pi);	  avail_pkt_ --;	} else	break;	      }    }  }  if(app_)	app_->upcall_send(this);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -