📄 macedon_tcp_transport.cc
字号:
//Copyright (c) 2004, Charles Killian, Adolfo Rodriguez, Dejan Kostic, Sooraj Bhat, and Amin Vahdat//All rights reserved.////Redistribution and use in source and binary forms, with or without//modification, are permitted provided that the following conditions are met://// * Redistributions of source code must retain the above copyright// notice, this list of conditions and the following disclaimer.// * Redistributions in binary form must reproduce the above copyright// notice, this list of conditions and the following disclaimer in// the documentation and/or other materials provided with the// distribution.// * Neither the names of Duke University nor The University of// California, San Diego, nor the names of its contributors// may be used to endorse or promote products derived from// this software without specific prior written permission.////THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"//AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE//IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE//DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE//FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL//DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR//SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER//CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,//OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE//USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.#include <poll.h>#include <fcntl.h>#include <signal.h>#include <stdarg.h> #include <math.h> #include <stdio.h> #include <strings.h>#include <sys/time.h> #include <sys/uio.h>#include <pthread.h>#include "macedon.h"#include "string.h"#include <netinet/tcp.h>#include "scheduler.h"#include "ext.h"#include "tcp_auto_ext.h"extern int num_fds;extern int threads_created;extern int threads_joined;extern int threads_exited;extern int threads_cancelled;extern int threads_tfrc;extern int threads_timer;extern int threads_tcp;void macedon_tcp_transport::signal_error(int tcp_error, int ipaddr, int port, int msg_count) { if(from_agent) { //CK: We could include "deferred" as a signal. macedon_tcp_error* deferred_error = new macedon_tcp_error(tcp_error, ipaddr, port, msg_count); if(last_error == NULL) { ASSERT(first_error == NULL); last_error = deferred_error; first_error = deferred_error; } else { last_error->next_error = deferred_error; last_error = last_error->next_error; } return; } int did_lock = 0; if(lock_held != 0) { ASSERT(lock_held == pthread_self()); unlock(); did_lock = 1;} printf("macedon_tcp_transport::signal_error: signal_error %d on %.8x:%d msg_count=%d at %f\n", tcp_error, ipaddr, port, msg_count, wall_clock()); struct transport_error_struct arg; arg.transport_error = tcp_error; arg.dest_addr = ipaddr; arg.port = port; arg.known_lost = msg_count; if (receiver && error_handler) { ( receiver->*error_handler) (AUTOEXT_TRANSPORT_ERROR, (void*)&arg); } if(did_lock) { lock(); } }void debug (int level, char *format, ... );macedon_tcp_transport::macedon_tcp_transport(int ipaddr, int port, int queue_size): macedon_transport(ipaddr, port, queue_size), server_descriptor(- 1), waiting_counter(0), time_last_cleanup(0.0), virtual_time(0){ output_quiet = parameters.getint("quiet"); if (!output_quiet) { printf("Using Adolfo TCP transport port %d\n", port); } bzero(&server_socket, sizeof(server_socket)); server_socket.sin_family = AF_INET; server_socket.sin_port = htons(port); server_socket.sin_addr.s_addr = here_addr_; if ( (server_descriptor = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "macedon_tcp_transport: failure creating server socket port %d: %s\n", port, strerror(errno)); terminate(31); } int yes = 1; setsockopt(server_descriptor, SOL_SOCKET, SO_REUSEADDR, (int *)&yes, sizeof(int)); // printf("setsockopt TCP_NODELAY!\n"); // setsockopt(server_descriptor, IPPROTO_TCP, TCP_NODELAY, &yes,sizeof(int)); if (bind(server_descriptor, (struct sockaddr *) & server_socket, sizeof (server_socket)) < 0) { fprintf(stderr, "macedon_tcp_transport: failure binding server port %d", port, strerror(errno)); terminate(32); } num_fds++; if (listen (server_descriptor, 1000) != 0) { fprintf(stderr, "macedon_tcp_transport: failure listening on server port %d\n", port); terminate(33); } keep_connecting = true; pthread_cond_init(&connect_cond, NULL); for(int i=0; i<NUM_CONNECT_THREADS; i++ ) { pthread_t ctid; if (pthread_create(&ctid, NULL, connecter, this) != 0) { fprintf(stderr, "macedon_tcp_transport: pthread_create port %d %s", port, strerror(errno)); printf ("macedon_tcp_transport: Error in creating connect thread. port %d\n", port); abort(); exit(81); } } initialized = 1; if (!output_quiet) { printf("macedon_tcp_transport listening on port %d, fd %d this %x\n", port, server_descriptor, this); }}macedon_tcp_transport::~macedon_tcp_transport(){}// ---------------------------------------------- // terminate// ---------------------------------------------- void macedon_tcp_transport::terminate (int code){ int did_lock = 0; if(lock_held != pthread_self()) { lock(); did_lock = 1;} if (server_descriptor != -1) { num_fds--; ::shutdown( server_descriptor, SHUT_RDWR); ::close(server_descriptor); server_descriptor = -1; } macedon_connection* sample2 = macedon_connections; while (sample2) { while (sample2->send_queue) { macedon_transport_work* work = sample2->pop_front(); // delete work; } sample2 = sample2->next; // delete sample2; } if(did_lock) { unlock(); } if (code) exit(code); else pthread_exit(NULL);}// ---------------------------------------------- // reset_connection// ---------------------------------------------- int macedon_tcp_transport::reset_connection (macedon_connection *neighbor, int tcp_error){ ASSERT(lock_held==pthread_self()); int msg_count = 0; if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected) return 1;#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f resetting %.8x port %d fd %d index %d status %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->descriptor, neighbor->index, neighbor->status, num_fds);#endif if(neighbor->descriptor >= 0) { num_fds--; } neighbor->close_descriptor(); neighbor->status = MACEDON_CONNECTION_STATUS_disconnected; if (neighbor->send_queue) { msg_count + 1; //TODO: May need more here. neighbor->send_queue->running_buffer = neighbor->send_queue->stuff; neighbor->send_queue->running_size = neighbor->send_queue->tot_size; } if (neighbor->recv_in_progress) { delete neighbor->recv_in_progress; neighbor->recv_in_progress = 0; } if(tcp_error && neighbor->index == 0) { signal_error(tcp_error, (int)neighbor->get_destination(), port, msg_count); }}// ---------------------------------------------- // abort_connection// ---------------------------------------------- int macedon_tcp_transport::transport_abort (macedon_connection *neighbor){ return reset_connection(neighbor, 0);}// ---------------------------------------------- // transport_disconnect// ---------------------------------------------- int macedon_tcp_transport::transport_disconnect (macedon_connection *neighbor){ ASSERT(lock_held==pthread_self());#if 0 reset_connection (neighbor); printf ("macedon_tcp_transport: %f disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), neighbor->descriptor, num_fds); return 0;#endif if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected ) {#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f %.8x %d index %d disconnecting but already disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif return 0; } if (!neighbor->send_queue && !neighbor->recv_in_progress) { // Adolfo: could also delete the neighbor here reset_connection (neighbor, NO_ERROR);#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f %.8x %d index %d disconnected %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif } else { // work still needs to be done neighbor->status = MACEDON_CONNECTION_STATUS_disconnecting;#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f %.8x %d index %d disconnecting %.8x fd %d fds %d\n", wall_clock(), neighbor->get_destination(), port, neighbor->index, neighbor, neighbor->descriptor, num_fds);#endif } return 0;}// ---------------------------------------------- // transport_connect// ---------------------------------------------- int macedon_tcp_transport::transport_connect (macedon_connection *neighbor){ ASSERT(lock_held == pthread_self()); //lock();#ifdef TRANSPORT_TRACE printf("macedon_tcp_transport: transport_connect called for destination %.8x at %f\n", neighbor->get_destination(), wall_clock());#endif neighbor->status = MACEDON_CONNECTION_STATUS_preconnecting; neighbor->time_connect_initiated = wall_clock(); neighbor->virtual_time_initiated = virtual_time++; pthread_cond_signal(&connect_cond); //unlock();}// ---------------------------------------------- // connecter // separate thread to do the connect// ---------------------------------------------- void *connecter (void *input){ macedon_tcp_transport *trans = (macedon_tcp_transport *)input; trans->do_connect();}// ---------------------------------------------- // do_connect// ---------------------------------------------- void macedon_tcp_transport::do_connect(){ lock(); while (keep_connecting) { int check_time = -1; /* double check_time = -1.0; */ macedon_connection* neighbor; while ( (neighbor = find_macedon_connection_by_vtime(check_time)) != NULL) { check_time = neighbor->virtual_time_initiated; double entry=wall_clock(); struct sockaddr_in socket_address; bzero(&socket_address, sizeof(socket_address)); socket_address.sin_family = AF_INET; neighbor->neigh_socket_address.sin_port = htons(port); socket_address.sin_port = htons(port); socket_address.sin_addr.s_addr = neighbor->get_destination(); if (neighbor->descriptor < 0 ) { if ( (neighbor->descriptor = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf (stderr, "failure creating connect socket: %s\n", strerror(errno)); exit(90); } } int descriptor = neighbor->descriptor; neighbor->status = MACEDON_CONNECTION_STATUS_connecting; if (num_fds > MAX_FDS) cleanup(); pthread_cond_signal(&connect_cond); unlock();#ifdef TRANSPORT_TRACE_CONNECT //if (!output_quiet) { printf("macedon_tcp_transport: %f trying to connect to %.8x %d %x %d fds %d\n", wall_clock(), (int )socket_address.sin_addr.s_addr, ntohs((int )socket_address.sin_port), neighbor, descriptor, num_fds); //}#endif int myret = ::connect(descriptor, (struct sockaddr *) &socket_address, sizeof(socket_address)); lock(); if (myret != 0) { if ( neighbor->status != MACEDON_CONNECTION_STATUS_connecting ) {#ifdef TRANSPORT_TRACE_CONNECT //if (!output_quiet) { printf ("connect failed to %.8x:%d descriptor %d %s time %f but neighbor status changed, so ignoring\n", (int )socket_address.sin_addr.s_addr, ntohs((int )socket_address.sin_port), descriptor, strerror(errno), wall_clock()); //}#endif continue; }#ifdef TRANSPORT_TRACE_CONNECT //if (!output_quiet) { printf ("connect failed to %.8x:%d descriptor %d %s time %f\n", (int )socket_address.sin_addr.s_addr, ntohs((int )socket_address.sin_port), descriptor, strerror(errno), wall_clock()); //}#endif neighbor->status = MACEDON_CONNECTION_STATUS_disconnected; //Since connrefused, now discconnected. neighbor->time_connect_initiated = entry; //Set initiated time to now to move to end of queue. neighbor->virtual_time_initiated = virtual_time++; //Clear out the send queue on a connect failed. int msg_count = 0; while (neighbor->send_queue) { msg_count++; macedon_transport_send_work* work = neighbor->send_queue; neighbor->pop_front(); delete work; } signal_error(TCP_CONNECT_FAILED, (int )socket_address.sin_addr.s_addr, port, msg_count); } else { if (descriptor == neighbor->descriptor || neighbor->descriptor == -1) { num_fds++; neighbor->descriptor = descriptor; int yes = 1;// setsockopt(descriptor, IPPROTO_TCP, TCP_NODELAY, // &yes,sizeof(int)); setsockopt(neighbor->descriptor, SOL_SOCKET, SO_REUSEADDR, (int *)&yes, sizeof(int)); int minimum = 2*sizeof(int); setsockopt(neighbor->descriptor, SOL_SOCKET, SO_RCVLOWAT,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -