📄 macedon_transport.cc
字号:
int old_type = 0; int old_state = 0; lock_held = 0; pthread_mutex_unlock( &pool_lock );#ifdef TRANSPORT_TRACE_LOCK printf("macedon_transport: %d transport unlock %x for %x held_by %d out\n",pthread_self(),&pool_lock,this,lock_held);#endif //pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, & old_type); //pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, & old_state); //pthread_testcancel();}// ---------------------------------------------- // create_connection_if_needed// ---------------------------------------------- void macedon_transport::create_connection_if_needed (int destination, int index){ lock (); macedon_connection* neighbor= find_macedon_connection (destination, index); if (!neighbor) { neighbor = new_macedon_connection (destination, htons(port), index); transport_connect(neighbor); // CK: Ryan, is that what you wanted? } unlock (); }// ---------------------------------------------- // have_room// ---------------------------------------------- int macedon_transport::have_room (int destination, int index){ int status = 1; lock (); macedon_connection* neighbor= find_macedon_connection (destination, index); if ((queue_size !=0 && neighbor && neighbor->send_queue_items >= queue_size)) { status = 0; } unlock (); return status;}// ---------------------------------------------- // have_room// ---------------------------------------------- int macedon_transport::queued (int destination, int index){ int status = 0; lock (); macedon_connection* neighbor= find_macedon_connection (destination, index); if (neighbor) { status = neighbor->send_queue_items; } unlock (); return status;}// ---------------------------------------------- // new_macedon_connection// ---------------------------------------------- macedon_connection* macedon_transport::new_macedon_connection (int destination, int useport, int index){ macedon_connection* sample; //lock(); ASSERT(lock_held==pthread_self()); try {#ifdef TRANSPORT_TRACE printf("macedon_transport: created new connection for %.8x %d transport_port %d %x index %d\n", destination, ntohs(useport), port, this, index);#endif sample = new macedon_connection (destination, index, useport, this); sample->next = macedon_connections; macedon_connections = sample; } catch (...) { printf("Exception: out of mem allocing new conn\n"); exit(97); } // hash_macedon_connections[destination]=(sample); //unlock(); return sample;}int macedon_transport::find_available_index(int destination){ macedon_connection *one=NULL, *two=NULL, *three=NULL; int last = 0; int index = -1; //lock(); ASSERT(lock_held==pthread_self());#if 1 macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2-> get_destination() == destination && sample2->index == 1) { one = sample2; } else if (sample2-> get_destination() == destination && sample2->index == 2) { two = sample2; } else if (sample2-> get_destination() == destination && sample2->index == 3) { three = sample2; } if (sample2-> get_destination() == destination && sample2->index > last) { last = sample2->index; } sample2 = sample2->next; if (one == NULL) { index = 1; } else if (two == NULL) { index = 2; } else if (three == NULL) { index = 3; } else if (last < INT_MAX) { index = last+1; } else { printf("macedon_transport: Did not find valid transport index!\n"); terminate(3421); } }#else //was hash stuff for find_connection terminate(3452);#endif //unlock(); return index;}// ---------------------------------------------- // new_macedon_connection_available_index//// Note: Implementation not perfect.// ---------------------------------------------- macedon_connection* macedon_transport::new_macedon_connection_available_index (int destination, int useport, int *index){ ASSERT(lock_held==pthread_self()); //lock(); *index = find_available_index(destination); macedon_connection *sample = new_macedon_connection(destination, useport, *index); //unlock(); return sample;}// ---------------------------------------------- // find_macedon_connection//// Note: index is used to separate the "real" macedon_connection from ones created just to// send an error message. The only current use for this is to handle criss-cross error// conditions properly.// Note: An index of '0' is presently used to indicate the "real" macedon_connection.// Note: In as much as is possible, the index should be hidden from the developer for now.// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection (int destination, int index){ ASSERT(lock_held==pthread_self()); //lock(); macedon_connection* sample = NULL;#if 1 macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2-> get_destination() == destination && sample2->index == index) { sample = sample2; break; //CK added -- won't this be more efficient? } sample2 = sample2->next; }#else sample = hash_macedon_connections[destination];#endif //unlock(); return sample;}#ifdef NEW_BW_FILTERvoid macedon_transport::start_bw_segment(int destination){ lock(); macedon_connection* conn = find_macedon_connection(destination, 0); printf("start_bw_segment called on %.8x, conn=%.8x\n", destination, conn); ASSERT(conn != NULL); bandwidth_time_filter* res = &conn->bandwidth; res->start_update(); unlock();}void macedon_transport::finish_bw_segment(int destination){ lock(); macedon_connection* conn = find_macedon_connection(destination, 0); ASSERT(conn != NULL); bandwidth_time_filter* res = &conn->bandwidth; res->finish_update(); unlock();}#endif// ---------------------------------------------- // get_bandwidth// ---------------------------------------------- double macedon_transport::get_bandwidth(int destination, int& valid){ valid = 0; double result = 0; lock(); macedon_connection* conn = find_macedon_connection(destination, 0);// printf("%f get_bandwidth called on %.8x, conn=%.8x\n", wall_clock(), destination, conn); if(conn != NULL) { result = conn->bandwidth.get_value(); valid = 1; } unlock(); return result ;}// ---------------------------------------------- // find_macedon_connection_by_fd// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_fd (int descriptor){ ASSERT(lock_held==pthread_self()); //lock(); macedon_connection* sample2 = macedon_connections; while (sample2) { if ((int)sample2-> descriptor == descriptor) { //unlock(); return sample2; } sample2 = sample2->next; } //unlock(); return NULL;}// ---------------------------------------------- // find_macedon_connection_by_time// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_time (double mytime){ double earliest=0.0; macedon_connection *ret_conn=NULL; ASSERT(lock_held==pthread_self()); //lock(); macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2->status == MACEDON_CONNECTION_STATUS_preconnecting && sample2->time_connect_initiated > mytime && (earliest==0.0 || sample2->time_connect_initiated < earliest)) { earliest = sample2->time_connect_initiated; ret_conn = sample2; } sample2 = sample2->next; } //unlock(); return ret_conn;}// ---------------------------------------------- // find_macedon_connection_by_vtime// ---------------------------------------------- macedon_connection* macedon_transport::find_macedon_connection_by_vtime (int mytime){ int earliest=INT_MAX; macedon_connection *ret_conn=NULL; ASSERT(lock_held==pthread_self()); //lock(); macedon_connection* sample2 = macedon_connections; while (sample2) { if (sample2->status == MACEDON_CONNECTION_STATUS_preconnecting && sample2->virtual_time_initiated > mytime && (sample2->virtual_time_initiated < earliest)) { earliest = sample2->virtual_time_initiated; ret_conn = sample2; } sample2 = sample2->next; } //unlock(); return ret_conn;}// ---------------------------------------------- // worker_wrapper// ---------------------------------------------- void* worker_wrapper(void *temp){ macedon_transport *this_transport = (macedon_transport *)temp; this_transport->worker();}// ---------------------------------------------- // worker// ---------------------------------------------- void* macedon_transport::worker(){ extern int global_exit; thread_id = pthread_self();#ifdef TRANSPORT_TRACE printf("macedon_transport: created macedon transport thread %d\n", pthread_self());#endif while(global_exit == 0) { while (!initialized) usleep(100000); wait_for_work (); if (global_exit == 0) do_work (); } pthread_exit(NULL);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -