⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 该代码是采用后缀匹配的经典结构化p2p模型
💻 C
📖 第 1 页 / 共 5 页
字号:
    }  }    return true;}voidTapestry::oracle_node_died( IPAddress deadip, GUID deadid, 			    const set<Node *> *lid ){  TapDEBUG(2) << "Oracle says node died: " << deadip << "/" 	      << print_guid( deadid ) << endl;  _rt->remove( deadid, false );  _rt->remove_backpointer( deadip, deadid );  int match = guid_compare( deadid, id_digits() );  uint digit = get_digit( deadid, match );  // now find a replacement  vector<NodeInfo *> nodes;  Time bestrtt = 1000000;  Tapestry *bestnode = NULL;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() != ip() && currnode->alive() ) {      if( guid_compare( currnode->id_digits(), id_digits() ) == match &&	  get_digit( currnode->id(), match ) == digit ) {	Time rtt = 	  2*Network::Instance()->gettopology()->latency( ip(),							 currnode->ip() );	if( rtt < bestrtt ) {	  bestrtt = rtt;	  bestnode = currnode;	}      }    }  }    if( bestnode != NULL ) {    if( _rt->add( bestnode->ip(), bestnode->id(), bestrtt, false ) ) {      bestnode->got_backpointer( ip(), id(), 				 match, 				 false );      if( _lookup_learn ) {	_cachebag->remove( bestnode->id(), false );      }    }  }}voidTapestry::oracle_node_joined( Tapestry *t ){  TapDEBUG(2) << "Oracle says node joined: " << t->ip() << "/" 	      << print_guid( t->id() ) << endl;  Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), t->ip() );  if( _rt->add( t->ip(), t->id(), rtt, false ) ) {      t->got_backpointer( ip(), id(), 			  guid_compare( t->id_digits(), id_digits() ), 			  false );      // for now don't worry about removing backpointers for other people,      // they don't really matter  }}voidTapestry::check_rt(void *x){  TapDEBUG(2) << "Checking the routing table" << endl;  Time t = now();  // do nothing if we should be dead or not fully alive  if( !joined ) {    _stab_scheduled = false;    return;  }  // ping everyone in the routing table to  //  - update latencies  //  - ensure they're still alive    // the easy way to do this is just to make a vector of all the nodes  // in the routing table and simply try to add them all to the routing table.  vector<NodeInfo *> nodes;  for( uint i = 0; i < _digits_per_id; i++ ) {    for( uint j = 0; j < _base; j++ ) {      RouteEntry *re = _rt->get_entry( i, j );      if( re == NULL || (re->get_first() != NULL && 			 re->get_first()->_addr == ip() ) ) {	// if this an entry with our own ip, don't bother delving further	// the backups of yourself will appear in later levels	// there should be no other duplicates though, so vector is safe	continue;      }      for( uint k = 0; k < re->size() && k <= _repair_backups; k++ ) {	nodes.push_back( re->get_at(k) );      }    }  }  RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  Time before_ping = now();  multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &nodes, NULL, false );  multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, true);  TapDEBUG(2) << "finished checking routing table " << now() << " " << t << endl;  // reschedule  if ( t + _stabtimer < now()) {     //stabilizating has run past _stabtime seconds    // schedule it in one ms, to avoid recursion    TapDEBUG(2) << "rescheduling check_rt in 1" << endl;    delaycb( 1, &Tapestry::check_rt, (void *) 0 );  } else {    Time later = _stabtimer - (now() - t);    TapDEBUG(2) << "rescheduling check_rt in " << later << endl;    delaycb( later, &Tapestry::check_rt, (void *) 0 );  }}voidTapestry::initstate(){  const set<Node *> *lid = Network::Instance()->getallnodes();  // TODO: we shouldn't need locking in here, right?  if( !alive() ) {    _join_num++;    TapDEBUG(3) << "My alive is false" << endl;    return;  }  TapDEBUG(1) << "initstate: about to add everyone" << endl;  // for every node but this own, add them all to your routing table  vector<NodeInfo *> nodes;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() == ip() || !currnode->alive() ) {      continue;    }        // cheat and get the latency straight from the topology    Time rtt = Network::Instance()->gettopology()->latency( ip(), 							    currnode->ip() ) +      Network::Instance()->gettopology()->latency( currnode->ip(), ip() );          if( rtt >= 50000 ) {      continue;    }    _rt->add( currnode->ip(), currnode->id(), rtt, false );  }  // now that everyone's been added, place backpointers on everyone   // who is still in the table  uint known_nodes = 0;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() == ip() || !currnode->alive() ) {      continue;    }    // TODO: should I send backpointers for levels lower than the    // maximum match?  That would be a pain.    if( _rt->contains( currnode->id() ) ) {      currnode->got_backpointer( ip(), id(), 				 guid_compare( currnode->id_digits(), 					       id_digits() ), 				 false );      known_nodes++;    }  }    TapDEBUG(2) << "INITSTATE: " << known_nodes << endl;  have_joined();  TapDEBUG(2) << "init_state: finished adding everyone" << endl;}TimeTapestry::ping( IPAddress other_node, GUID other_id, bool &ok ){  // if it's already in the table, don't worry about it  if( _rt->contains( other_id ) ) {    return _rt->get_time( other_id );  }  Time before = now();  ping_args pa;  ping_return pr;  TapDEBUG(4) << "about to ping " << other_node << endl;  record_stat(STAT_PING, 0, 0);  ok = doRPC( other_node, &Tapestry::handle_ping, &pa, &pr, MAXTIME );  if( ok ) {    record_stat(STAT_PING, 0, 0);  }  TapDEBUG(4) << "done with ping " << other_node << endl;  return now() - before;}voidTapestry::multi_add_to_rt( vector<NodeInfo *> *nodes, 			   map<IPAddress, Time> *timing ){  RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  Time before_ping = now();  multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, nodes, timing, true );  multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, timing, 		       false );}voidTapestry::multi_add_to_rt_start( RPCSet *ping_rpcset, 				 HashMap<unsigned, ping_callinfo*> *ping_resultmap,				 vector<NodeInfo *> *nodes, 				 map<IPAddress, Time> *timing, 				 bool check_exist ){  ping_args pa;  ping_return pr;  for( uint j = 0; j < nodes->size(); j++ ) {    NodeInfo *ni = (*nodes)[j];    // do an asynchronous RPC to each one, collecting the ping times in the    // process    // however, no need to ping if we already know the ping time, right?    if( (!check_exist) // && now() - _last_heard_map[ni->_addr] >= _stabtimer) 	|| !_rt->contains( ni->_id ) ) {      record_stat(STAT_PING, 0, 0);      ping_callinfo *pi = New ping_callinfo(ni->_addr, ni->_id, now());      if( _rt->contains( ni->_id ) ) {	pi->last_timeout = _rtt_timeout_factor*_rt->get_time( ni->_id );      } else {	pi->last_timeout = MAXTIME;      }      TapDEBUG(3) << "Starting multi-add for " << ni->_addr << ", timeout: " 		  << pi->last_timeout << endl;      unsigned rpc = asyncRPC( ni->_addr, 			       &Tapestry::handle_ping, &pa, &pr, 			       pi->last_timeout );      assert(rpc);      ping_resultmap->insert(rpc, pi);      ping_rpcset->insert(rpc);      if( timing != NULL ) {	(*timing)[ni->_addr] = 1000000;      }    } else if( check_exist && _rt->contains( ni->_id ) && timing != NULL ) {      (*timing)[ni->_addr] = _rt->get_time( ni->_id );    }  }}voidTapestry::multi_add_to_rt_end( RPCSet *ping_rpcset, 			       HashMap<unsigned, ping_callinfo*> *ping_resultmap,			       Time before_ping, map<IPAddress, Time> *timing,			       bool repair ){  // check for done pings  assert( ping_rpcset->size() == (uint) ping_resultmap->size() );  while( ping_rpcset->size() > 0 ) {    bool ok;    unsigned donerpc = rcvRPC( ping_rpcset, ok );    ping_callinfo *pi = (*ping_resultmap)[donerpc];    assert( pi );    Time ping_time = now() - pi->pingstart;    if( !ok ) {            // this ping failed.  remove if you've reached the max time limit,      // otherwise try the ping again.      if( now() - before_ping >= _declare_dead_time ) {	pi->failed = true;      } else {	// put another shrimp on the barbie . . .	TapDEBUG(2) << "Forking off check of (" << pi->ip << "/" 		    << print_guid(pi->id) << ")" << endl;	check_node_args *cna = New check_node_args();	cna->ip = pi->ip;	_check_nodes->push_back( cna );	_check_nodes_waiting->notifyAll();      }          } else {      TapDEBUG(3) << "Finished multi-add for " << pi->ip << endl;      record_stat( STAT_PING, 0, 0 );      // TODO: ok, but now how do I call rt->add without it placing       // backpointers synchronously?      pi->rtt = ping_time;      if( timing != NULL ) {	(*timing)[pi->ip] = ping_time; //_rt->get_time( pi->id );      }      _rt->set_timeout( pi->id, false );    }    TapDEBUG(3) << "multidone ip: " << pi->ip <<       " total left " << ping_rpcset->size() << endl;  }  // now that all the pings are done, we can add them to our routing table  // (possibly sending synchronous backpointers around) without messing  // up the measurements  set<GUID> removed;  // put recently dead nodes on there too  for( uint i = 0; i < _recently_dead.size(); i++ ) {    if( !_rt->contains(_recently_dead[i]) ) {      removed.insert( _recently_dead[i] );    }  }  _recently_dead.clear();  for( HashMap<unsigned, ping_callinfo*>::iterator j=ping_resultmap->begin();       j != ping_resultmap->end(); ++j ) {    ping_callinfo *pi = j.value();    //assert( pi->rtt == ping( pi->ip, pi->id ) );    TapDEBUG(4) << "ip: " << pi->ip << endl;    if( pi->failed ) {      // failed! remove! no need to send a backpointer remove message      _rt->remove( pi->id, false );      _rt->remove_backpointer( pi->ip, pi->id );      assert( !_rt->contains( pi->id ) );      TapDEBUG(1) << "removing failed node " << pi->ip << endl;      if( repair ) {	  removed.insert( pi->id );      }    } else {      // make sure it's not the default (we actually pinged this one)      if( pi->rtt != 87654 ) {	_rt->add( pi->ip, pi->id, pi->rtt );	if( _lookup_learn ) {	  _cachebag->remove( pi->id, false );	}      }    }    delete pi;  }  // now, for each that should be repaired, ask the live nodes in its slot  // for a new possibility and ask all live nodes in all  // the levels above the slot.  if( repair ) {      RPCSet repair_rpcset;      HashMap<unsigned, repair_callinfo*> repair_resultmap;      HashMap<GUID, unsigned> repair_numasked;      for( uint j = _digits_per_id-1; j >= 0; j-- ) {	for( uint k = 0; k < _base; k++ ) {	  NodeInfo *ni = _rt->read( j, k );	  if( ni == NULL ) { 	    continue;	  }	  if( ni->_addr != ip() && removed.find(ni->_id) == removed.end() ) {	    	    bool send = false;	    repair_return *rr = NULL;	    repair_args *ra = NULL;	    for(set<GUID>::iterator i=removed.begin();i != removed.end();++i) {	      	      if( repair_numasked[*i] >= _max_repair_num ) {		continue;	      }	      int match = guid_compare( *i, id_digits() );	      assert( match >= 0 ); // shouldn't be me	      if( j < (uint) match ) continue;	      uint digit = get_digit( *i, match );	      repair_numasked.insert( *i, repair_numasked[*i]+1);	      if( ra == NULL ) {		rr = New repair_return();		ra = New repair_args();		ra->bad_ids = New vector<GUID>;		ra->levels = New vector<uint>;		ra->digits = New vector<uint>;		send = true;	      }	      	      ra->bad_ids->push_back(*i);	      ra->levels->push_back( (uint) match);	      ra->digits->push_back(digit);	      	    }	    if( send && ra->bad_ids->size() > 0 ) {	      record_stat(STAT_REPAIR, ra->bad_ids->size(), 			  2*ra->bad_ids->size());	      // just do these once, if we don't repair then oh well I guess...	      unsigned rpc = asyncRPC( ni->_addr, 				       &Tapestry::handle_repair, 				       ra, rr, 				       _rtt_timeout_factor*				       _rt->get_time(ni->_id) );	      assert(rpc);	      repair_resultmap.insert(rpc, New repair_callinfo( ra, rr ));	      repair_rpcset.insert(rpc);	    }	    	  }	  delete ni;	}	// don't wrap the uint around past 0	if( j == 0 ) {	  break;	}      }      uint rsetsize = repair_rpcset.size();      vector<NodeInfo *> toadd;      for( unsigned int i = 0; i < rsetsize; i++ ) {	  bool ok;	  unsigned donerpc = rcvRPC( &repair_rpcset, ok );	  repair_callinfo *rc = repair_resultmap[donerpc];	  repair_return *rr = rc->rr;	  if( ok ) {	      record_stat( STAT_REPAIR, rr->nodelist.size(), 0 );	      for( vector<NodeInfo>::iterator j=rr->nodelist.begin();		   j != rr->nodelist.end(); j++ ) {		  // make sure this isn't already on the toadd list, and		  // that it wasn't removed by us		  // TODO: this is all sorts of inefficient		  bool add = true;		  if( removed.find((*j)._id ) != removed.end() ) {		    add = false;		  }		  if( add ) {		      for( vector<NodeInfo *>::iterator k=toadd.begin(); 			   k != toadd.end(); ++k) {			  if( (**k)._id == (*j)._id ) {			      add = false;			      break;			  }		      }		  }		  if( add ) {		      toadd.push_back( New NodeInfo( j->_addr, j->_id ) );		  }	      }	  }	  delete rr;	  rc->rr = NULL;	  delete rc->ra->bad_ids;	  delete rc->ra->levels;	  delete rc->ra->digits;	  delete rc;      }      multi_add_to_rt( &toadd, NULL );      // delete      for( uint i = 0; i < toadd.size(); i++ ) {	delete toadd[i];	toadd[i] = NULL;      }  }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -