📄 tapestry.c
字号:
} delete ips; delete ids; TapDEBUG(5) << "hl exit " << endl;}voidTapestry::insert(Args *args) { TapDEBUG(2) << "Tapestry Insert" << endl;}template<class BT, class AT, class RT>boolTapestry::retryRPC(IPAddress dst, void (BT::* fn)(AT *, RT *), AT *args, RT *ret, uint type, uint num_args_id, uint num_args_else){ Time starttime = now(); GUID dstid = get_id_from_ip(dst); Time timeout; if( dst != ip() && _rt->contains( dstid ) ) { timeout = _rtt_timeout_factor * _rt->get_time( dstid ); } else { timeout = MAXTIME; } while( now() < starttime + _declare_dead_time ) { if( dst != ip() ) { record_stat( type, num_args_id, num_args_else); } bool succ = doRPC(dst, fn, args, ret, timeout); if (succ) { return true; } timeout *= 2; } return false;}voidTapestry::check_node_loop(void * args){ ping_args pa; ping_return pr; // use ping callinfos RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; bool waiting = false; unsigned condvar_token = 1; TapDEBUG(3) << "Entered check_node_loop" << endl; while( true ) { if( !waiting ) { // initialize the Channel and add it to the condition variable // add it to the rpc map right away so no one else takes the 1 token Channel *c = chancreate(sizeof(int*), 0); assert( !_rpcmap[condvar_token] ); _rpcmap.insert( condvar_token, New RPCHandle(c, New Packet()) ); _check_nodes_waiting->wait_noblock(c); ping_rpcset.insert(condvar_token); waiting = true; // this RPCHandle, channel and packet will get deleted by // Node::_deleteRPC, called from rcvRPC TapDEBUG(3) << "Starting loop in check_node_loop" << endl; // check for any new IDs for( vector<check_node_args *>::iterator i=_check_nodes->begin(); i != _check_nodes->end(); i++ ) { check_node_args *check = *i; assert( check ); IPAddress checkip = check->ip; GUID checkid = check->id; TapDEBUG(2) << "Going to check (" << checkip << "/" << print_guid(checkid) << ") in check_node_loop." << endl; if( checkid == id() || !_rt->contains(checkid) || _rt->get_timeout( checkid ) ) { // we're already checking this person TapDEBUG(2) << "We're already checking (" << checkip << "/" << print_guid(checkid) << ") " << _rt->contains(checkid) << " " << _rt->get_timeout( checkid ) << ", so ignoring." << endl; delete check; continue; } Time timeout; if( checkid != ip() && _rt->contains( checkid ) ) { timeout = _rtt_timeout_factor * _rt->get_time( checkid ); } else { timeout = MAXTIME; } // start an asyncRPC and save the info if( checkid != ip() ) { record_stat( STAT_PING, 0, 0); } unsigned rpc = asyncRPC( checkip, &Tapestry::handle_ping, &pa, &pr, timeout ); assert(rpc); ping_callinfo *pi = New ping_callinfo( checkip, checkid, now() ); pi->last_timeout = timeout; pi->times_tried = 0; ping_resultmap.insert(rpc, pi); ping_rpcset.insert(rpc); _rt->set_timeout( pi->id, true ); // now try to find a better node to put in your table from the cache if( _lookup_learn ) { int digit = guid_compare( id(), pi->id ); assert( digit != -1 ); uint val = get_digit( pi->id, digit ); NodeInfo *n = _cachebag->read( digit, val ); if( n != NULL ) { TapDEBUG(2) << "Lookup-learn adding (" << n->_addr << "/" << print_guid(n->_id) << ") to replace (" << checkip << "/" << print_guid(checkid) << ")" << endl; _cachebag->remove( n->_id, false ); _rt->add( n->_addr, n->_id, n->_distance, false); delete n; } } delete check; } _check_nodes->clear(); } // now be extremely clever and select over both the asyncRPCs and the // channel that's waiting for new nodes to check bool ok; uint rpc = rcvRPC( &ping_rpcset, ok ); // TODO: what happens if we get an RPC back after dying/re-joining??? // if it's not the condvar one, do some stuff, otherwise go directly // to the top of the loop and add more nodes if( rpc != condvar_token ) { assert(rpc); ping_callinfo *pi = ping_resultmap[rpc]; pi->times_tried++; if( ok ) { TapDEBUG(2) << "check_node_loop found that (" << pi->ip << "/" << print_guid( pi->id ) << ") is alive." << endl; // this RPC succeeded, so this node wasn't dead after all. apologize. if( pi->ip != ip() ) { record_stat( STAT_PING, 0, 0); } _rt->set_timeout( pi->id, false ); delete pi; } else if( pi->times_tried >= _declare_dead_num || now() - pi->pingstart >= _declare_dead_time ) { // this node is officially dead. make it so. _rt->remove( pi->id, false ); _rt->remove_backpointer( pi->ip, pi->id ); _recently_dead.push_back(pi->id); TapDEBUG(2) << "Declaring (" << pi->ip << "/" << print_guid(pi->id) << ") dead in check_node_loop" << endl; delete pi; } else { // otherwise schedule this person again, doubling the timeout pi->last_timeout *= 2; if( pi->ip != ip() ) { record_stat( STAT_PING, 0, 0); } unsigned newrpc = asyncRPC( pi->ip, &Tapestry::handle_ping, &pa, &pr, pi->last_timeout ); assert(newrpc); ping_resultmap.insert(newrpc, pi); ping_rpcset.insert(newrpc); } } else { TapDEBUG(3) << "Notified via condvar in check_node_loop" << endl; waiting = false; } }}voidTapestry::have_joined(){ joined = true; _joining = false; _waiting_for_join->notifyAll(); if( _verbose ) { TapDEBUG(0) << "Finishing joining." << endl; } if( !_stab_scheduled && _stabtimer > 0 ) { delaycb( random()%_stabtimer, &Tapestry::check_rt, (void *) 0 ); _stab_scheduled = true; }}// External event that tells a node to contact the well-known node// and try to join.voidTapestry::join(Args *args){ TapDEBUG(5) << "j enter" << endl; _my_id = get_id_from_ip(ip()); // initialize an array of guid digits delete [] _my_id_digits; _my_id_digits = New uint[_digits_per_id]; for( uint i = 0; i < _digits_per_id; i++ ) { _my_id_digits[i] = get_digit( id(), i ); } IPAddress wellknown_ip = args->nget<IPAddress>("wellknown"); TapDEBUG(3) << ip() << " Wellknown: " << wellknown_ip << endl; if( _join_num == 0 && wellknown_ip == ip() ) { notifyObservers(); } else if( _join_num > 0 ) { // move these constructions to here since after a crash we won't // know our new IP/ID pair until the next join. delete _rt; _rt = New RoutingTable(this, _redundancy); if( _lookup_learn ) { delete _cachebag; _cachebag = New RoutingTable(this, _redundancy); } notifyObservers( (ObserverInfo *) "join" ); } if( _joining ) { TapDEBUG(0) << "Tried to join while joining -- ignoring" << endl; return; } uint curr_join = ++_join_num; // might already be joined if init_state was used if( joined ) { TapDEBUG(5) << "j exit" << endl; return; } _joining = true; if( _verbose ) { TapDEBUG(0) << "Tapestry join " << curr_join << endl; } // if we're the well known node, we're done if( ip() == wellknown_ip ) { have_joined(); TapDEBUG(5) << "j exit" << endl; return; } // contact the well known machine, and have it start routing to the surrogate join_args ja; ja.ip = ip(); ja.id = id(); join_return jr; uint attempts = 30; // keep attempting to join until you can't attempt no mo' do { attempts--; jr.failed = false; bool succ = retryRPC( wellknown_ip, &Tapestry::handle_join, &ja, &jr, STAT_JOIN, 1, 0); // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; return; } if( !succ ) { TapDEBUG(0) << "Well known ip died! BAD!" << endl; return; } else { record_stat(STAT_JOIN, 1, 1); } } while( jr.failed && attempts >= 0 ); if( jr.failed ) { TapDEBUG(0) << "All my joins failed! BAD!" << endl; return; } // now that the multicast is over, it's time for nearest neighbor // ping everyone on the initlist int init_level = guid_compare( jr.surr_id, id_digits() ); vector<NodeInfo *> seeds; TapDEBUG(2) << "init level of " << init_level << endl; for( int i = init_level; i >= 0; i-- ) { // go through each member of the init list, add them to your routing // table, and ask them for their forward and backward pointers // to do this, set off two sets of asynchRPCs at once, one to ping all // the nodes and find out their distances, and another to get the nearest // neighbors. these can't easily be combined since the nearest neighbor // call does a ping itself (so measured rtt would be double). RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); TapDEBUG(3) << "initing level " << i << " out of " << init_level << " size = " << initlist.size() << endl; multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &initlist, NULL, true ); RPCSet nn_rpcset; HashMap<unsigned, nn_callinfo*> nn_resultmap; unsigned int num_nncalls = 0; for( uint j = 0; j < initlist.size(); j++ ) { NodeInfo ni = *(initlist[j]); // also do an async nearest neighbor call nn_args *na = New nn_args(); na->ip = ip(); na->id = id(); na->alpha = i; nn_return *nr = New nn_return(); record_stat(STAT_NN, 1, 1); Time timeout = MAXTIME; if( _rt->contains( ni._id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ni._id ); } unsigned rpc = asyncRPC( ni._addr, &Tapestry::handle_nn, na, nr, timeout ); assert(rpc); nn_resultmap.insert(rpc, New nn_callinfo(ni._addr, na, nr)); nn_rpcset.insert(rpc); num_nncalls++; } multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, false ); for( uint j = 0; j < num_nncalls; j++ ) { bool ok; unsigned donerpc = rcvRPC( &nn_rpcset, ok ); nn_callinfo *ncall = nn_resultmap[donerpc]; nn_return nnr = *(ncall->nr); TapDEBUG(3) << "done with nn with " << ncall->ip << endl; if( ok ) { record_stat( STAT_NN, nnr.nodelist.size(), 0 ); for( uint k = 0; k < nnr.nodelist.size(); k++ ) { // make sure this one isn't on there yet // TODO: make this more efficient. Maybe use a hash set or something NodeInfo *currseed = nnr.nodelist[k]; // don't add ourselves, duh if( currseed->_id == id() ) { delete currseed; continue; } bool add = true; for( uint l = 0; l < seeds.size(); l++ ) { if( *currseed == *(seeds[l]) ) { add = false; break; } } if( add ) { seeds.push_back( currseed ); TapDEBUG(3) << " has a seed of " << currseed->_addr << " and " << print_guid( currseed->_id ) << endl; } else { delete currseed; } } TapDEBUG(3) << "done with adding seeds with " << ncall->ip << endl; } // TODO: for some reason the compiler gets mad if I try to delete nr // in the destructor delete ncall->nr; ncall->nr = NULL; delete ncall; } // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the seeds for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; delete currseed; } return; } // now that we have all the seeds we want from this one, // see if they are in the closest k NodeInfo * closestk[_k]; // initialize to NULL for( uint k = 0; k < _k; k++ ) { closestk[k] = NULL; } bool closest_full = false; // add these guys to routing table map< IPAddress, Time> timing; TapDEBUG(3) << "About to add seeds to the routing table." << endl; multi_add_to_rt( &seeds, &timing ); TapDEBUG(3) << "Done with adding seeds to the routing table." << endl; // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the seeds for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; delete currseed; } return; } for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; // add them all to the routing table (this gets us the ping time for free TapDEBUG(3) << "about to get distance for " << currseed->_addr << endl; //add_to_rt( currseed->_addr, currseed->_id ); TapDEBUG(3) << "added to rt for " << currseed->_addr << endl; //bool ok = false; currseed->_distance = timing[currseed->_addr]; TapDEBUG(3) << "got distance for " << currseed->_addr << endl; // is there anyone on the list farther than you? if so, replace bool added = false; for( uint k = 0; k < _k; k++ ) { NodeInfo * currclose = closestk[k]; if( currclose == NULL || (closest_full && currclose->_distance > currseed->_distance ) ) { if( currclose != NULL ) { delete currclose; } closestk[k] = currseed; added = true; if( k == _k - 1 ) { closest_full = true; } //TapDEBUG(2) << "close is " << currseed->_addr << endl; break; } } if( !added ) { // we're throwing this person away, so delete the memory delete currseed; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -