📄 tapestry.c
字号:
// finally, add this guy to our table add_to_rt( args->ip, args->id ); TapDEBUG(5) << "nn exit" << endl;}void Tapestry::handle_repair(repair_args *args, repair_return *ret){ TapDEBUG(5) << "rep enter" << endl; // send back all backward pointers vector<NodeInfo> nns; for( uint j = 0; j < args->bad_ids->size(); j++ ) { GUID bad = (*(args->bad_ids))[j]; uint level = (*(args->levels))[j]; uint digit = (*(args->digits))[j]; assert( level < _digits_per_id && digit < _base ); // TODO: limit the number? RouteEntry *re = _rt->get_entry( level, digit ); for( uint i = 0; re != NULL && i < re->size(); i++ ) { NodeInfo *next = re->get_at(i); if( next != NULL && next->_addr != ip() && next->_id != bad ) { nns.push_back( *next ); } } } ret->nodelist = nns; TapDEBUG(5) << "rep exit" << endl;}void Tapestry::handle_ping(ping_args *args, ping_return *ret){ TapDEBUG(4) << "pinged." << endl; // do nothing}void Tapestry::handle_mcnotify(mcnotify_args *args, mcnotify_return *ret){ TapDEBUG(5) << "mcn enter" << endl; TapDEBUG(3) << "got mcnotify from " << args->ip << endl; NodeInfo *mc_node = New NodeInfo( args->ip, args->id ); initlist.push_back( mc_node ); // add all the nodes on the nodelist as well nodelist_args na; na.nodelist = args->nodelist; nodelist_return nr; handle_nodelist( &na, &nr ); TapDEBUG(5) << "mcn exit" << endl;}void Tapestry::add_to_rt( IPAddress new_ip, GUID new_id ){ // first get the distance to this node // TODO: asynchronous pings would be nice // TODO: also, maybe a ping cache of some kind bool ok = true; Time distance = ping( new_ip, new_id, ok ); if( ok ) { // the RoutingTable takes care of placing (and removing) backpointers // for us _rt->add( new_ip, new_id, distance ); if( _lookup_learn ) { _cachebag->remove( new_id, false ); } } return;}boolTapestry::stabilized(vector<GUID> lid){ // if we don't think we've joined, we can't possibly be stable if( !joined ) { TapDEBUG(1) << "Haven't even joined yet." << endl; return false; } // for every GUID, make sure it either exists in your routing table, // or someone else does // TODO: do we need locking in this function? for( uint i = 0; i < lid.size(); i++ ) { GUID currguid = lid[i]; if( !_rt->contains(currguid) ) { int match = guid_compare( currguid, id_digits() ); if( match == -1 ) { TapDEBUG(1) << "doesn't have itself in the routing table!" << endl; return false; } else { NodeInfo * ni = _rt->read( match, get_digit( currguid, match ) ); if( ni == NULL ) { TapDEBUG(1) << "has a hole in the routing table at (" << match << "," << get_digit( currguid, match ) << ") where " << print_guid( currguid ) << " would fit." << endl; return false; } else { delete ni; } } } } return true;}voidTapestry::oracle_node_died( IPAddress deadip, GUID deadid, const set<Node *> *lid ){ TapDEBUG(2) << "Oracle says node died: " << deadip << "/" << print_guid( deadid ) << endl; _rt->remove( deadid, false ); _rt->remove_backpointer( deadip, deadid ); int match = guid_compare( deadid, id_digits() ); uint digit = get_digit( deadid, match ); // now find a replacement vector<NodeInfo *> nodes; Time bestrtt = 1000000; Tapestry *bestnode = NULL; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() != ip() && currnode->alive() ) { if( guid_compare( currnode->id_digits(), id_digits() ) == match && get_digit( currnode->id(), match ) == digit ) { Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), currnode->ip() ); if( rtt < bestrtt ) { bestrtt = rtt; bestnode = currnode; } } } } if( bestnode != NULL ) { if( _rt->add( bestnode->ip(), bestnode->id(), bestrtt, false ) ) { bestnode->got_backpointer( ip(), id(), match, false ); if( _lookup_learn ) { _cachebag->remove( bestnode->id(), false ); } } }}voidTapestry::oracle_node_joined( Tapestry *t ){ TapDEBUG(2) << "Oracle says node joined: " << t->ip() << "/" << print_guid( t->id() ) << endl; Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), t->ip() ); if( rtt >= 50000 ) { return; } if( _rt->add( t->ip(), t->id(), rtt, false ) ) { t->got_backpointer( ip(), id(), guid_compare( t->id_digits(), id_digits() ), false ); // for now don't worry about removing backpointers for other people, // they don't really matter }}voidTapestry::check_rt(void *x){ TapDEBUG(2) << "Checking the routing table" << endl; Time t = now(); // do nothing if we should be dead or not fully alive if( !joined ) { _stab_scheduled = false; return; } // ping everyone in the routing table to // - update latencies // - ensure they're still alive // the easy way to do this is just to make a vector of all the nodes // in the routing table and simply try to add them all to the routing table. vector<NodeInfo *> nodes; for( uint i = 0; i < _digits_per_id; i++ ) { for( uint j = 0; j < _base; j++ ) { RouteEntry *re = _rt->get_entry( i, j ); if( re == NULL || (re->get_first() != NULL && re->get_first()->_addr == ip() ) ) { // if this an entry with our own ip, don't bother delving further // the backups of yourself will appear in later levels // there should be no other duplicates though, so vector is safe continue; } for( uint k = 0; k < re->size() && k <= _repair_backups; k++ ) { nodes.push_back( re->get_at(k) ); } } } // maybe we want to check backpointers too if( _check_backpointers ) { for( uint i = 0; i < _digits_per_id; i++ ) { vector<NodeInfo *> *bps = _rt->get_backpointers( i ); for( uint j = 0; j < bps->size(); j++ ) { if( !_rt->contains( ((*bps)[j])->_id ) ) { nodes.push_back( (*bps)[j] ); } } } } RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &nodes, NULL, false ); multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, true); TapDEBUG(2) << "finished checking routing table " << now() << " " << t << endl; // reschedule if ( t + _stabtimer < now()) { //stabilizating has run past _stabtime seconds // schedule it in one ms, to avoid recursion TapDEBUG(3) << "rescheduling check_rt in 1" << endl; delaycb( 1, &Tapestry::check_rt, (void *) 0 ); } else { Time later = _stabtimer - (now() - t); TapDEBUG(3) << "rescheduling check_rt in " << later << endl; delaycb( later, &Tapestry::check_rt, (void *) 0 ); }}voidTapestry::initstate(){ const set<Node *> *lid = Network::Instance()->getallnodes(); // TODO: we shouldn't need locking in here, right? if( !alive() ) { _join_num++; TapDEBUG(3) << "My alive is false" << endl; return; } TapDEBUG(1) << "initstate: about to add everyone" << endl; // for every node but this own, add them all to your routing table vector<NodeInfo *> nodes; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() == ip() || !currnode->alive() ) { continue; } // cheat and get the latency straight from the topology Time rtt = Network::Instance()->gettopology()->latency( ip(), currnode->ip() ) + Network::Instance()->gettopology()->latency( currnode->ip(), ip() ); if( rtt >= 50000 ) { continue; } _rt->add( currnode->ip(), currnode->id(), rtt, false ); } // now that everyone's been added, place backpointers on everyone // who is still in the table uint known_nodes = 0; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() == ip() || !currnode->alive() ) { continue; } // TODO: should I send backpointers for levels lower than the // maximum match? That would be a pain. if( _rt->contains( currnode->id() ) ) { currnode->got_backpointer( ip(), id(), guid_compare( currnode->id_digits(), id_digits() ), false ); known_nodes++; } } TapDEBUG(3) << "INITSTATE: " << known_nodes << endl; have_joined(); TapDEBUG(2) << "init_state: finished adding everyone" << endl;}TimeTapestry::ping( IPAddress other_node, GUID other_id, bool &ok ){ // if it's already in the table, don't worry about it if( _rt->contains( other_id ) ) { return _rt->get_time( other_id ); } Time before = now(); ping_args pa; ping_return pr; TapDEBUG(4) << "about to ping " << other_node << endl; record_stat(STAT_PING, 0, 0); ok = doRPC( other_node, &Tapestry::handle_ping, &pa, &pr, MAXTIME ); if( ok ) { record_stat(STAT_PING, 0, 0); } TapDEBUG(4) << "done with ping " << other_node << endl; return now() - before;}voidTapestry::multi_add_to_rt( vector<NodeInfo *> *nodes, map<IPAddress, Time> *timing ){ RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, nodes, timing, true ); multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, timing, false );}voidTapestry::multi_add_to_rt_start( RPCSet *ping_rpcset, HashMap<unsigned, ping_callinfo*> *ping_resultmap, vector<NodeInfo *> *nodes, map<IPAddress, Time> *timing, bool check_exist ){ ping_args pa; ping_return pr; for( uint j = 0; j < nodes->size(); j++ ) { NodeInfo *ni = (*nodes)[j]; // do an asynchronous RPC to each one, collecting the ping times in the // process // however, no need to ping if we already know the ping time, right? if( (!check_exist) // && now() - _last_heard_map[ni->_addr] >= _stabtimer) || !_rt->contains( ni->_id ) ) { record_stat(STAT_PING, 0, 0); ping_callinfo *pi = New ping_callinfo(ni->_addr, ni->_id, now()); if( _rt->contains( ni->_id ) ) { TapDEBUG(4) << "GetTime()-ing for " << ni->_addr << "/" << print_guid(ni->_id) << endl; pi->last_timeout = _rtt_timeout_factor*_rt->get_time( ni->_id ); } else { pi->last_timeout = MAXTIME; } TapDEBUG(3) << "Starting multi-add for " << ni->_addr << ", timeout: " << pi->last_timeout << endl; unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_ping, &pa, &pr, pi->last_timeout ); assert(rpc); ping_resultmap->insert(rpc, pi); ping_rpcset->insert(rpc); if( timing != NULL ) { (*timing)[ni->_addr] = 1000000; } } else if( check_exist && _rt->contains( ni->_id ) && timing != NULL ) { (*timing)[ni->_addr] = _rt->get_time( ni->_id ); } }}voidTapestry::multi_add_to_rt_end( RPCSet *ping_rpcset, HashMap<unsigned, ping_callinfo*> *ping_resultmap, Time before_ping, map<IPAddress, Time> *timing, bool repair ){ // check for done pings assert( ping_rpcset->size() == (uint) ping_resultmap->size() ); while( ping_rpcset->size() > 0 ) { bool ok; unsigned donerpc = rcvRPC( ping_rpcset, ok ); ping_callinfo *pi = (*ping_resultmap)[donerpc]; assert( pi ); Time ping_time = now() - pi->pingstart; if( !ok ) { // this ping failed. remove if you've reached the max time limit, // otherwise try the ping again. if( now() - before_ping >= _declare_dead_time ) { pi->failed = true; } else { // put another shrimp on the barbie . . . TapDEBUG(3) << "Forking off check of (" << pi->ip << "/" << print_guid(pi->id) << ")" << endl; check_node_args *cna = New check_node_args(); cna->ip = pi->ip; cna->id = pi->id; _check_nodes->push_back( cna ); _check_nodes_waiting->notifyAll(); } } else { TapDEBUG(3) << "Finished multi-add for " << pi->ip << endl; record_stat( STAT_PING, 0, 0 ); // TODO: ok, but now how do I call rt->add without it placing // backpointers synchronously? pi->rtt = ping_time; if( timing != NULL ) { (*timing)[pi->ip] = ping_time; //_rt->get_time( pi->id ); } _rt->set_timeout( pi->id, false ); } TapDEBUG(3) << "multidone ip: " << pi->ip << " total left " << ping_rpcset->size() << endl; } // now that all the pings are done, we can add them to our routing table // (possibly sending synchronous backpointers around) without messing // up the measurements set<GUID> removed; // put recently dead nodes on there too for( uint i = 0; i < _recently_dead.size(); i++ ) { if( !_rt->contains(_recently_dead[i]) ) { removed.insert( _recently_dead[i] ); } } _recently_dead.clear(); for( HashMap<unsigned, ping_callinfo*>::iterator j=ping_resultmap->begin(); j != ping_resultmap->end(); ++j ) { ping_callinfo *pi = j.value(); //assert( pi->rtt == ping( pi->ip, pi->id ) ); TapDEBUG(4) << "ip: " << pi->ip << endl; if( pi->failed ) { // failed! remove! no need to send a backpointer remove message
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -