⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_tcp_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
📖 第 1 页 / 共 3 页
字号:
              (int *)&minimum, sizeof(int));  //  	  int bufsize = 2621440;	      //  	  setsockopt(neighbor->descriptor, SOL_SOCKET, SO_RCVBUF,//  		     (int *)&bufsize, sizeof(int));  //  	  setsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF,//  		     (int *)&bufsize, sizeof(int));  		            //            int bs = 0;          //            setsockopt(neighbor->descriptor, SOL_SOCKET, SO_LINGER,          //                (int *)&bs, sizeof(bs));          //            int bs = 16384;          //            setsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF,          //                (int *)&bs, sizeof(bs));          //  	  int new_len;           //  	  int arglen = sizeof(new_len);           //  	  if (-1 == getsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF, &new_len,(socklen_t*)&arglen))           //  	    perror("getsockopt");           //  	  printf("setsb %d\n", new_len);           // setsockopt(neighbor->descriptor, IPPROTO_TCP, TCP_NODELAY, &yes,sizeof(int));      #ifdef TRANSPORT_TRACE_CONNECT          //if (!output_quiet) {            printf ("macedon_tcp_transport: %f connected to %.8x:%d descriptor %d transport port %d time %f fds %d overall %f\n", wall_clock(), neighbor->get_destination(), ntohs((int)socket_address.sin_port), neighbor->descriptor, port, wall_clock()-entry, num_fds, wall_clock() - neighbor->time_connect_initiated);          //}#endif          // shouldn't need to, but clean queues          if (neighbor->send_queue) {            neighbor->send_queue->running_buffer = neighbor->send_queue->stuff;              neighbor->send_queue->running_size = neighbor->send_queue->tot_size;          }          if (neighbor->recv_in_progress) {            delete neighbor->recv_in_progress;            neighbor->recv_in_progress = 0;          }          neighbor->status = MACEDON_CONNECTION_STATUS_connected;        }        else {   // must have been accepted in the meantime          ::shutdown(descriptor,  SHUT_RDWR);          ::close( descriptor);        }      }    }#ifdef TRANSPORT_TRACE_LOCK    printf("macedon_tcp_transport: %x cond_wait (release lock) on %x by transport %x held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif    ASSERT(lock_held == pthread_self());    lock_held = 0;    pthread_cond_wait(&connect_cond, &pool_lock);#ifdef TRANSPORT_TRACE_LOCK    printf("macedon_tcp_transport: %x cond_wait done (acquire lock) on %x by transport %x held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif    lock_held = pthread_self();  }  unlock();  return;}// ---------------------------------------------- // send_more// ---------------------------------------------- int macedon_tcp_transport::send_more (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());#ifdef TRANSPORT_TRACE  printf ("macedon_tcp_transport: send destination %.8x status %d port %d index %d\n", neighbor->get_destination(), neighbor->status, port, neighbor->index);  fflush(stdout);#endif  if (!neighbor->send_queue) {    return 1;  }  if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnecting)    neighbor->status = MACEDON_CONNECTION_STATUS_connected;  else if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected ) {    transport_connect (neighbor);    return 1;  }  else if (neighbor->status == MACEDON_CONNECTION_STATUS_connecting       || neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting) {    return 1;  }  //if(from_agent) return 0;  int sent = 0;  int descriptor = neighbor->descriptor;  int keep_sending=1;  while (keep_sending && neighbor->send_queue) {    macedon_transport_send_work* work = neighbor->send_queue;#ifdef TRANSPORT_TRACE    printf ("macedon_tcp_transport: send going for %d bytes to %.8x port %d index %d\n", work-> running_size, neighbor->get_destination(), port, neighbor->index);    //    dump_hex(work->running_buffer, work->running_size);#endif    int flags = 0;    if (-1 == (flags = fcntl (descriptor, F_GETFL, 0)))      flags = 0;    fcntl (descriptor, F_SETFL, flags | O_NONBLOCK);        int * dude = (int *) (work->stuff);    //      if (*dude > 4000) {    //        printf("Problem with header size when sending %d\n", dude);    //        exit(86);    //      }    sent = write (descriptor, work->running_buffer, work->running_size);     neighbor->time_last_used = wall_clock();    if (sent < 0 && errno!=EWOULDBLOCK) {      printf ("macedon_tcp_transport: send error on socket to %.8x port %d index %d fd %d: %s\n", neighbor->get_destination(), port, neighbor->index, neighbor->descriptor, strerror(errno));      reset_connection (neighbor, TCP_ERROR_SENDING);      //transport_connect(neighbor);      return 1;    }    else if (sent < 0 && errno==EWOULDBLOCK)      keep_sending = 0;#ifdef TRANSPORT_TRACE    printf ("macedon_tcp_transport: send sent %d bytes to %.8x %d index %d\n", sent, neighbor->get_destination(), port, index);#endif    if (sent > 0) {      neighbor->bytes_sent += sent;      work->running_buffer += sent;      work->running_size -= sent;      if (work->running_size == 0) {  // done sending this work        neighbor->items_sent ++;        neighbor->pop_front();        delete work;      }    }  }#ifdef TRANSPORT_TRACE  printf ("macedon_tcp_transport: send done destination %.8x port %d index %d items left %d\n", neighbor->get_destination(), port, neighbor->index, neighbor->send_queue_items);#endif  return 0;}// ---------------------------------------------- // wait_for_work // ---------------------------------------------- void macedon_tcp_transport::wait_for_work  (){#ifdef TRANSPORT_TRACE  //  printf("macedon_tcp_transport: waiting for work\n");#endif  // fflush(stdout);  lock ();  //we reserve slot zero for the listening socket  waiting_counter = 0;  waiting_structure[waiting_counter].fd = server_descriptor;  waiting_structure[waiting_counter].events =  POLLIN;#ifdef TRANSPORT_TRACE      printf ("macedon_tcp_transport: waiting_counter %d server_descriptor %d\n", waiting_counter,server_descriptor);#endif  macedon_connection* sample2 = macedon_connections;  while (sample2) {      if (sample2->status == MACEDON_CONNECTION_STATUS_connected ||        sample2->status == MACEDON_CONNECTION_STATUS_disconnecting) {      waiting_counter ++;      if (waiting_counter > sizeof( waiting_structure)/sizeof(struct pollfd )) {        fprintf (stderr, "counter exceeding maximum number of file descriptors: %d\n", waiting_counter);        terminate(100);      }#ifdef TRANSPORT_TRACE      printf ("macedon_tcp_transport: counter %d sample2->descriptor %d destination %.8x\n", waiting_counter,sample2->descriptor, sample2->neigh_socket_address.sin_addr.s_addr);#endif      waiting_structure[waiting_counter].fd =  sample2->descriptor;      if (sample2->send_queue)   // I have stuff to send for him        waiting_structure[waiting_counter].events = POLLIN | POLLOUT | POLLPRI;      else        waiting_structure[waiting_counter].events = POLLIN | POLLPRI;      if (waiting_structure[waiting_counter].fd >= MAXIMUM_DESCRIPTORS) {        printf("Exception: too big a file descriptor %d\n", waiting_structure[waiting_counter].fd);        exit(91);      }    }    else {#ifdef TRANSPORT_TRACE      printf("skip %.8x %d %d\n", sample2->get_destination(), sample2->status, sample2->descriptor);#endif    }    sample2 = sample2->next;  }  unlock ();#ifdef TRANSPORT_TRACE  //  printf("macedon_tcp_transport: polling on %d descriptors\n", waiting_counter+1);#endif  //  fflush(stdout);  int status = poll (waiting_structure, waiting_counter+1, 500);  if (status < 0 && errno != EINTR ) {    perror("Polling error: \n");    terminate(101);  }}// ---------------------------------------------- // accept_connections// ---------------------------------------------- void macedon_tcp_transport::accept_connections (){  int keep_accepting = 1;  struct sockaddr_in client;  socklen_t client_length = sizeof(sockaddr_in);  int flags = 0;  if (-1 == (flags = fcntl (server_descriptor, F_GETFL, 0)))    flags = 0;  fcntl (server_descriptor, F_SETFL, flags | O_NONBLOCK);    while (keep_accepting) {    if (num_fds > MAX_FDS)      cleanup();    int descriptor = ::accept( server_descriptor, (sockaddr *) & client, &client_length);    if (descriptor < 0) {      if (errno != EWOULDBLOCK) {    // error        printf ("Failure accepting a macedon_connection: %s\n", strerror(errno));        exit(93);      }      else   // no harm no foul        keep_accepting = 0;    }    else {      int yes = 1; //       setsockopt(descriptor, IPPROTO_TCP, TCP_NODELAY, //  		 &yes,sizeof(int));      int minimum = 4;	            setsockopt(descriptor, SOL_SOCKET, SO_RCVLOWAT,          (int *)&minimum, sizeof(int));  //        int bufsize = 2621440;	      //        setsockopt(descriptor, SOL_SOCKET, SO_RCVBUF,//            (int *)&bufsize, sizeof(int));  //        setsockopt(descriptor, SOL_SOCKET, SO_SNDBUF,//            (int *)&bufsize, sizeof(int));        //       int bs = 0;      //       setsockopt(descriptor, SOL_SOCKET, SO_LINGER,      //  		 (int *)&bs, sizeof(bs));      //        int bs = 16384;      //        setsockopt(descriptor, SOL_SOCKET, SO_SNDBUF,      //  		 (int *)&bs, sizeof(bs));        int side = (int)client.sin_addr.s_addr;      if ((side & htonl(0xff000000)) == htonl(0x0a000000))      {        //this is the Modelnet hack        side &= ~(htonl(0x00800000));       }      lock ();      macedon_connection* neighbor = find_macedon_connection (side, 0);      if (neighbor && neighbor->status != MACEDON_CONNECTION_STATUS_disconnected) {        if (neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting             || here_addr_ < side) {           // HACK this is probably a bug, what happens if the connection is in connecting state, this is a potential race condition          // deterministically pick which connection to keep so we don't bounce back and forth#ifdef TRANSPORT_TRACE_CONNECT          printf ("macedon_tcp_transport: %f peer already exists to %.8x %d, state %d, reusing connection %x, entry %x fds %d\n", wall_clock(), side, port, neighbor->status, this, neighbor, num_fds);#endif          //reset_connection(neighbor, TCP_PEER_ALREADY_EXISTS);          signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0);          if(neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting) {            //nothing special to do.#ifdef TRANSPORT_TRACE            printf("macedon_tcp_transport: peer %.8x %d was in preconnecting state, just assign new socket to it.\n", side, port);#endif          } else if(neighbor->status == MACEDON_CONNECTION_STATUS_connecting) {            //If it is connecting, we need to close the descriptor            neighbor->close_descriptor();#ifdef TRANSPORT_TRACE            printf("macedon_tcp_transport: peer %.8x %d was in connecting state, close connecting socket.\n", side, port);#endif          } else {            //In this case -- shift this neighbor to a new index to finish delivering data.            int index = find_available_index(neighbor->get_destination());            neighbor->index = index;#ifdef TRANSPORT_TRACE            printf("macedon_tcp_transport: peer %.8x %d was in connected state, assign new index %d and disconnect transport.\n", side, port, index);#endif            transport_disconnect(neighbor);#ifdef NEW_BW_FILTER            bandwidth_time_filter& bw = neighbor->get_filter();#else            bandwidth_filter& bw = neighbor->get_filter();#endif            neighbor = new_macedon_connection (side, (int)client.sin_port, 0);            neighbor->set_filter(bw);          }        }        else if (neighbor->descriptor != -1             && neighbor->status != MACEDON_CONNECTION_STATUS_disconnected) {#ifdef TRANSPORT_TRACE_CONNECT          printf ("macedon_tcp_transport: %f peer already exists to %.8x %d, keeping the old one %x entry %x fds %d\n", wall_clock(), side, port, this, neighbor, num_fds);#endif          int index;          macedon_connection* error_connection = new_macedon_connection_available_index(side, (int)client.sin_port, &index);          num_fds++;          printf("macedon_tcp_transport: creating error connection to %.8x, index=%d fd=%d\n", neighbor->neigh_socket_address.sin_addr.s_addr, index, descriptor);          error_connection->status = MACEDON_CONNECTION_STATUS_connected;          error_connection->descriptor = descriptor;          signal_error(TCP_PEER_ALREADY_EXISTS_OTHER, (int)neighbor->get_destination(), port, 0); //we have no idea how much data could have been lost          //::shutdown(descriptor,  SHUT_RDWR);          //::close( descriptor);          unlock(); //NOTE: Unlock before send, because send will re-lock          send(side, index, NULL, 0, NULL, 0);          continue;        }      }      else if (!neighbor) {        neighbor = new_macedon_connection (side, (int)client.sin_port, 0);      }      num_fds++;      neighbor->status = MACEDON_CONNECTION_STATUS_connected;      neighbor->descriptor = descriptor;      // shouldn't need to, but clean queues      if (neighbor->send_queue) {        neighbor->send_queue->running_buffer = neighbor->send_queue->stuff;          neighbor->send_queue->running_size = neighbor->send_queue->tot_size;      }      if (neighbor->recv_in_progress) {        delete neighbor->recv_in_progress;        neighbor->recv_in_progress = 0;      }      unlock ();#ifdef TRANSPORT_TRACE_CONNECT      //if (!output_quiet) {        printf ("macedon_tcp_transport: %f accepted a macedon_connection from %.8x:%d fd %d my port %d this %x entry %x fds %d\n", wall_clock(), side, client.sin_port, descriptor, port, this, neighbor, num_fds);      //}#endif      fflush(stdout);    }  }}// ---------------------------------------------- // do_work // ---------------------------------------------- void macedon_tcp_transport::do_work (){  while(first_error != NULL) {    ASSERT(from_agent == 0);    signal_error(first_error->tcp_error, first_error->ipaddr, first_error->port, first_error->msg_count);    delete first_error;    first_error = first_error->next_error;    if(first_error == NULL) {      last_error == NULL;    }  }  //REB: what's up with this??  if (wall_clock() - time_last_cleanup > CLEANUP_TIME) {    cleanup();    time_last_cleanup = wall_clock();    return;  }  for (int examine = 0; examine  <= waiting_counter ; examine++ ) {    if (examine == 0) {      //did we get any new macedon_connections?      if (waiting_structure[examine].revents & POLLIN)       {        accept_connections();    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -