⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 P2P模拟器
💻 C
📖 第 1 页 / 共 5 页
字号:
      _rt->remove( pi->id, false );      _rt->remove_backpointer( pi->ip, pi->id );      assert( !_rt->contains( pi->id ) );      TapDEBUG(1) << "removing failed node " << pi->ip << endl;      if( repair ) {	  removed.insert( pi->id );      }    } else {      // make sure it's not the default (we actually pinged this one)      if( pi->rtt != 87654 ) {	_rt->add( pi->ip, pi->id, pi->rtt );	if( _lookup_learn ) {	  _cachebag->remove( pi->id, false );	}      }    }    delete pi;  }  // now, for each that should be repaired, ask the live nodes in its slot  // for a new possibility and ask all live nodes in all  // the levels above the slot.  if( repair ) {      RPCSet repair_rpcset;      HashMap<unsigned, repair_callinfo*> repair_resultmap;      HashMap<GUID, unsigned> repair_numasked;      for( uint j = _digits_per_id-1; j >= 0; j-- ) {	for( uint k = 0; k < _base; k++ ) {	  NodeInfo *ni = _rt->read( j, k );	  if( ni == NULL ) { 	    continue;	  }	  if( ni->_addr != ip() && removed.find(ni->_id) == removed.end() ) {	    	    bool send = false;	    repair_return *rr = NULL;	    repair_args *ra = NULL;	    for(set<GUID>::iterator i=removed.begin();i != removed.end();++i) {	      	      if( repair_numasked[*i] >= _max_repair_num ) {		continue;	      }	      int match = guid_compare( *i, id_digits() );	      assert( match >= 0 ); // shouldn't be me	      if( j < (uint) match ) continue;	      uint digit = get_digit( *i, match );	      repair_numasked.insert( *i, repair_numasked[*i]+1);	      if( ra == NULL ) {		rr = New repair_return();		ra = New repair_args();		ra->bad_ids = New vector<GUID>;		ra->levels = New vector<uint>;		ra->digits = New vector<uint>;		send = true;	      }	      	      ra->bad_ids->push_back(*i);	      ra->levels->push_back( (uint) match);	      ra->digits->push_back(digit);	      	    }	    if( send && ra->bad_ids->size() > 0 ) {	      record_stat(STAT_REPAIR, ra->bad_ids->size(), 			  2*ra->bad_ids->size());	      // just do these once, if we don't repair then oh well I guess...	      unsigned rpc = asyncRPC( ni->_addr, 				       &Tapestry::handle_repair, 				       ra, rr, 				       _rtt_timeout_factor*				       _rt->get_time(ni->_id) );	      assert(rpc);	      repair_resultmap.insert(rpc, New repair_callinfo( ra, rr ));	      repair_rpcset.insert(rpc);	    }	    	  }	  delete ni;	}	// don't wrap the uint around past 0	if( j == 0 ) {	  break;	}      }      uint rsetsize = repair_rpcset.size();      vector<NodeInfo *> toadd;      for( unsigned int i = 0; i < rsetsize; i++ ) {	  bool ok;	  unsigned donerpc = rcvRPC( &repair_rpcset, ok );	  repair_callinfo *rc = repair_resultmap[donerpc];	  repair_return *rr = rc->rr;	  if( ok ) {	      record_stat( STAT_REPAIR, rr->nodelist.size(), 0 );	      for( vector<NodeInfo>::iterator j=rr->nodelist.begin();		   j != rr->nodelist.end(); j++ ) {		  // make sure this isn't already on the toadd list, and		  // that it wasn't removed by us		  // TODO: this is all sorts of inefficient		  bool add = true;		  if( removed.find((*j)._id ) != removed.end() ) {		    add = false;		  }		  if( add ) {		      for( vector<NodeInfo *>::iterator k=toadd.begin(); 			   k != toadd.end(); ++k) {			  if( (**k)._id == (*j)._id ) {			      add = false;			      break;			  }		      }		  }		  if( add ) {		      toadd.push_back( New NodeInfo( j->_addr, j->_id ) );		  }	      }	  }	  delete rr;	  rc->rr = NULL;	  delete rc->ra->bad_ids;	  delete rc->ra->levels;	  delete rc->ra->digits;	  delete rc;      }      multi_add_to_rt( &toadd, NULL );      // delete      for( uint i = 0; i < toadd.size(); i++ ) {	delete toadd[i];	toadd[i] = NULL;      }  }}voidTapestry::place_backpointer( RPCSet *bp_rpcset, 			     HashMap<unsigned, backpointer_args*> *bp_resultmap, 			     IPAddress bpip, int level, 			     bool remove ){  backpointer_args *bpa = New backpointer_args();  bpa->ip = ip();  bpa->id = id();  bpa->level = level;  bpa->remove = remove;  backpointer_return bpr;  TapDEBUG(3) << "sending bp to " << bpip << endl;  record_stat(STAT_BACKPOINTER, 1, 2);  GUID bpid = get_id_from_ip(bpip);  Time timeout = MAXTIME;  if( _rt->contains( bpid ) ) {    timeout = _rtt_timeout_factor*_rt->get_time( bpid );  }  unsigned rpc = asyncRPC( bpip, &Tapestry::handle_backpointer, bpa, &bpr, 			   timeout );  bp_rpcset->insert(rpc);  bp_resultmap->insert(rpc, bpa);  // for now assume no failures}voidTapestry::place_backpointer_end( RPCSet *bp_rpcset,				 HashMap<unsigned,backpointer_args*>*bp_resultmap) {  // wait for each to finish  uint setsize = bp_rpcset->size();  RoutingTable *rt_old = _rt;  for( unsigned int j = 0; j < setsize; j++ ) {    bool ok;    unsigned donerpc = rcvRPC( bp_rpcset, ok );    if( ok ) {      record_stat(STAT_BACKPOINTER, 0, 0);    } else {      // for now we won't worry about what happens if a bp msg is dropped    }    backpointer_args *bpa = (*bp_resultmap)[donerpc];    delete bpa;  }  // must have the lock again before returning  if( _rt != rt_old ) {    // the routing table changed while we were away (i.e. we crashed)    // TODO: in the future, if place_backpointer_end is ever called not at    // the end of an rt function, we might need to do something here    TapDEBUG(3) << "rt changed while away in place_backpointer_end" << endl;  }}void Tapestry::handle_backpointer(backpointer_args *args, backpointer_return *ret){  TapDEBUG(3) << "got a bp msg from " << args->id << endl;  TapDEBUG(5) << "bp enter" << endl;  got_backpointer( args->ip, args->id, args->level, args->remove );  TapDEBUG(5) << "bp exit" << endl;  // maybe this person should be in our table?  //add_to_rt( args->ip, args->id );}voidTapestry::got_backpointer( IPAddress bpip, GUID bpid, uint level, bool remove ){  if( remove ) {    _rt->remove_backpointer( bpip, bpid, level );  } else {    _rt->add_backpointer( bpip, bpid, level );  }}IPAddressTapestry::next_hop( GUID key ){  IPAddress *ip = New IPAddress[1];  ip[0] = 0;  GUID *id = New GUID[1];  id[0] = 0;  next_hop( key, &ip, &id, 1 );  IPAddress rt = ip[0];  delete ip;  return rt;}voidTapestry::next_hop( GUID key, IPAddress** ips, GUID **ids, uint size ){  // first, figure out how many digits we share, and start looking at   // that level  int level = guid_compare( key, id_digits() );  if( level == -1 ) {    // it's us!    if( size > 0 ) {      (*ips)[0] = ip();      (*ids)[0] = id();    }    return;  }  RouteEntry *re = _rt->get_entry( level, get_digit( key, level ) );  // use the first value passed in in ips as a non-nexthop, that is, if   // ips[0] is nonnull, do NOT return that one as a possible next hop  IPAddress dontuse = (*ips)[0];  // if it has an entry, use it  if( re != NULL && re->get_first() != NULL &&       (re->get_first()->_addr != dontuse || re->size() > 1 ) ) {    // this cannot be us, since this should be the entry where we differ    // from the key, but just to be sure . . .     assert( re->get_first()->_addr != id() );     uint used = 0;     for( uint i = 0; i < re->size() && used < size; i++ ) {       if( re->get_at(i)->_addr != dontuse ) {	 (*ips)[used] = re->get_at(i)->_addr;	 (*ids)[used] = re->get_at(i)->_id;	 used++;       }     }     return;  } else {    // if there's no such entry, it's time to surrogate route.  yeah baby.    // keep adding 1 to the digit until you find a match.  If it's us,    // go up to the next level and keep on trying till the end    // (From Fig. 3 in the JSAC paper)    for( uint i = level; i < _digits_per_id; i++ ) {      uint j = get_digit( key, i );      while( re == NULL || re->get_first() == NULL || 	     (re->get_first()->_addr == dontuse && re->size() == 1) ) {	// NOTE: we can't start with looking at the j++ entry, since	// this might be the next level up, and we'd want to start at j	re = _rt->get_entry( i, j );	j++;	if( j == _base ) {	  // think circularly	  j = 0;	}      }      TapDEBUG(4) << "looking for key " << print_guid(key) << ", level " <<	i << ", digit " << j << " is " << print_guid(re->get_first()->_id) << 	endl;      // if it is us, go around another time      // otherwise, we've found the next hop      if( re->get_first()->_addr != ip() ) {	uint used = 0;	for( uint k = 0; k < re->size() && used < size; k++ ) {	  if( re->get_at(k)->_addr != dontuse ) {	    (*ips)[used] = re->get_at(k)->_addr;	    (*ids)[used] = re->get_at(k)->_id;	    used++;	  }	}	return;      }      re = NULL;    }      }  // didn't find a better surrogate, so it must be us  if( size > 0 ) {    (*ips)[0] = ip();    (*ids)[0] = id();  }}intTapestry::guid_compare( GUID key1, GUID key2 ){  // if they're the same, return -1  if( key1 == key2 ) {    return -1;  }  uint i = 0;  // otherwise, figure out where they differ  for( ; i < _digits_per_id; i++ ) {    if( get_digit( key1, i ) != get_digit( key2, i ) ) {      break;    }  }  return (int) i;}intTapestry::guid_compare( GUID key1, uint key2_digits[] ){  uint i = 0;  // otherwise, figure out where they differ  for( ; i < _digits_per_id; i++ ) {    if( get_digit( key1, i ) != key2_digits[i] ) {      break;    }  }  // if they're the same, return -1  if( i == _digits_per_id ) {    return -1;  }  return (int) i;}intTapestry::guid_compare( uint key1_digits[], uint key2_digits[] ){  uint i = 0;  // otherwise, figure out where they differ  for( ; i < _digits_per_id; i++ ) {    if( key1_digits[i] != key2_digits[i] ) {      break;    }  }  // if they're the same, return -1  if( i == _digits_per_id ) {    return -1;  }  return (int) i;}voidTapestry::leave(Args *args){  crash (args);}voidTapestry::crash(Args *args){  if( _verbose ) {    TapDEBUG(0) << "Tapestry crash" << endl;  }    // XXX: Thomer says: not necessary  // crash();  // clear out routing table, and any other state that might be lying around  delete _rt;  if( _lookup_learn ) {    delete _cachebag;  }  joined = false;  // clear join state also  _joining = false;  for( uint l = 0; l < initlist.size(); l++ ) {    delete initlist[l];  }  initlist.clear();  _recently_dead.clear();  // although these should be used since the node is dead, and the real new  // ones aren't made until the join (we don't know our ID until then),  // they are necessary for outstanding RPCs.  _rt = New RoutingTable(this, _redundancy);  if( _lookup_learn ) {    _cachebag = New RoutingTable(this, _redundancy);  }  // TODO: should be killing these waiting RPCs instead of allowing them  // to finish normally.  bah.  _waiting_for_join->notifyAll();  TapDEBUG(2) << "crash exit" << endl;  notifyObservers( (ObserverInfo *) "crash" );}stringTapestry::print_guid( GUID id ){  // NOTE: for now only handle up to base 256 (should be plenty)  uint multiplier = 1;  uint extra = 1;  if( _base > 16 ) {    multiplier++;    extra += _digits_per_id;  }  uint size = _digits_per_id*multiplier+extra;  char buf[size];  //printf( "initial guid: %16qx\n", id );  // print it out, digit by digit  // (in order to get leading zeros)  uint j = 0;  for( uint i = 0; i < size-1; i++ ) {    uint digit = get_digit( id, j );    if( _base > 16 ) {      sprintf( &(buf[i]), "%.2x", digit );      i += 2;    } else {      sprintf( &(buf[i]), "%x", digit );    }    j++;    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -