⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_tcp_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
📖 第 1 页 / 共 3 页
字号:
    else if ((waiting_structure[examine].revents & POLLERR) ||        (waiting_structure[examine].revents & POLLHUP)) {      printf("Exception: TCP main server socket has a problem!\n");      exit(94);    }    }    else {      if ((waiting_structure[examine].revents & POLLERR) ||          (waiting_structure[examine].revents & POLLNVAL) ||          (waiting_structure[examine].revents & POLLHUP)) {#ifdef TRANSPORT_TRACE        printf ("macedon_tcp_transport: error/hup on fd %d waiting_structure[%d].revents: %x\n", waiting_structure[examine].fd, examine, waiting_structure[examine].revents);#endif        lock ();	            macedon_connection*neighbor = find_macedon_connection_by_fd(waiting_structure[examine].fd);        if (neighbor) {#ifdef TRANSPORT_TRACE          printf ("poll error on %.8x fd %d\n", neighbor->get_destination(), neighbor->descriptor);#endif          reset_connection(neighbor, TCP_POLL_ERROR);          //            transport_connect(neighbor);        }        else {          num_fds--;          ::shutdown( waiting_structure[examine].fd, SHUT_RDWR);          ::close( waiting_structure[examine].fd );        }        unlock ();      }       else if (waiting_structure[examine].revents & POLLIN ||          waiting_structure[examine].revents & POLLPRI ||          waiting_structure[examine].revents & POLLOUT) {        //  	printf("%f Poll called: IN: %d PRI: %d OUT: %d fd: %d\n",         //  	       wall_clock(),        //  	       waiting_structure[examine].revents & POLLIN,        //  	       waiting_structure[examine].revents & POLLPRI,        //  	       waiting_structure[examine].revents & POLLOUT,        //  	       waiting_structure[examine].fd);        actual_work(waiting_structure[examine].fd);      }	      else  if (waiting_structure[examine].revents ) {        fprintf (stderr, "fd %d waiting_structure[%d].revents: %x\n", waiting_structure[examine].fd, examine, waiting_structure[examine].revents);      }	    }  }}// ---------------------------------------------- // cleanup// ---------------------------------------------- void macedon_tcp_transport::cleanup(){  lock();  double curtime = wall_clock();  macedon_connection* sample4 = macedon_connections;  macedon_connection* oldest = sample4;  double oldest_time;  if (sample4)    oldest_time = sample4->time_last_used;  while (sample4) {      int nuke=0;    if (sample4->time_last_used < oldest_time) {      oldest = sample4;      oldest_time = sample4->time_last_used;    }    if ( curtime > sample4->time_last_used + TRANSPORT_TIMEOUT ) {      if (sample4->status == MACEDON_CONNECTION_STATUS_connected) {        if (!sample4->send_queue) {          if (!sample4->recv_in_progress) {            // nuke = 1;          }          else {            if (sample4->recv_in_progress->field_bytes ==0) {              // disable nuking for now              //	      nuke = 1;              delete sample4->recv_in_progress;              sample4->recv_in_progress = 0;            }            // 	    else             // 	      printf("Wanted to nuke %.8x but could not cause recv queue %x\n", sample4->get_destination(), sample4->recv_in_progress);          }        }        // 	else        //	  printf("Wanted to nuke %.8x but could not cause send queue %d %x\n", sample4->get_destination(), sample4->send_queue_items, sample4->send_queue);      }    }    if (nuke) {      // #ifdef TRANSPORT_TRACE_CONNECT      printf("macedon_tcp_transport: %f nuking %.8x %d index %d %x %x %f fds %d\n", wall_clock(), sample4->get_destination(), port, sample4->index, sample4->send_queue, sample4->recv_in_progress, sample4->time_last_used, num_fds);      // #endif      reset_connection(sample4, NO_ERROR);    }    sample4 = sample4->next;  }  if (num_fds > MAX_FDS) {    printf("macedon_tcp_transport: %f nuking oldest %.8x %d index %d %f %x fds %d\n",  wall_clock(), oldest->get_destination(), port, index, oldest_time, oldest, num_fds);    reset_connection (oldest, TCP_CLEANUP_KILLING_OLDEST);  }  macedon_connection* sample2 = macedon_connections;  while (sample2) {      if (sample2->status == MACEDON_CONNECTION_STATUS_connected         || sample2->status == MACEDON_CONNECTION_STATUS_disconnecting) {      send_more(sample2);      if (sample2->status == MACEDON_CONNECTION_STATUS_connected           || sample2->status == MACEDON_CONNECTION_STATUS_disconnecting)         recv_more(sample2);    }    sample2 = sample2->next;  }  unlock();}// ---------------------------------------------- // actual_work// ---------------------------------------------- void macedon_tcp_transport::actual_work (int desc){  lock ();  macedon_connection* receiver = find_macedon_connection_by_fd(desc);  if (! receiver ) {    printf ("Exception: macedon_tcp_transport: %f null peer in select %d\n", wall_clock(), desc);    num_fds--;    ::shutdown( desc, SHUT_RDWR);    ::close( desc );    unlock();    return;  }	  send_more (receiver);  if (receiver->status == MACEDON_CONNECTION_STATUS_connected       || receiver->status == MACEDON_CONNECTION_STATUS_disconnecting)     recv_more (receiver);  if (!receiver->send_queue &&      !receiver->recv_in_progress &&      receiver->status == MACEDON_CONNECTION_STATUS_disconnecting)    reset_connection(receiver, NO_ERROR);  unlock ();  return;}// ---------------------------------------------- // recv_more// ---------------------------------------------- int macedon_tcp_transport::recv_more (macedon_connection *neighbor){  ASSERT(lock_held == pthread_self());  static int inst_count=0;  inst_count ++;  macedon_transport_recv_work *recv_work;  int descriptor = neighbor->descriptor;  int flags;  if (-1 == (flags = fcntl (descriptor, F_GETFL, 0)))    flags = 0;  fcntl (descriptor, F_SETFL, flags | O_NONBLOCK);  int keep_receiving=1;  while (keep_receiving) {    if (neighbor->recv_in_progress) {      recv_work = neighbor->recv_in_progress;      //      printf("recv was in progress %x %d %x %.8x\n", recv_work, inst_count, neighbor, neighbor->get_destination());      neighbor->recv_in_progress = 0;    }    else {      try {        recv_work = new macedon_transport_recv_work();        //	printf("alloced recv work element %x\n", recv_work);      }      catch (...) {        printf("Exception: out of mem allocing new recv work\n");        exit(97);      }    }    int sizesHeader = 2 * sizeof(int);    if (recv_work->field_bytes < sizesHeader) { // sizes not read completely      errno = 0;      //printf("Trying to read %d bytes from %.8x on fd %d\n",2*sizeof(int)-recv_work->field_bytes, neighbor->neigh_socket_address.sin_addr.s_addr, descriptor);      errno = 0;      int bread = (int)::read(descriptor, ((char*)&recv_work->header_size)+recv_work->field_bytes, sizesHeader-recv_work->field_bytes);      //printf("Read %d bytes\n", bread);      if ((bread < 0 && errno != EWOULDBLOCK && errno != EAGAIN)          || bread == 0) {  // read error        printf("macedon_tcp_transport: receive error on fd %d %.8x port %d %s bread=%d errno=%d index=%d\n", neighbor->descriptor, neighbor->get_destination(), port, strerror(errno), bread, errno, neighbor->index);        reset_connection(neighbor, TCP_RECV_ERROR);        if(neighbor->index == 0) {          //            transport_connect(neighbor);        }        delete recv_work;        return 1;      }      else if (bread < 0) {  // must be EWOULDBLOCK , try later         //	printf("queueing recv 2 %x %d %x %.8x\n", recv_work, inst_count, neighbor, neighbor->get_destination());         keep_receiving = 0;        neighbor->recv_in_progress = recv_work;      }      else {    // read something         neighbor->bandwidth.update(bread);        // Try to update the bytes on the zero index connection also        if(neighbor->index != 0) {          macedon_connection* index_zero = find_macedon_connection(neighbor->get_destination(), 0);          if(index_zero != NULL)            index_zero->bandwidth.update(bread);        }        if (bread > sizesHeader-recv_work->field_bytes) {          printf ("Read in too much data to be headers %d\n", bread);          exit(92);        }        recv_work->field_bytes+= bread;        if (recv_work->field_bytes < sizesHeader) {          // still not done          //	  printf("queueing recv 3 %x %d \n", recv_work, inst_count, neighbor, neighbor->get_destination());           neighbor->recv_in_progress = recv_work;          continue;        }        else {  // read in the sizes completely          neighbor->bytes_received +=sizesHeader;          recv_work->tot_size = recv_work->header_size + recv_work->data_size;          if (recv_work->tot_size != 0) {            int trytoread = recv_work->tot_size-recv_work->running_size;            //	    if (trytoread < 4000)             {              recv_work->stuff = (unsigned char *)                malloc(recv_work->tot_size+100);                if (recv_work->stuff == 0) {                printf("Malloc exception 2 for size %x %d!\n", recv_work->stuff, recv_work->tot_size);                reset_connection(neighbor, TCP_RECV_ERROR);                /*                 abort(); */                /*                 exit(72); */              }              recv_work->running_buffer = recv_work->stuff;            }#if 0            else {              printf("Exception: trying to read too much data %d fd %d %.8x %s\n", trytoread, neighbor->descriptor, neighbor->get_destination(), strerror(errno));              reset_connection(neighbor);              transport_connect(neighbor);              delete recv_work;              return 1;            }#endif          } else {            if(neighbor->index == 0) {              printf ("macedon_tcp_transport: %f peer socket to %.8x %d closed, expecting incoming connection.  entry %x fds %d\n", wall_clock(), neighbor->get_destination(), port, this, neighbor, num_fds);              int index = find_available_index(neighbor->get_destination());              signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0);              neighbor->index = index;              transport_disconnect(neighbor);              new_macedon_connection(neighbor->get_destination(), htons(port), 0);              //reset_connection(neighbor,TCP_PEER_ALREADY_EXISTS);             } else {              transport_disconnect(neighbor);            }            keep_receiving = 0;          }        }      }    }    if (keep_receiving) {    // try to read the rest of the data      int trytoread = recv_work->tot_size-recv_work->running_size;      //       if (trytoread > 4000) {      // 	if (recv_work->running_size)      // 	  dump_hex((void *)recv_work->stuff, recv_work->running_size);      // 	printf("Exception: macedon_tcp_transport: %f trying to read way too much %d, sizes are %d %d %d %d %.8x\n", wall_clock(), trytoread, recv_work->header_size, recv_work->data_size, neighbor->descriptor, descriptor, neighbor->get_destination());      // 	fflush(stdout);      // 	terminate (110);      //       }      int bread = 0;      if (trytoread) {        //printf("Trying to read %d bytes from %.8x on fd %d\n", trytoread, neighbor->neigh_socket_address.sin_addr.s_addr, descriptor);        errno = 0;        bread =(int)::read(descriptor, recv_work->running_buffer, trytoread);         //printf("Read %d bytes\n", bread);      }      //    dump_hex((void *)recv_work->running_buffer, bread);      if (bread > 0) {        recv_work->running_buffer += bread;        recv_work->running_size += bread;        neighbor->bytes_received += (int)bread;        neighbor->bandwidth.update(bread);        // Also update the zero index neighbor also        if(neighbor->index != 0) {          macedon_connection* index_zero = find_macedon_connection(neighbor->get_destination(), 0);          if(index_zero != NULL)            index_zero->bandwidth.update(bread);        }      }      else {  // read error        if (bread < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) {  // try later          keep_receiving = 0;          bread = 0;        }        else {   // real error          printf("macedon_tcp_transport: receive error on fd %d %.8x port %d %s bread=%d trytoread=%d errno=%d index=%d\n", neighbor->descriptor, neighbor->get_destination(), port, strerror(errno), bread, trytoread, errno, neighbor->index);          // 	  printf("data received error on fd %d %.8x %s\n", neighbor->descriptor, neighbor->get_destination(), strerror(errno));          reset_connection(neighbor, TCP_RECV_ERROR);          //            transport_connect(neighbor);          delete recv_work;          return 1;        }      }      if (recv_work->running_size != recv_work->tot_size) {        keep_receiving = 0;#ifdef TRANSPORT_TRACE        printf ("macedon_tcp_transport: did not read all the data, %d bytes so far (%d this time), %d were promised, queueing work %x %.8x:%d index %d fd %d\n", recv_work->running_size, bread, recv_work->tot_size, recv_work, neighbor->get_destination(), port, neighbor->index, descriptor);        fflush(stdout);#endif        neighbor->recv_in_progress = recv_work;      }	      else {        int trace=0;        // 	if (recv_work->tot_size >= 18*sizeof(int)) {        // 	  int * dude = (int *) (recv_work->stuff+17*sizeof(int));        // 	  if (*dude == 900) // trace scribe anycast msgs        // 	    dump_hex((void *)recv_work->stuff, recv_work->tot_size);        // 	}#ifdef TRANSPORT_TRACE        printf ("macedon_tcp_transport: read all the data, %d bytes with %d header (%d this time), buff is %x %x %.8x:%d index %d fd %d\n", recv_work->running_size, recv_work->header_size, bread, recv_work->stuff, recv_work->stuff+recv_work->header_size, neighbor->get_destination(), port, neighbor->index, descriptor);        fflush(stdout);#endif#ifdef TRANSPORT_MASSIVE_TRACE        {          int destination = neighbor->get_destination();          int header_size = recv_work->header_size;          int data_size = recv_work->running_size - recv_work->header_size;          int offset = 0;          printf("macedon_transport: massive trace dump, recv from %.8x:%d hsize %d size %d\n", destination, port, header_size, data_size);          printf("macedon_tcp_transport: header bytes %.8x:%d ", destination, port);          dump_hex(recv_work->stuff+recv_work->psize, recv_work->header_size);          printf("macedon_tcp_transport: data bytes %.8x:%d ", destination, port);          dump_hex(recv_work->stuff +offset+ recv_work->header_size, recv_work->running_size - recv_work->header_size - offset);        }#endif        neighbor->items_received++;        int mydest = neighbor->get_destination();        neighbor->time_last_used = wall_clock();        if (receiver && recv_work->tot_size) {          int offset = 0;          if(recv_work->tot_size - offset <= 0) {            if(neighbor->index == 0) {              printf ("macedon_tcp_transport: %f peer socket to %.8x %d closed, expecting incoming connection.  entry %x fds %d\n", wall_clock(), neighbor->get_destination(), port, this, neighbor, num_fds);              int index = find_available_index(neighbor->get_destination());              signal_error(TCP_PEER_ALREADY_EXISTS, (int)neighbor->get_destination(), port, 0);              neighbor->index = index;              transport_disconnect(neighbor);              new_macedon_connection(neighbor->get_destination(), htons(port), 0);              //reset_connection(neighbor,TCP_PEER_ALREADY_EXISTS);             } else {              transport_disconnect(neighbor);            }          } else {            unlock();            ( receiver->*receive_handler)              (mydest,               recv_work->stuff+offset, recv_work->header_size,               recv_work->stuff+offset+recv_work->header_size, recv_work->tot_size-recv_work->header_size-offset);            lock();          }        }        //printf ("macedon_tcp_transport: timings (base->lock: %f, base->recv: %f, tcp->lock: %f)\n",(b-a),(c-b),(d-c));        delete recv_work;      }    }  }  return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -