⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_udp_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.#include <poll.h>#include <fcntl.h>#include <signal.h>#include <stdarg.h>   #include <math.h>   #include <stdio.h>   #include <strings.h>#include <sys/time.h>    #include <sys/uio.h>#include <pthread.h>#include "macedon.h"#include "string.h"#include <netinet/tcp.h>#include "scheduler.h"extern int num_fds;extern int threads_created;extern int threads_joined;extern int threads_exited;extern int threads_cancelled;extern int threads_tfrc;extern int threads_timer;extern int threads_tcp;macedon_udp_transport::macedon_udp_transport(int ipaddr, int port, 			     int queue_size)  : macedon_transport(ipaddr, port, queue_size), server_descriptor(- 1), time_last_cleanup(0.0){  if (!parameters.getint("quiet")) {    printf("Using Adolfo UDP transport port %d\n", port);  }    bzero(&server_socket, sizeof(server_socket));  server_socket.sin_family = AF_INET;  server_socket.sin_port = htons(port);  server_socket.sin_addr.s_addr = ipaddr;  if ( (server_descriptor = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {    fprintf(stderr, "macedon_udp_transport: failure creating server socket port %d %s\n", port, strerror(errno));    terminate(31);  }  int yes = 1;	    setsockopt(server_descriptor, SOL_SOCKET, SO_REUSEADDR,	     (int *)&yes, sizeof(int));    if (bind(server_descriptor, (struct sockaddr *) & server_socket,	   sizeof (server_socket)) < 0) {    fprintf(stderr, "macedon_udp_transport: failure binding server port %d %s\n", port, strerror(errno));    terminate(32);  }  num_fds++;  initialized = 1;  if (!parameters.getint("quiet")) {    printf("macedon_udp_transport: macedon_udp_transport bound on port %x, fd %d this %x\n", port, server_descriptor, this);  }}macedon_udp_transport::~macedon_udp_transport(){}// ---------------------------------------------- // terminate// ---------------------------------------------- void  macedon_udp_transport::terminate (int code){  int did_lock = 0;  if(lock_held != pthread_self()) { lock(); did_lock = 1;}  if (server_descriptor != -1) {    num_fds--;    ::shutdown( server_descriptor, SHUT_RDWR);    ::close(server_descriptor);    server_descriptor = -1;  }  if(did_lock) { unlock(); }    fflush(NULL);  abort();  if (code)    exit(code);  else    pthread_exit(NULL);}// ---------------------------------------------- // transport_disconnect// ---------------------------------------------- int macedon_udp_transport::transport_disconnect (macedon_connection *neighbor){  lock();#ifdef TRANSPORT_TRACE  printf("macedon_udp_transport: transport_disconnect called for destination %.8x\n", neighbor->get_destination());#endif  neighbor->status = MACEDON_CONNECTION_STATUS_disconnected;  unlock();}// ---------------------------------------------- // transport_connect// ---------------------------------------------- int macedon_udp_transport::transport_connect (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());  //lock();#ifdef TRANSPORT_TRACE  printf("macedon_udp_transport: transport_connect called for destination %.8x\n", neighbor->get_destination());#endif  neighbor->status  = MACEDON_CONNECTION_STATUS_connected;  neighbor->time_last_used = wall_clock();  //unlock();}// ---------------------------------------------- // send_more// ---------------------------------------------- int macedon_udp_transport::send_more (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());#ifdef TRANSPORT_TRACE  printf ("macedon_udp_transport: send destination %x status %d \n", neighbor->get_destination(), neighbor->status);#endif  if (!neighbor->send_queue) {    return 1;  }    if (neighbor->status != MACEDON_CONNECTION_STATUS_connected) {    transport_connect(neighbor);  }    int sent = 0;  int descriptor = server_descriptor;  while (neighbor->send_queue) {    macedon_transport_send_work* work = neighbor->send_queue;#ifdef TRANSPORT_TRACE    printf ("macedon_udp_transport: send going for %d bytes to %x \n", work-> running_size, neighbor->get_destination());    //    dump_hex(work->running_buffer, work->running_size);#endif    int * dude = (int *) (work->stuff);//      if (*dude > 4000) {//        printf("Problem with header size when sending %d\n", dude);//        exit(86);//      }    server_socket.sin_addr.s_addr = neighbor->get_destination();    sent = sendto (descriptor, work->running_buffer, work->running_size, 0, 		   (const sockaddr *)&server_socket, sizeof(server_socket));     int error = errno;    neighbor->time_last_used = wall_clock();    if (sent < 0 && error != EAGAIN) {      //perror("macedon_udp_transport: ");      printf ("macedon_udp_transport: send error on socket to %x fd %d buf_addr %x size %d sockaddr %x :%s\n", neighbor->get_destination(), descriptor, work->running_buffer, work->running_size, &server_socket, strerror(error));      terminate (46);    }#ifdef TRANSPORT_TRACE   printf ("macedon_udp_transport: send sent %d bytes to %x \n", sent, neighbor->get_destination());#endif    if (sent > 0) {      neighbor->bytes_sent += sent;      work->running_buffer += sent;      work->running_size -= sent;      if (work->running_size == 0) {  // done sending this work        neighbor->items_sent ++;        neighbor->pop_front();        delete work;      }    } else if(error == EAGAIN) {#ifdef TRANSPORT_TRACE     printf ("macedon_udp_transport: discarding message to %x \n", sent, neighbor->get_destination());#endif      if (work->running_size == 0) {  // done sending this work        neighbor->items_sent ++;        neighbor->pop_front();        delete work;      }    }  }#ifdef TRANSPORT_TRACE  printf ("macedon_udp_transport: send done destination %x items left %d\n", neighbor->get_destination(), neighbor->send_queue_items);#endif    return 0;}// ---------------------------------------------- // wait_for_work // ---------------------------------------------- void macedon_udp_transport::wait_for_work  (){ #ifdef TRANSPORT_TRACE  //  printf("macedon_udp_transport: waiting for work\n");#endif  // fflush(stdout);  lock ();  waiting_structure[0].fd = server_descriptor;  waiting_structure[0].events =  POLLIN | POLLPRI;  waiting_structure[0].revents = 0;  unlock ();#ifdef TRANSPORT_TRACE  //  printf("macedon_udp_transport: polling on 1 descriptor\n");#endif  //  fflush(stdout);  int status = poll (waiting_structure, 1, 500);  if (status < 0 && errno != EINTR ) {    perror("Polling error: \n");    terminate(101);  }}// ---------------------------------------------- // do_work // ---------------------------------------------- void macedon_udp_transport::do_work (){  if (wall_clock() - time_last_cleanup > CLEANUP_TIME) {    cleanup();    time_last_cleanup = wall_clock();    return;  }  if ((waiting_structure[0].revents & POLLERR) ||      (waiting_structure[0].revents & POLLNVAL) ||      (waiting_structure[0].revents & POLLHUP)) {#ifdef TRANSPORT_TRACE    printf ("macedon_udp_transport: error/hup on fd %d revents: %x\n", waiting_structure[0].fd, waiting_structure[0].revents);#endif    terminate(49);  }  else if (waiting_structure[0].revents & POLLIN ||	   waiting_structure[0].revents & POLLPRI ||	   waiting_structure[0].revents & POLLOUT) {    actual_work();  }	  else  if (waiting_structure[0].revents ) {    fprintf (stderr, "fd %d revents: %x\n", waiting_structure[0].fd, waiting_structure[0].revents);  }	}// ---------------------------------------------- // cleanup// ---------------------------------------------- void macedon_udp_transport::cleanup(){  lock();    double curtime = wall_clock();  macedon_connection* sample4 = macedon_connections;  macedon_connection* oldest = sample4;  double oldest_time;  if (sample4)    oldest_time = sample4->time_last_used;  while (sample4) {      int nuke=0;    if (sample4->time_last_used < oldest_time) {      oldest = sample4;      oldest_time = sample4->time_last_used;    }    if ( curtime > sample4->time_last_used + TRANSPORT_TIMEOUT ) {      if (sample4->status == MACEDON_CONNECTION_STATUS_connected) {	if (!sample4->send_queue) {	  if (!sample4->recv_in_progress) {	    // nuke = 1;	  }	  else {	    if (sample4->recv_in_progress->field_bytes ==0) {	      // disable nuking for now	      //	      nuke = 1;	      delete sample4->recv_in_progress;	      sample4->recv_in_progress = 0;	    }// 	    else // 	      printf("Wanted to nuke %.8x but could not cause recv queue %x\n", sample4->get_destination(), sample4->recv_in_progress);	  }	}// 	else//	  printf("Wanted to nuke %.8x but could not cause send queue %d %x\n", sample4->get_destination(), sample4->send_queue_items, sample4->send_queue);      }    }    if (nuke) {// #ifdef TRANSPORT_TRACE_CONNECT      printf("macedon_udp_transport: %f nuking %.8x %x %x %x %f fds %d\n", wall_clock(), sample4->get_destination(), port, sample4->send_queue, sample4->recv_in_progress, sample4->time_last_used, num_fds);// #endif    }    sample4 = sample4->next;  }    if (num_fds > MAX_FDS) {    printf("macedon_udp_transport: %f nuking oldest %.8x %f %x fds %d\n",  wall_clock(), oldest->get_destination(), oldest_time, oldest, num_fds);  }  unlock();}// ---------------------------------------------- // actual_work// ---------------------------------------------- void macedon_udp_transport::actual_work (){  lock ();  recv_more (NULL);  unlock ();  return;}// ---------------------------------------------- // recv_more// ---------------------------------------------- int macedon_udp_transport::recv_more (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());  // neighbor passed in is NULL always for UDP  static int inst_count=0;  inst_count ++;  macedon_transport_recv_work *recv_work;  int descriptor = server_descriptor;  try {    recv_work = new macedon_transport_recv_work();    //	printf("alloced recv work element %x\n", recv_work);  }  catch (...) {    printf("Exception: out of mem allocing new recv work\n");    exit(97);  }  int flags;  if (-1 == (flags = fcntl (descriptor, F_GETFL, 0)))    flags = 0;  fcntl (descriptor, F_SETFL, flags | O_NONBLOCK);  recv_work->stuff = (unsigned char *)malloc( MACEDON_MAX_UDP_SIZE );  socklen_t arg = sizeof(server_socket);  int bread = (int)::recvfrom(descriptor, ((char*)recv_work->stuff), MACEDON_MAX_UDP_SIZE, 0, (struct sockaddr *) &server_socket, &arg);  if (bread <= 0 && errno != EWOULDBLOCK) { // there was nothing to read    delete recv_work;    return 1;  }  int hsize = 2 * sizeof(int);  if (bread < hsize) {  // read error or did not read a complete header    printf("macedon_udp_transport: receive error on fd %d %.8x %s\n", neighbor->descriptor, neighbor->get_destination(), strerror(errno));    delete recv_work;    terminate(50);    return 1;  }  recv_work->header_size = *((int *)recv_work->stuff);  recv_work->data_size = *((int *)(recv_work->stuff+sizeof(int)));  int offset = 0;  recv_work->tot_size = recv_work->header_size + recv_work->data_size;  unsigned char *temp = (unsigned char *)malloc(recv_work->tot_size);  bcopy(recv_work->stuff+2*sizeof(int)+offset, temp, recv_work->tot_size);  ::free(recv_work->stuff);  recv_work->stuff = temp;#ifdef TRANSPORT_TRACE  printf ("macedon_udp_transport: read all the data, %d bytes with %d header (%d this time), buff is %x\n", recv_work->tot_size, recv_work->header_size, bread, recv_work->stuff);  fflush(stdout);#endif  neighbor = find_macedon_connection (server_socket.sin_addr.s_addr, 0);  if (!neighbor)    neighbor = new_macedon_connection (server_socket.sin_addr.s_addr, (int)server_socket.sin_port, 0);  neighbor->items_received++;  int mydest = neighbor->get_destination();  neighbor->time_last_used = wall_clock();  unlock();  if (receiver && recv_work->tot_size) {    ( receiver->*receive_handler)      (mydest,       recv_work->stuff, recv_work->header_size,       recv_work->stuff+recv_work->header_size, recv_work->tot_size-recv_work->header_size);  }  lock();  delete recv_work;  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -