📄 node.c
字号:
{ if( !collect_stat() ) { cout << "No stats were collected by time " << now() << "; collect_stat_time=" << _collect_stat_time << endl; return; } // add up the time everyone has been alive (including the last session) Time live_time = 0; for( uint i = 0; i < _time_sessions.size(); i++ ) { live_time += _time_sessions[i]; } for( uint i = 0; i < _last_joins.size(); i++ ) { // if this person joined at 0 and never failed, or was otherwise // alive at the end // if( _num_joins[i] == 0 && _last_joins[i] == 0 ) { // that means this one never died // live_time += now() - _collect_stat_time; // printf( "yup\n" ); // } else if( _last_joins[i] != 0 ) { live_time += now() - _last_joins[i]; } } cout << "\n<-----STATS----->" << endl; // first print out bw stats unsigned long total = 0; cout << "BW_PER_TYPE:: "; for( uint i = 0; i < _bw_stats.size(); i++ ) { cout << i << ":" << _bw_stats[i] << " "; total += _bw_stats[i]; } cout << endl; double total_time = ((double) (now() - _collect_stat_time))/1000.0; // already accounts for nodes in live_time . . . double live_time_s = ((double) live_time)/1000.0; uint num_nodes = Network::Instance()->size(); double overall_bw = ((double) total)/(total_time*((double) num_nodes)); double live_bw = ((double) total)/live_time_s; printf( "BW_TOTALS:: time(s):%.3f live_time(s/node):%.3f nodes:%d overall_bw(bytes/node/s):%.3f live_bw(bytes/node/s):%.3f\n", total_time, live_time_s/((double) num_nodes), num_nodes, overall_bw, live_bw ); /*print out b/w distribution sort(_per_node_avg.begin(),_per_node_avg.end()); uint sz = _per_node_avg.size(); double allavg = 0; for (uint i = 0; i < sz; i++) allavg += _per_node_avg[i]; if (sz > 0) { printf("BW_PERNODE:: 50p:%.3f 90p:%.3f 95p:%.3f 99p:%.3f 100p:%.3f avg:%.3f\n", _per_node_avg[sz/2], _per_node_avg[(uint)(sz*0.9)], _per_node_avg[(uint)(sz*0.95)], _per_node_avg[(uint)(sz*0.99)], _per_node_avg[sz-1], allavg/sz); } */ //print out b/w distribution of out b/w cout << "BW_PERNODE_IN:: "; if (_per_node_in.size()) print_dist_stats(_per_node_in); cout << "BW_PERNODE:: "; if (_per_node_out.size()) print_dist_stats(_per_node_out); for (uint i = 0; i < 3; i++) { printf("BW_SPE%uNODE_IN:: ",i+1); print_dist_stats(_special_node_in[i]); } for (uint i = 0; i < 3; i++) { printf("BW_SPE%uNODE:: ",i+1); print_dist_stats(_special_node_out[i]); } // then do lookup stats double total_lookups = _correct_lookups.size() + _incorrect_lookups.size() + _failed_lookups.size(); printf( "LOOKUP_RATES:: success:%.3f incorrect:%.3f failed:%.3f\n", ((double)_correct_lookups.size())/total_lookups, ((double)_incorrect_lookups.size())/total_lookups, ((double)_failed_lookups.size())/total_lookups ); cout << "CORRECT_LOOKUPS:: "; print_lookup_stat_helper( _correct_lookups, _correct_stretch, _correct_hops ); cout << "INCORRECT_LOOKUPS:: "; print_lookup_stat_helper( _incorrect_lookups, _incorrect_stretch, _incorrect_hops ); cout << "FAILED_LOOKUPS:: "; print_lookup_stat_helper( _failed_lookups, _failed_stretch, _failed_hops ); // now overall stats (put them all in one container) for( uint i = 0; i < _incorrect_lookups.size(); i++ ) { _correct_lookups.push_back( _incorrect_lookups[i] ); _correct_stretch.push_back( _incorrect_stretch[i] ); _correct_hops.push_back( _incorrect_hops[i] ); } for( uint i = 0; i < _failed_lookups.size(); i++ ) { _correct_lookups.push_back( _failed_lookups[i] ); _correct_stretch.push_back( _failed_stretch[i] ); _correct_hops.push_back( _failed_hops[i] ); } cout << "OVERALL_LOOKUPS:: "; print_lookup_stat_helper( _correct_lookups, _correct_stretch, _correct_hops ); cout << "TIMEOUTS_PER_LOOKUP:: "; print_lookup_stat_helper( _time_timeouts, _num_timeouts, _correct_hops /* this isn't used */, true ); cout << "WORST_BURST:: in:" << maxinburstrate << " out:" << maxoutburstrate << endl; cout << "<-----ENDSTATS----->\n" << endl;}void Node::print_lookup_stat_helper( vector<Time> times, vector<double> stretch, vector<uint> hops, bool timeouts ){ assert( times.size() == stretch.size() ); // sort first, ask questions later sort( times.begin(), times.end() ); sort( stretch.begin(), stretch.end() ); sort( hops.begin(), hops.end() ); Time time_med, time_10, time_90; double stretch_med, stretch_10, stretch_90; uint hops_med, hops_10, hops_90; if( times.size() == 0 ) { time_med = 0; time_10 = 0; time_90 = 0; stretch_med = 0; stretch_10 = 0; stretch_90 = 0; hops_med = 0; hops_10 = 0; hops_90 = 0; } else { if( times.size() % 2 == 0 ) { time_med = (times[times.size()/2] + times[times.size()/2-1])/2; stretch_med = (stretch[times.size()/2] + stretch[times.size()/2-1])/2; hops_med = (hops[times.size()/2] + hops[times.size()/2-1])/2; } else { time_med = times[(times.size()-1)/2]; stretch_med = stretch[(times.size()-1)/2]; hops_med = hops[(times.size()-1)/2]; } time_10 = times[(uint) (times.size()*.1)]; stretch_10 = stretch[(uint) (times.size()*.1)]; hops_10 = hops[(uint) (times.size()*.1)]; time_90 = times[(uint) (times.size()*.9)]; stretch_90 = stretch[(uint) (times.size()*.9)]; hops_90 = hops[(uint) (times.size()*.9)]; } // also need the means Time time_total = 0; double stretch_total = 0; uint hops_total = 0; for( uint i = 0; i < times.size(); i++ ) { time_total += times[i]; stretch_total += stretch[i]; hops_total += hops[i]; } double time_mean, stretch_mean, hops_mean; if( times.size() == 0 ) { time_mean = 0; stretch_mean = 0; hops_mean = 0; } else { time_mean = ((double) time_total)/((double) times.size()); stretch_mean = ((double) stretch_total)/((double) times.size()); hops_mean = ((double) hops_total)/((double) times.size()); } if( timeouts ) { printf( "time_timeout_10th:%llu time_timeout_mean:%.3f time_timeout_median:%llu time_timeout_90th:%llu ", time_10, time_mean, time_med, time_90 ); printf( "num_timeout_10th:%.3f num_timeout_mean:%.3f num_timeout_median:%.3f num_timeout_90th:%.3f\n", stretch_10, stretch_mean, stretch_med, stretch_90 ); } else { printf( "lookup_10th:%llu lookup_mean:%.3f lookup_median:%llu lookup_90th:%llu ", time_10, time_mean, time_med, time_90 ); printf( "stretch_10th:%.3f stretch_mean:%.3f stretch_median:%.3f stretch_90th:%.3f ", stretch_10, stretch_mean, stretch_med, stretch_90 ); printf( "hops_10th:%u hops_mean:%.3f hops_median:%u hops_90th:%u ", hops_10, hops_mean, hops_med, hops_90 ); cout << " numlookups:" << times.size() << endl; }}//void functionvoidNode::add_edge(int *matrix, int sz){ return;}//network health monitorvoidNode::calculate_conncomp(void *){ if (Node::collect_stat()) { const set<Node*> *l = Network::Instance()->getallnodes(); uint sz = l->size(); int *curr, *old, *tmp; curr = (int *)malloc(sizeof(int) * sz * sz); old = (int *)malloc(sizeof(int) * sz * sz); assert(old && curr); for (uint i = 0; i < sz * sz; i++) { curr[i] = 99999; old[i] = 99999; } int alive = 0; for (set<Node*>::iterator i = l->begin(); i != l->end(); ++l) { if(!(*i)->alive()) continue; old[((*i)->first_ip()-1)*sz + (*i)->first_ip()-1] = 0; (*i)->add_edge(old,sz); alive++; } for (uint k = 0; k < sz; k++) { for (uint i = 0; i < sz; i++) { for (uint j = 0; j < sz; j++) { if (old[i * sz + j] <= old[i * sz + k] + old[k * sz + j]) curr[i * sz + j] = old[i * sz + j]; else curr[i * sz + j] = old[i * sz + k] + old[k * sz + j]; } } tmp = old; old = curr; curr = tmp; } vector<uint> *path = new vector<uint>; u_int allp = 0; u_int failed = 0; path->clear(); for (uint i = 0; i < sz; i++) { for (uint j = 0; j < sz; j++) { if (old[i*sz + i] == 0 && old[j*sz + j] == 0) { if (old[i*sz + j] < 99999) { path->push_back(old[i*sz + j]); allp += old[i*sz + j]; }else{ failed++; } } } } sort(path->begin(),path->end()); assert(path->size()>0); printf("%llu alive %d avg %.2f 10-p %u 50-p %u 90-p %u longest %u failed %.3f\n", now(), alive, (double)allp/(double)path->size(),(*path)[(u_int)(0.1*path->size())], (*path)[(u_int)(0.5*path->size())], (*path)[(u_int)(0.9*path->size())], (*path)[path->size()-1], (float) (failed/(failed+path->size()))); delete path; free(old); free(curr); } delaycb(_track_conncomp_timer, &Node::calculate_conncomp, (void*)NULL);}// Called by NetEvent::execute() to deliver a packet to a Node,// after Network processing (i.e. delays and failures).voidNode::packet_handler(Packet *p){ if(p->reply()){ // RPC reply, give to waiting thread. send(p->channel(), &p); } else { // RPC request, start a handler thread. ThreadManager::Instance()->create(Node::Receive, p); }}//// Send off a request packet asking Node::Receive to// call fn(args), wait for reply.// Return value indicates whether we received a reply,// i.e. absence of time-out.//boolNode::_doRPC(IPAddress dst, void (*fn)(void *), void *args, Time timeout){ return _doRPC_receive(_doRPC_send(dst, fn, 0, args, timeout));}RPCHandle*Node::_doRPC_send(IPAddress dst, void (*fn)(void *), void (*killme)(void *), void *args, Time timeout){ Packet *p = New Packet; p->_fn = fn; p->_killme = killme; p->_args = args; p->_src = ip(); p->_dst = dst; p->_timeout = timeout; Node *n = getpeer (ip()); p->_queue_delay = n->queue_delay (); // where to send the reply, buffered for single reply Channel *c = p->_c = chancreate(sizeof(Packet*), 1); Network::Instance()->send(p); return New RPCHandle(c, p);}boolNode::_doRPC_receive(RPCHandle *rpch){ Packet *reply = (Packet *) recvp(rpch->channel()); bool ok = reply->_ok; delete reply; delete rpch; return ok;}//// Node::got_packet() invokes Receive() when an RPC request arrives.// The reply goes back directly to the appropriate channel.//voidNode::Receive(void *px){ Packet *p = (Packet *) px; assert(Network::Instance()->getnode(p->dst())); // make reply Packet *reply = New Packet; reply->_c = p->_c; reply->_src = p->_dst; reply->_dst = p->_src; reply->_timeout = p->_timeout; Node *s = Network::Instance()->getnode(reply->src()); reply->_queue_delay = s->queue_delay (); if (Network::Instance()->alive(p->dst())) { // && Network::Instance()->gettopology()->latency(p->_src, p->_dst, p->reply()) != 100000 ) { (p->_fn)(p->_args); reply->_ok = true; } else { reply->_ok = false; // XXX delete reply for timeout? } // send it back, potentially with a latency punishment for when this node was // dead. Network::Instance()->send(reply); // ...and we're done taskexit(0);}stringNode::header(){ char buf[128]; sprintf(buf,"%llu %s(%u,%u,%qx) ", now(), proto_name().c_str(), _first_ip, _ip,_id); return string(buf);}IPAddressNode::set_alive(bool a){ if(!a && _replace_on_death) { _prev_ip = _ip; } else if(a && _replace_on_death && _prev_ip) { _ip = Network::Instance()->unused_ip(); Network::Instance()->map_ip(_first_ip, _ip); assert(!Network::Instance()->getnode(_first_ip)->alive()); } _alive = a; return _ip;}#include "bighashmap.cc"
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -