📄 tapestry.c
字号:
} while( now() < starttime + _declare_dead_time ) { if( dst != ip() ) { record_stat( type, num_args_id, num_args_else); } bool succ = doRPC(dst, fn, args, ret, timeout); if (succ) { return true; } timeout *= 2; } return false;}voidTapestry::check_node_loop(void * args){ ping_args pa; ping_return pr; // use ping callinfos RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; bool waiting = false; unsigned condvar_token = 1; TapDEBUG(3) << "Entered check_node_loop" << endl; while( true ) { if( !waiting ) { // initialize the Channel and add it to the condition variable // add it to the rpc map right away so no one else takes the 1 token Channel *c = chancreate(sizeof(int*), 0); assert( !_rpcmap[condvar_token] ); _rpcmap.insert( condvar_token, New RPCHandle(c, New Packet()) ); _check_nodes_waiting->wait_noblock(c); ping_rpcset.insert(condvar_token); waiting = true; // this RPCHandle, channel and packet will get deleted by // Node::_deleteRPC, called from rcvRPC TapDEBUG(3) << "Starting loop in check_node_loop" << endl; // check for any new IDs for( vector<check_node_args *>::iterator i=_check_nodes->begin(); i != _check_nodes->end(); i++ ) { check_node_args *check = *i; assert( check ); IPAddress checkip = check->ip; GUID checkid = ((Tapestry *)Network::Instance()->getnode(checkip))->id(); Time timeout; if( checkid != ip() && _rt->contains( checkid ) ) { timeout = _rtt_timeout_factor * _rt->get_time( checkid ); } else { timeout = MAXTIME; } // start an asyncRPC and save the info if( checkid != ip() ) { record_stat( STAT_PING, 0, 0); } unsigned rpc = asyncRPC( checkip, &Tapestry::handle_ping, &pa, &pr, timeout ); assert(rpc); ping_callinfo *pi = New ping_callinfo( checkip, checkid, now() ); pi->last_timeout = timeout; ping_resultmap.insert(rpc, pi); ping_rpcset.insert(rpc); _rt->set_timeout( pi->id, true ); // now try to find a better node to put in your table from the cache if( _lookup_learn ) { int digit = guid_compare( id(), pi->id ); assert( digit != -1 ); uint val = get_digit( pi->id, digit ); NodeInfo *n = _cachebag->read( digit, val ); if( n != NULL ) { _cachebag->remove( n->_id, false ); _rt->add( n->_addr, n->_id, n->_distance, false); delete n; } } delete check; } _check_nodes->clear(); } // now be extremely clever and select over both the asyncRPCs and the // channel that's waiting for new nodes to check bool ok; uint rpc = rcvRPC( &ping_rpcset, ok ); // TODO: what happens if we get an RPC back after dying/re-joining??? // if it's not the condvar one, do some stuff, otherwise go directly // to the top of the loop and add more nodes if( rpc != condvar_token ) { assert(rpc); ping_callinfo *pi = ping_resultmap[rpc]; if( ok ) { // this RPC succeeded, so this node wasn't dead after all. apologize. if( pi->ip != ip() ) { record_stat( STAT_PING, 0, 0); } _rt->set_timeout( pi->id, false ); delete pi; } else if( now() - pi->pingstart >= _declare_dead_time ) { // this node is officially dead. make it so. _rt->remove( pi->id, false ); _rt->remove_backpointer( pi->ip, pi->id ); _recently_dead.push_back(pi->id); TapDEBUG(2) << "Declaring (" << pi->ip << "/" << print_guid(pi->id) << ") dead in check_node_loop" << endl; delete pi; } else { // otherwise schedule this person again, doubling the timeout pi->last_timeout *= 2; if( pi->ip != ip() ) { record_stat( STAT_PING, 0, 0); } unsigned newrpc = asyncRPC( pi->ip, &Tapestry::handle_ping, &pa, &pr, pi->last_timeout ); assert(newrpc); ping_resultmap.insert(newrpc, pi); ping_rpcset.insert(newrpc); } } else { TapDEBUG(3) << "Notified via condvar in check_node_loop" << endl; waiting = false; } }}voidTapestry::have_joined(){ joined = true; _joining = false; _waiting_for_join->notifyAll(); if( _verbose ) { TapDEBUG(0) << "Finishing joining." << endl; } if( !_stab_scheduled && _stabtimer > 0 ) { delaycb( random()%_stabtimer, &Tapestry::check_rt, (void *) 0 ); _stab_scheduled = true; }}// External event that tells a node to contact the well-known node// and try to join.voidTapestry::join(Args *args){ TapDEBUG(5) << "j enter" << endl; IPAddress wellknown_ip = args->nget<IPAddress>("wellknown"); TapDEBUG(3) << ip() << " Wellknown: " << wellknown_ip << endl; if( _join_num == 0 && wellknown_ip == ip() ) { notifyObservers(); } else if( _join_num > 0 ) { notifyObservers( (ObserverInfo *) "join" ); } if( _joining ) { TapDEBUG(0) << "Tried to join while joining -- ignoring" << endl; return; } uint curr_join = ++_join_num; // might already be joined if init_state was used if( joined ) { TapDEBUG(5) << "j exit" << endl; return; } _joining = true; if( _verbose ) { TapDEBUG(0) << "Tapestry join " << curr_join << endl; } // if we're the well known node, we're done if( ip() == wellknown_ip ) { have_joined(); TapDEBUG(5) << "j exit" << endl; return; } // contact the well known machine, and have it start routing to the surrogate join_args ja; ja.ip = ip(); ja.id = id(); join_return jr; uint attempts = 30; // keep attempting to join until you can't attempt no mo' do { attempts--; jr.failed = false; bool succ = retryRPC( wellknown_ip, &Tapestry::handle_join, &ja, &jr, STAT_JOIN, 1, 0); // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; return; } if( !succ ) { TapDEBUG(0) << "Well known ip died! BAD!" << endl; return; } else { record_stat(STAT_JOIN, 1, 1); } } while( jr.failed && attempts >= 0 ); if( jr.failed ) { TapDEBUG(0) << "All my joins failed! BAD!" << endl; return; } // now that the multicast is over, it's time for nearest neighbor // ping everyone on the initlist int init_level = guid_compare( jr.surr_id, id_digits() ); vector<NodeInfo *> seeds; TapDEBUG(2) << "init level of " << init_level << endl; for( int i = init_level; i >= 0; i-- ) { // go through each member of the init list, add them to your routing // table, and ask them for their forward and backward pointers // to do this, set off two sets of asynchRPCs at once, one to ping all // the nodes and find out their distances, and another to get the nearest // neighbors. these can't easily be combined since the nearest neighbor // call does a ping itself (so measured rtt would be double). RPCSet ping_rpcset; HashMap<unsigned, ping_callinfo*> ping_resultmap; Time before_ping = now(); TapDEBUG(3) << "initing level " << i << " out of " << init_level << " size = " << initlist.size() << endl; multi_add_to_rt_start( &ping_rpcset, &ping_resultmap, &initlist, NULL, true ); RPCSet nn_rpcset; HashMap<unsigned, nn_callinfo*> nn_resultmap; unsigned int num_nncalls = 0; for( uint j = 0; j < initlist.size(); j++ ) { NodeInfo ni = *(initlist[j]); // also do an async nearest neighbor call nn_args *na = New nn_args(); na->ip = ip(); na->id = id(); na->alpha = i; nn_return *nr = New nn_return(); record_stat(STAT_NN, 1, 1); Time timeout = MAXTIME; if( _rt->contains( ni._id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ni._id ); } unsigned rpc = asyncRPC( ni._addr, &Tapestry::handle_nn, na, nr, timeout ); assert(rpc); nn_resultmap.insert(rpc, New nn_callinfo(ni._addr, na, nr)); nn_rpcset.insert(rpc); num_nncalls++; } multi_add_to_rt_end( &ping_rpcset, &ping_resultmap, before_ping, NULL, false ); for( uint j = 0; j < num_nncalls; j++ ) { bool ok; unsigned donerpc = rcvRPC( &nn_rpcset, ok ); nn_callinfo *ncall = nn_resultmap[donerpc]; nn_return nnr = *(ncall->nr); TapDEBUG(2) << "done with nn with " << ncall->ip << endl; if( ok ) { record_stat( STAT_NN, nnr.nodelist.size(), 0 ); for( uint k = 0; k < nnr.nodelist.size(); k++ ) { // make sure this one isn't on there yet // TODO: make this more efficient. Maybe use a hash set or something NodeInfo *currseed = nnr.nodelist[k]; // don't add ourselves, duh if( currseed->_id == id() ) { delete currseed; continue; } bool add = true; for( uint l = 0; l < seeds.size(); l++ ) { if( *currseed == *(seeds[l]) ) { add = false; break; } } if( add ) { seeds.push_back( currseed ); TapDEBUG(3) << " has a seed of " << currseed->_addr << " and " << print_guid( currseed->_id ) << endl; } else { delete currseed; } } TapDEBUG(3) << "done with adding seeds with " << ncall->ip << endl; } // TODO: for some reason the compiler gets mad if I try to delete nr // in the destructor delete ncall->nr; ncall->nr = NULL; delete ncall; } // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the seeds for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; delete currseed; } return; } // now that we have all the seeds we want from this one, // see if they are in the closest k NodeInfo * closestk[_k]; // initialize to NULL for( uint k = 0; k < _k; k++ ) { closestk[k] = NULL; } bool closest_full = false; // add these guys to routing table map< IPAddress, Time> timing; TapDEBUG(3) << "About to add seeds to the routing table." << endl; multi_add_to_rt( &seeds, &timing ); TapDEBUG(3) << "Done with adding seeds to the routing table." << endl; // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the seeds for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; delete currseed; } return; } for( uint j = 0; j < seeds.size(); j++ ) { NodeInfo *currseed = seeds[j]; // add them all to the routing table (this gets us the ping time for free TapDEBUG(3) << "about to get distance for " << currseed->_addr << endl; //add_to_rt( currseed->_addr, currseed->_id ); TapDEBUG(3) << "added to rt for " << currseed->_addr << endl; //bool ok = false; currseed->_distance = timing[currseed->_addr]; TapDEBUG(3) << "got distance for " << currseed->_addr << endl; // is there anyone on the list farther than you? if so, replace bool added = false; for( uint k = 0; k < _k; k++ ) { NodeInfo * currclose = closestk[k]; if( currclose == NULL || (closest_full && currclose->_distance > currseed->_distance ) ) { if( currclose != NULL ) { delete currclose; } closestk[k] = currseed; added = true; if( k == _k - 1 ) { closest_full = true; } //TapDEBUG(2) << "close is " << currseed->_addr << endl; break; } } if( !added ) { // we're throwing this person away, so delete the memory delete currseed; } } // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the currclose now for( uint j = 0; j < _k; j++ ) { NodeInfo *currseed = closestk[j]; delete currseed; } return; } TapDEBUG(2) << "gathered closest for level " << i << endl; // these k are the next initlist for( uint l = 0; l < initlist.size(); l++ ) { delete initlist[l]; } initlist.clear(); seeds.clear(); for( uint k = 0; k < _k; k++ ) { NodeInfo *currclose = closestk[k]; if( currclose != NULL ) { TapDEBUG(2) << "close is " << currclose->_addr << endl; initlist.push_back( currclose ); } } } for( uint l = 0; l < initlist.size(); l++ ) { delete initlist[l]; } initlist.clear(); have_joined(); TapDEBUG(5) << "j exit" << endl; TapDEBUG(2) << "join done" << endl; TapDEBUG(2) << *_rt << endl;}voidTapestry::handle_join(join_args *args, join_return *ret){ TapDEBUG(5) << "hj enter" << endl; TapDEBUG(2) << "got a join message from " << args->ip << "/" << print_guid(args->id) << endl; // not allowed to participate in your own join! if( args->ip == ip() ) { ret->failed = true; TapDEBUG(5) << "hj exit" << endl; return; } // if our join has not yet finished, we must delay the handling of this // person's join. while( !joined ) { _waiting_for_join->wait(); // hmmm. if we're now dead, indicate some kind of timeout probably if( !alive() ) { ret->failed = true; TapDEBUG(5) << "hj exit" << endl; return; //TODO: ???? } } // route toward the root IPAddress *ips = New IPAddress[_redundant_lookup_num]; for( uint i = 0; i < _redundant_lookup_num; i++ ) { ips[i] = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -