⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 5 页
字号:
/* * Copyright (c) 2003-2005 Jeremy Stribling *                    Massachusetts Institute of Technology *  * Permission is hereby granted, free of charge, to any person obtaining * a copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to * permit persons to whom the Software is furnished to do so, subject to * the following conditions: *  * The above copyright notice and this permission notice shall be * included in all copies or substantial portions of the Software. *  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. *//* $Id: tapestry.C,v 1.56 2005/04/15 20:51:30 thomer Exp $ */#include "tapestry.h"#include "p2psim/network.h"#include <stdio.h>#include <math.h>#include <iostream>#include <map>#include "p2psim/bighashmap.hh"using namespace std;unsigned long long Tapestry::_num_lookups = 0;unsigned long long Tapestry::_num_succ_lookups = 0;unsigned long long Tapestry::_num_inc_lookups = 0;unsigned long long Tapestry::_num_fail_lookups = 0;unsigned long long Tapestry::_num_hops = 0;unsigned long long Tapestry::_total_latency = 0;Tapestry::Tapestry(IPAddress i, Args a) : P2Protocol(i),    _base(a.nget<uint>("base", 16, 10)),    _bits_per_digit((uint) (log10(((double) _base))/log10((double) 2))),    _digits_per_id((uint) 8*sizeof(GUID)/_bits_per_digit),    _redundant_lookup_num(a.nget<uint>("redundant_lookup_num", 3, 10)),    _redundancy(a.nget<uint>("redundancy", 3, 10)){  assert( _base <= 256 ); // for printing reasons  joined = false;  _joining = false;  _stab_scheduled = false;  _my_id = get_id_from_ip(ip());  TapDEBUG(2) << "Constructing" << endl;  _rt = New RoutingTable(this, _redundancy);  _waiting_for_join = New ConditionVar();  _check_nodes_waiting = New ConditionVar();  //stabilization timer  _stabtimer = a.nget<uint>("stabtimer", 10000, 10);  _join_num = 0;  _repair_backups = a.nget<uint>("repair_backups", 0, 10);  _verbose = a.nget<bool>("verbose", 0, 10);  _lookup_learn = a.nget<bool>("lookuplearn", 1, 10);  _direct_reply = a.nget<bool>("direct_reply", 1, 10);  _lookup_cheat = a.nget<bool>("lookupcheat", 1, 10);  _check_backpointers = a.nget<bool>("checkbp", 0, 10);  _nn_random = a.nget<bool>("nnrandom", 0, 10);  if( _lookup_learn ) {    _cachebag = New RoutingTable(this, _redundancy);  }  _max_lookup_time = a.nget<Time>("maxlookuptime", 4000, 10);  _declare_dead_time = a.nget<Time>("declare_dead", 30000, 10);  _declare_dead_num = a.nget<uint>("declare_dead_num", 4, 10);  _rtt_timeout_factor = a.nget<Time>("timeout_factor", 3, 10);  _max_repair_num = a.nget<uint>("max_repair_num", 5, 10);  _check_nodes = New vector<check_node_args *>;  // init stats  while (stat.size() < (uint) STAT_SIZE) {    stat.push_back(0);    num_msgs.push_back(0);  }  // initialize an array of guid digits  _my_id_digits = New uint[_digits_per_id];  for( uint i = 0; i < _digits_per_id; i++ ) {    _my_id_digits[i] = get_digit( id(), i );  }  _recently_dead.clear();  // start the checking thread  delaycb( 0, &Tapestry::check_node_loop, (void *) 0 );}Tapestry::~Tapestry(){  delete _rt;  if( _lookup_learn ) {    delete _cachebag;  }  delete _waiting_for_join;  delete _check_nodes;  delete _check_nodes_waiting;  delete [] _my_id_digits;  TapDEBUG(2) << "Destructing" << endl;  // print out statistics  print_stats();}voidTapestry::record_stat(stat_type type, uint num_ids, uint num_else ){  TapDEBUG(5) << "record stat " << type << endl;  assert(stat.size() > (uint) type);  Node::record_bw_stat( type, num_ids, num_else );  stat[type] += 20 + 4*num_ids + num_else;  num_msgs[type]++;}voidTapestry::print_stats(){  TapDEBUG(1) << "STATS: " <<    "join " << stat[STAT_JOIN] << " " << num_msgs[STAT_JOIN] <<    " lookup " << stat[STAT_LOOKUP] << " " << num_msgs[STAT_LOOKUP] <<    " nodelist " << stat[STAT_NODELIST] << " " << num_msgs[STAT_NODELIST] <<    " mc " << stat[STAT_MC] << " " << num_msgs[STAT_MC] <<    " ping " << stat[STAT_PING] << " " << num_msgs[STAT_PING] <<    " backpointer " << stat[STAT_BACKPOINTER] << " " 	      << num_msgs[STAT_BACKPOINTER] <<    " mcnotify " << stat[STAT_MCNOTIFY] << " " << num_msgs[STAT_MCNOTIFY] <<    " nn " << stat[STAT_NN] << " " << num_msgs[STAT_NN] <<    " repair " << stat[STAT_REPAIR] << " " << num_msgs[STAT_REPAIR] <<    endl;  // let's print out global lookup stats  if( _num_lookups > 0 ) {    cout << "PARAMS: base " << _base << " stabtimer " << _stabtimer <<      " redundant_lookup_num " << _redundant_lookup_num << endl;    cout << "total lookups: " << _num_lookups << endl;    cout << "average lookup latency: " 	 << (((double)_total_latency)/((double) _num_lookups)) << endl;    cout << "average hops: " 	 << (((double)_num_hops)/((double) _num_lookups)) << endl;    cout << "success rate: " 	 << (((double)_num_succ_lookups)/((double) _num_lookups)) << endl;    cout << "incorrect rate: " 	 << (((double)_num_inc_lookups)/((double) _num_lookups)) << endl;    cout << "fail rate: " 	 << (((double)_num_fail_lookups)/((double) _num_lookups)) << endl;    _num_lookups = 0;    Node::print_stats();  }}voidTapestry::lookup(Args *args) {  if( !joined ) {    return;  }  GUID key = args->nget<GUID>("key");  if( _verbose ) {    TapDEBUG(0) << "Tapestry Lookup for key " << print_guid(key) << endl;  }  wrap_lookup_args *wla = New wrap_lookup_args();  wla->key = key;  wla->starttime = now();  wla->num_tries = 1;  wla->hopcount = 0;  wla->num_timeouts = 0;  wla->time_timeouts = 0;  lookup_wrapper( wla );}voidTapestry::lookup_wrapper(wrap_lookup_args *args){  lookup_args la;  la.key = args->key;  la.looker = ip();  la.starttime = args->starttime;  la.lasthop = ip();  la.lastrtt = 0;  lookup_return lr;      lr.hopcount = 0;  lr.failed = false;  lr.time_done = 0;  lr.num_timeouts = 0;  lr.time_timeouts = 0;  uint curr_join = _join_num;  handle_lookup( &la, &lr );  if( !alive() || _join_num != curr_join ) {    delete args;    TapDEBUG(2) << "Lookup aborting in wrapper, dead or rejoined" << endl;    return;  }  args->hopcount += lr.hopcount + lr.num_timeouts; // args.hopcount is TOTAL  args->num_timeouts += lr.num_timeouts;  args->time_timeouts += lr.time_timeouts;  if( !lr.failed && (!_lookup_cheat || (lr.owner_id == lr.real_owner_id)) &&      now() - args->starttime < _max_lookup_time ) {    if( _direct_reply && lr.time_done == 0 ) {      // I was dead when this recursive query got to me.  Ignore;      return;    }    if( _verbose ) {      TapDEBUG(0) << "Lookup complete for key " << print_guid(args->key) 		  << ": ip " << lr.owner_ip << ", id " 		  << print_guid(lr.owner_id) << ", hops " << args->hopcount		  << ", numtries " << args->num_tries << endl;    }    _num_lookups++;    _num_succ_lookups++;    _num_hops += args->hopcount;    if( _direct_reply ) {      _total_latency += ( lr.time_done - args->starttime );      record_lookup_stat( ip(), lr.owner_ip, lr.time_done - args->starttime,			  true, true, args->hopcount, args->num_timeouts, 			  args->time_timeouts);    } else {      _total_latency += ( now() - args->starttime );      record_lookup_stat( ip(), lr.owner_ip, now() - args->starttime,			  true, true, args->hopcount, args->num_timeouts, 			  args->time_timeouts);    }    delete args;  } else {    if( now() - args->starttime < _max_lookup_time ) {      args->num_tries = args->num_tries+1;      if( _verbose ) {	TapDEBUG(1) << "retrying failed or incorrect lookup for key " 		    << print_guid(args->key) << ", numtries " 		    << args->num_tries << endl;      }      delaycb( 100, &Tapestry::lookup_wrapper, args );    } else {      // failed or timed out      if( lr.failed || !_lookup_cheat || lr.owner_id == lr.real_owner_id ) {	if( _verbose ) {	  TapDEBUG(0) << "Lookup failed for key " << print_guid(args->key) 		      << endl;	}	record_lookup_stat( ip(), ip(), _max_lookup_time,			    false, false, args->hopcount, args->num_timeouts, 			    args->time_timeouts );	_num_fail_lookups++;      } else if( _lookup_cheat && lr.owner_id != lr.real_owner_id ) {	if( _direct_reply && lr.time_done == 0 ) {	  // I was dead when this recursive query got to me.  Ignore;	  return;	}	if( _verbose ) {	  TapDEBUG(0) << "Lookup incorrect for key " << print_guid(args->key) 		      << ": ip " << lr.owner_ip << ", id " 		      << print_guid(lr.owner_id) << ", real root " 		      << print_guid(lr.real_owner_id) << " hops " 		      << args->hopcount << ", numtries " << args->num_tries 		      << endl;	}	record_lookup_stat( ip(), ip(), _max_lookup_time,			    true, false, args->hopcount, args->num_timeouts, 			    args->time_timeouts );	_num_inc_lookups++;      } else {	if( _verbose ) {	  TapDEBUG(0) << "failed: " << lr.failed << ", lookupcheat " 		      << _lookup_cheat << ", owner " << lr.owner_id 		      << ", real owner " << lr.real_owner_id << ", start " 		      << args->starttime << ", key " << print_guid(args->key) 		      <<  endl;	}	assert(0); // this can't be!      }      _num_lookups++;      _num_hops += args->hopcount;      // all failures get the max time      _total_latency += _max_lookup_time;      delete args;          }  }}void Tapestry::handle_lookup_done(lookup_args *args, lookup_return *ret){  ret->time_done = now();}void Tapestry::handle_lookup(lookup_args *args, lookup_return *ret){  TapDEBUG(2) << "Looking up key " <<     print_guid(args->key) << " for node " << args->looker << endl;  TapDEBUG(5) << "hl enter" << endl;  // if we're learning from lookups, learn about the last hop as well as  // the source of the query  if( _lookup_learn && args->lasthop != ip() ) {    GUID id1 = get_id_from_ip(args->looker);    GUID id2 = get_id_from_ip(args->lasthop);    if( !_rt->contains( id1 ) ) {      _cachebag->add( args->looker, id1, MAXTIME, false );    }    if( !_rt->contains( id2 ) ) {      _cachebag->add( args->lasthop, id2, args->lastrtt, false );    }  }  // find the next hop for the key.  if it's me, i'm done  IPAddress *ips = New IPAddress[_redundant_lookup_num];  for( uint i = 0; i < _redundant_lookup_num; i++ ) {    ips[i] = 0;  }  GUID *ids = New GUID[_redundant_lookup_num];  for( uint i = 0; i < _redundant_lookup_num; i++ ) {    ids[i] = 0;  }  next_hop( args->key, &ips, &ids, _redundant_lookup_num );  uint i = 0;  uint curr_join = _join_num;  for( ; i < _redundant_lookup_num; i++ ) {    IPAddress next = ips[i];    TapDEBUG(3) << "Trying " << next << endl;    if( next == 0 ) {      continue;    }    if( next == ip() ) {      ret->owner_ip = ip();      ret->owner_id = id();      if( _lookup_cheat ) {	ret->real_owner_id = lookup_cheat( args->key );      }      ret->failed = false;      if( _direct_reply ) {	bool succ = retryRPC( args->looker, 			      &Tapestry::handle_lookup_done, args, ret,			      STAT_LOOKUP, 1, 0);	// no need to count return bytes, since it's recursive	if( !succ ) {	  // the originator is no longer alive, so ignore	  ret->time_done = 0;	}      }      break;    } else {      // it's not me, so forward the query      record_stat(STAT_LOOKUP, 1, 0);      Time before = now();      GUID nextid = ids[i];      // don't want to route to nodes who have recently timed out      if( _rt->get_timeout( nextid ) ) {	continue;      }      args->lasthop = ip();      args->lastrtt = _rt->get_time( nextid );      bool succ = doRPC( next, &Tapestry::handle_lookup, args, ret, 			 _rtt_timeout_factor*_rt->get_time(nextid) );      if( succ ) {	_last_heard_map[next] = now();	// every successful RPC counts as a "hop"	ret->hopcount++;	if( !_direct_reply || (ret->failed && args->looker == ip()) ) {	  // only record in the non-direct reply case OR	  // if it's a direct reply and a failure, we should include 	  // those bytes exactly once at the source (to simulate a direct 	  // reply)	  record_stat( STAT_LOOKUP, 1, 2 );	}      } else {	// remove it from our routing table	check_node_args *cna = New check_node_args();	cna->ip = next;	cna->id = nextid;	_check_nodes->push_back( cna );	_check_nodes_waiting->notifyAll();	TapDEBUG(3) << "Forking off check (due to lookup) of (" << cna->ip 		    << "/" << print_guid(nextid) << ")" << endl;	// record the timeout stats	// every unsuccessful RPC counts as a "timeout"	ret->num_timeouts++;	ret->time_timeouts += (now() - before);      }      if( succ && !ret->failed ) {	break;      } else { 	// since we're using recursive routing, we only do this check in the	// case of non-success.	// make sure we haven't crashed and/or started another join	if( !alive() || _join_num != curr_join ) {	  ret->failed = true;	  TapDEBUG(2) << "Lookup aborting, dead or rejoined" << endl;	  delete ips;	  delete ids;	  return;	}	// print out that a failure happened	if( _verbose ) {	  TapDEBUG(1) << "Failure happened during the lookup of key " << 	    print_guid(args->key) << ", trying to reach node " << next << 	    " for node " << args->looker << endl;	}	ret->failed = false;      }    }    // we're retrying if we've gotten down here, so if we've    // exceeded the max time we should quit    if( now() - args->starttime > _max_lookup_time ) {      i = _redundant_lookup_num;      TapDEBUG(1) << "Timed out looking for " << print_guid(args->key) << endl;      break;    }  }  // if we were never successful, set the failed flag  if( i == _redundant_lookup_num ) {    TapDEBUG(1) << "setting failed to true for key " << print_guid(args->key)		<< endl;    ret->failed = true;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -