⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 5 页
字号:
  }  delete ips;  delete ids;  TapDEBUG(5) << "hl exit " << endl;}voidTapestry::insert(Args *args) {  TapDEBUG(2) << "Tapestry Insert" << endl;}template<class BT, class AT, class RT>boolTapestry::retryRPC(IPAddress dst, void (BT::* fn)(AT *, RT *), 		   AT *args, RT *ret, uint type, uint num_args_id, 		   uint num_args_else){  Time starttime = now();  GUID dstid = get_id_from_ip(dst);  Time timeout;  if( dst != ip() && _rt->contains( dstid ) ) {    timeout = _rtt_timeout_factor * _rt->get_time( dstid );  } else {    timeout = MAXTIME;  }  while( now() < starttime + _declare_dead_time ) {    if( dst != ip() ) {      record_stat( type, num_args_id, num_args_else);    }    bool succ = doRPC(dst, fn, args, ret, timeout);    if (succ) {      return true;    }    timeout *= 2;  }  return false;}voidTapestry::check_node_loop(void * args){  ping_args pa;  ping_return pr;  // use ping callinfos   RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  bool waiting = false;  unsigned condvar_token = 1;  TapDEBUG(3) << "Entered check_node_loop" << endl;  while( true ) {    if( !waiting ) {      // initialize the Channel and add it to the condition variable      // add it to the rpc map right away so no one else takes the 1 token      Channel *c = chancreate(sizeof(int*), 0);      assert( !_rpcmap[condvar_token] );      _rpcmap.insert( condvar_token, New RPCHandle(c, New Packet()) );      _check_nodes_waiting->wait_noblock(c);      ping_rpcset.insert(condvar_token);      waiting = true;      // this RPCHandle, channel and packet will get deleted by      // Node::_deleteRPC, called from rcvRPC      TapDEBUG(3) << "Starting loop in check_node_loop" << endl;      // check for any new IDs      for( vector<check_node_args *>::iterator i=_check_nodes->begin();	   i != _check_nodes->end(); i++ ) {		check_node_args *check = *i;	assert( check );	IPAddress checkip = check->ip;	GUID checkid = check->id;	TapDEBUG(2) << "Going to check (" << checkip << "/" 		    << print_guid(checkid) << ") in check_node_loop." << endl;		if( checkid == id() || !_rt->contains(checkid) || 	    _rt->get_timeout( checkid ) ) {	  // we're already checking this person	  TapDEBUG(2) << "We're already checking (" << checkip << "/" 		      << print_guid(checkid) << ") " << _rt->contains(checkid) 		      << " " << _rt->get_timeout( checkid ) 		      << ", so ignoring." << endl;	  delete check;	  continue;	}	Time timeout;	if( checkid != ip() && _rt->contains( checkid ) ) {	  timeout = _rtt_timeout_factor * _rt->get_time( checkid );	} else {	  timeout = MAXTIME;	}		// start an asyncRPC and save the info	if( checkid != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	unsigned rpc = asyncRPC( checkip, &Tapestry::handle_ping, &pa, &pr, 				 timeout );	assert(rpc);	ping_callinfo *pi = New ping_callinfo( checkip, checkid, now() );	pi->last_timeout = timeout;	pi->times_tried = 0;	ping_resultmap.insert(rpc, pi);	ping_rpcset.insert(rpc);		_rt->set_timeout( pi->id, true );		// now try to find a better node to put in your table from the cache	if( _lookup_learn ) {	  int digit = guid_compare( id(), pi->id );	  assert( digit != -1 );	  uint val = get_digit( pi->id, digit );	  NodeInfo *n = _cachebag->read( digit, val );	  if( n != NULL ) {	    TapDEBUG(2) << "Lookup-learn adding (" << n->_addr << "/" 			<< print_guid(n->_id) << ") to replace (" << checkip			<< "/" << print_guid(checkid) << ")" << endl;	    _cachebag->remove( n->_id, false );	    _rt->add( n->_addr, n->_id, n->_distance, false);	    delete n;	  }	}		delete check;	      }      _check_nodes->clear();    }    // now be extremely clever and select over both the asyncRPCs and the    // channel that's waiting for new nodes to check    bool ok;    uint rpc = rcvRPC( &ping_rpcset, ok );        // TODO: what happens if we get an RPC back after dying/re-joining???    // if it's not the condvar one, do some stuff, otherwise go directly    // to the top of the loop and add more nodes    if( rpc != condvar_token ) {      assert(rpc);      ping_callinfo *pi = ping_resultmap[rpc];      pi->times_tried++;      if( ok ) {	TapDEBUG(2) << "check_node_loop found that (" << pi->ip << "/" <<	  print_guid( pi->id ) << ") is alive." << endl;	// this RPC succeeded, so this node wasn't dead after all.  apologize.	if( pi->ip != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	_rt->set_timeout( pi->id, false );	delete pi;      } else if( pi->times_tried >= _declare_dead_num || 		 now() - pi->pingstart >= _declare_dead_time ) {	// this node is officially dead.  make it so.	_rt->remove( pi->id, false );	_rt->remove_backpointer( pi->ip, pi->id );	_recently_dead.push_back(pi->id);    	TapDEBUG(2) << "Declaring (" << pi->ip << "/" << print_guid(pi->id) 		    << ") dead in check_node_loop" << endl;	delete pi;      } else {	// otherwise schedule this person again, doubling the timeout	pi->last_timeout *= 2;	if( pi->ip != ip() ) {	  record_stat( STAT_PING, 0, 0);	}	unsigned newrpc = asyncRPC( pi->ip, &Tapestry::handle_ping, &pa, &pr, 				    pi->last_timeout );	assert(newrpc);	ping_resultmap.insert(newrpc, pi);	ping_rpcset.insert(newrpc);	      }    } else {      TapDEBUG(3) << "Notified via condvar in check_node_loop" << endl;      waiting = false;    }  }}voidTapestry::have_joined(){   joined = true;   _joining = false;   _waiting_for_join->notifyAll();   if( _verbose ) {     TapDEBUG(0) << "Finishing joining." << endl;   }   if( !_stab_scheduled && _stabtimer > 0 ) {     delaycb( random()%_stabtimer, &Tapestry::check_rt, (void *) 0 );     _stab_scheduled = true;   }}// External event that tells a node to contact the well-known node// and try to join.voidTapestry::join(Args *args){  TapDEBUG(5) << "j enter" << endl;  _my_id = get_id_from_ip(ip());  // initialize an array of guid digits  delete [] _my_id_digits;  _my_id_digits = New uint[_digits_per_id];  for( uint i = 0; i < _digits_per_id; i++ ) {    _my_id_digits[i] = get_digit( id(), i );  }  IPAddress wellknown_ip = args->nget<IPAddress>("wellknown");  TapDEBUG(3) << ip() << " Wellknown: " << wellknown_ip << endl;  if( _join_num == 0 && wellknown_ip == ip() ) {    notifyObservers();  } else if( _join_num > 0 ) {    // move these constructions to here since after a crash we won't    // know our new IP/ID pair until the next join.    delete _rt;    _rt = New RoutingTable(this, _redundancy);    if( _lookup_learn ) {      delete _cachebag;      _cachebag = New RoutingTable(this, _redundancy);    }    notifyObservers( (ObserverInfo *) "join" );  }  if( _joining ) {    TapDEBUG(0) << "Tried to join while joining -- ignoring" << endl;    return;  }  uint curr_join = ++_join_num;  // might already be joined if init_state was used  if( joined ) {    TapDEBUG(5) << "j exit" << endl;    return;  }  _joining = true;  if( _verbose ) {    TapDEBUG(0) << "Tapestry join " << curr_join << endl;  }  // if we're the well known node, we're done  if( ip() == wellknown_ip ) {    have_joined();    TapDEBUG(5) << "j exit" << endl;    return;  }  // contact the well known machine, and have it start routing to the surrogate  join_args ja;  ja.ip = ip();  ja.id = id();  join_return jr;  uint attempts = 30;  // keep attempting to join until you can't attempt no mo'  do {    attempts--;    jr.failed = false;    bool succ = retryRPC( wellknown_ip, &Tapestry::handle_join, &ja, &jr, 			  STAT_JOIN, 1, 0);    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      return;    }        if( !succ ) {      TapDEBUG(0) << "Well known ip died!  BAD!" << endl;      return;    } else {      record_stat(STAT_JOIN, 1, 1);    }  } while( jr.failed && attempts >= 0 );  if( jr.failed ) {    TapDEBUG(0) << "All my joins failed!  BAD!" << endl;    return;  }  // now that the multicast is over, it's time for nearest neighbor  // ping everyone on the initlist  int init_level = guid_compare( jr.surr_id, id_digits() );   vector<NodeInfo *> seeds;  TapDEBUG(2) << "init level of " << init_level << endl;  for( int i = init_level; i >= 0; i-- ) {        // go through each member of the init list, add them to your routing     // table, and ask them for their forward and backward pointers     // to do this, set off two sets of asynchRPCs at once, one to ping all    // the nodes and find out their distances, and another to get the nearest    // neighbors.  these can't easily be combined since the nearest neighbor    // call does a ping itself (so measured rtt would be double).    RPCSet ping_rpcset;    HashMap<unsigned, ping_callinfo*> ping_resultmap;    Time before_ping = now();    TapDEBUG(3) << "initing level " << i << " out of " << init_level 		<< " size = " << initlist.size() << endl;    multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &initlist, 			   NULL, true );    RPCSet nn_rpcset;    HashMap<unsigned, nn_callinfo*> nn_resultmap;    unsigned int num_nncalls = 0;    for( uint j = 0; j < initlist.size(); j++ ) {      NodeInfo ni = *(initlist[j]);      // also do an async nearest neighbor call      nn_args *na = New nn_args();      na->ip = ip();      na->id = id();      na->alpha = i;      nn_return *nr = New nn_return();      record_stat(STAT_NN, 1, 1);      Time timeout = MAXTIME;      if( _rt->contains( ni._id ) ) {	timeout = _rtt_timeout_factor*_rt->get_time( ni._id );      }      unsigned rpc = asyncRPC( ni._addr, &Tapestry::handle_nn, na, nr, 			       timeout );      assert(rpc);      nn_resultmap.insert(rpc, New nn_callinfo(ni._addr, na, nr));      nn_rpcset.insert(rpc);      num_nncalls++;    }    multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, 			 false );    for( uint j = 0; j < num_nncalls; j++ ) {      bool ok;      unsigned donerpc = rcvRPC( &nn_rpcset, ok );      nn_callinfo *ncall = nn_resultmap[donerpc];      nn_return nnr = *(ncall->nr);      TapDEBUG(3) << "done with nn with " << ncall->ip << endl;      if( ok ) {	record_stat( STAT_NN, nnr.nodelist.size(), 0 );	for( uint k = 0; k < nnr.nodelist.size(); k++ ) {	  // make sure this one isn't on there yet	  // TODO: make this more efficient.  Maybe use a hash set or something	  NodeInfo *currseed = nnr.nodelist[k];	  	  // don't add ourselves, duh	  if( currseed->_id == id() ) {	    delete currseed;	    continue;	  }	  bool add = true;	  for( uint l = 0; l < seeds.size(); l++ ) {	    if( *currseed == *(seeds[l]) ) {	      add = false;	      break;	    }	  }	  if( add ) {	    seeds.push_back( currseed );	    TapDEBUG(3) << " has a seed of " << currseed->_addr << " and " << 	      print_guid( currseed->_id ) << endl;	  } else {	    delete currseed;	  }	}	TapDEBUG(3) << "done with adding seeds with " << ncall->ip << endl;      }      // TODO: for some reason the compiler gets mad if I try to delete nr      // in the destructor      delete ncall->nr;      ncall->nr = NULL;      delete ncall;    }    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the seeds      for( uint j = 0; j < seeds.size(); j++ ) {	NodeInfo *currseed = seeds[j];	delete currseed;      }      return;    }    // now that we have all the seeds we want from this one,     // see if they are in the closest k    NodeInfo * closestk[_k];    // initialize to NULL    for( uint k = 0; k < _k; k++ ) {      closestk[k] = NULL;    }    bool closest_full = false;    // add these guys to routing table    map< IPAddress, Time> timing;    TapDEBUG(3) << "About to add seeds to the routing table." << endl;    multi_add_to_rt( &seeds, &timing );    TapDEBUG(3) << "Done with adding seeds to the routing table." << endl;    // make sure we haven't crashed and/or started another join    if( !alive() || _join_num != curr_join ) {      TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl;      // need to delete all the seeds      for( uint j = 0; j < seeds.size(); j++ ) {	NodeInfo *currseed = seeds[j];	delete currseed;      }      return;    }    for( uint j = 0; j < seeds.size(); j++ ) {      NodeInfo *currseed = seeds[j];      // add them all to the routing table (this gets us the ping time for free      TapDEBUG(3) << "about to get distance for " << currseed->_addr << endl;      //add_to_rt( currseed->_addr, currseed->_id );      TapDEBUG(3) << "added to rt for " << currseed->_addr << endl;      //bool ok = false;      currseed->_distance = timing[currseed->_addr];      TapDEBUG(3) << "got distance for " << currseed->_addr << endl;      // is there anyone on the list farther than you?  if so, replace      bool added = false;      for( uint k = 0; k < _k; k++ ) {	NodeInfo * currclose = closestk[k];	if( currclose == NULL || 	    (closest_full && currclose->_distance > currseed->_distance ) ) {	  if( currclose != NULL ) {	    delete currclose;	  }	  closestk[k] = currseed;	  added = true;	  if( k == _k - 1 ) {	    closest_full = true;	  }	  //TapDEBUG(2) << "close is " << currseed->_addr << endl;	  break;	}      }      if( !added ) {	// we're throwing this person away, so delete the memory	delete currseed;      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -