📄 macedon_tcp_transport.cc
字号:
else if ((waiting_structure[examine].revents & POLLERR) || (waiting_structure[examine].revents & POLLHUP)) { printf("Exception: TCP main server socket has a problem!\n"); exit(94); } } else { if ((waiting_structure[examine].revents & POLLERR) || (waiting_structure[examine].revents & POLLNVAL) || (waiting_structure[examine].revents & POLLHUP)) {#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: error/hup on fd %d waiting_structure[%d].revents: %x\n", waiting_structure[examine].fd, examine, waiting_structure[examine].revents);#endif lock (); macedon_connection*neighbor = find_macedon_connection_by_fd(waiting_structure[examine].fd); if (neighbor) {#ifdef TRANSPORT_TRACE printf ("poll error on %.8x fd %d\n", neighbor->get_destination(), neighbor->descriptor);#endif reset_connection(neighbor, TCP_POLL_ERROR); // transport_connect(neighbor); } else { num_fds--; ::shutdown( waiting_structure[examine].fd, SHUT_RDWR); ::close( waiting_structure[examine].fd ); } unlock (); } else if (waiting_structure[examine].revents & POLLIN || waiting_structure[examine].revents & POLLPRI || waiting_structure[examine].revents & POLLOUT) { // printf("%f Poll called: IN: %d PRI: %d OUT: %d fd: %d\n", // wall_clock(), // waiting_structure[examine].revents & POLLIN, // waiting_structure[examine].revents & POLLPRI, // waiting_structure[examine].revents & POLLOUT, // waiting_structure[examine].fd); actual_work(waiting_structure[examine].fd); } else if (waiting_structure[examine].revents ) { fprintf (stderr, "fd %d waiting_structure[%d].revents: %x\n", waiting_structure[examine].fd, examine, waiting_structure[examine].revents); } } }}// ---------------------------------------------- // cleanup// ---------------------------------------------- void macedon_tcp_transport::cleanup(){ lock(); double curtime = wall_clock(); macedon_connection* sample4 = macedon_connections; macedon_connection* oldest = sample4; double oldest_time; if (sample4) oldest_time = sample4->time_last_used; while (sample4) { int nuke=0; if (sample4->time_last_used < oldest_time) { oldest = sample4; oldest_time = sample4->time_last_used; } if ( curtime > sample4->time_last_used + TRANSPORT_TIMEOUT ) { if (sample4->status == MACEDON_CONNECTION_STATUS_connected) { if (!sample4->send_queue) { if (!sample4->recv_in_progress) { // nuke = 1; } else { if (sample4->recv_in_progress->field_bytes ==0) { // disable nuking for now // nuke = 1; delete sample4->recv_in_progress; sample4->recv_in_progress = 0; } // else // printf("Wanted to nuke %.8x but could not cause recv queue %x\n", sample4->get_destination(), sample4->recv_in_progress); } } // else // printf("Wanted to nuke %.8x but could not cause send queue %d %x\n", sample4->get_destination(), sample4->send_queue_items, sample4->send_queue); } } if (nuke) { // #ifdef TRANSPORT_TRACE_CONNECT printf("macedon_tcp_transport: %f nuking %.8x %d index %d %x %x %f fds %d\n", wall_clock(), sample4->get_destination(), port, sample4->index, sample4->send_queue, sample4->recv_in_progress, sample4->time_last_used, num_fds); // #endif reset_connection(sample4, NO_ERROR); } sample4 = sample4->next; } if (num_fds > MAX_FDS) { printf("macedon_tcp_transport: %f nuking oldest %.8x %d index %d %f %x fds %d\n", wall_clock(), oldest->get_destination(), port, index, oldest_time, oldest, num_fds); reset_connection (oldest, TCP_CLEANUP_KILLING_OLDEST); } macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2->status == MACEDON_CONNECTION_STATUS_connected || sample2->status == MACEDON_CONNECTION_STATUS_disconnecting) { send_more(sample2); if (sample2->status == MACEDON_CONNECTION_STATUS_connected || sample2->status == MACEDON_CONNECTION_STATUS_disconnecting) recv_more(sample2); } sample2 = sample2->next; } unlock();}// ---------------------------------------------- // actual_work// ---------------------------------------------- void macedon_tcp_transport::actual_work (int desc){ lock (); macedon_connection* receiver = find_macedon_connection_by_fd(desc); if (! receiver ) { printf ("Exception: macedon_tcp_transport: %f null peer in select %d\n", wall_clock(), desc); num_fds--; ::shutdown( desc, SHUT_RDWR); ::close( desc ); unlock(); return; } send_more (receiver); if (receiver->status == MACEDON_CONNECTION_STATUS_connected || receiver->status == MACEDON_CONNECTION_STATUS_disconnecting) recv_more (receiver); if (!receiver->send_queue && !receiver->recv_in_progress && receiver->status == MACEDON_CONNECTION_STATUS_disconnecting) reset_connection(receiver, NO_ERROR); unlock (); return;}// ---------------------------------------------- // recv_more// ---------------------------------------------- int macedon_tcp_transport::recv_more (macedon_connection *neighbor){ ASSERT(lock_held == pthread_self()); static int inst_count=0; inst_count ++; macedon_transport_recv_work *recv_work; int descriptor = neighbor->descriptor; int flags; if (-1 == (flags = fcntl (descriptor, F_GETFL, 0))) flags = 0; fcntl (descriptor, F_SETFL, flags | O_NONBLOCK); int keep_receiving=1; while (keep_receiving) { if (neighbor->recv_in_progress) { recv_work = neighbor->recv_in_progress; // printf("recv was in progress %x %d %x %.8x\n", recv_work, inst_count, neighbor, neighbor->get_destination()); neighbor->recv_in_progress = 0; } else { try { recv_work = new macedon_transport_recv_work(); // printf("alloced recv work element %x\n", recv_work); } catch (...) { printf("Exception: out of mem allocing new recv work\n"); exit(97); } } int sizesHeader = 2 * sizeof(int); if (recv_work->field_bytes < sizesHeader) { // sizes not read completely errno = 0; //printf("Trying to read %d bytes from %.8x on fd %d\n",2*sizeof(int)-recv_work->field_bytes, neighbor->neigh_socket_address.sin_addr.s_addr, descriptor); errno = 0; int bread = (int)::read(descriptor, ((char*)&recv_work->header_size)+recv_work->field_bytes, sizesHeader-recv_work->field_bytes); //printf("Read %d bytes\n", bread); if ((bread < 0 && errno != EWOULDBLOCK && errno != EAGAIN) || bread == 0) { // read error printf("macedon_tcp_transport: receive error on fd %d %.8x port %d %s bread=%d errno=%d index=%d\n", neighbor->descriptor, neighbor->get_destination(), port, strerror(errno), bread, errno, neighbor->index); reset_connection(neighbor, TCP_RECV_ERROR); if(neighbor->index == 0) { // transport_connect(neighbor); } delete recv_work; return 1; } else if (bread < 0) { // must be EWOULDBLOCK , try later // printf("queueing recv 2 %x %d %x %.8x\n", recv_work, inst_count, neighbor, neighbor->get_destination()); keep_receiving = 0; neighbor->recv_in_progress = recv_work; } else { // read something neighbor->bandwidth.update(bread); // Try to update the bytes on the zero index connection also if(neighbor->index != 0) { macedon_connection* index_zero = find_macedon_connection(neighbor->get_destination(), 0); if(index_zero != NULL) index_zero->bandwidth.update(bread); } if (bread > sizesHeader-recv_work->field_bytes) { printf ("Read in too much data to be headers %d\n", bread); exit(92); } recv_work->field_bytes+= bread; if (recv_work->field_bytes < sizesHeader) { // still not done // printf("queueing recv 3 %x %d \n", recv_work, inst_count, neighbor, neighbor->get_destination()); neighbor->recv_in_progress = recv_work; continue; } else { // read in the sizes completely neighbor->bytes_received +=sizesHeader; recv_work->tot_size = recv_work->header_size + recv_work->data_size; if (recv_work->tot_size != 0) { int trytoread = recv_work->tot_size-recv_work->running_size; // if (trytoread < 4000) { recv_work->stuff = (unsigned char *) malloc(recv_work->tot_size+100); if (recv_work->stuff == 0) { printf("Malloc exception 2 for size %x %d!\n", recv_work->stuff, recv_work->tot_size); reset_connection(neighbor, TCP_RECV_ERROR); /* abort(); */ /* exit(72); */ } recv_work->running_buffer = recv_work->stuff; }#if 0 else { printf("Exception: trying to read too much data %d fd %d %.8x %s\n", trytoread, neighbor->descriptor, neighbor->get_destination(), strerror(errno)); reset_connection(neighbor); transport_connect(neighbor); delete recv_work; return 1; }#endif } else { if(neighbor->index == 0) { printf ("macedon_tcp_transport: %f peer socket to %.8x %d closed, expecting incoming connection. entry %x fds %d\n", wall_clock(), neighbor->get_destination(), port, this, neighbor, num_fds); int index = find_available_index(neighbor->get_destination()); signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0); neighbor->index = index; transport_disconnect(neighbor); new_macedon_connection(neighbor->get_destination(), htons(port), 0); //reset_connection(neighbor,TCP_PEER_ALREADY_EXISTS); } else { transport_disconnect(neighbor); } keep_receiving = 0; } } } } if (keep_receiving) { // try to read the rest of the data int trytoread = recv_work->tot_size-recv_work->running_size; // if (trytoread > 4000) { // if (recv_work->running_size) // dump_hex((void *)recv_work->stuff, recv_work->running_size); // printf("Exception: macedon_tcp_transport: %f trying to read way too much %d, sizes are %d %d %d %d %.8x\n", wall_clock(), trytoread, recv_work->header_size, recv_work->data_size, neighbor->descriptor, descriptor, neighbor->get_destination()); // fflush(stdout); // terminate (110); // } int bread = 0; if (trytoread) { //printf("Trying to read %d bytes from %.8x on fd %d\n", trytoread, neighbor->neigh_socket_address.sin_addr.s_addr, descriptor); errno = 0; bread =(int)::read(descriptor, recv_work->running_buffer, trytoread); //printf("Read %d bytes\n", bread); } // dump_hex((void *)recv_work->running_buffer, bread); if (bread > 0) { recv_work->running_buffer += bread; recv_work->running_size += bread; neighbor->bytes_received += (int)bread; neighbor->bandwidth.update(bread); // Also update the zero index neighbor also if(neighbor->index != 0) { macedon_connection* index_zero = find_macedon_connection(neighbor->get_destination(), 0); if(index_zero != NULL) index_zero->bandwidth.update(bread); } } else { // read error if (bread < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { // try later keep_receiving = 0; bread = 0; } else { // real error printf("macedon_tcp_transport: receive error on fd %d %.8x port %d %s bread=%d trytoread=%d errno=%d index=%d\n", neighbor->descriptor, neighbor->get_destination(), port, strerror(errno), bread, trytoread, errno, neighbor->index); // printf("data received error on fd %d %.8x %s\n", neighbor->descriptor, neighbor->get_destination(), strerror(errno)); reset_connection(neighbor, TCP_RECV_ERROR); // transport_connect(neighbor); delete recv_work; return 1; } } if (recv_work->running_size != recv_work->tot_size) { keep_receiving = 0;#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: did not read all the data, %d bytes so far (%d this time), %d were promised, queueing work %x %.8x:%d index %d fd %d\n", recv_work->running_size, bread, recv_work->tot_size, recv_work, neighbor->get_destination(), port, neighbor->index, descriptor); fflush(stdout);#endif neighbor->recv_in_progress = recv_work; } else { int trace=0; // if (recv_work->tot_size >= 18*sizeof(int)) { // int * dude = (int *) (recv_work->stuff+17*sizeof(int)); // if (*dude == 900) // trace scribe anycast msgs // dump_hex((void *)recv_work->stuff, recv_work->tot_size); // }#ifdef TRANSPORT_TRACE printf ("macedon_tcp_transport: read all the data, %d bytes with %d header (%d this time), buff is %x %x %.8x:%d index %d fd %d\n", recv_work->running_size, recv_work->header_size, bread, recv_work->stuff, recv_work->stuff+recv_work->header_size, neighbor->get_destination(), port, neighbor->index, descriptor); fflush(stdout);#endif#ifdef TRANSPORT_MASSIVE_TRACE { int destination = neighbor->get_destination(); int header_size = recv_work->header_size; int data_size = recv_work->running_size - recv_work->header_size; int offset = 0; printf("macedon_transport: massive trace dump, recv from %.8x:%d hsize %d size %d\n", destination, port, header_size, data_size); printf("macedon_tcp_transport: header bytes %.8x:%d ", destination, port); dump_hex(recv_work->stuff+recv_work->psize, recv_work->header_size); printf("macedon_tcp_transport: data bytes %.8x:%d ", destination, port); dump_hex(recv_work->stuff +offset+ recv_work->header_size, recv_work->running_size - recv_work->header_size - offset); }#endif neighbor->items_received++; int mydest = neighbor->get_destination(); neighbor->time_last_used = wall_clock(); if (receiver && recv_work->tot_size) { int offset = 0; if(recv_work->tot_size - offset <= 0) { if(neighbor->index == 0) { printf ("macedon_tcp_transport: %f peer socket to %.8x %d closed, expecting incoming connection. entry %x fds %d\n", wall_clock(), neighbor->get_destination(), port, this, neighbor, num_fds); int index = find_available_index(neighbor->get_destination()); signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0); neighbor->index = index; transport_disconnect(neighbor); new_macedon_connection(neighbor->get_destination(), htons(port), 0); //reset_connection(neighbor,TCP_PEER_ALREADY_EXISTS); } else { transport_disconnect(neighbor); } } else { unlock(); ( receiver->*receive_handler) (mydest, recv_work->stuff+offset, recv_work->header_size, recv_work->stuff+offset+recv_work->header_size, recv_work->tot_size-recv_work->header_size-offset); lock(); } } //printf ("macedon_tcp_transport: timings (base->lock: %f, base->recv: %f, tcp->lock: %f)\n",(b-a),(c-b),(d-c)); delete recv_work; } } } return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -