⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tapestry.c

📁 基于chord算法的p2p文件系统。A p2p file system based on chord.
💻 C
📖 第 1 页 / 共 5 页
字号:
  // finally, add this guy to our table  add_to_rt( args->ip, args->id );  TapDEBUG(5) << "nn exit" << endl;}void Tapestry::handle_repair(repair_args *args, repair_return *ret){  TapDEBUG(5) << "rep enter" << endl;  // send back all backward pointers  vector<NodeInfo> nns;  for( uint j = 0; j < args->bad_ids->size(); j++ ) {    GUID bad = (*(args->bad_ids))[j];    uint level = (*(args->levels))[j];    uint digit = (*(args->digits))[j];    assert( level < _digits_per_id && digit < _base );        // TODO: limit the number?    RouteEntry *re = _rt->get_entry( level, digit );    for( uint i = 0; re != NULL && i < re->size(); i++ ) {      NodeInfo *next = re->get_at(i);      if( next != NULL && next->_addr != ip() && next->_id != bad ) {	nns.push_back( *next );      }    }      }  ret->nodelist = nns;  TapDEBUG(5) << "rep exit" << endl;}void Tapestry::handle_ping(ping_args *args, ping_return *ret){  TapDEBUG(4) << "pinged." << endl;  // do nothing}void Tapestry::handle_mcnotify(mcnotify_args *args, mcnotify_return *ret){  TapDEBUG(5) << "mcn enter" << endl;  TapDEBUG(3) << "got mcnotify from " << args->ip << endl;  NodeInfo *mc_node = New NodeInfo( args->ip, args->id );  initlist.push_back( mc_node );  // add all the nodes on the nodelist as well  nodelist_args na;  na.nodelist = args->nodelist;  nodelist_return nr;  handle_nodelist( &na, &nr );  TapDEBUG(5) << "mcn exit" << endl;}void Tapestry::add_to_rt( IPAddress new_ip, GUID new_id ){  // first get the distance to this node  // TODO: asynchronous pings would be nice  // TODO: also, maybe a ping cache of some kind  bool ok = true;  Time distance = ping( new_ip, new_id, ok );  if( ok ) {    // the RoutingTable takes care of placing (and removing) backpointers     // for us    _rt->add( new_ip, new_id, distance );    if( _lookup_learn ) {      _cachebag->remove( new_id, false );    }  }  return;}boolTapestry::stabilized(vector<GUID> lid){    // if we don't think we've joined, we can't possibly be stable  if( !joined ) {    TapDEBUG(1) << "Haven't even joined yet." << endl;    return false;  }  // for every GUID, make sure it either exists in your routing table,  // or someone else does  // TODO: do we need locking in this function?  for( uint i = 0; i < lid.size(); i++ ) {    GUID currguid = lid[i];    if( !_rt->contains(currguid) ) {      int match = guid_compare( currguid, id_digits() );      if( match == -1 ) {	TapDEBUG(1) << "doesn't have itself in the routing table!" << endl;	return false;      } else {	NodeInfo * ni = _rt->read( match, get_digit( currguid, match ) );	if( ni == NULL ) {	  TapDEBUG(1) << "has a hole in the routing table at (" << match <<	    "," << get_digit( currguid, match ) << ") where " << 	    print_guid( currguid ) << " would fit." << endl;	    return false;	} else {	  delete ni;	}      }    }  }    return true;}voidTapestry::oracle_node_died( IPAddress deadip, GUID deadid, 			    const set<Node *> *lid ){  TapDEBUG(2) << "Oracle says node died: " << deadip << "/" 	      << print_guid( deadid ) << endl;  _rt->remove( deadid, false );  _rt->remove_backpointer( deadip, deadid );  int match = guid_compare( deadid, id_digits() );  uint digit = get_digit( deadid, match );  // now find a replacement  vector<NodeInfo *> nodes;  Time bestrtt = 1000000;  Tapestry *bestnode = NULL;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() != ip() && currnode->alive() ) {      if( guid_compare( currnode->id_digits(), id_digits() ) == match &&	  get_digit( currnode->id(), match ) == digit ) {	Time rtt = 	  2*Network::Instance()->gettopology()->latency( ip(),							 currnode->ip() );	if( rtt < bestrtt ) {	  bestrtt = rtt;	  bestnode = currnode;	}      }    }  }    if( bestnode != NULL ) {    if( _rt->add( bestnode->ip(), bestnode->id(), bestrtt, false ) ) {      bestnode->got_backpointer( ip(), id(), 				 match, 				 false );      if( _lookup_learn ) {	_cachebag->remove( bestnode->id(), false );      }    }  }}voidTapestry::oracle_node_joined( Tapestry *t ){  TapDEBUG(2) << "Oracle says node joined: " << t->ip() << "/" 	      << print_guid( t->id() ) << endl;  Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), t->ip() );  if( rtt >= 50000 ) {    return;  }  if( _rt->add( t->ip(), t->id(), rtt, false ) ) {      t->got_backpointer( ip(), id(), 			  guid_compare( t->id_digits(), id_digits() ), 			  false );      // for now don't worry about removing backpointers for other people,      // they don't really matter  }}voidTapestry::check_rt(void *x){  TapDEBUG(2) << "Checking the routing table" << endl;  Time t = now();  // do nothing if we should be dead or not fully alive  if( !joined ) {    _stab_scheduled = false;    return;  }  // ping everyone in the routing table to  //  - update latencies  //  - ensure they're still alive    // the easy way to do this is just to make a vector of all the nodes  // in the routing table and simply try to add them all to the routing table.  vector<NodeInfo *> nodes;  for( uint i = 0; i < _digits_per_id; i++ ) {    for( uint j = 0; j < _base; j++ ) {      RouteEntry *re = _rt->get_entry( i, j );      if( re == NULL || (re->get_first() != NULL && 			 re->get_first()->_addr == ip() ) ) {	// if this an entry with our own ip, don't bother delving further	// the backups of yourself will appear in later levels	// there should be no other duplicates though, so vector is safe	continue;      }      for( uint k = 0; k < re->size() && k <= _repair_backups; k++ ) {	nodes.push_back( re->get_at(k) );      }    }  }  // maybe we want to check backpointers too  if( _check_backpointers ) {    for( uint i = 0; i < _digits_per_id; i++ ) {      vector<NodeInfo *> *bps = _rt->get_backpointers( i );      for( uint j = 0; j < bps->size(); j++ ) {	if( !_rt->contains( ((*bps)[j])->_id ) ) {	  nodes.push_back( (*bps)[j] );	}      }    }  }  RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  Time before_ping = now();  multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &nodes, NULL, false );  multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, true);  TapDEBUG(2) << "finished checking routing table " << now() << " " 	      << t << endl;  // reschedule  if ( t + _stabtimer < now()) {     //stabilizating has run past _stabtime seconds    // schedule it in one ms, to avoid recursion    TapDEBUG(3) << "rescheduling check_rt in 1" << endl;    delaycb( 1, &Tapestry::check_rt, (void *) 0 );  } else {    Time later = _stabtimer - (now() - t);    TapDEBUG(3) << "rescheduling check_rt in " << later << endl;    delaycb( later, &Tapestry::check_rt, (void *) 0 );  }}voidTapestry::initstate(){  const set<Node *> *lid = Network::Instance()->getallnodes();  // TODO: we shouldn't need locking in here, right?  if( !alive() ) {    _join_num++;    TapDEBUG(3) << "My alive is false" << endl;    return;  }  TapDEBUG(1) << "initstate: about to add everyone" << endl;  // for every node but this own, add them all to your routing table  vector<NodeInfo *> nodes;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() == ip() || !currnode->alive() ) {      continue;    }        // cheat and get the latency straight from the topology    Time rtt = Network::Instance()->gettopology()->latency( ip(), 							    currnode->ip() ) +      Network::Instance()->gettopology()->latency( currnode->ip(), ip() );          if( rtt >= 50000 ) {      continue;    }    _rt->add( currnode->ip(), currnode->id(), rtt, false );  }  // now that everyone's been added, place backpointers on everyone   // who is still in the table  uint known_nodes = 0;  for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) {    Tapestry *currnode = (Tapestry*) *i;    if( currnode->ip() == ip() || !currnode->alive() ) {      continue;    }    // TODO: should I send backpointers for levels lower than the    // maximum match?  That would be a pain.    if( _rt->contains( currnode->id() ) ) {      currnode->got_backpointer( ip(), id(), 				 guid_compare( currnode->id_digits(), 					       id_digits() ), 				 false );      known_nodes++;    }  }    TapDEBUG(3) << "INITSTATE: " << known_nodes << endl;  have_joined();  TapDEBUG(2) << "init_state: finished adding everyone" << endl;}TimeTapestry::ping( IPAddress other_node, GUID other_id, bool &ok ){  // if it's already in the table, don't worry about it  if( _rt->contains( other_id ) ) {    return _rt->get_time( other_id );  }  Time before = now();  ping_args pa;  ping_return pr;  TapDEBUG(4) << "about to ping " << other_node << endl;  record_stat(STAT_PING, 0, 0);  ok = doRPC( other_node, &Tapestry::handle_ping, &pa, &pr, MAXTIME );  if( ok ) {    record_stat(STAT_PING, 0, 0);  }  TapDEBUG(4) << "done with ping " << other_node << endl;  return now() - before;}voidTapestry::multi_add_to_rt( vector<NodeInfo *> *nodes, 			   map<IPAddress, Time> *timing ){  RPCSet ping_rpcset;  HashMap<unsigned, ping_callinfo*> ping_resultmap;  Time before_ping = now();  multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, nodes, timing, true );  multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, timing, 		       false );}voidTapestry::multi_add_to_rt_start( RPCSet *ping_rpcset, 				 HashMap<unsigned, ping_callinfo*> *ping_resultmap,				 vector<NodeInfo *> *nodes, 				 map<IPAddress, Time> *timing, 				 bool check_exist ){  ping_args pa;  ping_return pr;  for( uint j = 0; j < nodes->size(); j++ ) {    NodeInfo *ni = (*nodes)[j];    // do an asynchronous RPC to each one, collecting the ping times in the    // process    // however, no need to ping if we already know the ping time, right?    if( (!check_exist) // && now() - _last_heard_map[ni->_addr] >= _stabtimer) 	|| !_rt->contains( ni->_id ) ) {      record_stat(STAT_PING, 0, 0);      ping_callinfo *pi = New ping_callinfo(ni->_addr, ni->_id, now());      if( _rt->contains( ni->_id ) ) {	TapDEBUG(4) << "GetTime()-ing for " << ni->_addr << "/" 		    << print_guid(ni->_id) << endl;	pi->last_timeout = _rtt_timeout_factor*_rt->get_time( ni->_id );      } else {	pi->last_timeout = MAXTIME;      }      TapDEBUG(3) << "Starting multi-add for " << ni->_addr << ", timeout: " 		  << pi->last_timeout << endl;      unsigned rpc = asyncRPC( ni->_addr, 			       &Tapestry::handle_ping, &pa, &pr, 			       pi->last_timeout );      assert(rpc);      ping_resultmap->insert(rpc, pi);      ping_rpcset->insert(rpc);      if( timing != NULL ) {	(*timing)[ni->_addr] = 1000000;      }    } else if( check_exist && _rt->contains( ni->_id ) && timing != NULL ) {      (*timing)[ni->_addr] = _rt->get_time( ni->_id );    }  }}voidTapestry::multi_add_to_rt_end( RPCSet *ping_rpcset, 			       HashMap<unsigned, ping_callinfo*> *ping_resultmap,			       Time before_ping, map<IPAddress, Time> *timing,			       bool repair ){  // check for done pings  assert( ping_rpcset->size() == (uint) ping_resultmap->size() );  while( ping_rpcset->size() > 0 ) {    bool ok;    unsigned donerpc = rcvRPC( ping_rpcset, ok );    ping_callinfo *pi = (*ping_resultmap)[donerpc];    assert( pi );    Time ping_time = now() - pi->pingstart;    if( !ok ) {            // this ping failed.  remove if you've reached the max time limit,      // otherwise try the ping again.      if( now() - before_ping >= _declare_dead_time ) {	pi->failed = true;      } else {	// put another shrimp on the barbie . . .	TapDEBUG(3) << "Forking off check of (" << pi->ip << "/" 		    << print_guid(pi->id) << ")" << endl;	check_node_args *cna = New check_node_args();	cna->ip = pi->ip;	cna->id = pi->id;	_check_nodes->push_back( cna );	_check_nodes_waiting->notifyAll();      }          } else {      TapDEBUG(3) << "Finished multi-add for " << pi->ip << endl;      record_stat( STAT_PING, 0, 0 );      // TODO: ok, but now how do I call rt->add without it placing       // backpointers synchronously?      pi->rtt = ping_time;      if( timing != NULL ) {	(*timing)[pi->ip] = ping_time; //_rt->get_time( pi->id );      }      _rt->set_timeout( pi->id, false );    }    TapDEBUG(3) << "multidone ip: " << pi->ip <<       " total left " << ping_rpcset->size() << endl;  }  // now that all the pings are done, we can add them to our routing table  // (possibly sending synchronous backpointers around) without messing  // up the measurements  set<GUID> removed;  // put recently dead nodes on there too  for( uint i = 0; i < _recently_dead.size(); i++ ) {    if( !_rt->contains(_recently_dead[i]) ) {      removed.insert( _recently_dead[i] );    }  }  _recently_dead.clear();  for( HashMap<unsigned, ping_callinfo*>::iterator j=ping_resultmap->begin();       j != ping_resultmap->end(); ++j ) {    ping_callinfo *pi = j.value();    //assert( pi->rtt == ping( pi->ip, pi->id ) );    TapDEBUG(4) << "ip: " << pi->ip << endl;    if( pi->failed ) {      // failed! remove! no need to send a backpointer remove message

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -