📄 tapestry.c
字号:
_rt->remove( pi->id, false ); _rt->remove_backpointer( pi->ip, pi->id ); assert( !_rt->contains( pi->id ) ); TapDEBUG(1) << "removing failed node " << pi->ip << endl; if( repair ) { removed.insert( pi->id ); } } else { // make sure it's not the default (we actually pinged this one) if( pi->rtt != 87654 ) { _rt->add( pi->ip, pi->id, pi->rtt ); if( _lookup_learn ) { _cachebag->remove( pi->id, false ); } } } delete pi; } // now, for each that should be repaired, ask the live nodes in its slot // for a new possibility and ask all live nodes in all // the levels above the slot. if( repair ) { RPCSet repair_rpcset; HashMap<unsigned, repair_callinfo*> repair_resultmap; HashMap<GUID, unsigned> repair_numasked; for( uint j = _digits_per_id-1; j >= 0; j-- ) { for( uint k = 0; k < _base; k++ ) { NodeInfo *ni = _rt->read( j, k ); if( ni == NULL ) { continue; } if( ni->_addr != ip() && removed.find(ni->_id) == removed.end() ) { bool send = false; repair_return *rr = NULL; repair_args *ra = NULL; for(set<GUID>::iterator i=removed.begin();i != removed.end();++i) { if( repair_numasked[*i] >= _max_repair_num ) { continue; } int match = guid_compare( *i, id_digits() ); assert( match >= 0 ); // shouldn't be me if( j < (uint) match ) continue; uint digit = get_digit( *i, match ); repair_numasked.insert( *i, repair_numasked[*i]+1); if( ra == NULL ) { rr = New repair_return(); ra = New repair_args(); ra->bad_ids = New vector<GUID>; ra->levels = New vector<uint>; ra->digits = New vector<uint>; send = true; } ra->bad_ids->push_back(*i); ra->levels->push_back( (uint) match); ra->digits->push_back(digit); } if( send && ra->bad_ids->size() > 0 ) { record_stat(STAT_REPAIR, ra->bad_ids->size(), 2*ra->bad_ids->size()); // just do these once, if we don't repair then oh well I guess... unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_repair, ra, rr, _rtt_timeout_factor* _rt->get_time(ni->_id) ); assert(rpc); repair_resultmap.insert(rpc, New repair_callinfo( ra, rr )); repair_rpcset.insert(rpc); } } delete ni; } // don't wrap the uint around past 0 if( j == 0 ) { break; } } uint rsetsize = repair_rpcset.size(); vector<NodeInfo *> toadd; for( unsigned int i = 0; i < rsetsize; i++ ) { bool ok; unsigned donerpc = rcvRPC( &repair_rpcset, ok ); repair_callinfo *rc = repair_resultmap[donerpc]; repair_return *rr = rc->rr; if( ok ) { record_stat( STAT_REPAIR, rr->nodelist.size(), 0 ); for( vector<NodeInfo>::iterator j=rr->nodelist.begin(); j != rr->nodelist.end(); j++ ) { // make sure this isn't already on the toadd list, and // that it wasn't removed by us // TODO: this is all sorts of inefficient bool add = true; if( removed.find((*j)._id ) != removed.end() ) { add = false; } if( add ) { for( vector<NodeInfo *>::iterator k=toadd.begin(); k != toadd.end(); ++k) { if( (**k)._id == (*j)._id ) { add = false; break; } } } if( add ) { toadd.push_back( New NodeInfo( j->_addr, j->_id ) ); } } } delete rr; rc->rr = NULL; delete rc->ra->bad_ids; delete rc->ra->levels; delete rc->ra->digits; delete rc; } multi_add_to_rt( &toadd, NULL ); // delete for( uint i = 0; i < toadd.size(); i++ ) { delete toadd[i]; toadd[i] = NULL; } }}voidTapestry::place_backpointer( RPCSet *bp_rpcset, HashMap<unsigned, backpointer_args*> *bp_resultmap, IPAddress bpip, int level, bool remove ){ backpointer_args *bpa = New backpointer_args(); bpa->ip = ip(); bpa->id = id(); bpa->level = level; bpa->remove = remove; backpointer_return bpr; TapDEBUG(3) << "sending bp to " << bpip << endl; record_stat(STAT_BACKPOINTER, 1, 2); GUID bpid = get_id_from_ip(bpip); Time timeout = MAXTIME; if( _rt->contains( bpid ) ) { timeout = _rtt_timeout_factor*_rt->get_time( bpid ); } unsigned rpc = asyncRPC( bpip, &Tapestry::handle_backpointer, bpa, &bpr, timeout ); bp_rpcset->insert(rpc); bp_resultmap->insert(rpc, bpa); // for now assume no failures}voidTapestry::place_backpointer_end( RPCSet *bp_rpcset, HashMap<unsigned,backpointer_args*>*bp_resultmap) { // wait for each to finish uint setsize = bp_rpcset->size(); RoutingTable *rt_old = _rt; for( unsigned int j = 0; j < setsize; j++ ) { bool ok; unsigned donerpc = rcvRPC( bp_rpcset, ok ); if( ok ) { record_stat(STAT_BACKPOINTER, 0, 0); } else { // for now we won't worry about what happens if a bp msg is dropped } backpointer_args *bpa = (*bp_resultmap)[donerpc]; delete bpa; } // must have the lock again before returning if( _rt != rt_old ) { // the routing table changed while we were away (i.e. we crashed) // TODO: in the future, if place_backpointer_end is ever called not at // the end of an rt function, we might need to do something here TapDEBUG(3) << "rt changed while away in place_backpointer_end" << endl; }}void Tapestry::handle_backpointer(backpointer_args *args, backpointer_return *ret){ TapDEBUG(3) << "got a bp msg from " << args->id << endl; TapDEBUG(5) << "bp enter" << endl; got_backpointer( args->ip, args->id, args->level, args->remove ); TapDEBUG(5) << "bp exit" << endl; // maybe this person should be in our table? //add_to_rt( args->ip, args->id );}voidTapestry::got_backpointer( IPAddress bpip, GUID bpid, uint level, bool remove ){ if( remove ) { _rt->remove_backpointer( bpip, bpid, level ); } else { _rt->add_backpointer( bpip, bpid, level ); }}IPAddressTapestry::next_hop( GUID key ){ IPAddress *ip = New IPAddress[1]; ip[0] = 0; GUID *id = New GUID[1]; id[0] = 0; next_hop( key, &ip, &id, 1 ); IPAddress rt = ip[0]; delete ip; return rt;}voidTapestry::next_hop( GUID key, IPAddress** ips, GUID **ids, uint size ){ // first, figure out how many digits we share, and start looking at // that level int level = guid_compare( key, id_digits() ); if( level == -1 ) { // it's us! if( size > 0 ) { (*ips)[0] = ip(); (*ids)[0] = id(); } return; } RouteEntry *re = _rt->get_entry( level, get_digit( key, level ) ); // use the first value passed in in ips as a non-nexthop, that is, if // ips[0] is nonnull, do NOT return that one as a possible next hop IPAddress dontuse = (*ips)[0]; // if it has an entry, use it if( re != NULL && re->get_first() != NULL && (re->get_first()->_addr != dontuse || re->size() > 1 ) ) { // this cannot be us, since this should be the entry where we differ // from the key, but just to be sure . . . assert( re->get_first()->_addr != id() ); uint used = 0; for( uint i = 0; i < re->size() && used < size; i++ ) { if( re->get_at(i)->_addr != dontuse ) { (*ips)[used] = re->get_at(i)->_addr; (*ids)[used] = re->get_at(i)->_id; used++; } } return; } else { // if there's no such entry, it's time to surrogate route. yeah baby. // keep adding 1 to the digit until you find a match. If it's us, // go up to the next level and keep on trying till the end // (From Fig. 3 in the JSAC paper) for( uint i = level; i < _digits_per_id; i++ ) { uint j = get_digit( key, i ); while( re == NULL || re->get_first() == NULL || (re->get_first()->_addr == dontuse && re->size() == 1) ) { // NOTE: we can't start with looking at the j++ entry, since // this might be the next level up, and we'd want to start at j re = _rt->get_entry( i, j ); j++; if( j == _base ) { // think circularly j = 0; } } TapDEBUG(4) << "looking for key " << print_guid(key) << ", level " << i << ", digit " << j << " is " << print_guid(re->get_first()->_id) << endl; // if it is us, go around another time // otherwise, we've found the next hop if( re->get_first()->_addr != ip() ) { uint used = 0; for( uint k = 0; k < re->size() && used < size; k++ ) { if( re->get_at(k)->_addr != dontuse ) { (*ips)[used] = re->get_at(k)->_addr; (*ids)[used] = re->get_at(k)->_id; used++; } } return; } re = NULL; } } // didn't find a better surrogate, so it must be us if( size > 0 ) { (*ips)[0] = ip(); (*ids)[0] = id(); }}intTapestry::guid_compare( GUID key1, GUID key2 ){ // if they're the same, return -1 if( key1 == key2 ) { return -1; } uint i = 0; // otherwise, figure out where they differ for( ; i < _digits_per_id; i++ ) { if( get_digit( key1, i ) != get_digit( key2, i ) ) { break; } } return (int) i;}intTapestry::guid_compare( GUID key1, uint key2_digits[] ){ uint i = 0; // otherwise, figure out where they differ for( ; i < _digits_per_id; i++ ) { if( get_digit( key1, i ) != key2_digits[i] ) { break; } } // if they're the same, return -1 if( i == _digits_per_id ) { return -1; } return (int) i;}intTapestry::guid_compare( uint key1_digits[], uint key2_digits[] ){ uint i = 0; // otherwise, figure out where they differ for( ; i < _digits_per_id; i++ ) { if( key1_digits[i] != key2_digits[i] ) { break; } } // if they're the same, return -1 if( i == _digits_per_id ) { return -1; } return (int) i;}voidTapestry::leave(Args *args){ crash (args);}voidTapestry::crash(Args *args){ if( _verbose ) { TapDEBUG(0) << "Tapestry crash" << endl; } // XXX: Thomer says: not necessary // crash(); // clear out routing table, and any other state that might be lying around delete _rt; if( _lookup_learn ) { delete _cachebag; } joined = false; // clear join state also _joining = false; for( uint l = 0; l < initlist.size(); l++ ) { delete initlist[l]; } initlist.clear(); _recently_dead.clear(); // although these should be used since the node is dead, and the real new // ones aren't made until the join (we don't know our ID until then), // they are necessary for outstanding RPCs. _rt = New RoutingTable(this, _redundancy); if( _lookup_learn ) { _cachebag = New RoutingTable(this, _redundancy); } // TODO: should be killing these waiting RPCs instead of allowing them // to finish normally. bah. _waiting_for_join->notifyAll(); TapDEBUG(2) << "crash exit" << endl; notifyObservers( (ObserverInfo *) "crash" );}stringTapestry::print_guid( GUID id ){ // NOTE: for now only handle up to base 256 (should be plenty) uint multiplier = 1; uint extra = 1; if( _base > 16 ) { multiplier++; extra += _digits_per_id; } uint size = _digits_per_id*multiplier+extra; char buf[size]; //printf( "initial guid: %16qx\n", id ); // print it out, digit by digit // (in order to get leading zeros) uint j = 0; for( uint i = 0; i < size-1; i++ ) { uint digit = get_digit( id, j ); if( _base > 16 ) { sprintf( &(buf[i]), "%.2x", digit ); i += 2; } else { sprintf( &(buf[i]), "%x", digit ); } j++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -