📄 tapestry.c
字号:
} } return true;}voidTapestry::oracle_node_died( IPAddress deadip, GUID deadid, const set<Node *> *lid ){ TapDEBUG(2) << "Oracle says node died: " << deadip << "/" << print_guid( deadid ) << endl; _rt->remove( deadid, false ); _rt->remove_backpointer( deadip, deadid ); int match = guid_compare( deadid, id_digits() ); uint digit = get_digit( deadid, match ); // now find a replacement vector<NodeInfo *> nodes; Time bestrtt = 1000000; Tapestry *bestnode = NULL; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() != ip() && currnode->alive() ) { if( guid_compare( currnode->id_digits(), id_digits() ) == match && get_digit( currnode->id(), match ) == digit ) { Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), currnode->ip() ); if( rtt < bestrtt ) { bestrtt = rtt; bestnode = currnode; } } } } if( bestnode != NULL ) { if( _rt->add( bestnode->ip(), bestnode->id(), bestrtt, false ) ) { bestnode->got_backpointer( ip(), id(), match, false ); if( _lookup_learn ) { _cachebag->remove( bestnode->id(), false ); } } }}voidTapestry::oracle_node_joined( Tapestry *t ){ TapDEBUG(2) << "Oracle says node joined: " << t->ip() << "/" << print_guid( t->id() ) << endl; Time rtt = 2*Network::Instance()->gettopology()->latency( ip(), t->ip() ); if( _rt->add( t->ip(), t->id(), rtt, false ) ) { t->got_backpointer( ip(), id(), guid_compare( t->id_digits(), id_digits() ), false ); // for now don't worry about removing backpointers for other people, // they don't really matter }}voidTapestry::check_rt(void *x){ TapDEBUG(2) << "Checking the routing table" << endl; Time t = now(); // do nothing if we should be dead or not fully alive if( !joined ) { _stab_scheduled = false; return; } // ping everyone in the routing table to // - update latencies // - ensure they're still alive // the easy way to do this is just to make a vector of all the nodes // in the routing table and simply try to add them all to the routing table. vector<NodeInfo *> nodes; for( uint i = 0; i < _digits_per_id; i++ ) { for( uint j = 0; j < _base; j++ ) { RouteEntry *re = _rt->get_entry( i, j ); if( re == NULL || (re->get_first() != NULL && re->get_first()->_addr == ip() ) ) { // if this an entry with our own ip, don't bother delving further // the backups of yourself will appear in later levels // there should be no other duplicates though, so vector is safe continue; } for( uint k = 0; k < re->size() && k <= _repair_backups; k++ ) { nodes.push_back( re->get_at(k) ); } } } RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &nodes, NULL, false ); multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, true); TapDEBUG(2) << "finished checking routing table " << now() << " " << t << endl; // reschedule if ( t + _stabtimer < now()) { //stabilizating has run past _stabtime seconds // schedule it in one ms, to avoid recursion TapDEBUG(2) << "rescheduling check_rt in 1" << endl; delaycb( 1, &Tapestry::check_rt, (void *) 0 ); } else { Time later = _stabtimer - (now() - t); TapDEBUG(2) << "rescheduling check_rt in " << later << endl; delaycb( later, &Tapestry::check_rt, (void *) 0 ); }}voidTapestry::initstate(){ const set<Node *> *lid = Network::Instance()->getallnodes(); // TODO: we shouldn't need locking in here, right? if( !alive() ) { _join_num++; TapDEBUG(3) << "My alive is false" << endl; return; } TapDEBUG(1) << "initstate: about to add everyone" << endl; // for every node but this own, add them all to your routing table vector<NodeInfo *> nodes; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() == ip() || !currnode->alive() ) { continue; } // cheat and get the latency straight from the topology Time rtt = Network::Instance()->gettopology()->latency( ip(), currnode->ip() ) + Network::Instance()->gettopology()->latency( currnode->ip(), ip() ); if( rtt >= 50000 ) { continue; } _rt->add( currnode->ip(), currnode->id(), rtt, false ); } // now that everyone's been added, place backpointers on everyone // who is still in the table uint known_nodes = 0; for(set<Node*>::const_iterator i = lid->begin(); i != lid->end(); ++i) { Tapestry *currnode = (Tapestry*) *i; if( currnode->ip() == ip() || !currnode->alive() ) { continue; } // TODO: should I send backpointers for levels lower than the // maximum match? That would be a pain. if( _rt->contains( currnode->id() ) ) { currnode->got_backpointer( ip(), id(), guid_compare( currnode->id_digits(), id_digits() ), false ); known_nodes++; } } TapDEBUG(2) << "INITSTATE: " << known_nodes << endl; have_joined(); TapDEBUG(2) << "init_state: finished adding everyone" << endl;}TimeTapestry::ping( IPAddress other_node, GUID other_id, bool &ok ){ // if it's already in the table, don't worry about it if( _rt->contains( other_id ) ) { return _rt->get_time( other_id ); } Time before = now(); ping_args pa; ping_return pr; TapDEBUG(4) << "about to ping " << other_node << endl; record_stat(STAT_PING, 0, 0); ok = doRPC( other_node, &Tapestry::handle_ping, &pa, &pr, MAXTIME ); if( ok ) { record_stat(STAT_PING, 0, 0); } TapDEBUG(4) << "done with ping " << other_node << endl; return now() - before;}voidTapestry::multi_add_to_rt( vector<NodeInfo *> *nodes, map<IPAddress, Time> *timing ){ RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, nodes, timing, true ); multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, timing, false );}voidTapestry::multi_add_to_rt_start( RPCSet *ping_rpcset, HashMap<unsigned, ping_callinfo*> *ping_resultmap, vector<NodeInfo *> *nodes, map<IPAddress, Time> *timing, bool check_exist ){ ping_args pa; ping_return pr; for( uint j = 0; j < nodes->size(); j++ ) { NodeInfo *ni = (*nodes)[j]; // do an asynchronous RPC to each one, collecting the ping times in the // process // however, no need to ping if we already know the ping time, right? if( (!check_exist) // && now() - _last_heard_map[ni->_addr] >= _stabtimer) || !_rt->contains( ni->_id ) ) { record_stat(STAT_PING, 0, 0); ping_callinfo *pi = New ping_callinfo(ni->_addr, ni->_id, now()); if( _rt->contains( ni->_id ) ) { pi->last_timeout = _rtt_timeout_factor*_rt->get_time( ni->_id ); } else { pi->last_timeout = MAXTIME; } TapDEBUG(3) << "Starting multi-add for " << ni->_addr << ", timeout: " << pi->last_timeout << endl; unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_ping, &pa, &pr, pi->last_timeout ); assert(rpc); ping_resultmap->insert(rpc, pi); ping_rpcset->insert(rpc); if( timing != NULL ) { (*timing)[ni->_addr] = 1000000; } } else if( check_exist && _rt->contains( ni->_id ) && timing != NULL ) { (*timing)[ni->_addr] = _rt->get_time( ni->_id ); } }}voidTapestry::multi_add_to_rt_end( RPCSet *ping_rpcset, HashMap<unsigned, ping_callinfo*> *ping_resultmap, Time before_ping, map<IPAddress, Time> *timing, bool repair ){ // check for done pings assert( ping_rpcset->size() == (uint) ping_resultmap->size() ); while( ping_rpcset->size() > 0 ) { bool ok; unsigned donerpc = rcvRPC( ping_rpcset, ok ); ping_callinfo *pi = (*ping_resultmap)[donerpc]; assert( pi ); Time ping_time = now() - pi->pingstart; if( !ok ) { // this ping failed. remove if you've reached the max time limit, // otherwise try the ping again. if( now() - before_ping >= _declare_dead_time ) { pi->failed = true; } else { // put another shrimp on the barbie . . . TapDEBUG(2) << "Forking off check of (" << pi->ip << "/" << print_guid(pi->id) << ")" << endl; check_node_args *cna = New check_node_args(); cna->ip = pi->ip; _check_nodes->push_back( cna ); _check_nodes_waiting->notifyAll(); } } else { TapDEBUG(3) << "Finished multi-add for " << pi->ip << endl; record_stat( STAT_PING, 0, 0 ); // TODO: ok, but now how do I call rt->add without it placing // backpointers synchronously? pi->rtt = ping_time; if( timing != NULL ) { (*timing)[pi->ip] = ping_time; //_rt->get_time( pi->id ); } _rt->set_timeout( pi->id, false ); } TapDEBUG(3) << "multidone ip: " << pi->ip << " total left " << ping_rpcset->size() << endl; } // now that all the pings are done, we can add them to our routing table // (possibly sending synchronous backpointers around) without messing // up the measurements set<GUID> removed; // put recently dead nodes on there too for( uint i = 0; i < _recently_dead.size(); i++ ) { if( !_rt->contains(_recently_dead[i]) ) { removed.insert( _recently_dead[i] ); } } _recently_dead.clear(); for( HashMap<unsigned, ping_callinfo*>::iterator j=ping_resultmap->begin(); j != ping_resultmap->end(); ++j ) { ping_callinfo *pi = j.value(); //assert( pi->rtt == ping( pi->ip, pi->id ) ); TapDEBUG(4) << "ip: " << pi->ip << endl; if( pi->failed ) { // failed! remove! no need to send a backpointer remove message _rt->remove( pi->id, false ); _rt->remove_backpointer( pi->ip, pi->id ); assert( !_rt->contains( pi->id ) ); TapDEBUG(1) << "removing failed node " << pi->ip << endl; if( repair ) { removed.insert( pi->id ); } } else { // make sure it's not the default (we actually pinged this one) if( pi->rtt != 87654 ) { _rt->add( pi->ip, pi->id, pi->rtt ); if( _lookup_learn ) { _cachebag->remove( pi->id, false ); } } } delete pi; } // now, for each that should be repaired, ask the live nodes in its slot // for a new possibility and ask all live nodes in all // the levels above the slot. if( repair ) { RPCSet repair_rpcset; HashMap<unsigned, repair_callinfo*> repair_resultmap; HashMap<GUID, unsigned> repair_numasked; for( uint j = _digits_per_id-1; j >= 0; j-- ) { for( uint k = 0; k < _base; k++ ) { NodeInfo *ni = _rt->read( j, k ); if( ni == NULL ) { continue; } if( ni->_addr != ip() && removed.find(ni->_id) == removed.end() ) { bool send = false; repair_return *rr = NULL; repair_args *ra = NULL; for(set<GUID>::iterator i=removed.begin();i != removed.end();++i) { if( repair_numasked[*i] >= _max_repair_num ) { continue; } int match = guid_compare( *i, id_digits() ); assert( match >= 0 ); // shouldn't be me if( j < (uint) match ) continue; uint digit = get_digit( *i, match ); repair_numasked.insert( *i, repair_numasked[*i]+1); if( ra == NULL ) { rr = New repair_return(); ra = New repair_args(); ra->bad_ids = New vector<GUID>; ra->levels = New vector<uint>; ra->digits = New vector<uint>; send = true; } ra->bad_ids->push_back(*i); ra->levels->push_back( (uint) match); ra->digits->push_back(digit); } if( send && ra->bad_ids->size() > 0 ) { record_stat(STAT_REPAIR, ra->bad_ids->size(), 2*ra->bad_ids->size()); // just do these once, if we don't repair then oh well I guess... unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_repair, ra, rr, _rtt_timeout_factor* _rt->get_time(ni->_id) ); assert(rpc); repair_resultmap.insert(rpc, New repair_callinfo( ra, rr )); repair_rpcset.insert(rpc); } } delete ni; } // don't wrap the uint around past 0 if( j == 0 ) { break; } } uint rsetsize = repair_rpcset.size(); vector<NodeInfo *> toadd; for( unsigned int i = 0; i < rsetsize; i++ ) { bool ok; unsigned donerpc = rcvRPC( &repair_rpcset, ok ); repair_callinfo *rc = repair_resultmap[donerpc]; repair_return *rr = rc->rr; if( ok ) { record_stat( STAT_REPAIR, rr->nodelist.size(), 0 ); for( vector<NodeInfo>::iterator j=rr->nodelist.begin(); j != rr->nodelist.end(); j++ ) { // make sure this isn't already on the toadd list, and // that it wasn't removed by us // TODO: this is all sorts of inefficient bool add = true; if( removed.find((*j)._id ) != removed.end() ) { add = false; } if( add ) { for( vector<NodeInfo *>::iterator k=toadd.begin(); k != toadd.end(); ++k) { if( (**k)._id == (*j)._id ) { add = false; break; } } } if( add ) { toadd.push_back( New NodeInfo( j->_addr, j->_id ) ); } } } delete rr; rc->rr = NULL; delete rc->ra->bad_ids; delete rc->ra->levels; delete rc->ra->digits; delete rc; } multi_add_to_rt( &toadd, NULL ); // delete for( uint i = 0; i < toadd.size(); i++ ) { delete toadd[i]; toadd[i] = NULL; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -