📄 node.c
字号:
/* * Copyright (c) 2003-2005 Thomer M. Gil (thomer@csail.mit.edu), * Robert Morris (rtm@csail.mit.edu). * Massachusetts Institute of Technology * * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: * * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */#include "parse.h"#include "network.h"#include "args.h"#include "protocols/protocolfactory.h"#include "p2psim/threadmanager.h"#include <iostream>using namespace std;string Node::_protocol = ""; Args Node::_args;Time Node::_collect_stat_time = 0;bool Node::_collect_stat = false;bool Node::_replace_on_death = true;// static stat data structs:vector<unsigned long> Node::_bw_stats;vector<uint> Node::_bw_counts;vector<Time> Node::_correct_lookups;vector<Time> Node::_incorrect_lookups;vector<Time> Node::_failed_lookups;vector<double> Node::_correct_stretch;vector<double> Node::_incorrect_stretch;vector<double> Node::_failed_stretch;vector<uint> Node::_correct_hops;vector<uint> Node::_incorrect_hops;vector<uint> Node::_failed_hops;vector<double> Node::_num_timeouts;vector<Time> Node::_time_timeouts;vector<uint> Node::_num_joins;vector<Time> Node::_last_joins;vector<Time> Node::_time_sessions;//vector<double> Node::_per_node_avg;vector<double> Node::_per_node_in;vector<double> Node::_per_node_out;uint Node::totalin = 0;uint Node::totalout = 0;vector< vector<double> > Node::_special_node_in;vector< vector<double> > Node::_special_node_out;double Node::maxinburstrate = 0.0;double Node::maxoutburstrate = 0.0;Node::Node(IPAddress i) : _queue_len(0), _ip(i), _alive(true), _token(1) { _track_conncomp_timer = _args.nget<uint>("track_conncomp_timer",0,10); if (ip()==1) { Node::_special_node_in.resize(3); Node::_special_node_out.resize(3); for (uint i = 0; i < 3; i++) { Node::_special_node_in[i].clear(); Node::_special_node_out[i].clear(); } if (_track_conncomp_timer > 0) delaycb(_track_conncomp_timer, &Node::calculate_conncomp, (void*)NULL); } _num_joins_pos = -1; _prev_ip = 0; _first_ip = _ip; join_time = 0; //node_live_bytes = 0; node_live_inbytes = 0; node_live_outbytes = 0; node_lastburst_live_outbytes = 0; node_lastburst_live_inbytes = 0; node_last_inburstime = node_last_outburstime = join_time;}Node::~Node(){}Node *Node::getpeer(IPAddress a){ return Network::Instance()->getnode(a);}unsignedNode::rcvRPC(RPCSet *hset, bool &ok){ int na = hset->size() + 1; Alt *a = (Alt *) malloc(sizeof(Alt) * na); // might be big, take off stack! Packet *p; unsigned *index2token = (unsigned*) malloc(sizeof(unsigned) * hset->size()); int i = 0; for(RPCSet::const_iterator j = hset->begin(); j != hset->end(); j++) { assert(_rpcmap[*j]); a[i].c = _rpcmap[*j]->channel(); a[i].v = &p; a[i].op = CHANRCV; index2token[i] = *j; i++; } a[i].op = CHANEND; if((i = alt(a)) < 0) { cerr << "interrupted" << endl; assert(false); } assert(i < (int) hset->size()); unsigned token = index2token[i]; assert(token); hset->erase(token); _deleteRPC(token); if( !p ) { // if there's no packet, then this must be a wakeup // from a non-network source (like a condition variable) ok = true; } else { ok = p->ok(); delete p; } free(a); free(index2token); return token;}voidNode::_deleteRPC(unsigned token){ assert(_rpcmap[token]); delete _rpcmap[token]; _rpcmap.remove(token);}voidNode::parse(char *filename){ ifstream in(filename); if(!in) { cerr << "no such file " << filename << endl; taskexitall(0); } string line; hash_map<string, Args> xmap; while(getline(in,line)) { vector<string> words = split(line); // skip empty lines and commented lines if(words.empty() || words[0][0] == '#') continue; // read protocol string _protocol = words[0]; words.erase(words.begin()); // if it has no arguments, you still need to register the prototype // if(!words.size()) // xmap[protocol]; // this is a variable assignment while(words.size()) { vector<string> xargs = split(words[0], "="); words.erase(words.begin()); if (_args.find (xargs[0]) == _args.end()) _args.insert(make_pair(xargs[0], xargs[1])); } break; } in.close();}boolNode::collect_stat(){ if (_collect_stat) return true; if (now() >= _collect_stat_time) { _collect_stat = true; return true; }else return false;}#define BURSTTIME 100000void Node::record_in_bytes(uint b) { node_live_inbytes += b; if ((now()-node_last_inburstime) > BURSTTIME) { double burstrate = (double)(1000*(node_live_inbytes-node_lastburst_live_inbytes))/(double)((now()-node_last_inburstime)); if (burstrate > Node::maxinburstrate) Node::maxinburstrate = burstrate; node_last_inburstime = now(); node_lastburst_live_inbytes = node_live_inbytes; }}void Node::record_out_bytes(uint b) { node_live_outbytes += b; if ((now()-node_last_outburstime) > BURSTTIME) { double burstrate = (double)(1000*(node_live_outbytes-node_lastburst_live_outbytes))/(double)((now()-node_last_outburstime)); if (burstrate > Node::maxoutburstrate) Node::maxoutburstrate = burstrate; node_last_outburstime = now(); node_lastburst_live_outbytes = node_live_outbytes; }}voidNode::record_inout_bw_stat(IPAddress src, IPAddress dst, uint num_ids, uint num_else){ if (src == dst) return; Node *n = Network::Instance()->getnode(src); if (n && n->alive()) n->record_out_bytes(20 + 4*num_ids + num_else); n = Network::Instance()->getnode(dst); if (n && n->alive()) n->record_in_bytes(20 + 4*num_ids + num_else);}void Node::record_bw_stat(stat_type type, uint num_ids, uint num_else){ if( !collect_stat() ) { return; } while( _bw_stats.size() <= type ) { _bw_stats.push_back(0); _bw_counts.push_back(0); } _bw_stats[type] += 20 + 4*num_ids + num_else; _bw_counts[type]++;}void Node::record_lookup_stat(IPAddress src, IPAddress dst, Time interval, bool complete, bool correct, uint num_hops, uint num_timeouts, Time time_timeouts){ if( !collect_stat() ) { return; } // get stretch as well double stretch; if( complete && correct ) { Time rtt = 2*Network::Instance()->gettopology()->latency( src, dst ); if( rtt > 0 && interval > 0 ) { stretch = ((double) interval)/((double) rtt); } else { if( interval == 0 ) { stretch = 1; } else { stretch = interval; // is this reasonable? } } } else { // the stretch should be the interval divided by the median ping time in // the topology. fake for now. ***TODO***!!! stretch = ((double) interval)/100.0; } // Stretch can be < 1 for recursive direct-reply routing // assert( stretch >= 1.0 ); if( complete && correct ) { _correct_lookups.push_back( interval ); _correct_stretch.push_back( stretch ); _correct_hops.push_back( num_hops ); } else if( !complete ) { _failed_lookups.push_back( interval ); _failed_stretch.push_back( stretch ); _failed_hops.push_back( num_hops ); } else { _incorrect_lookups.push_back( interval ); _incorrect_stretch.push_back( stretch ); _incorrect_hops.push_back( num_hops ); } // timeout stuff _num_timeouts.push_back( num_timeouts ); _time_timeouts.push_back( time_timeouts );}voidNode::check_num_joins_pos(){ if( _num_joins_pos == -1 ) { _num_joins_pos = _num_joins.size(); _num_joins.push_back(0); // initialize people starting from the stattime _last_joins.push_back(_collect_stat_time); }}voidNode::record_join(){ // do this first to make sure state is initialized for this node join_time = now(); //node_live_bytes = 0; node_live_inbytes = 0; node_live_outbytes = 0; node_lastburst_live_outbytes = 0; node_lastburst_live_inbytes = 0; node_last_inburstime = node_last_outburstime = join_time; check_num_joins_pos(); if( !collect_stat() ) { return; } assert( _num_joins[_num_joins_pos] == 0 || !_last_joins[_num_joins_pos] ); _num_joins[_num_joins_pos]++; _last_joins[_num_joins_pos] = now();}voidNode::record_crash(){ if( !collect_stat() ) { return; } if (join_time > 0) { Time duration = now() - join_time; //this is a hack, don't screw the distribution with nodes whose lifetime //is too short if (duration >= 600000) { //old value is 180000 //_per_node_avg.push_back((double)1000.0*node_live_bytes/(double)duration); _per_node_out.push_back((double)1000.0*node_live_outbytes/(double)duration); _per_node_in.push_back((double)1000.0*node_live_inbytes/(double)duration); if ((_special) && (_special < 4)){ Node::_special_node_out[_special-1].push_back((double)1000.0*node_live_outbytes/(double)duration); Node::_special_node_in[_special-1].push_back((double)1000.0*node_live_inbytes/(double)duration); ADEBUG(4) << "special crashed IN: " << node_live_inbytes << " OUT: " << node_live_outbytes << " DURATION: " << duration << " AVG_IN: " << ((double)1000.0*node_live_outbytes/(double)duration) << " AVG_OUT: " << ((double)1000.0*node_live_inbytes/(double)duration) << endl; } } } check_num_joins_pos(); assert( _num_joins_pos >= 0 ); Time session = now() - _last_joins[_num_joins_pos]; _last_joins[_num_joins_pos] = 0; _time_sessions.push_back( session );}voidNode::print_dist_stats(vector<double> v){ sort(v.begin(),v.end()); uint sz = v.size(); double allavg = 0.0; for (uint i = 0; i < sz; i++) allavg += v[i]; if (sz > 0) { printf("1p:%.3f 5p:%.3f 10p:%.3f 50p:%.3f 90p:%.3f 95p:%.3f 99p:%.3f 100p:%.3f avg:%.3f\n", v[(uint)(sz*0.01)], v[(uint)(sz*0.05)],v[(uint)(sz*0.1)], v[sz/2], v[(uint)(sz*0.9)], v[(uint)(sz*0.95)], v[(uint)(sz*0.99)], v[sz-1], allavg/sz); }else printf("\n");}voidNode::print_stats()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -