📄 tapestry.c
字号:
} // make sure we haven't crashed and/or started another join if( !alive() || _join_num != curr_join ) { TapDEBUG(0) << "Old join " << curr_join << " aborting" << endl; // need to delete all the currclose now for( uint j = 0; j < _k; j++ ) { NodeInfo *currseed = closestk[j]; delete currseed; } return; } TapDEBUG(2) << "gathered closest for level " << i << endl; // these k are the next initlist for( uint l = 0; l < initlist.size(); l++ ) { delete initlist[l]; } initlist.clear(); seeds.clear(); for( uint k = 0; k < _k; k++ ) { NodeInfo *currclose = closestk[k]; if( currclose != NULL ) { TapDEBUG(2) << "close is " << currclose->_addr << endl; initlist.push_back( currclose ); } } } for( uint l = 0; l < initlist.size(); l++ ) { delete initlist[l]; } initlist.clear(); have_joined(); TapDEBUG(5) << "j exit" << endl; TapDEBUG(2) << "join done" << endl; TapDEBUG(2) << *_rt << endl;}voidTapestry::handle_join(join_args *args, join_return *ret){ TapDEBUG(5) << "hj enter" << endl; TapDEBUG(2) << "got a join message from " << args->ip << "/" << print_guid(args->id) << endl; // not allowed to participate in your own join! if( args->ip == ip() ) { ret->failed = true; TapDEBUG(5) << "hj exit" << endl; return; } // if our join has not yet finished, we must delay the handling of this // person's join. while( !joined ) { _waiting_for_join->wait(); // hmmm. if we're now dead, indicate some kind of timeout probably if( !alive() ) { ret->failed = true; TapDEBUG(5) << "hj exit" << endl; return; //TODO: ???? } } // route toward the root IPAddress *ips = New IPAddress[_redundant_lookup_num]; for( uint i = 0; i < _redundant_lookup_num; i++ ) { ips[i] = 0; } GUID *ids = New GUID[_redundant_lookup_num]; for( uint i = 0; i < _redundant_lookup_num; i++ ) { ids[i] = 0; } ips[0] = args->ip; next_hop( args->id, &ips, &ids, _redundant_lookup_num ); uint i = 0; for( ; i < _redundant_lookup_num; i++ ) { IPAddress next = ips[i]; if( next == 0 ) { continue; } if( next == ip() ) { // we are the surrogate root for this node, start the integration TapDEBUG(2) << "is the surrogate root for " << args->ip << endl; // start by sending the new node all of the nodes in your table // up to the digit you share int alpha = guid_compare( args->id, id_digits() ); vector<NodeInfo *> thisrow; bool stop = true; for( int i = 0; i <= alpha; i++ ) { for( uint j = 0; j < _base; j++ ) { NodeInfo *ni = _rt->read( i, j ); if( ni != NULL ) { // keep going if there is someone else on this level if( ni->_addr != ip() ) { stop = false; } thisrow.push_back( ni ); } } if( stop ) break; stop = true; } nodelist_args na; na.nodelist = thisrow; nodelist_return nr; record_stat(STAT_NODELIST, na.nodelist.size(), 0); unsigned rpc = asyncRPC( args->ip, &Tapestry::handle_nodelist, &na, &nr, MAXTIME ); // start the multicast mc_args mca; mca.new_ip = args->ip; mca.new_id = args->id; mca.alpha = alpha; mca.from_lock = false; mc_return mcr; // make the watchlist vector<bool *> wl; for( int i = 0; i < alpha+1; i++ ) { bool *level = New bool[_base]; wl.push_back(level); for( uint j = 0; j < _base; j++ ) { wl[i][j] = false; } } mca.watchlist = wl; handle_mc( &mca, &mcr ); // finish up the nodelist rpc RPCSet rpcset; rpcset.insert(rpc); bool ok; rcvRPC( &rpcset, ok ); if( ok ) { record_stat(STAT_NODELIST, 0, 0); } else { if( retryRPC( args->ip, &Tapestry::handle_nodelist, &na, &nr, STAT_NODELIST, na.nodelist.size(), 0 ) ) { record_stat(STAT_NODELIST, 0, 0); } } // free the nodelist for( uint i = 0; i < na.nodelist.size(); i++ ) { delete na.nodelist[i]; } // free the bools! for( int i = 0; i < alpha+1; i++ ) { bool *level = wl[i]; delete level; } TapDEBUG(2) << "surrogate for " << args->ip << " is done with handlejoin" << endl; ret->surr_id = id(); break; } else { // not the surrogate // recursive routing yo TapDEBUG(2) << "Forwarding join req for " << args->ip << " to " << next << endl; bool succ = retryRPC( next, &Tapestry::handle_join, args, ret, STAT_JOIN, 1, 0); if( succ ) { record_stat(STAT_JOIN, 1, 1); } if( succ && !ret->failed ) { // success! break; } else { ret->failed = false; } } } // if we were never successful, set the failed flag if( i == _redundant_lookup_num ) { ret->failed = true; } delete [] ips; delete [] ids; TapDEBUG(5) << "hj exit" << endl;}void Tapestry::handle_nodelist(nodelist_args *args, nodelist_return *ret){ TapDEBUG(5) << "nhl enter" << endl; TapDEBUG(3) << "handling a nodelist message" << endl; // add each to your routing table multi_add_to_rt( &(args->nodelist), NULL ); /* for( uint i = 0; i < args->nodelist.size(); i++ ) { NodeInfo * curr_node = args->nodelist[i]; add_to_rt( curr_node->_addr, curr_node->_id ); } */ TapDEBUG(5) << "nhl exit" << endl;}voidTapestry::handle_mc(mc_args *args, mc_return *ret){ TapDEBUG(5) << "mc enter" << endl; TapDEBUG(2) << "got multicast message for " << args->new_ip << endl; // not allowed to participate in your own join! if( args->new_ip == ip() ) { TapDEBUG(5) << "mc exit" << endl; return; } // lock this node _rt->set_lock( args->new_ip, args->new_id ); // make sure that it knows about you as well mcnotify_args mca; mca.ip = ip(); mca.id = id(); mcnotify_return mcr; // check the watchlist and see if you know about any nodes that // can fill those prefixes vector<NodeInfo *> nodelist; for( uint i = 0; i < args->watchlist.size(); i++ ) { for( uint j = 0; j < _base; j++ ) { if( !args->watchlist[i][j] ) { NodeInfo * ni = _rt->read( i, j ); if( ni != NULL && ni->_addr != ip() && ni->_addr != args->new_ip ) { nodelist.push_back( ni ); args->watchlist[i][j] = true; } else { if( ni != NULL ) { delete ni; } } } } } mca.nodelist = nodelist; record_stat(STAT_MCNOTIFY, 1+nodelist.size(), 0); unsigned mcnrpc = asyncRPC( args->new_ip, &Tapestry::handle_mcnotify, &mca, &mcr, MAXTIME ); // don't go on if this is from a lock if( !args->from_lock ) { RPCSet rpcset; HashMap<unsigned, mc_callinfo*> resultmap; unsigned int numcalls = 0; // then, find any other node that shares this prefix and multicast for( uint i = args->alpha; i < _digits_per_id; i++ ) { for( uint j = 0; j < _base; j++ ) { NodeInfo *ni = _rt->read( i, j ); // don't RPC to ourselves or to the new node if( ni == NULL || ni->_addr == ip() || ni->_addr == args->new_ip ) { if( ni != NULL ) { delete ni; } continue; } else { mc_args *mca = New mc_args(); mc_return *mcr = New mc_return(); mca->new_ip = args->new_ip; mca->new_id = args->new_id; mca->alpha = i + 1; mca->from_lock = false; mca->watchlist = args->watchlist; TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << ni->_addr << "/" << print_guid( ni->_id ) << endl; record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base); unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, _rtt_timeout_factor*_rt->get_time(ni->_id)); assert(rpc); resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr)); rpcset.insert(rpc); numcalls++; delete ni; } } } // also, multicast to any locks that might need it vector <NodeInfo *> * locks = _rt->get_locks( args->new_id ); for( uint i = 0; i < locks->size(); i++ ) { NodeInfo *ni = (*locks)[i]; if( ni->_addr != args->new_ip ) { mc_args *mca = New mc_args(); mc_return *mcr = New mc_return(); mca->new_ip = args->new_ip; mca->new_id = args->new_id; mca->alpha = args->alpha; mca->from_lock = true; mca->watchlist = args->watchlist; TapDEBUG(2) << "multicasting info for " << args->new_ip << " to " << ni->_addr << "/" << print_guid( ni->_id ) << " as a lock " << endl; record_stat(STAT_MC, 1, 2+mca->watchlist.size()*_base); Time timeout = MAXTIME; if( _rt->contains( ni->_id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ni->_id ); } unsigned rpc = asyncRPC( ni->_addr, &Tapestry::handle_mc, mca, mcr, timeout ); assert(rpc); resultmap.insert(rpc, New mc_callinfo(ni->_addr, mca, mcr)); rpcset.insert(rpc); numcalls++; } } Time startmc_time = now(); // wait for them all to return while( rpcset.size() > 0 ) { bool ok; unsigned donerpc = rcvRPC( &rpcset, ok ); if( ok || now() >= startmc_time + _declare_dead_time ) { if( ok ) { record_stat(STAT_MC, 0, 0); } mc_callinfo *ci = resultmap[donerpc]; TapDEBUG(2) << "mc to " << ci->ip << " about " << args->new_ip << " is done. ok = " << ok << endl; delete ci; } else { // do it again sam mc_callinfo *ci = resultmap[donerpc]; resultmap.remove( donerpc ); GUID ci_id = get_id_from_ip(ci->ip); Time timeout = MAXTIME; if( _rt->contains( ci_id ) ) { timeout = _rtt_timeout_factor*_rt->get_time( ci_id ); } if( timeout == 0 ) { TapDEBUG(2) << "Timeout of 0 for node " << ci->ip << "/" << print_guid(ci_id) << "; why?" << endl; } record_stat(STAT_MC, 1, 2+ci->ma->watchlist.size()*_base); unsigned rpc = asyncRPC( ci->ip, &Tapestry::handle_mc, ci->ma, ci->mr, timeout ); assert(rpc); rpcset.insert(rpc); resultmap.insert(rpc, ci); } } } // finish up mcnotify RPCSet mcnrpcset; mcnrpcset.insert(mcnrpc); bool ok; rcvRPC( &mcnrpcset, ok ); if( ok ) { record_stat(STAT_MCNOTIFY, 0, 0); } else { ok = retryRPC( args->new_ip, &Tapestry::handle_mcnotify, &mca, &mcr, STAT_MCNOTIFY, 1+nodelist.size(), 0); if( ok ) { record_stat(STAT_MCNOTIFY, 0, 0); } } // free the nodelist for( uint i = 0; i < nodelist.size(); i++ ) { delete nodelist[i]; } if( !ok ) { _rt->remove_lock( args->new_ip, args->new_id ); TapDEBUG(3) << "Notify to new node failed, abandoning mc!" << endl; TapDEBUG(5) << "mc exit" << endl; return; } // we wait until here to add the new node to your table // since other simultaneous inserts will require sending the mc to // at least one unlocked pointer, and if this guy is in the routing table // already, we may send to it even though its locked. add_to_rt( args->new_ip, args->new_id ); _rt->remove_lock( args->new_ip, args->new_id ); TapDEBUG(2) << "multicast done for " << args->new_ip << endl; // TODO: object pointer transferal TapDEBUG(5) << "mc exit" << endl;}void Tapestry::handle_nn(nn_args *args, nn_return *ret){ TapDEBUG(5) << "nn enter" << endl; // send back all backward pointers vector<NodeInfo *> nns; vector<NodeInfo *> *bps = _rt->get_backpointers( args->alpha ); for( uint i = 0; i < bps->size() && (!_nn_random || i < _k); i++ ) { uint index = i; // pick the nodes randomly if( _nn_random ) { bool chosen; do { chosen = true; index = random()%bps->size(); for( uint j = 0; j < nns.size(); j++ ) { if( nns[j]->_id == (*bps)[index]->_id ) { chosen = false; break; } } if( !chosen ) { TapDEBUG(5) << "Couldn't choose " << index << "(" << (*bps)[index]->_addr << "/" << print_guid((*bps)[index]->_id) << ") for element " << i << " out of " << bps->size() << "; trying again" << endl; } else { TapDEBUG(5) << "Chose " << index << "(" << (*bps)[index]->_addr << "/" << print_guid((*bps)[index]->_id) << ") for element " << i << endl; } } while( !chosen ); } NodeInfo *newnode = New NodeInfo( ((*bps)[index])->_addr, ((*bps)[index])->_id ); nns.push_back( newnode ); } // send all forward pointers at that level for( uint i = 0; i < _base; i++ ) { NodeInfo *newnode = _rt->read( args->alpha, i ); if( newnode != NULL ) { nns.push_back( newnode ); } } // add yourself to the list nns.push_back( New NodeInfo( ip(), id() ) ); ret->nodelist = nns;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -