⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_tcp_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
📖 第 1 页 / 共 3 页
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met:////   * Redistributions of source code must retain the above copyright//     notice, this list of conditions and the following disclaimer.//   * Redistributions in binary form must reproduce the above copyright//     notice, this list of conditions and the following disclaimer in//     the documentation and/or other materials provided with the//     distribution.//   * Neither the names of Duke University nor The University of//     California, San Diego, nor the names of its contributors//     may be used to endorse or promote products derived from//     this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.#include <poll.h>#include <fcntl.h>#include <signal.h>#include <stdarg.h>   #include <math.h>   #include <stdio.h>   #include <strings.h>#include <sys/time.h>    #include <sys/uio.h>#include <pthread.h>#include "macedon.h"#include "string.h"#include <netinet/tcp.h>#include "scheduler.h"#include "ext.h"#include "tcp_auto_ext.h"extern int num_fds;extern int threads_created;extern int threads_joined;extern int threads_exited;extern int threads_cancelled;extern int threads_tfrc;extern int threads_timer;extern int threads_tcp;void macedon_tcp_transport::signal_error(int tcp_error, int ipaddr, int port, int msg_count) {  if(from_agent) {    //CK: We could include "deferred" as a signal.    macedon_tcp_error* deferred_error = new macedon_tcp_error(tcp_error, ipaddr, port, msg_count);    if(last_error == NULL) {      ASSERT(first_error == NULL);      last_error = deferred_error;      first_error = deferred_error;    } else {      last_error->next_error = deferred_error;      last_error = last_error->next_error;    }    return;  }  int did_lock = 0;  if(lock_held != 0) { ASSERT(lock_held == pthread_self()); unlock(); did_lock = 1;}  printf("macedon_tcp_transport::signal_error: signal_error %d on %.8x:%d msg_count=%d at %f\n",      tcp_error, ipaddr, port, msg_count, wall_clock());  struct transport_error_struct arg;  arg.transport_error = tcp_error;  arg.dest_addr = ipaddr;  arg.port = port;  arg.known_lost = msg_count;  if (receiver && error_handler) {    ( receiver->*error_handler)      (AUTOEXT_TRANSPORT_ERROR, (void*)&arg);  }  if(did_lock) { lock(); }   }void debug (int level, char *format, ... );macedon_tcp_transport::macedon_tcp_transport(int ipaddr, int port,     int queue_size): macedon_transport(ipaddr, port, queue_size), server_descriptor(- 1), waiting_counter(0), time_last_cleanup(0.0), virtual_time(0){  output_quiet = parameters.getint("quiet");  if (!output_quiet) {    printf("Using Adolfo TCP transport port %d\n", port);  }  bzero(&server_socket, sizeof(server_socket));  server_socket.sin_family = AF_INET;  server_socket.sin_port = htons(port);  server_socket.sin_addr.s_addr = here_addr_;  if ( (server_descriptor = socket(AF_INET, SOCK_STREAM, 0)) < 0) {    fprintf(stderr, "macedon_tcp_transport: failure creating server socket port %d: %s\n", port, strerror(errno));    terminate(31);  }  int yes = 1;	    setsockopt(server_descriptor, SOL_SOCKET, SO_REUSEADDR,      (int *)&yes, sizeof(int));  // printf("setsockopt TCP_NODELAY!\n");  // setsockopt(server_descriptor, IPPROTO_TCP, TCP_NODELAY, &yes,sizeof(int));  if (bind(server_descriptor, (struct sockaddr *) & server_socket,        sizeof (server_socket)) < 0) {    fprintf(stderr, "macedon_tcp_transport: failure binding server port %d", port, strerror(errno));    terminate(32);  }  num_fds++;  if (listen (server_descriptor, 1000) != 0) {    fprintf(stderr, "macedon_tcp_transport: failure listening on server port %d\n", port);    terminate(33);  }  keep_connecting = true;  pthread_cond_init(&connect_cond, NULL);  for(int i=0; i<NUM_CONNECT_THREADS; i++ ) {    pthread_t ctid;    if (pthread_create(&ctid, NULL, connecter, this) != 0) {      fprintf(stderr, "macedon_tcp_transport: pthread_create port %d %s", port, strerror(errno));      printf ("macedon_tcp_transport: Error in creating connect thread. port %d\n", port);      abort();      exit(81);    }  }  initialized = 1;  if (!output_quiet) {    printf("macedon_tcp_transport listening on port %d, fd %d this %x\n", port, server_descriptor, this);  }}macedon_tcp_transport::~macedon_tcp_transport(){}// ---------------------------------------------- // terminate// ---------------------------------------------- void  macedon_tcp_transport::terminate (int code){  int did_lock = 0;  if(lock_held != pthread_self()) { lock(); did_lock = 1;}  if (server_descriptor != -1) {    num_fds--;    ::shutdown( server_descriptor, SHUT_RDWR);    ::close(server_descriptor);    server_descriptor = -1;  }  macedon_connection* sample2 = macedon_connections;  while (sample2) {    while (sample2->send_queue) {      macedon_transport_work* work = sample2->pop_front();      //      delete work;    }    sample2 = sample2->next;    //    delete sample2;  }  if(did_lock) { unlock(); }  if (code)    exit(code);  else    pthread_exit(NULL);}// ---------------------------------------------- // reset_connection// ---------------------------------------------- int macedon_tcp_transport::reset_connection (macedon_connection *neighbor, int tcp_error){  ASSERT(lock_held==pthread_self());  int msg_count = 0;  if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected)    return 1;#ifdef TRANSPORT_TRACE_CONNECT  printf ("macedon_tcp_transport: %f resetting %.8x port %d fd %d index %d status %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->descriptor, neighbor->index, neighbor->status, num_fds);#endif  if(neighbor->descriptor >= 0) {    num_fds--;  }  neighbor->close_descriptor();  neighbor->status = MACEDON_CONNECTION_STATUS_disconnected;  if (neighbor->send_queue) {    msg_count + 1; //TODO: May need more here.    neighbor->send_queue->running_buffer = neighbor->send_queue->stuff;      neighbor->send_queue->running_size = neighbor->send_queue->tot_size;  }  if (neighbor->recv_in_progress) {    delete neighbor->recv_in_progress;    neighbor->recv_in_progress = 0;  }  if(tcp_error && neighbor->index == 0) {    signal_error(tcp_error, (int)neighbor->get_destination(), port, msg_count);  }}// ---------------------------------------------- // abort_connection// ---------------------------------------------- int macedon_tcp_transport::transport_abort (macedon_connection *neighbor){  return reset_connection(neighbor, 0);}// ---------------------------------------------- // transport_disconnect// ---------------------------------------------- int macedon_tcp_transport::transport_disconnect (macedon_connection *neighbor){  ASSERT(lock_held==pthread_self());#if 0  reset_connection (neighbor);  printf ("macedon_tcp_transport: %f disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), neighbor->descriptor, num_fds);  return 0;#endif  if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected ) {#ifdef TRANSPORT_TRACE_CONNECT    printf ("macedon_tcp_transport: %f %.8x %d index %d disconnecting but already disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif    return 0;  }  if (!neighbor->send_queue && !neighbor->recv_in_progress) {    // Adolfo: could also delete the neighbor here    reset_connection (neighbor, NO_ERROR);#ifdef TRANSPORT_TRACE_CONNECT    printf ("macedon_tcp_transport: %f %.8x %d index %d disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif  }  else  {  // work still needs to be done    neighbor->status = MACEDON_CONNECTION_STATUS_disconnecting;#ifdef TRANSPORT_TRACE_CONNECT    printf ("macedon_tcp_transport: %f %.8x %d index %d disconnecting %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif  }  return 0;}// ---------------------------------------------- // transport_connect// ---------------------------------------------- int macedon_tcp_transport::transport_connect (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());  //lock();#ifdef TRANSPORT_TRACE  printf("macedon_tcp_transport: transport_connect called for destination %.8x at %f\n", neighbor->get_destination(), wall_clock());#endif  neighbor->status  = MACEDON_CONNECTION_STATUS_preconnecting;  neighbor->time_connect_initiated = wall_clock();  neighbor->virtual_time_initiated = virtual_time++;  pthread_cond_signal(&connect_cond);  //unlock();}// ---------------------------------------------- // connecter    // separate thread to do the connect// ---------------------------------------------- void *connecter (void *input){  macedon_tcp_transport *trans = (macedon_tcp_transport *)input;  trans->do_connect();}// ---------------------------------------------- // do_connect// ---------------------------------------------- void macedon_tcp_transport::do_connect(){  lock();  while (keep_connecting) {    int check_time = -1;    /*     double check_time = -1.0; */    macedon_connection* neighbor;    while ( (neighbor = find_macedon_connection_by_vtime(check_time)) != NULL) {      check_time = neighbor->virtual_time_initiated;      double entry=wall_clock();      struct sockaddr_in socket_address;      bzero(&socket_address, sizeof(socket_address));      socket_address.sin_family = AF_INET;      neighbor->neigh_socket_address.sin_port = htons(port);      socket_address.sin_port = htons(port);      socket_address.sin_addr.s_addr = neighbor->get_destination();      if (neighbor->descriptor < 0 ) {        if ( (neighbor->descriptor = socket(AF_INET, SOCK_STREAM, 0)) < 0) {          fprintf (stderr, "failure creating connect socket: %s\n", strerror(errno));          exit(90);        }      }      int descriptor =  neighbor->descriptor;      neighbor->status = MACEDON_CONNECTION_STATUS_connecting;      if (num_fds > MAX_FDS)        cleanup();      pthread_cond_signal(&connect_cond);      unlock();#ifdef TRANSPORT_TRACE_CONNECT      //if (!output_quiet) {        printf("macedon_tcp_transport: %f trying to connect to %.8x %d %x %d fds %d\n", wall_clock(), (int )socket_address.sin_addr.s_addr, ntohs((int )socket_address.sin_port), neighbor, descriptor, num_fds);      //}#endif      int myret = ::connect(descriptor, (struct sockaddr *)  &socket_address,          sizeof(socket_address));      lock();      if (myret != 0) {        if ( neighbor->status != MACEDON_CONNECTION_STATUS_connecting ) {#ifdef TRANSPORT_TRACE_CONNECT          //if (!output_quiet) {            printf ("connect failed to %.8x:%d  descriptor  %d %s time %f but neighbor status changed, so ignoring\n",	                 (int )socket_address.sin_addr.s_addr,                ntohs((int )socket_address.sin_port),                descriptor,                strerror(errno),                wall_clock());          //}#endif          continue;                 }#ifdef TRANSPORT_TRACE_CONNECT        //if (!output_quiet) {          printf ("connect failed to %.8x:%d  descriptor  %d %s time %f\n",	               (int )socket_address.sin_addr.s_addr,              ntohs((int )socket_address.sin_port),              descriptor,              strerror(errno),              wall_clock());        //}#endif        neighbor->status = MACEDON_CONNECTION_STATUS_disconnected; //Since connrefused, now discconnected.        neighbor->time_connect_initiated = entry; //Set initiated time to now to move to end of queue.        neighbor->virtual_time_initiated = virtual_time++;        //Clear out the send queue on a connect failed.        int msg_count = 0;        while (neighbor->send_queue) {          msg_count++;          macedon_transport_send_work* work = neighbor->send_queue;          neighbor->pop_front();          delete work;        }        signal_error(TCP_CONNECT_FAILED, (int )socket_address.sin_addr.s_addr, port, msg_count);      }      else {        if (descriptor == neighbor->descriptor             || neighbor->descriptor == -1) {          num_fds++;          neighbor->descriptor = descriptor;          int yes = 1;//  	  setsockopt(descriptor, IPPROTO_TCP, TCP_NODELAY, //  		     &yes,sizeof(int));          setsockopt(neighbor->descriptor, SOL_SOCKET, SO_REUSEADDR,              (int *)&yes, sizeof(int));            int minimum = 2*sizeof(int);          setsockopt(neighbor->descriptor, SOL_SOCKET, SO_RCVLOWAT,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -