📄 tapestry.c
字号:
} ips[0] = args->ip; next_hop( args->id, &ips, _redundant_lookup_num ); uint i = 0; for( ; i < _redundant_lookup_num; i++ ) { IPAddress next = ips[i]; if( next == 0 ) { continue; } if( next == ip() ) { // we are the surrogate root for this node, start the integration TapDEBUG(2) << "is the surrogate root for " << args->ip << endl; // start by sending the new node all of the nodes in your table // up to the digit you share int alpha = guid_compare( args->id, id_digits() ); vector<NodeInfo *> thisrow; bool stop = true; for( int i = 0; i <= alpha; i++ ) { for( uint j = 0; j < _base; j++ ) { NodeInfo *ni = _rt->read( i, j ); if( ni != NULL ) { // keep going if there is someone else on this level if( ni->_addr != ip() ) { stop = false; } thisrow.push_back( ni ); } } if( stop ) break; stop = true; } nodelist_args na; na.nodelist = thisrow; nodelist_return nr; record_stat(STAT_NODELIST, na.nodelist.size(), 0); unsigned rpc = asyncRPC( args->ip, &Tapestry::handle_nodelist, &na, &nr, MAXTIME ); // start the multicast mc_args mca; mca.new_ip = args->ip; mca.new_id = args->id; mca.alpha = alpha; mca.from_lock = false; mc_return mcr; // make the watchlist vector<bool *> wl; for( int i = 0; i < alpha+1; i++ ) { bool *level = New bool[_base]; wl.push_back(level); for( uint j = 0; j < _base; j++ ) { wl[i][j] = false; } } mca.watchlist = wl; handle_mc( &mca, &mcr ); // finish up the nodelist rpc RPCSet rpcset; rpcset.insert(rpc); bool ok; rcvRPC( &rpcset, ok ); if( ok ) { record_stat(STAT_NODELIST, 0, 0); } else { if( retryRPC( args->ip, &Tapestry::handle_nodelist, &na, &nr, STAT_NODELIST, na.nodelist.size(), 0 ) ) { record_stat(STAT_NODELIST, 0, 0); } } // free the nodelist for( uint i = 0; i < na.nodelist.size(); i++ ) { delete na.nodelist[i]; } // free the bools! for( int i = 0; i < alpha+1; i++ ) { bool *level = wl[i]; delete level; } TapDEBUG(2) << "surrogate for " << args->ip << " is done with handlejoin" << endl; ret->surr_id = id(); break; } else { // not the surrogate // recursive routing yo TapDEBUG(2) << "Forwarding join req for " << args->ip << " to " << next << endl; bool succ = retryRPC( next, &Tapestry::handle_join, args, ret, STAT_JOIN, 1, 0); if( succ ) { record_stat(STAT_JOIN, 1, 1); } if( succ && !ret->failed ) { // success! break; } else { ret->failed = false; } } } // if we were never successful, set the failed flag if( i == _redundant_lookup_num ) { ret->failed = true; } delete ips; TapDEBUG(5) << "hj exit" << endl;}void Tapestry::handle_nodelist(nodelist_args *args, nodelist_return *ret){ TapDEBUG(5) << "nhl enter" << endl; TapDEBUG(3) << "handling a nodelist message" << endl; // add each to your routing table multi_add_to_rt( &(args->nodelist), NULL ); /* for( uint i = 0; i < args->nodelist.size(); i++ ) { NodeInfo * curr_node = args->nodelist[i]; add_to_rt( curr_node->_addr, curr_node->_id ); } */ TapDEBUG(5) << "nhl exit" << endl;}voidTapestry::handle_mc(mc_args *args, mc_return *ret){ TapDEBUG(5) << "mc enter" << endl; TapDEBUG(2) << "got multicast message for " << args->new_ip << endl; // not allowed to participate in your own join! if( args->new_ip == ip() ) { TapDEBUG(5) << "mc exit" << endl; return; } // lock this node _rt->set_lock( args->new_ip, args->new_id ); // make sure that it knows about you as well mcnotify_args mca; mca.ip = ip(); mca.id = id(); mcnotify_return mcr; // check the watchlist and see if you know about any nodes that // can fill those prefixes vector<NodeInfo *> nodelist; for( uint i = 0; i < args->watchlist.size(); i++ ) { for( uint j = 0; j < _base; j++ ) { if( !args->watchlist[i][j] ) { NodeInfo * ni = _rt->read( i, j ); if( ni != NULL && ni->_addr != ip() && ni->_addr != args->new_ip ) { nodelist.push_back( ni ); args->watchlist[i][j] = true; } else { if( ni != NULL ) { delete ni; } } } } } mca.nodelist = nodelist; record_stat(STAT_MCNOTIFY, 1+nodelist.size(), 0); unsigned mcnrpc = asyncRPC( args->new_ip, &Tapestry::handle_mcnotify, &mca, &mcr, MAXTIME ); // don't go on if this is from a lock if( !args->from_lock ) { RPCSet rpcset; HashMap<unsigned, mc_callinfo*> resultmap; unsigned int numcalls = 0; // then, find any other node that shares this prefix and multicast for( uint i = args->alpha; i < _digits_per_id; i++ ) { for( uint j = 0; j < _base; j++ ) { NodeInfo *ni = _rt->read( i, j ); // don't RPC to ourselves or to the new node if( ni == NULL || ni->_addr == ip() || ni->_addr == args->new_ip ) { if( ni != NULL ) { delete ni; } continue; } else { mc_args *mca = New mc_args(); mc_return *mcr = New mc_return(); mca->new_ip = args->new_ip; mca->new_id = args->new_id; mca->alpha = i + 1; mca->from_lock = false; mca->watchlist = args->watchlist; TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << ni->_addr << "/" << print_guid( ni->_id ) << endl; record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base); unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, _rtt_timeout_factor*_rt->get_time(ni->_id)); assert(rpc); resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr)); rpcset.insert(rpc); numcalls++; delete ni; } } } // also, multicast to any locks that might need it vector <NodeInfo *> * locks = _rt->get_locks( args->new_id ); for( uint i = 0; i < locks->size(); i++ ) { NodeInfo *ni = (*locks)[i]; if( ni->_addr != args->new_ip ) { mc_args *mca = New mc_args(); mc_return *mcr = New mc_return(); mca->new_ip = args->new_ip; mca->new_id = args->new_id; mca->alpha = args->alpha; mca->from_lock = true; mca->watchlist = args->watchlist; TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << ni->_addr << "/" << print_guid( ni->_id ) << " as a lock " << endl; record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base); Time timeout = MAXTIME; if( _rt->contains( ni->_id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ni->_id ); } unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, timeout ); assert(rpc); resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr)); rpcset.insert(rpc); numcalls++; } } Time startmc_time = now(); // wait for them all to return while( rpcset.size() > 0 ) { bool ok; unsigned donerpc = rcvRPC( &rpcset, ok ); if( ok || now() >= startmc_time + _declare_dead_time ) { if( ok ) { record_stat(STAT_MC, 0, 0); } mc_callinfo *ci = resultmap[donerpc]; TapDEBUG(2) << "mc to " << ci->ip << " about " << args->new_ip << " is done. ok = " << ok << endl; delete ci; } else { // do it again sam mc_callinfo *ci = resultmap[donerpc]; resultmap.remove( donerpc ); GUID ci_id = ((Tapestry *)Network::Instance()->getnode(ci->ip))->id(); Time timeout = MAXTIME; if( _rt->contains( ci_id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ci_id ); } record_stat(STAT_MC, 1, 2+ci->ma->watchlist.size()*_base); unsigned rpc = asyncRPC( ci->ip, &Tapestry::handle_mc, ci->ma, ci->mr, timeout ); assert(rpc); rpcset.insert(rpc); resultmap.insert(rpc, ci); } } } // finish up mcnotify RPCSet mcnrpcset; mcnrpcset.insert(mcnrpc); bool ok; rcvRPC( &mcnrpcset, ok ); if( ok ) { record_stat(STAT_MCNOTIFY, 0, 0); } else { ok = retryRPC( args->new_ip, &Tapestry::handle_mcnotify, &mca, &mcr, STAT_MCNOTIFY, 1+nodelist.size(), 0); if( ok ) { record_stat(STAT_MCNOTIFY, 0, 0); } } // free the nodelist for( uint i = 0; i < nodelist.size(); i++ ) { delete nodelist[i]; } if( !ok ) { _rt->remove_lock( args->new_ip, args->new_id ); TapDEBUG(3) << "Notify to new node failed, abandoning mc!" << endl; TapDEBUG(5) << "mc exit" << endl; return; } // we wait until here to add the new node to your table // since other simultaneous inserts will require sending the mc to // at least one unlocked pointer, and if this guy is in the routing table // already, we may send to it even though its locked. add_to_rt( args->new_ip, args->new_id ); _rt->remove_lock( args->new_ip, args->new_id ); TapDEBUG(2) << "multicast done for " << args->new_ip << endl; // TODO: object pointer transferal TapDEBUG(5) << "mc exit" << endl;}void Tapestry::handle_nn(nn_args *args, nn_return *ret){ TapDEBUG(5) << "nn enter" << endl; // send back all backward pointers vector<NodeInfo *> nns; vector<NodeInfo *> *bps = _rt->get_backpointers( args->alpha ); for( uint i = 0; i < bps->size(); i++ ) { NodeInfo *newnode = New NodeInfo( ((*bps)[i])->_addr, ((*bps)[i])->_id ); nns.push_back( newnode ); } // send all forward pointers at that level for( uint i = 0; i < _base; i++ ) { NodeInfo *newnode = _rt->read( args->alpha, i ); if( newnode != NULL ) { nns.push_back( newnode ); } } // add yourself to the list nns.push_back( New NodeInfo( ip(), id() ) ); ret->nodelist = nns; // finally, add this guy to our table add_to_rt( args->ip, args->id ); TapDEBUG(5) << "nn exit" << endl;}void Tapestry::handle_repair(repair_args *args, repair_return *ret){ TapDEBUG(5) << "rep enter" << endl; // send back all backward pointers vector<NodeInfo> nns; for( uint j = 0; j < args->bad_ids->size(); j++ ) { GUID bad = (*(args->bad_ids))[j]; uint level = (*(args->levels))[j]; uint digit = (*(args->digits))[j]; assert( level < _digits_per_id && digit < _base ); // TODO: limit the number? RouteEntry *re = _rt->get_entry( level, digit ); for( uint i = 0; re != NULL && i < re->size(); i++ ) { NodeInfo *next = re->get_at(i); if( next != NULL && next->_addr != ip() && next->_id != bad ) { nns.push_back( *next ); } } } ret->nodelist = nns; TapDEBUG(5) << "rep exit" << endl;}void Tapestry::handle_ping(ping_args *args, ping_return *ret){ TapDEBUG(4) << "pinged." << endl; // do nothing}void Tapestry::handle_mcnotify(mcnotify_args *args, mcnotify_return *ret){ TapDEBUG(5) << "mcn enter" << endl; TapDEBUG(3) << "got mcnotify from " << args->ip << endl; NodeInfo *mc_node = New NodeInfo( args->ip, args->id ); initlist.push_back( mc_node ); // add all the nodes on the nodelist as well nodelist_args na; na.nodelist = args->nodelist; nodelist_return nr; handle_nodelist( &na, &nr ); TapDEBUG(5) << "mcn exit" << endl;}void Tapestry::add_to_rt( IPAddress new_ip, GUID new_id ){ // first get the distance to this node // TODO: asynchronous pings would be nice // TODO: also, maybe a ping cache of some kind bool ok = true; Time distance = ping( new_ip, new_id, ok ); if( ok ) { // the RoutingTable takes care of placing (and removing) backpointers // for us _rt->add( new_ip, new_id, distance ); if( _lookup_learn ) { _cachebag->remove( new_id, false ); } } return;}boolTapestry::stabilized(vector<GUID> lid){ // if we don't think we've joined, we can't possibly be stable if( !joined ) { TapDEBUG(1) << "Haven't even joined yet." << endl; return false; } // for every GUID, make sure it either exists in your routing table, // or someone else does // TODO: do we need locking in this function? for( uint i = 0; i < lid.size(); i++ ) { GUID currguid = lid[i]; if( !_rt->contains(currguid) ) { int match = guid_compare( currguid, id_digits() ); if( match == -1 ) { TapDEBUG(1) << "doesn't have itself in the routing table!" << endl; return false; } else { NodeInfo * ni = _rt->read( match, get_digit( currguid, match ) ); if( ni == NULL ) { TapDEBUG(1) << "has a hole in the routing table at (" << match << "," << get_digit( currguid, match ) << ") where " << print_guid( currguid ) << " would fit." << endl; return false; } else { delete ni; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -