⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 macedon_transport.cc

📁 这是一个著名的应用层组播中间件的源码
💻 CC
📖 第 1 页 / 共 2 页
字号:
  int old_type = 0;  int old_state = 0;  lock_held = 0;  pthread_mutex_unlock( &pool_lock );#ifdef TRANSPORT_TRACE_LOCK  printf("macedon_transport: %d transport unlock %x for %x held_by %d out\n",pthread_self(),&pool_lock,this,lock_held);#endif  //pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, & old_type);  //pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, & old_state);  //pthread_testcancel();}// ---------------------------------------------- // create_connection_if_needed// ---------------------------------------------- void macedon_transport::create_connection_if_needed (int destination, int index){  lock ();    macedon_connection* neighbor= find_macedon_connection (destination, index);  if (!neighbor) {    neighbor = new_macedon_connection (destination, htons(port), index);    transport_connect(neighbor); // CK: Ryan, is that what you wanted?  }  unlock ();  }// ---------------------------------------------- // have_room// ---------------------------------------------- int macedon_transport::have_room (int destination, int index){  int status = 1;  lock ();    macedon_connection* neighbor= find_macedon_connection (destination, index);  if ((queue_size !=0 &&       neighbor && neighbor->send_queue_items >= queue_size))    {      status = 0;    }  unlock ();    return status;}// ---------------------------------------------- // have_room// ---------------------------------------------- int macedon_transport::queued (int destination, int index){  int status = 0;  lock ();    macedon_connection* neighbor= find_macedon_connection (destination, index);  if (neighbor)    {      status = neighbor->send_queue_items;    }  unlock ();    return status;}// ---------------------------------------------- // new_macedon_connection// ---------------------------------------------- macedon_connection*  macedon_transport::new_macedon_connection (int destination, int useport, int index){  macedon_connection* sample;  //lock();  ASSERT(lock_held==pthread_self());  try {#ifdef TRANSPORT_TRACE    printf("macedon_transport: created new connection for %.8x %d transport_port %d %x index %d\n", destination, ntohs(useport), port, this, index);#endif    sample = new macedon_connection  (destination, index, useport, this);    sample->next = macedon_connections;    macedon_connections = sample;  }  catch (...) {    printf("Exception: out of mem allocing new conn\n");    exit(97);  }  //  hash_macedon_connections[destination]=(sample);  //unlock();  return sample;}int macedon_transport::find_available_index(int destination){  macedon_connection *one=NULL, *two=NULL, *three=NULL;  int last = 0;  int index = -1;  //lock();  ASSERT(lock_held==pthread_self());#if 1  macedon_connection* sample2 = macedon_connections;  while (sample2) {    if (sample2-> get_destination() == destination && sample2->index == 1) {      one = sample2;    }    else if (sample2-> get_destination() == destination && sample2->index == 2) {      two = sample2;    }    else if (sample2-> get_destination() == destination && sample2->index == 3) {      three = sample2;    }    if (sample2-> get_destination() == destination && sample2->index > last) {      last = sample2->index;    }    sample2 = sample2->next;    if (one == NULL) { index = 1; }    else if (two == NULL) { index = 2; }    else if (three == NULL) { index = 3; }    else if (last < INT_MAX) { index = last+1; }    else {      printf("macedon_transport: Did not find valid transport index!\n");      terminate(3421);    }  }#else  //was hash stuff for find_connection  terminate(3452);#endif  //unlock();  return index;}// ---------------------------------------------- // new_macedon_connection_available_index//// Note: Implementation not perfect.// ---------------------------------------------- macedon_connection*  macedon_transport::new_macedon_connection_available_index (int destination, int useport, int *index){  ASSERT(lock_held==pthread_self());  //lock();  *index = find_available_index(destination);  macedon_connection *sample = new_macedon_connection(destination, useport, *index);  //unlock();  return sample;}// ---------------------------------------------- // find_macedon_connection//// Note: index is used to separate the "real" macedon_connection from ones created just to// send an error message.  The only current use for this is to handle criss-cross error// conditions properly.// Note: An index of '0' is presently used to indicate the "real" macedon_connection.// Note: In as much as is possible, the index should be hidden from the developer for now.// ---------------------------------------------- macedon_connection*  macedon_transport::find_macedon_connection (int destination, int index){  ASSERT(lock_held==pthread_self());  //lock();  macedon_connection* sample = NULL;#if 1  macedon_connection* sample2 = macedon_connections;  while (sample2) {    if (sample2-> get_destination() == destination && sample2->index == index) {      sample = sample2;      break; //CK added -- won't this be more efficient?    }    sample2 = sample2->next;  }#else  sample = hash_macedon_connections[destination];#endif  //unlock();  return sample;}#ifdef NEW_BW_FILTERvoid macedon_transport::start_bw_segment(int destination){  lock();  macedon_connection* conn = find_macedon_connection(destination, 0);  printf("start_bw_segment called on %.8x, conn=%.8x\n", destination, conn);  ASSERT(conn != NULL);  bandwidth_time_filter* res =  &conn->bandwidth;  res->start_update();  unlock();}void macedon_transport::finish_bw_segment(int destination){  lock();  macedon_connection* conn = find_macedon_connection(destination, 0);  ASSERT(conn != NULL);  bandwidth_time_filter* res =  &conn->bandwidth;  res->finish_update();  unlock();}#endif// ---------------------------------------------- // get_bandwidth// ---------------------------------------------- double macedon_transport::get_bandwidth(int destination, int& valid){  valid = 0;  double result = 0;  lock();  macedon_connection* conn = find_macedon_connection(destination, 0);//    printf("%f get_bandwidth called on %.8x, conn=%.8x\n", wall_clock(), destination, conn);  if(conn != NULL) {    result = conn->bandwidth.get_value();    valid = 1;  }  unlock();  return result  ;}// ---------------------------------------------- // find_macedon_connection_by_fd// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_fd (int descriptor){  ASSERT(lock_held==pthread_self());  //lock();  macedon_connection* sample2 = macedon_connections;  while (sample2) {    if ((int)sample2-> descriptor ==  descriptor) {      //unlock();      return sample2;    }    sample2 = sample2->next;  }  //unlock();  return NULL;}// ---------------------------------------------- // find_macedon_connection_by_time// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_time (double mytime){  double earliest=0.0;  macedon_connection *ret_conn=NULL;  ASSERT(lock_held==pthread_self());  //lock();  macedon_connection* sample2 = macedon_connections;  while (sample2) {    if (sample2->status == MACEDON_CONNECTION_STATUS_preconnecting &&         sample2->time_connect_initiated > mytime &&         (earliest==0.0 || sample2->time_connect_initiated < earliest)) {      earliest = sample2->time_connect_initiated;      ret_conn = sample2;    }    sample2 = sample2->next;  }  //unlock();  return ret_conn;}// ---------------------------------------------- // find_macedon_connection_by_vtime// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_vtime (int mytime){  int earliest=INT_MAX;  macedon_connection *ret_conn=NULL;  ASSERT(lock_held==pthread_self());  //lock();  macedon_connection* sample2 = macedon_connections;  while (sample2) {    if (sample2->status == MACEDON_CONNECTION_STATUS_preconnecting &&         sample2->virtual_time_initiated > mytime &&         (sample2->virtual_time_initiated < earliest)) {      earliest = sample2->virtual_time_initiated;      ret_conn = sample2;    }    sample2 = sample2->next;  }  //unlock();  return ret_conn;}// ---------------------------------------------- // worker_wrapper// ---------------------------------------------- void* worker_wrapper(void *temp){  macedon_transport *this_transport = (macedon_transport *)temp;  this_transport->worker();}// ---------------------------------------------- // worker// ---------------------------------------------- void* macedon_transport::worker(){  extern int global_exit;  thread_id = pthread_self();#ifdef TRANSPORT_TRACE  printf("macedon_transport: created macedon transport thread %d\n", pthread_self());#endif  while(global_exit == 0)    {      while (!initialized)	usleep(100000);      wait_for_work ();      if (global_exit == 0)	do_work ();    }  pthread_exit(NULL);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -