📄 macedon_tcp_transport.cc
字号:
(int *)&minimum, sizeof(int)); // int bufsize = 2621440; // setsockopt(neighbor->descriptor, SOL_SOCKET, SO_RCVBUF,// (int *)&bufsize, sizeof(int)); // setsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF,// (int *)&bufsize, sizeof(int)); // int bs = 0; // setsockopt(neighbor->descriptor, SOL_SOCKET, SO_LINGER, // (int *)&bs, sizeof(bs)); // int bs = 16384; // setsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF, // (int *)&bs, sizeof(bs)); // int new_len; // int arglen = sizeof(new_len); // if (-1 == getsockopt(neighbor->descriptor, SOL_SOCKET, SO_SNDBUF, &new_len,(socklen_t*)&arglen)) // perror("getsockopt"); // printf("setsb %d\n", new_len); // setsockopt(neighbor->descriptor, IPPROTO_TCP, TCP_NODELAY, &yes,sizeof(int)); #ifdef TRANSPORT_TRACE_CONNECT //if (!output_quiet) { printf ("macedon_tcp_transport: %f connected to %.8x:%d descriptor %d transport port %d time %f fds %d overall %f\n", wall_clock(), neighbor->get_destination(), ntohs((int)socket_address.sin_port), neighbor->descriptor, port, wall_clock()-entry, num_fds, wall_clock() - neighbor->time_connect_initiated); //}#endif // shouldn't need to, but clean queues if (neighbor->send_queue) { neighbor->send_queue->running_buffer = neighbor->send_queue->stuff; neighbor->send_queue->running_size = neighbor->send_queue->tot_size; } if (neighbor->recv_in_progress) { delete neighbor->recv_in_progress; neighbor->recv_in_progress = 0; } neighbor->status = MACEDON_CONNECTION_STATUS_connected; } else { // must have been accepted in the meantime ::shutdown(descriptor, SHUT_RDWR); ::close( descriptor); } } }#ifdef TRANSPORT_TRACE_LOCK printf("macedon_tcp_transport: %x cond_wait (release lock) on %x by transport %x held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif ASSERT(lock_held == pthread_self()); lock_held = 0; pthread_cond_wait(&connect_cond, &pool_lock);#ifdef TRANSPORT_TRACE_LOCK printf("macedon_tcp_transport: %x cond_wait done (acquire lock) on %x by transport %x held_by %d\n",pthread_self(),&pool_lock,this,lock_held);#endif lock_held = pthread_self(); } unlock(); return;}// ---------------------------------------------- // send_more// ---------------------------------------------- int macedon_tcp_transport::send_more (macedon_connection *neighbor){ ASSERT(lock_held == pthread_self());#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: send destination %.8x status %d port %d index %d\n", neighbor->get_destination(), neighbor->status, port, neighbor->index); fflush(stdout);#endif if (!neighbor->send_queue) { return 1; } if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnecting) neighbor->status = MACEDON_CONNECTION_STATUS_connected; else if (neighbor->status == MACEDON_CONNECTION_STATUS_disconnected ) { transport_connect (neighbor); return 1; } else if (neighbor->status == MACEDON_CONNECTION_STATUS_connecting || neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting) { return 1; } //if(from_agent) return 0; int sent = 0; int descriptor = neighbor->descriptor; int keep_sending=1; while (keep_sending && neighbor->send_queue) { macedon_transport_send_work* work = neighbor->send_queue;#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: send going for %d bytes to %.8x port %d index %d\n", work-> running_size, neighbor->get_destination(), port, neighbor->index); // dump_hex(work->running_buffer, work->running_size);#endif int flags = 0; if (-1 == (flags = fcntl (descriptor, F_GETFL, 0))) flags = 0; fcntl (descriptor, F_SETFL, flags | O_NONBLOCK); int * dude = (int *) (work->stuff); // if (*dude > 4000) { // printf("Problem with header size when sending %d\n", dude); // exit(86); // } sent = write (descriptor, work->running_buffer, work->running_size); neighbor->time_last_used = wall_clock(); if (sent < 0 && errno!=EWOULDBLOCK) { printf ("macedon_tcp_transport: send error on socket to %.8x port %d index %d fd %d: %s\n", neighbor->get_destination(), port, neighbor->index, neighbor->descriptor, strerror(errno)); reset_connection (neighbor, TCP_ERROR_SENDING); //transport_connect(neighbor); return 1; } else if (sent < 0 && errno==EWOULDBLOCK) keep_sending = 0;#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: send sent %d bytes to %.8x %d index %d\n", sent, neighbor->get_destination(), port, index);#endif if (sent > 0) { neighbor->bytes_sent += sent; work->running_buffer += sent; work->running_size -= sent; if (work->running_size == 0) { // done sending this work neighbor->items_sent ++; neighbor->pop_front(); delete work; } } }#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: send done destination %.8x port %d index %d items left %d\n", neighbor->get_destination(), port, neighbor->index, neighbor->send_queue_items);#endif return 0;}// ---------------------------------------------- // wait_for_work // ---------------------------------------------- void macedon_tcp_transport::wait_for_work (){#ifdef TRANSPORT_TRACE // printf("macedon_tcp_transport: waiting for work\n");#endif // fflush(stdout); lock (); //we reserve slot zero for the listening socket waiting_counter = 0; waiting_structure[waiting_counter].fd = server_descriptor; waiting_structure[waiting_counter].events = POLLIN;#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: waiting_counter %d server_descriptor %d\n", waiting_counter,server_descriptor);#endif macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2->status == MACEDON_CONNECTION_STATUS_connected || sample2->status == MACEDON_CONNECTION_STATUS_disconnecting) { waiting_counter ++; if (waiting_counter > sizeof( waiting_structure)/sizeof(struct pollfd )) { fprintf (stderr, "counter exceeding maximum number of file descriptors: %d\n", waiting_counter); terminate(100); }#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: counter %d sample2->descriptor %d destination %.8x\n", waiting_counter,sample2->descriptor, sample2->neigh_socket_address.sin_addr.s_addr);#endif waiting_structure[waiting_counter].fd = sample2->descriptor; if (sample2->send_queue) // I have stuff to send for him waiting_structure[waiting_counter].events = POLLIN | POLLOUT | POLLPRI; else waiting_structure[waiting_counter].events = POLLIN | POLLPRI; if (waiting_structure[waiting_counter].fd >= MAXIMUM_DESCRIPTORS) { printf("Exception: too big a file descriptor %d\n", waiting_structure[waiting_counter].fd); exit(91); } } else {#ifdef TRANSPORT_TRACE printf("skip %.8x %d %d\n", sample2->get_destination(), sample2->status, sample2->descriptor);#endif } sample2 = sample2->next; } unlock ();#ifdef TRANSPORT_TRACE // printf("macedon_tcp_transport: polling on %d descriptors\n", waiting_counter+1);#endif // fflush(stdout); int status = poll (waiting_structure, waiting_counter+1, 500); if (status < 0 && errno != EINTR ) { perror("Polling error: \n"); terminate(101); }}// ---------------------------------------------- // accept_connections// ---------------------------------------------- void macedon_tcp_transport::accept_connections (){ int keep_accepting = 1; struct sockaddr_in client; socklen_t client_length = sizeof(sockaddr_in); int flags = 0; if (-1 == (flags = fcntl (server_descriptor, F_GETFL, 0))) flags = 0; fcntl (server_descriptor, F_SETFL, flags | O_NONBLOCK); while (keep_accepting) { if (num_fds > MAX_FDS) cleanup(); int descriptor = ::accept( server_descriptor, (sockaddr *) & client, &client_length); if (descriptor < 0) { if (errno != EWOULDBLOCK) { // error printf ("Failure accepting a macedon_connection: %s\n", strerror(errno)); exit(93); } else // no harm no foul keep_accepting = 0; } else { int yes = 1; // setsockopt(descriptor, IPPROTO_TCP, TCP_NODELAY, // &yes,sizeof(int)); int minimum = 4; setsockopt(descriptor, SOL_SOCKET, SO_RCVLOWAT, (int *)&minimum, sizeof(int)); // int bufsize = 2621440; // setsockopt(descriptor, SOL_SOCKET, SO_RCVBUF,// (int *)&bufsize, sizeof(int)); // setsockopt(descriptor, SOL_SOCKET, SO_SNDBUF,// (int *)&bufsize, sizeof(int)); // int bs = 0; // setsockopt(descriptor, SOL_SOCKET, SO_LINGER, // (int *)&bs, sizeof(bs)); // int bs = 16384; // setsockopt(descriptor, SOL_SOCKET, SO_SNDBUF, // (int *)&bs, sizeof(bs)); int side = (int)client.sin_addr.s_addr; if ((side & htonl(0xff000000)) == htonl(0x0a000000)) { //this is the Modelnet hack side &= ~(htonl(0x00800000)); } lock (); macedon_connection* neighbor = find_macedon_connection (side, 0); if (neighbor && neighbor->status != MACEDON_CONNECTION_STATUS_disconnected) { if (neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting || here_addr_ < side) { // HACK this is probably a bug, what happens if the connection is in connecting state, this is a potential race condition // deterministically pick which connection to keep so we don't bounce back and forth#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f peer already exists to %.8x %d, state %d, reusing connection %x, entry %x fds %d\n", wall_clock(), side, port, neighbor->status, this, neighbor, num_fds);#endif //reset_connection(neighbor, TCP_PEER_ALREADY_EXISTS); signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0); if(neighbor->status == MACEDON_CONNECTION_STATUS_preconnecting) { //nothing special to do.#ifdef TRANSPORT_TRACE printf("macedon_tcp_transport: peer %.8x %d was in preconnecting state, just assign new socket to it.\n", side, port);#endif } else if(neighbor->status == MACEDON_CONNECTION_STATUS_connecting) { //If it is connecting, we need to close the descriptor neighbor->close_descriptor();#ifdef TRANSPORT_TRACE printf("macedon_tcp_transport: peer %.8x %d was in connecting state, close connecting socket.\n", side, port);#endif } else { //In this case -- shift this neighbor to a new index to finish delivering data. int index = find_available_index(neighbor->get_destination()); neighbor->index = index;#ifdef TRANSPORT_TRACE printf("macedon_tcp_transport: peer %.8x %d was in connected state, assign new index %d and disconnect transport.\n", side, port, index);#endif transport_disconnect(neighbor);#ifdef NEW_BW_FILTER bandwidth_time_filter& bw = neighbor->get_filter();#else bandwidth_filter& bw = neighbor->get_filter();#endif neighbor = new_macedon_connection (side, (int)client.sin_port, 0); neighbor->set_filter(bw); } } else if (neighbor->descriptor != -1 && neighbor->status != MACEDON_CONNECTION_STATUS_disconnected) {#ifdef TRANSPORT_TRACE_CONNECT printf ("macedon_tcp_transport: %f peer already exists to %.8x %d, keeping the old one %x entry %x fds %d\n", wall_clock(), side, port, this, neighbor, num_fds);#endif int index; macedon_connection* error_connection = new_macedon_connection_available_index(side, (int)client.sin_port, &index); num_fds++; printf("macedon_tcp_transport: creating error connection to %.8x, index=%d fd=%d\n", neighbor->neigh_socket_address.sin_addr.s_addr, index, descriptor); error_connection->status = MACEDON_CONNECTION_STATUS_connected; error_connection->descriptor = descriptor; signal_error(TCP_PEER_ALREADY_EXISTS_OTHER, (int)neighbor->get_destination(), port, 0); //we have no idea how much data could have been lost //::shutdown(descriptor, SHUT_RDWR); //::close( descriptor); unlock(); //NOTE: Unlock before send, because send will re-lock send(side, index, NULL, 0, NULL, 0); continue; } } else if (!neighbor) { neighbor = new_macedon_connection (side, (int)client.sin_port, 0); } num_fds++; neighbor->status = MACEDON_CONNECTION_STATUS_connected; neighbor->descriptor = descriptor; // shouldn't need to, but clean queues if (neighbor->send_queue) { neighbor->send_queue->running_buffer = neighbor->send_queue->stuff; neighbor->send_queue->running_size = neighbor->send_queue->tot_size; } if (neighbor->recv_in_progress) { delete neighbor->recv_in_progress; neighbor->recv_in_progress = 0; } unlock ();#ifdef TRANSPORT_TRACE_CONNECT //if (!output_quiet) { printf ("macedon_tcp_transport: %f accepted a macedon_connection from %.8x:%d fd %d my port %d this %x entry %x fds %d\n", wall_clock(), side, client.sin_port, descriptor, port, this, neighbor, num_fds); //}#endif fflush(stdout); } }}// ---------------------------------------------- // do_work // ---------------------------------------------- void macedon_tcp_transport::do_work (){ while(first_error != NULL) { ASSERT(from_agent == 0); signal_error(first_error->tcp_error, first_error->ipaddr, first_error->port, first_error->msg_count); delete first_error; first_error = first_error->next_error; if(first_error == NULL) { last_error == NULL; } } //REB: what's up with this?? if (wall_clock() - time_last_cleanup > CLEANUP_TIME) { cleanup(); time_last_cleanup = wall_clock(); return; } for (int examine = 0; examine <= waiting_counter ; examine++ ) { if (examine == 0) { //did we get any new macedon_connections? if (waiting_structure[examine].revents & POLLIN) { accept_connections(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -