⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nsport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE./*To add new protocol you must:declare the offsetset the offsetadd to demultiplex statement*/#include "nsport.h"#include "scheduler.h"#include "packet.h"#include "config.h"#include "macedon.h"#define MAXBUFFSIZE 3000Scheduler *globalsched;int nsport_headersize;char mymaddr[] = "225.0.1.2";  // being deprecatedMACEDON_Agent *globalbase;extern int big_hdr_size;init_function init_array_actual[1025];init_function *init_array = init_array_actual;static MACEDON_Agent *nsport_create_agent(int type){  MACEDON_Agent *temp;  switch (type)    {    default:      if(init_array[type] != NULL) {        temp = init_array[type]();      } else {        printf("No protocol registered for type %d!!!\n", type);	exit(43);      }      break;    }  return temp;}macedon_Agent *macedon_init(int source, int agent_type){  void *arg;  int ret;  pthread_attr_t attr;  MACEDON_Agent *my_macedon;  MACEDON_Agent *my_base;  hdr_cmn::offset_ = 0;  hdr_ip::offset_ = sizeof(hdr_cmn);  load_protocols();  globalsched = new Scheduler();  if (!parameters.isset("quiet")) {    printf("Initializing agent of type %d\n", agent_type);    printf("Agent pid %d ppid %d\n", getpid(), getppid());  }  my_macedon = nsport_create_agent(agent_type);      if (!parameters.isset("quiet")) {    printf("Initialized agent of type %d %x\n", agent_type, my_macedon);  }  fflush(stdout);  my_base = my_macedon;  while (my_base)    {      globalbase = my_base;#ifdef NSPORT_COMPRESS_ADDRESS      if (my_base->realaddr == source)	{	  if ( my_base->realport != NSPORT_PORT )	    {	      printf("The source node must use port of %d\n", NSPORT_PORT);	      exit(4);	    }	}      int realport = htons((short unsigned int)NSPORT_PORT);            my_base->source_ = convert_address(source, realport);#else      my_base->source_ = source;#endif            nsport_headersize = sizeof(hdr_cmn)+	sizeof(hdr_ip)+	big_hdr_size;       if (!parameters.isset("quiet")) {	printf("Set source %x %x\n", my_base, my_base->source_);	printf("cmn is %d, ip is %d, mine for protocol %d is %d\n", sizeof(hdr_cmn), sizeof(hdr_ip), agent_type, big_hdr_size);      }            my_base = my_base->base_agent;    }  if (!parameters.isset("quiet")) {    printf("Allocated MACEDON agent %x with base %x\n", my_macedon, globalbase);    printf("Initializing transports for lowest agent.\n");  }    // the rest of the stuff is only done for the top-level agent#ifdef RWLOCK  pthread_rwlock_init(&(globalbase->agentrwlock), NULL);  #else  pthread_mutexattr_t    attrm;  pthread_mutexattr_settype(&attrm, PTHREAD_MUTEX_ERRORCHECK);  //    pthread_mutexattr_settype(&attrm, PTHREAD_MUTEX_RECURSIVE);  pthread_mutex_init(&(globalbase->agentlock), &attrm);#endif  if(sysconf(_SC_THREADS)==-1){        printf("Threads not supported\n");    exit(69);   }//  pthread_attr_init( &attr );//  arg = (void *)globalbase;//  if ((ret = pthread_create(&(my_macedon->childtid), //			   &attr, nsport_input, arg)) != 0)//    {//     //      printf("Error %d in creating init thread.\n", ret);//      perror("pthread_create");//      exit(77);//    }  //  printf("created init thread %d\n", my_macedon->childtid);  extern int threads_created;  extern int threads_joined;  extern int threads_cancelled;  extern int threads_exited;  extern int threads_timer;  extern int threads_tfrc;  extern int threads_tcp;  threads_created++;#ifdef SCHED_TRACE  printf("Number of threads is %d,  cr %d, join %d, exit %d, can %d time %d, tfrc %d, tcp %d\n", 	 threads_created-threads_joined, 	 threads_created, threads_joined,	 threads_exited, threads_cancelled, 	 threads_timer, threads_tfrc, threads_tcp);#endif  char command[] = "join";  char *argv[2];    argv[1]=command;    if (!parameters.getint("quiet")) {    printf("Doing join for MACEDON agent %x with base %x\n", my_macedon, globalbase);  }  my_macedon->command(2, argv);    if (!parameters.getint("quiet")) {    printf("Join complete.\n");  }  return my_macedon;}//void *//nsport_input(void *myarg)//{//  socklen_t arg;//  int readsize;//  unsigned char *mybuf;//  struct sockaddr_in othersocket;//  struct ip_mreq multireq;//  pthread_attr_t attr;//  void *thread_arg;//  int ret;//  pthread_t kid_tid;//  MACEDON_Agent *my_agent = (MACEDON_Agent *)myarg;////  if (!parameters.getint("quiet")) {//    printf("Input called with MACEDON agent of %x\n", my_agent);//  }////  multireq.imr_multiaddr.s_addr = inet_addr(mymaddr);//  multireq.imr_interface.s_addr = INADDR_ANY;////  /*if (setsockopt(my_agent->multisockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,//                 &multireq, sizeof(multireq)) < 0)//    {//      printf("Can't join multicast group for %x %x.\n", inet_addr(mymaddr), //	     multireq.imr_multiaddr.s_addr);//      perror ("Can't join multicast group ");//      exit(14);//    }//  else {//    if (!parameters.getint("quiet")) {//      printf("Joined multicast group %x.\n",  multireq.imr_multiaddr.s_addr);//    }//  }////  if (setsockopt(my_agent->sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,//                 &multireq, sizeof(multireq)) < 0)//    {//      printf("Can't join multicast group for %x %x.\n", inet_addr(mymaddr), //	     multireq.imr_multiaddr.s_addr);//      perror ("Can't join multicast group  on ordinary socket ");//      exit(14);//    }//  else {//    if (!parameters.getint("quiet")) {//      printf("Joined multicast group %x.\n",  multireq.imr_multiaddr.s_addr);//    }//  }*/////  pthread_attr_init( &attr );//  thread_arg = (void *)my_agent;////   if ((ret = pthread_create(&(kid_tid), //// 			   &attr, nsport_multiread, thread_arg)) != 0)////     {////       printf("Error %d in creating multicast read thread.\n", ret);////       perror("pthread_create");////       exit(78);////     }//  //  printf("created multicast read thread %d\n", kid_tid);//  while (1)//    {//      mybuf = (unsigned char *)malloc(NSPORT_MAX_UDP_SIZE);//      if (mybuf == 0) {//	printf("Malloc exception 2!\n");//	exit(72);//      }//      arg = sizeof(othersocket);//      if ((readsize=recvfrom(my_agent->sockfd, mybuf, NSPORT_MAX_UDP_SIZE, 0, //			     (struct sockaddr *) &othersocket,//			     &arg)) == -1)//	{//	  printf("Recvfrom failure.\n");//	  exit(19);//	}//      extern int global_exit;//      if (global_exit == 1)//	pthread_exit(NULL);//      //#if NSPORT_DEBUG//      //           printf("Received packet of size %d from %x\n",readsize,othersocket.sin_addr.s_addr); //      //#endif//      nsport_readbuf(mybuf, othersocket.sin_addr.s_addr, othersocket.sin_port, my_agent);//    } ////  if (!parameters.getint("quiet")) {//    printf("Done receiving.\n");//  }//}////////void *////nsport_multiread(void *myarg)////{////  unsigned char *mybuf;////  struct sockaddr_in othersocket;////  socklen_t arg;////  int readsize;////  MACEDON_Agent *my_agent = (MACEDON_Agent *)myarg;////////  printf("Multiread called with MACEDON agent of %x\n", my_agent);////////  while (1)////    {////      mybuf = (unsigned char *)malloc(NSPORT_MAX_UDP_SIZE);////      if (mybuf == 0) {////	printf("Malloc exception 2!\n");////	exit(72);////      }////      arg = sizeof(othersocket);////      if ((readsize=recvfrom(my_agent->multisockfd, mybuf, ////			     NSPORT_MAX_UDP_SIZE, 0, ////			     (struct sockaddr *) &othersocket,////			     &arg)) == -1)////	{////	  printf("Recvfrom failure.\n");////	  exit(19);////	}////      //#if NSPORT_DEBUG////      printf("Received multicast packet of size %d from %x\n",readsize,othersocket.sin_addr.s_addr); ////   //#endif////      nsport_readbuf(mybuf, othersocket.sin_addr.s_addr, othersocket.sin_port, my_agent);////    } ////}////void nsport_readbuf(unsigned char *mybuf, int from, int fromport, MACEDON_Agent *my_agent)//{//  Packet *pkt;//  hdr_ip* recv_hdrip;//  hdr_cmn* recv_hdrcmn;//  struct macedon_fields *recv_mf; //  my_agent->Lock();//#if NSPORT_DEBUG > 1	//  printf("Locked for receive %d \n",pthread_self());//#endif////  pkt = new Packet();////  pkt->bits_ = mybuf;//  mybuf = 0;//  recv_hdrip   = (hdr_ip *)(pkt->bits_ + hdr_ip::offset_);//#ifdef NSPORT_HACK_MODELNET////    from &= ~(htonl(0x00800000)); /* unset majic bit *///#endif//#ifdef NSPORT_COMPRESS_ADDRESS//  recv_hdrip->src_.addr_ = convert_address(from, fromport);//#else////  recv_hdrip->src_.addr_ = from;//#endif //  recv_hdrcmn = (hdr_cmn *)pkt->bits_;//  pkt->hdrlen_ = recv_hdrcmn->hdrsize_;//  if (recv_hdrcmn->size_)//    {//      pkt->data_ = (unsigned char*)(pkt->bits_ + recv_hdrcmn->hdrsize_);//      pkt->size_=recv_hdrcmn->size_;//      //      printf("Received %d data bytes.\n", recv_hdrcmn->size_);//    }//  recv_mf = my_agent->getmacedonfields( pkt ); //#ifdef MACEDON_TRACE_SEND_RECEIVE//  printf("Received packet from %x type %d time %f data_size %d data: ", from, recv_mf->mh_type_, Scheduler::instance().clock(), pkt->size_);//  if (pkt->size_ > 0) {//    // struct macedon_fields *temp_mf = (struct macedon_fields *)pkt->data_; //    // printf("%d\n", temp_mf->mh_type_);////     int tmplen = pkt->size_;////     unsigned int *tmpdata  = (unsigned int *)(pkt->data_);////     while (tmplen) ////       {//// 	printf("%.8x ", *tmpdata);//// 	tmpdata = tmpdata++;//// 	tmplen-=sizeof(unsigned int);////       }//    printf("\n");//    //    printf("%s\n", pkt->data_);//  }//  else//    printf("\n");//#endif ////  nsport_demux(pkt, my_agent);//  pkt = 0;//  my_agent->Unlock();//#if NSPORT_DEBUG > 1	//  printf("Received unlocked %d \n",pthread_self());//#endif//}////void //nsport_demux(Packet *pkt, MACEDON_Agent *my_agent)//{//  my_agent->recv(pkt, 0); //  //if (my_agent->pt_type >= PT_final)//  if (0)//    {//      printf("unknown packet type %d\n",my_agent->pt_type);//      pkt->data_ = 0;//      Packet::free(pkt);  //    }      ////}////voidbind(char *trash1, void *trash2){  // do nothing  return;}TclClass::TclClass(char *trash){}intconvert_address(int addr, int port){#ifdef NSPORT_COMPRESS_ADDRESS  int retaddr;  int checkaddr;  int maskaddr = 0x000000FF;    checkaddr = (addr >> 24) & maskaddr;    if (checkaddr >= 224      && checkaddr <= 239)    // multicast address    retaddr = addr;  else    {//        port = port << 16;            retaddr = addr & NSPORT_MASK_PORT;      retaddr = retaddr | port;    }#if NSPORT_DEBUGprintf("Converted address %x with port %x to %x\n", addr, port, retaddr);#endif  return retaddr;#endif}intextract_port(int addr){#ifdef NSPORT_COMPRESS_ADDRESS  int port;  int checkaddr;  int maskaddr = 0x000000FF;  checkaddr = (addr >> 24) & maskaddr;  if (checkaddr >= 224      && checkaddr <= 239)    // multicast address    port = NSPORT_PORT;  else    {//      port = addr << 16;    port = addr;    port &= NSPORT_DEMASK_PORT;    }#if NSPORT_DEBUG  printf("Extracted port value of %x from addr %x\n", port, addr);#endif  return (port);#endif}void sure_sleep( int amount)  {    struct timeval timeout;    int now, original;    int left= amount;    original = (int)Scheduler::instance().clock();    do {      timeout.tv_sec= left;      timeout.tv_usec= 0;      select(0,NULL,NULL,NULL,&timeout);      now = (int)Scheduler::instance().clock();      printf("slept for %d seconds\n", now - original);      left -= now - original;      original = now;    }     while (left > 0);}double sure_sleep( double amount)  {#if NSPORT_DEBUG > 1      printf("asked to sleep for %f seconds\n",  amount);#endif      if (amount == (1.0e300 * 1.0e100))	{	  printf("refusing to sleeping until the end of time\n");	  fflush(stdout);	  abort();	  exit( 77);	}    struct timeval timeout;    double now, original;    double left = amount;    original = Scheduler::instance().clock();    do       {      timeout.tv_sec= (int)left;      timeout.tv_usec= (int)((left - timeout.tv_sec)*1000000);      int code =select(0,NULL,NULL,NULL,&timeout);      if (code < 0)	{#if NSPORT_DEBUG > 0	  printf("error in select %s \n",strerror(errno));#endif	}      pthread_testcancel();      now = Scheduler::instance().clock();      left -= now - original;#if NSPORT_DEBUG > 1      printf("slept for %f seconds, left %f \n", now - original, left);#endif      original = now;    }     while (left > 0.02);#if NSPORT_DEBUG > 1      printf("done sleeping, left=%lf\n", left);#endif      return left;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -